diff --git a/egress_backend/components/amplify_waf_addon/amplify_waf_addon.py b/egress_backend/components/amplify_waf_addon/amplify_waf_addon.py index 573e6ea..6a1b502 100644 --- a/egress_backend/components/amplify_waf_addon/amplify_waf_addon.py +++ b/egress_backend/components/amplify_waf_addon/amplify_waf_addon.py @@ -16,6 +16,8 @@ from cdk_nag import NagSuppressions from constructs import Construct +from ..utils import convert_bool + dirname = os.path.dirname(__file__) @@ -210,7 +212,7 @@ def __init__( web_acl_id=web_acl_arn, ) - if custom_domain_config.get("is_enabled"): + if convert_bool(custom_domain_config.get("is_enabled"), bool): amplify_app_distribution.node.default_child.add_property_override( "DistributionConfig.ViewerCertificate.AcmCertificateArn", custom_domain_config.get("cert_arn"), diff --git a/egress_backend/components/utils.py b/egress_backend/components/utils.py new file mode 100644 index 0000000..9c546c0 --- /dev/null +++ b/egress_backend/components/utils.py @@ -0,0 +1,17 @@ +def convert_bool(s, rtype): + """ + Returns: + rtype=str: 'true' or 'false' + type=bool: true or false + """ + if rtype not in (str, bool): + raise ValueError(f"Invalid return type: {rtype}") + if (isinstance(s, str) and s.lower() == "true") or s is True: + if rtype == str: + return "true" + return True + if (isinstance(s, str) and s.lower() == "false") or s is False: + if rtype == str: + return "false" + return False + raise ValueError(f"Invalid boolean string: {s}") diff --git a/egress_backend/egress_backend_stack.py b/egress_backend/egress_backend_stack.py index fafeebf..26947f9 100644 --- a/egress_backend/egress_backend_stack.py +++ b/egress_backend/egress_backend_stack.py @@ -27,37 +27,17 @@ from cdk_nag import NagSuppressions from constructs import Construct -from egress_backend.components.amplify_waf_addon.amplify_waf_addon import ( - CustomAmplifyDistribution, -) -from egress_backend.components.email_configuration_set.email_configuration_set_cr import ( +from .components.amplify_waf_addon.amplify_waf_addon import CustomAmplifyDistribution +from .components.email_configuration_set.email_configuration_set_cr import ( EmailConfigurationSetCustomResource, ) -from egress_backend.components.email_configuration_set_event_dest.email_configuration_set_event_dest_cr import ( +from .components.email_configuration_set_event_dest.email_configuration_set_event_dest_cr import ( EmailConfigurationSetEventDestinationCustomResource, ) -from egress_backend.components.email_identity.email_identity_verification_cr import ( +from .components.email_identity.email_identity_verification_cr import ( EmailIdentityVerificationCustomResource, ) - - -def convert_bool(s, rtype): - """ - Returns: - rtype=str: 'true' or 'false' - type=bool: true or false - """ - if rtype not in (str, bool): - raise ValueError(f"Invalid return type: {rtype}") - if (isinstance(s, str) and s.lower() == "true") or s is True: - if rtype == str: - return "true" - return True - if (isinstance(s, str) and s.lower() == "false") or s is False: - if rtype == str: - return "false" - return False - raise ValueError(f"Invalid boolean string: {s}") +from .components.utils import convert_bool class EgressBackendStack(Stack): @@ -1221,7 +1201,9 @@ def __init__( # add additional Idp as per configuration supported_idps = ["COGNITO"] - if self.node.try_get_context(env_id).get("custom_idp").get("is_enabled"): + if convert_bool( + self.node.try_get_context(env_id).get("custom_idp").get("is_enabled"), bool + ): cognito.CfnUserPoolIdentityProvider( self, "CustomIdentityProvider", diff --git a/egress_backend/lambda/egress_api/main.py b/egress_backend/lambda/egress_api/main.py index b5fcceb..59d16d5 100644 --- a/egress_backend/lambda/egress_api/main.py +++ b/egress_backend/lambda/egress_api/main.py @@ -18,7 +18,7 @@ @logger.inject_lambda_context(log_event=True) def handler(event, context): field = event["field"] - usergroup = event["usergroup"][0] + usergroups = event["usergroup"] arguments = event["arguments"] if "arguments" in event else False logger.debug("Invoking API: %s", field) @@ -28,7 +28,7 @@ def handler(event, context): if field == "updateRequest": if arguments: - arguments["request"]["usergroup"] = usergroup + arguments["request"]["usergroup"] = usergroups return update_request(arguments, context) response = "Arguments not supplied" diff --git a/egress_backend/lambda/egress_api/update_request.py b/egress_backend/lambda/egress_api/update_request.py index 2606da6..10daa4a 100644 --- a/egress_backend/lambda/egress_api/update_request.py +++ b/egress_backend/lambda/egress_api/update_request.py @@ -5,7 +5,7 @@ import json import os -from typing import Any +from typing import Any, List import boto3 from aws_lambda_powertools import Logger, Tracer @@ -28,7 +28,7 @@ def update_request(arguments: str, context: Any): # Get the task token and id from the request inbound_egress_request_id = arguments["request"]["egress_request_id"] inbound_task_token = arguments["request"]["task_token"] - usergroup = arguments["request"]["usergroup"] + usergroups = arguments["request"]["usergroup"] logger.info( "Update Request API invoked with Egress Request ID: %s", @@ -41,7 +41,7 @@ def update_request(arguments: str, context: Any): # Check if reviewer is valid reviewer_valid = is_reviewer_valid( request_id=inbound_egress_request_id, - reviewer_usergroup=usergroup, + reviewer_usergroups=usergroups, egress_request=egress_details, ) @@ -55,7 +55,7 @@ def update_request(arguments: str, context: Any): # Determine egress request status and SWB status statuses = determine_status( egress_arguments=arguments, - reviewer_usergroup=usergroup, + reviewer_usergroups=usergroups, is_single_approval_enabled=egress_details["Items"][0][ "is_single_approval_enabled" ], @@ -81,13 +81,15 @@ def update_request(arguments: str, context: Any): # TO-DO: Inject Environment variables for reviewer group names def determine_status( - egress_arguments: Any, reviewer_usergroup: str, is_single_approval_enabled: str + egress_arguments: Any, + reviewer_usergroups: List[str], + is_single_approval_enabled: str, ): global egress_status global swb_status reviewer_list_groups = json.loads(reviewer_list) - if reviewer_usergroup == reviewer_list_groups[0]: + if reviewer_list_groups[0] in reviewer_usergroups: inbound_reviewer_1_decision = egress_arguments["request"][ "ig_reviewer_1_decision" ] @@ -108,7 +110,7 @@ def determine_status( egress_status = "REJECTED" swb_status = "PENDING" - elif reviewer_usergroup == reviewer_list_groups[1]: + elif reviewer_list_groups[1] in reviewer_usergroups: inbound_reviewer_1_decision = egress_arguments["request"][ "ig_reviewer_1_decision" ] @@ -133,7 +135,7 @@ def determine_status( swb_status = "REJECTED" else: - logger.error("Status mapping error with usergroup %s", reviewer_usergroup) + logger.error("Status mapping error with usergroup %s", reviewer_usergroups) raise Exception( "Unable to determine the status of the request. Please refresh and retry" ) @@ -156,9 +158,11 @@ def retrieve_request_details(request_id: str): # Check if reviewer is valid by matching the current reviewer group field in the DB to the incoming usergroup -def is_reviewer_valid(request_id: str, reviewer_usergroup: str, egress_request: Any): +def is_reviewer_valid( + request_id: str, reviewer_usergroups: List[str], egress_request: Any +): current_reviewer_group = egress_request["Items"][0]["current_reviewer_group"] - if current_reviewer_group != reviewer_usergroup: + if current_reviewer_group not in reviewer_usergroups: logger.error( "Egress request: %s found but reviewer is not valid and not found in the current reviewer group: %s", request_id,