Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement canary file generation functionality from contract test PatchInputs #1074

Merged
merged 3 commits into from
Jun 24, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 99 additions & 16 deletions src/rpdk/core/project.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# pylint: disable=too-many-lines
import copy
import json
import logging
import os
Expand Down Expand Up @@ -76,7 +77,6 @@
FILE_GENERATION_ENABLED = "file_generation_enabled"
TYPE_NAME = "typeName"
CONTRACT_TEST_FILE_NAMES = "contract_test_file_names"
INPUT1_FILE_NAME = "inputs_1.json"
FN_SUB = "Fn::Sub"
FN_IMPORT_VALUE = "Fn::ImportValue"
UUID = "uuid"
Expand Down Expand Up @@ -1345,21 +1345,91 @@ def _generate_stack_template_files(self) -> None:
with ct_file.open("r") as f:
json_data = json.load(f)
resource_name = self.type_info[2]
stack_template_data = {
"Description": f"Template for {self.type_name}",
"Resources": {
f"{resource_name}": {
"Type": self.type_name,
"Properties": self._replace_dynamic_values(
json_data["CreateInputs"]
),
}
},
}
stack_template_file_name = f"{CANARY_FILE_PREFIX}{count}_001.yaml"
stack_template_file_path = stack_template_folder / stack_template_file_name
with stack_template_file_path.open("w") as stack_template_file:
yaml.dump(stack_template_data, stack_template_file, indent=2)

self._save_stack_template_data(
resource_name,
count,
stack_template_folder,
self._replace_dynamic_values(
json_data["CreateInputs"],
),
"001",
marc-1010 marked this conversation as resolved.
Show resolved Hide resolved
)
if "PatchInputs" in json_data:
deepcopy_create_input = copy.deepcopy(json_data["CreateInputs"])
self._save_stack_template_data(
resource_name,
count,
stack_template_folder,
self._apply_patch_inputs_to_create_inputs(
json_data["PatchInputs"],
deepcopy_create_input,
),
"002",
)
marc-1010 marked this conversation as resolved.
Show resolved Hide resolved

def _save_stack_template_data(
self,
resource_name,
contract_test_input_count,
stack_template_folder,
properties_data,
suffix,
):
stack_template_data = {
"Description": f"Template for {self.type_name}",
"Resources": {
f"{resource_name}": {
"Type": self.type_name,
"Properties": properties_data,
}
},
}
stack_template_file_name = (
f"{CANARY_FILE_PREFIX}{contract_test_input_count}_{suffix}.yaml"
marc-1010 marked this conversation as resolved.
Show resolved Hide resolved
)
stack_template_file_path = stack_template_folder / stack_template_file_name
with stack_template_file_path.open("w") as stack_template_file:
yaml.dump(stack_template_data, stack_template_file, indent=2)

def _apply_patch_inputs_to_create_inputs(
self, patch_inputs: Dict[str, Any], create_inputs: Dict[str, Any]
) -> Dict[str, Any]:
for patch_input in patch_inputs:
self._apply_patch_input_to_create_input(patch_input, create_inputs)
return create_inputs

def _apply_patch_input_to_create_input(
self, patch_input: Any, create_inputs: Dict[str, Any]
) -> Dict[str, Any]:
marc-1010 marked this conversation as resolved.
Show resolved Hide resolved
op = patch_input.get("op")
marc-1010 marked this conversation as resolved.
Show resolved Hide resolved
path = patch_input.get("path")
if op not in {"replace", "remove", "add"}:
return create_inputs
key_list = [self._translate_integer_key(key) for key in path.split("/") if key]
current_input = create_inputs
for key in key_list[:-1]:
try:
current_input = current_input[key]
except (KeyError, IndexError):
LOG.warning("Canary generation skipped for invalid path: %s", path)
return create_inputs
patch_key = key_list[-1]
if op == "remove":
del current_input[patch_key]
elif op == "add" and isinstance(current_input, list):
current_input.insert(patch_key, patch_input["value"])
self._replace_dynamic_values_with_root_key(current_input, patch_key)
else:
# remaining use cases for both "add" and "replace" operations
current_input[patch_key] = patch_input["value"]
self._replace_dynamic_values_with_root_key(current_input, patch_key)
return create_inputs

def _translate_integer_key(self, key: str) -> Any:
if key.isdigit():
key = int(key)
return key

def _replace_dynamic_values(self, properties: Dict[str, Any]) -> Dict[str, Any]:
for key, value in properties.items():
Expand All @@ -1372,6 +1442,19 @@ def _replace_dynamic_values(self, properties: Dict[str, Any]) -> Dict[str, Any]:
properties[key] = return_value
return properties

def _replace_dynamic_values_with_root_key(
self, properties: Dict[str, Any], root_key=None
) -> Dict[str, Any]:
value = properties[root_key]
if isinstance(value, dict):
properties[root_key] = self._replace_dynamic_values(value)
elif isinstance(value, list):
properties[root_key] = [self._replace_dynamic_value(item) for item in value]
else:
return_value = self._replace_dynamic_value(value)
properties[root_key] = return_value
return properties

def _replace_dynamic_value(self, original_value: Any) -> Any:
pattern = r"\{\{(.*?)\}\}"

Expand Down
Loading
Loading