diff --git a/codeplain_REST_api.py b/codeplain_REST_api.py index f1c1ae8..702b70f 100644 --- a/codeplain_REST_api.py +++ b/codeplain_REST_api.py @@ -1,4 +1,3 @@ -import json import requests @@ -23,7 +22,7 @@ class MissingResource(Exception): class PlainSyntaxError(Exception): - pass + pass class OnlyRelativeLinksAllowed(Exception): @@ -35,66 +34,62 @@ class LinkMustHaveTextSpecified(Exception): class CodeplainAPI: - + def __init__(self, api_key): self.api_key = api_key - @property def api_url(self): return self._api_url - @api_url.setter def api_url(self, value): self._api_url = value - def post_request(self, endpoint_url, headers, payload): response = requests.post(endpoint_url, headers=headers, json=payload) try: response_json = response.json() except requests.exceptions.JSONDecodeError as e: - print(f"Failed to decode JSON response. Response text: {response.text}") + print(f"Failed to decode JSON response: {e}. Response text: {response.text}") raise if response.status_code == requests.codes.bad_request and "error_code" in response_json: - if response_json["error_code"] == 'FunctionalRequirementTooComplex': - raise FunctionalRequirementTooComplex(response_json['message']) - - if response_json["error_code"] == 'ConflictingRequirements': - raise ConflictingRequirements(response_json['message']) - - if response_json["error_code"] == 'CreditBalanceTooLow': - raise CreditBalanceTooLow(response_json['message']) - - if response_json["error_code"] == 'LLMOverloadedError': - raise LLMOverloadedError(response_json['message']) - - if response_json["error_code"] == 'MissingResource': - raise MissingResource(response_json['message']) - - if response_json["error_code"] == 'PlainSyntaxError': - raise PlainSyntaxError(response_json['message']) - - if response_json["error_code"] == 'OnlyRelativeLinksAllowed': - raise OnlyRelativeLinksAllowed(response_json['message']) - - if response_json["error_code"] == 'LinkMustHaveTextSpecified': - raise LinkMustHaveTextSpecified(response_json['message']) + if response_json["error_code"] == "FunctionalRequirementTooComplex": + raise FunctionalRequirementTooComplex(response_json["message"]) + + if response_json["error_code"] == "ConflictingRequirements": + raise ConflictingRequirements(response_json["message"]) + + if response_json["error_code"] == "CreditBalanceTooLow": + raise CreditBalanceTooLow(response_json["message"]) + + if response_json["error_code"] == "LLMOverloadedError": + raise LLMOverloadedError(response_json["message"]) + + if response_json["error_code"] == "MissingResource": + raise MissingResource(response_json["message"]) + + if response_json["error_code"] == "PlainSyntaxError": + raise PlainSyntaxError(response_json["message"]) + + if response_json["error_code"] == "OnlyRelativeLinksAllowed": + raise OnlyRelativeLinksAllowed(response_json["message"]) + + if response_json["error_code"] == "LinkMustHaveTextSpecified": + raise LinkMustHaveTextSpecified(response_json["message"]) response.raise_for_status() return response_json - def get_plain_source_tree(self, plain_source, loaded_templates): """ Builds plain source tree from the given plain text source in Markdown format. Args: - plain_source (str): A string containing the plain text source to be parsed. + plain_source (str): A string containing the plain text source to be parsed. loaded_templates (dict): A dictionary containing the loaded templates. Returns: @@ -104,128 +99,113 @@ def get_plain_source_tree(self, plain_source, loaded_templates): Exception: If parsing of plain_source fails. """ endpoint_url = f"{self.api_url}/plain_source_tree" - headers = { - "X-API-Key": self.api_key, - "Content-Type": "application/json" - } - - payload = { - "plain_source": plain_source, - "loaded_templates": loaded_templates - } - - return self.post_request(endpoint_url, headers, payload) + headers = {"X-API-Key": self.api_key, "Content-Type": "application/json"} + payload = {"plain_source": plain_source, "loaded_templates": loaded_templates} + + return self.post_request(endpoint_url, headers, payload) def render_functional_requirement(self, frid, plain_source_tree, linked_resources, existing_files_content): """ - Renders the content of a functional requirement based on the provided ID, + Renders the content of a functional requirement based on the provided ID, plain source tree, and existing files' content. Args: frid (str): The unique identifier for the functional requirement to be rendered. plain_source_tree (dict): A dictionary containing the plain source tree. - linked_resources (dict): A dictionary where the keys represent resource names + linked_resources (dict): A dictionary where the keys represent resource names and the values are the content of those resources. existing_files_content (dict): A dictionary where the keys represent code base filenames and the values are the content of those files. Returns: - str: A string containing the rendered functional requirement, formatted + str: A string containing the rendered functional requirement, formatted appropriately based on the inputs. Raises: ValueError: If the frid is invalid or the necessary plain source tree is not valid. """ endpoint_url = f"{self.api_url}/render_functional_requirement" - headers = { - "X-API-Key": self.api_key, - "Content-Type": "application/json" - } - + headers = {"X-API-Key": self.api_key, "Content-Type": "application/json"} + payload = { "frid": frid, "plain_source_tree": plain_source_tree, "linked_resources": linked_resources, - "existing_files_content": existing_files_content + "existing_files_content": existing_files_content, } return self.post_request(endpoint_url, headers, payload) - def fix_unittests_issue(self, frid, plain_source_tree, linked_resources, existing_files_content, unittests_issue): endpoint_url = f"{self.api_url}/fix_unittests_issue" - headers = { - "X-API-Key": self.api_key, - "Content-Type": "application/json" - } - + headers = {"X-API-Key": self.api_key, "Content-Type": "application/json"} + payload = { "frid": frid, "plain_source_tree": plain_source_tree, "linked_resources": linked_resources, "existing_files_content": existing_files_content, - "unittests_issue": unittests_issue + "unittests_issue": unittests_issue, } - - return self.post_request(endpoint_url, headers, payload) + return self.post_request(endpoint_url, headers, payload) - def refactor_source_files_if_needed(self, files_to_check, existing_files_content): + def refactor_source_files_if_needed(self, frid, files_to_check, existing_files_content): endpoint_url = f"{self.api_url}/refactor_source_files_if_needed" - headers = { - "X-API-Key": self.api_key, - "Content-Type": "application/json" - } - + headers = {"X-API-Key": self.api_key, "Content-Type": "application/json"} + payload = { + "frid": frid, "files_to_check": list(files_to_check), - "existing_files_content": existing_files_content + "existing_files_content": existing_files_content, } return self.post_request(endpoint_url, headers, payload) - - def render_conformance_tests(self, frid, functional_requirement_id, plain_source_tree, linked_resources, existing_files_content): + def render_conformance_tests( + self, frid, functional_requirement_id, plain_source_tree, linked_resources, existing_files_content + ): endpoint_url = f"{self.api_url}/render_conformance_tests" - headers = { - "X-API-Key": self.api_key, - "Content-Type": "application/json" - } - + headers = {"X-API-Key": self.api_key, "Content-Type": "application/json"} + payload = { "frid": frid, "functional_requirement_id": functional_requirement_id, "plain_source_tree": plain_source_tree, "linked_resources": linked_resources, - "existing_files_content": existing_files_content + "existing_files_content": existing_files_content, } - - return self.post_request(endpoint_url, headers, payload) + return self.post_request(endpoint_url, headers, payload) - def generate_folder_name_from_functional_requirement(self, functional_requirement, existing_folder_names): + def generate_folder_name_from_functional_requirement(self, frid, functional_requirement, existing_folder_names): endpoint_url = f"{self.api_url}/generate_folder_name_from_functional_requirement" - headers = { - "X-API-Key": self.api_key, - "Content-Type": "application/json" - } - + headers = {"X-API-Key": self.api_key, "Content-Type": "application/json"} + payload = { + "frid": frid, "functional_requirement": functional_requirement, - "existing_folder_names": existing_folder_names + "existing_folder_names": existing_folder_names, } - - return self.post_request(endpoint_url, headers, payload) + return self.post_request(endpoint_url, headers, payload) - def fix_conformance_tests_issue(self, frid, functional_requirement_id, plain_source_tree, linked_resources, existing_files_content, code_diff, conformance_tests_files, conformance_tests_issue, implementation_fix_count): + def fix_conformance_tests_issue( + self, + frid, + functional_requirement_id, + plain_source_tree, + linked_resources, + existing_files_content, + code_diff, + conformance_tests_files, + conformance_tests_issue, + implementation_fix_count, + ): endpoint_url = f"{self.api_url}/fix_conformance_tests_issue" - headers = { - "X-API-Key": self.api_key, - "Content-Type": "application/json" - } - + headers = {"X-API-Key": self.api_key, "Content-Type": "application/json"} + payload = { "frid": frid, "functional_requirement_id": functional_requirement_id, @@ -235,7 +215,7 @@ def fix_conformance_tests_issue(self, frid, functional_requirement_id, plain_sou "code_diff": code_diff, "conformance_tests_files": conformance_tests_files, "conformance_tests_issue": conformance_tests_issue, - "implementation_fix_count": implementation_fix_count + "implementation_fix_count": implementation_fix_count, } - + return self.post_request(endpoint_url, headers, payload) diff --git a/examples/example_hello_world_python/harness_tests/hello_world_display/test_hello_world.py b/examples/example_hello_world_python/harness_tests/hello_world_display/test_hello_world.py index d6d886e..2bca3be 100644 --- a/examples/example_hello_world_python/harness_tests/hello_world_display/test_hello_world.py +++ b/examples/example_hello_world_python/harness_tests/hello_world_display/test_hello_world.py @@ -1,15 +1,17 @@ #!/usr/bin/env python3 -import unittest import subprocess +import unittest + class TestHelloWorld(unittest.TestCase): def test_hello_world_output(self): # Run the hello_world.py script and capture its output - result = subprocess.run(['python3', 'hello_world.py'], capture_output=True, text=True) - + result = subprocess.run(["python3", "hello_world.py"], capture_output=True, text=True) + # Check if the output matches the expected string self.assertEqual(result.stdout.strip(), "hello, world") -if __name__ == '__main__': - unittest.main() \ No newline at end of file + +if __name__ == "__main__": + unittest.main() diff --git a/file_utils.py b/file_utils.py index 35a5b83..c8bb817 100644 --- a/file_utils.py +++ b/file_utils.py @@ -1,43 +1,42 @@ -import os - import difflib +import os from liquid2 import Environment, FileSystemLoader, StrictUndefined from liquid2.exceptions import UndefinedError import plain_spec -BINARY_FILE_EXTENSIONS = ['.pyc'] +BINARY_FILE_EXTENSIONS = [".pyc"] def list_all_text_files(directory): all_files = [] for root, dirs, files in os.walk(directory, topdown=False): modified_root = os.path.relpath(root, directory) - if modified_root == '.': - modified_root = '' + if modified_root == ".": + modified_root = "" for filename in files: if not any(filename.endswith(ending) for ending in BINARY_FILE_EXTENSIONS): try: - with open(os.path.join(root, filename), 'rb') as f: - f.read().decode('utf-8') + with open(os.path.join(root, filename), "rb") as f: + f.read().decode("utf-8") except UnicodeDecodeError: print(f"WARNING! Not listing {filename} in {root}. File is not a text file. Skipping it.") continue all_files.append(os.path.join(modified_root, filename)) - + return all_files def list_folders_in_directory(directory): # List all items in the directory items = os.listdir(directory) - + # Filter out the folders folders = [item for item in items if os.path.isdir(os.path.join(directory, item))] - + return folders @@ -45,7 +44,7 @@ def delete_files_and_subfolders(directory, verbose=False): total_files_deleted = 0 total_folders_deleted = 0 items_deleted = [] - + # Walk the directory in reverse order (bottom-up) for root, dirs, files in os.walk(directory, topdown=False): # Delete files @@ -55,7 +54,7 @@ def delete_files_and_subfolders(directory, verbose=False): total_files_deleted += 1 if verbose and len(items_deleted) < 10: items_deleted.append(f"Deleted file: {file_path}") - + # Delete directories for dir_ in dirs: dir_path = os.path.join(root, dir_) @@ -63,7 +62,7 @@ def delete_files_and_subfolders(directory, verbose=False): total_folders_deleted += 1 if verbose and len(items_deleted) < 10: items_deleted.append(f"Deleted folder: {dir_path}") - + # Print the results if verbose: if total_files_deleted + total_folders_deleted > 10: @@ -79,9 +78,9 @@ def copy_file(source_path, destination_path): os.makedirs(os.path.dirname(destination_path), exist_ok=True) # Open the source file in read-binary ('rb') mode - with open(source_path, 'rb') as source_file: + with open(source_path, "rb") as source_file: # Open the destination file in write-binary ('wb') mode - with open(destination_path, 'wb') as destination_file: + with open(destination_path, "wb") as destination_file: # Copy the content from source to destination while True: # Read a chunk of the source file @@ -95,7 +94,7 @@ def copy_file(source_path, destination_path): def add_current_path_if_no_path(filename): # Extract the base name of the file (ignoring any path information) basename = os.path.basename(filename) - + # Compare the basename to the original filename # If they are the same, there was no path information in the filename if basename == filename: @@ -115,20 +114,20 @@ def get_folders_diff(orig_folder, new_folder): diff = {} for file_name in new_files: - with open(os.path.join(new_folder, file_name), 'r') as f: + with open(os.path.join(new_folder, file_name), "r") as f: new_file = f.read().splitlines() - + if file_name in orig_files: - with open(os.path.join(orig_folder, file_name), 'r') as f: - orig_file = f.read().splitlines() + with open(os.path.join(orig_folder, file_name), "r") as f: + orig_file = f.read().splitlines() orig_file_name = file_name else: orig_file = [] - orig_file_name = '/dev/null' + orig_file_name = "/dev/null" - file_diff = difflib.unified_diff(orig_file, new_file, fromfile=orig_file_name, tofile=file_name, lineterm='') - file_diff_str = '\n'.join(file_diff) + file_diff = difflib.unified_diff(orig_file, new_file, fromfile=orig_file_name, tofile=file_name, lineterm="") + file_diff_str = "\n".join(file_diff) if file_diff_str: diff[file_name] = file_diff_str @@ -138,10 +137,10 @@ def get_folders_diff(orig_folder, new_folder): def get_existing_files_content(build_folder, existing_files): existing_files_content = {} for file_name in existing_files: - with open(os.path.join(build_folder, file_name), 'rb') as f: + with open(os.path.join(build_folder, file_name), "rb") as f: content = f.read() try: - existing_files_content[file_name] = content.decode('utf-8') + existing_files_content[file_name] = content.decode("utf-8") except UnicodeDecodeError: print(f"WARNING! Error loading {file_name}. File is not a text file. Skipping it.") @@ -176,21 +175,22 @@ def store_response_files(target_folder, response_files, existing_files): def load_linked_resources(folder_name, resources_list): linked_resources = {} for resource in resources_list: - file_name = resource['target'] + file_name = resource["target"] if file_name in linked_resources: continue full_file_name = os.path.join(folder_name, file_name) if not os.path.isfile(full_file_name): raise FileNotFoundError(f"The file '{full_file_name}' does not exist.") - - with open(full_file_name, 'rb') as f: + with open(full_file_name, "rb") as f: content = f.read() try: - linked_resources[file_name] = content.decode('utf-8') + linked_resources[file_name] = content.decode("utf-8") except UnicodeDecodeError: - print(f"WARNING! Error loading {resource['text']} ({resource['target']}). File is not a text file. Skipping it.") + print( + f"WARNING! Error loading {resource['text']} ({resource['target']}). File is not a text file. Skipping it." + ) return linked_resources @@ -210,10 +210,7 @@ def get_loaded_templates(source_path, plain_source): # Render the plain source with Liquid templating engine # to identify the templates that are being loaded liquid_loader = TrackingFileSystemLoader(source_path) - liquid_env = Environment( - loader=liquid_loader, - undefined=StrictUndefined - ) + liquid_env = Environment(loader=liquid_loader, undefined=StrictUndefined) liquid_env.filters["code_variable"] = plain_spec.code_variable_liquid_filter liquid_env.filters["prohibited_chars"] = plain_spec.prohibited_chars_liquid_filter diff --git a/plain2code.py b/plain2code.py index 2017805..5294908 100644 --- a/plain2code.py +++ b/plain2code.py @@ -1,20 +1,21 @@ -import sys -import os -import shutil -import subprocess import argparse -import json import importlib.util +import json import logging +import os +import shutil +import subprocess +import sys import traceback -import plain_spec import file_utils +import plain_spec +from codeplain_REST_api import CodeplainAPI -TEST_SCRIPT_EXECUTION_TIMEOUT = 120 # 120 seconds +TEST_SCRIPT_EXECUTION_TIMEOUT = 120 # 120 seconds -CLAUDE_API_KEY = os.getenv('CLAUDE_API_KEY') -DEFAULT_BUILD_FOLDER = 'build' +CLAUDE_API_KEY = os.getenv("CLAUDE_API_KEY") +DEFAULT_BUILD_FOLDER = "build" DEFAULT_CONFORMANCE_TESTS_FOLDER = "conformance_tests" CONFORMANCE_TESTS_BACKUP_FOLDER_SUFFIX = ".backup" CONFORMANCE_TESTS_DEFINITION_FILE_NAME = "conformance_tests.json" @@ -40,7 +41,7 @@ def get_render_range(render_range, plain_source_tree): if render_range is None: raise InvalidRenderRange("Invalid render range.") - render_range = render_range.split(',') + render_range = render_range.split(",") if len(render_range) < 1 or len(render_range) > 2: raise InvalidRenderRange("Invalid render range.") @@ -76,7 +77,7 @@ def execute_test_script(test_script, scripts_args, verbose): stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, - timeout=TEST_SCRIPT_EXECUTION_TIMEOUT + timeout=TEST_SCRIPT_EXECUTION_TIMEOUT, ) if verbose: @@ -100,21 +101,21 @@ def execute_test_script(test_script, scripts_args, verbose): print(f"Test script {test_script} timed out after {TEST_SCRIPT_EXECUTION_TIMEOUT} seconds.") if e.stdout: - decoded_output = e.stdout.decode('utf-8') if isinstance(e.stdout, bytes) else e.stdout + decoded_output = e.stdout.decode("utf-8") if isinstance(e.stdout, bytes) else e.stdout print(f"Test script partial output before the timeout:\n{decoded_output}") else: - print(f"Test script did not produce any output before the timeout.") - + print("Test script did not produce any output before the timeout.") return f"Tests did not finish in {TEST_SCRIPT_EXECUTION_TIMEOUT} seconds." + def run_unittests(args, codeplainAPI, frid, plain_source_tree, linked_resources, existing_files): changed_files = set() - + if not args.unittests_script: return existing_files, changed_files - + if args.verbose: print("Running unit tests script:", args.unittests_script) @@ -131,12 +132,16 @@ def run_unittests(args, codeplainAPI, frid, plain_source_tree, linked_resources, break if unit_test_run_count > MAX_UNITTEST_FIX_ATTEMPTS: - print(f"Unit tests still failed after {unit_test_run_count - 1} attemps at fixing issues. Please fix the issues manually.") + print( + f"Unit tests still failed after {unit_test_run_count - 1} attemps at fixing issues. Please fix the issues manually." + ) sys.exit(1) existing_files_content = file_utils.get_existing_files_content(args.build_folder, existing_files) - response_files = codeplainAPI.fix_unittests_issue(frid, plain_source_tree, linked_resources, existing_files_content, unittests_issue) + response_files = codeplainAPI.fix_unittests_issue( + frid, plain_source_tree, linked_resources, existing_files_content, unittests_issue + ) changed_files.update(response_files.keys()) @@ -147,11 +152,20 @@ def run_unittests(args, codeplainAPI, frid, plain_source_tree, linked_resources, return existing_files, changed_files -def generate_conformance_tests(args, codeplainAPI, frid, functional_requirement_id, plain_source_tree, linked_resources, existing_files_content, conformance_tests_folder_name): +def generate_conformance_tests( + args, + codeplainAPI: CodeplainAPI, + frid, + functional_requirement_id, + plain_source_tree, + linked_resources, + existing_files_content, + conformance_tests_folder_name, +): specifications, _ = plain_spec.get_specifications_for_frid(plain_source_tree, functional_requirement_id) if args.verbose: # TODO: Print the definitions. - print(f"\nImplementing test requirements:") + print("\nImplementing test requirements:") print("\n".join(specifications[plain_spec.TEST_REQUIREMENTS])) print() @@ -160,10 +174,11 @@ def generate_conformance_tests(args, codeplainAPI, frid, functional_requirement_ existing_folder_names = file_utils.list_folders_in_directory(args.conformance_tests_folder) except FileNotFoundError: existing_folder_names = [] - + fr_subfolder_name = codeplainAPI.generate_folder_name_from_functional_requirement( - specifications[plain_spec.FUNCTIONAL_REQUIREMENTS][-1], - existing_folder_names + frid=frid, + functional_requirement=specifications[plain_spec.FUNCTIONAL_REQUIREMENTS][-1], + existing_folder_names=existing_folder_names, ) if args.verbose: @@ -173,20 +188,33 @@ def generate_conformance_tests(args, codeplainAPI, frid, functional_requirement_ file_utils.delete_files_and_subfolders(conformance_tests_folder_name, args.verbose) - response_files = codeplainAPI.render_conformance_tests(frid, functional_requirement_id, plain_source_tree, linked_resources, existing_files_content) + response_files = codeplainAPI.render_conformance_tests( + frid, functional_requirement_id, plain_source_tree, linked_resources, existing_files_content + ) file_utils.store_response_files(conformance_tests_folder_name, response_files, []) print("\nConformance test files generated:") - print('\n'.join(["- " + file_name for file_name in response_files.keys()]) + '\n') + print("\n".join(["- " + file_name for file_name in response_files.keys()]) + "\n") return { - 'functional_requirement': specifications[plain_spec.FUNCTIONAL_REQUIREMENTS][-1], - 'folder_name' : conformance_tests_folder_name + "functional_requirement": specifications[plain_spec.FUNCTIONAL_REQUIREMENTS][-1], + "folder_name": conformance_tests_folder_name, } -def run_conformance_tests(args, codeplainAPI, frid, functional_requirement_id, plain_source_tree, linked_resources, existing_files, existing_files_content, code_diff, conformance_tests_folder_name): +def run_conformance_tests( + args, + codeplainAPI, + frid, + functional_requirement_id, + plain_source_tree, + linked_resources, + existing_files, + existing_files_content, + code_diff, + conformance_tests_folder_name, +): recreated_conformance_tests = False conformance_test_fix_count = 0 implementation_fix_count = 1 @@ -195,22 +223,39 @@ def run_conformance_tests(args, codeplainAPI, frid, functional_requirement_id, p conformance_test_fix_count += 1 if args.verbose: - print(f"\nRunning conformance tests script {args.conformance_tests_script} for {conformance_tests_folder_name} (functional requirement {functional_requirement_id}, attempt: {conformance_test_fix_count}).") + print( + f"\nRunning conformance tests script {args.conformance_tests_script} for {conformance_tests_folder_name} (functional requirement {functional_requirement_id}, attempt: {conformance_test_fix_count})." + ) - conformance_tests_issue = execute_test_script(args.conformance_tests_script, [args.build_folder, conformance_tests_folder_name], args.verbose) + conformance_tests_issue = execute_test_script( + args.conformance_tests_script, [args.build_folder, conformance_tests_folder_name], args.verbose + ) if not conformance_tests_issue: break - + if conformance_test_fix_count > MAX_CONFORMANCE_TEST_FIX_ATTEMPTS: - print(f"Conformance tests script {args.conformance_tests_script} for {conformance_tests_folder_name} still failed after {conformance_test_fix_count - 1} attemps at fixing issues.") + print( + f"Conformance tests script {args.conformance_tests_script} for {conformance_tests_folder_name} still failed after {conformance_test_fix_count - 1} attemps at fixing issues." + ) if recreated_conformance_tests: - print("We've already tried to fix the issue by recreating the conformance tests but tests still fail. Please fix the issues manually.") + print( + "We've already tried to fix the issue by recreating the conformance tests but tests still fail. Please fix the issues manually." + ) sys.exit(1) print("Recreating conformance tests.") - generate_conformance_tests(args, codeplainAPI, frid, functional_requirement_id, plain_source_tree, linked_resources, existing_files_content, conformance_tests_folder_name) + generate_conformance_tests( + args, + codeplainAPI, + frid, + functional_requirement_id, + plain_source_tree, + linked_resources, + existing_files_content, + conformance_tests_folder_name, + ) recreated_conformance_tests = True conformance_test_fix_count = 0 @@ -218,15 +263,27 @@ def run_conformance_tests(args, codeplainAPI, frid, functional_requirement_id, p conformance_tests_files = file_utils.list_all_text_files(conformance_tests_folder_name) continue - conformance_tests_files_content = file_utils.get_existing_files_content(conformance_tests_folder_name, conformance_tests_files) + conformance_tests_files_content = file_utils.get_existing_files_content( + conformance_tests_folder_name, conformance_tests_files + ) try: [conformance_tests_fixed, response_files] = codeplainAPI.fix_conformance_tests_issue( - frid, functional_requirement_id, plain_source_tree, linked_resources, existing_files_content, code_diff, conformance_tests_files_content, conformance_tests_issue, implementation_fix_count + frid, + functional_requirement_id, + plain_source_tree, + linked_resources, + existing_files_content, + code_diff, + conformance_tests_files_content, + conformance_tests_issue, + implementation_fix_count, ) if conformance_tests_fixed: - conformance_tests_files = file_utils.store_response_files(conformance_tests_folder_name, response_files, conformance_tests_files) + conformance_tests_files = file_utils.store_response_files( + conformance_tests_folder_name, response_files, conformance_tests_files + ) print(f"\nConformance test files in folder {conformance_tests_folder_name} fixed:") print_response_files_summary(response_files) @@ -237,11 +294,15 @@ def run_conformance_tests(args, codeplainAPI, frid, functional_requirement_id, p print("Files fixed:") print_response_files_summary(response_files) - [existing_files, _] = run_unittests(args, codeplainAPI, frid, plain_source_tree, linked_resources, existing_files) + [existing_files, _] = run_unittests( + args, codeplainAPI, frid, plain_source_tree, linked_resources, existing_files + ) return [True, existing_files] - print(f"Couldn't fix conformance tests issue in folder {conformance_tests_folder_name} for functional requirement {functional_requirement_id}. Trying one more time.") + print( + f"Couldn't fix conformance tests issue in folder {conformance_tests_folder_name} for functional requirement {functional_requirement_id}. Trying one more time." + ) implementation_fix_count += 1 except codeplain_api.ConflictingRequirements as e: print(f"Conflicting requirements. {str(e)}.") @@ -249,11 +310,13 @@ def run_conformance_tests(args, codeplainAPI, frid, functional_requirement_id, p except Exception as e: print(f"Error fixing conformance tests issue: {str(e)}") sys.exit(1) - + return [False, existing_files] -def conformance_testing(args, codeplainAPI, frid, plain_source_tree, linked_resources, existing_files, conformance_tests): +def conformance_testing( + args, codeplainAPI, frid, plain_source_tree, linked_resources, existing_files, conformance_tests +): conformance_tests_run_count = 0 specifications, _ = plain_spec.get_specifications_for_frid(plain_source_tree, frid) while conformance_tests_run_count < MAX_CONFORMANCE_TEST_RUNS: @@ -270,25 +333,47 @@ def conformance_testing(args, codeplainAPI, frid, plain_source_tree, linked_reso previous_build_folder = args.build_folder + "." + plain_spec.get_previous_frid(plain_source_tree, frid) if not os.path.exists(previous_build_folder): raise Exception(f"Build folder {previous_build_folder} not found: ") - + code_diff = file_utils.get_folders_diff(previous_build_folder, args.build_folder) functional_requirement_id = plain_spec.get_first_frid(plain_source_tree) while functional_requirement_id is not None and not implementation_code_has_changed: - if (functional_requirement_id == frid) and \ - (frid not in conformance_tests or \ - conformance_tests[frid]['functional_requirement'] != specifications[plain_spec.FUNCTIONAL_REQUIREMENTS][-1]): + if (functional_requirement_id == frid) and ( + frid not in conformance_tests + or conformance_tests[frid]["functional_requirement"] + != specifications[plain_spec.FUNCTIONAL_REQUIREMENTS][-1] + ): if frid in conformance_tests: - conformance_tests_folder_name = conformance_tests[frid]['folder_name'] + conformance_tests_folder_name = conformance_tests[frid]["folder_name"] else: conformance_tests_folder_name = None - conformance_tests[frid] = generate_conformance_tests(args, codeplainAPI, frid, frid, plain_source_tree, linked_resources, existing_files_content, conformance_tests_folder_name) - - conformance_tests_folder_name = conformance_tests[functional_requirement_id]['folder_name'] - - [implementation_code_has_changed, existing_files] = run_conformance_tests(args, codeplainAPI, frid, functional_requirement_id, plain_source_tree, linked_resources, existing_files, existing_files_content, code_diff, conformance_tests_folder_name) + conformance_tests[frid] = generate_conformance_tests( + args, + codeplainAPI, + frid, + frid, + plain_source_tree, + linked_resources, + existing_files_content, + conformance_tests_folder_name, + ) + + conformance_tests_folder_name = conformance_tests[functional_requirement_id]["folder_name"] + + [implementation_code_has_changed, existing_files] = run_conformance_tests( + args, + codeplainAPI, + frid, + functional_requirement_id, + plain_source_tree, + linked_resources, + existing_files, + existing_files_content, + code_diff, + conformance_tests_folder_name, + ) if functional_requirement_id == frid: break @@ -300,7 +385,9 @@ def conformance_testing(args, codeplainAPI, frid, plain_source_tree, linked_reso return conformance_tests - print(f"Conformance tests still failed after {conformance_tests_run_count} attemps at fixing issues. Please fix the issues manually.") + print( + f"Conformance tests still failed after {conformance_tests_run_count} attemps at fixing issues. Please fix the issues manually." + ) sys.exit(1) @@ -308,44 +395,52 @@ class IndentedFormatter(logging.Formatter): def format(self, record): original_message = record.getMessage() - - modified_message = original_message.replace('\n', '\n ') + + modified_message = original_message.replace("\n", "\n ") record.msg = modified_message return super().format(record) - + def print_linked_resources(plain_section): - if 'linked_resources' in plain_section: - linked_resources_str = "\n".join([f"- {resource_name['text']} ({resource_name['target']})" for resource_name in plain_section['linked_resources']]) + if "linked_resources" in plain_section: + linked_resources_str = "\n".join( + [ + f"- {resource_name['text']} ({resource_name['target']})" + for resource_name in plain_section["linked_resources"] + ] + ) print(f"Linked resources:\n{linked_resources_str}\n\n") -def render_functional_requirement(args, codeplainAPI, plain_source_tree, frid, all_linked_resources): +# TODO: Once we'll be refactoring and working with this method, we should also adjust cognitive complexity. +def render_functional_requirement( # noqa: C901 + args, codeplainAPI: CodeplainAPI, plain_source_tree, frid, all_linked_resources +): if args.render_range is not None and frid not in args.render_range: if args.verbose: - print(f"\n-------------------------------------") + print("\n-------------------------------------") print(f"Skipping rendering iteration: {frid}\n") return specifications, _ = plain_spec.get_specifications_for_frid(plain_source_tree, frid) - + if args.verbose: - print(f"\n-------------------------------------") + print("\n-------------------------------------") print(f"Rendering functional requirement {frid}") if len(specifications[plain_spec.DEFINITIONS]) > 0: print("\nDefinitions:") - print('\n'.join(specifications[plain_spec.DEFINITIONS])) + print("\n".join(specifications[plain_spec.DEFINITIONS])) print("\nNon-Functional Requirements:") - print('\n'.join(specifications[plain_spec.NON_FUNCTIONAL_REQUIREMENTS])) + print("\n".join(specifications[plain_spec.NON_FUNCTIONAL_REQUIREMENTS])) if len(specifications[plain_spec.TEST_REQUIREMENTS]) > 0: print("\nTest Requirements:") - print('\n'.join(specifications[plain_spec.TEST_REQUIREMENTS])) + print("\n".join(specifications[plain_spec.TEST_REQUIREMENTS])) print("\nFunctional Requirement:") print(specifications[plain_spec.FUNCTIONAL_REQUIREMENTS][-1]) @@ -383,19 +478,19 @@ def render_functional_requirement(args, codeplainAPI, plain_source_tree, frid, a conformance_tests_backup_folder = args.conformance_tests_folder + CONFORMANCE_TESTS_BACKUP_FOLDER_SUFFIX if os.path.exists(conformance_tests_backup_folder): shutil.rmtree(conformance_tests_backup_folder) - + # Copy the entire directory tree shutil.copytree(args.conformance_tests_folder, conformance_tests_backup_folder) - + if args.verbose: - print(f"Conformance tests folder successfully backed up.") + print("Conformance tests folder successfully backed up.") resources_list = [] plain_spec.collect_linked_resources(plain_source_tree, resources_list, frid) linked_resources = {} for resource in resources_list: - linked_resources[resource['target']] = all_linked_resources[resource['target']] + linked_resources[resource["target"]] = all_linked_resources[resource["target"]] if previous_build_folder: existing_files = file_utils.list_all_text_files(previous_build_folder) @@ -404,26 +499,32 @@ def render_functional_requirement(args, codeplainAPI, plain_source_tree, frid, a existing_files = [] existing_files_content = {} - conformance_tests_definition_file_name = os.path.join(args.conformance_tests_folder, CONFORMANCE_TESTS_DEFINITION_FILE_NAME) + conformance_tests_definition_file_name = os.path.join( + args.conformance_tests_folder, CONFORMANCE_TESTS_DEFINITION_FILE_NAME + ) try: - with open(conformance_tests_definition_file_name, 'r') as f: + with open(conformance_tests_definition_file_name, "r") as f: conformance_tests = json.load(f) except FileNotFoundError: conformance_tests = {} if args.dry_run: if args.verbose: - print(f"\n== Dry run: not actually rendering the functional requirement. ==\n") + print("\n== Dry run: not actually rendering the functional requirement. ==\n") return try: - response_files = codeplainAPI.render_functional_requirement(frid, plain_source_tree, linked_resources, existing_files_content) + response_files = codeplainAPI.render_functional_requirement( + frid, plain_source_tree, linked_resources, existing_files_content + ) except codeplain_api.FunctionalRequirementTooComplex as e: # TODO: Suggest how to break down the functional requirement. Identified options are: # - Split the functional requirement into smaller parts. # - If the functional requirement changes multiple entities, first limit the changes to a single representative entity and then to all entities. # - Move the functional requirement higher up, that is, to come earlier in the rendering order. - print(f"Too many files or code lines generated. You should break down the functional requirement into smaller parts ({str(e)}).") + print( + f"Too many files or code lines generated. You should break down the functional requirement into smaller parts ({str(e)})." + ) sys.exit(1) if previous_build_folder: @@ -432,16 +533,20 @@ def render_functional_requirement(args, codeplainAPI, plain_source_tree, frid, a if args.verbose: print("Copying file: ", file_name) - file_utils.copy_file(os.path.join(previous_build_folder, file_name), os.path.join(args.build_folder, file_name)) + file_utils.copy_file( + os.path.join(previous_build_folder, file_name), os.path.join(args.build_folder, file_name) + ) changed_files = set() changed_files.update(response_files.keys()) existing_files = file_utils.store_response_files(args.build_folder, response_files, existing_files) print("Files generated:") - print('\n'.join(["- " + file_name for file_name in response_files.keys()])) + print("\n".join(["- " + file_name for file_name in response_files.keys()])) - [existing_files, tmp_changed_files] = run_unittests(args, codeplainAPI, frid, plain_source_tree, linked_resources, existing_files) + [existing_files, tmp_changed_files] = run_unittests( + args, codeplainAPI, frid, plain_source_tree, linked_resources, existing_files + ) for file_name in tmp_changed_files: if file_name not in existing_files: @@ -457,7 +562,11 @@ def render_functional_requirement(args, codeplainAPI, plain_source_tree, frid, a print(f"\nRefactoring iteration {num_refactoring_iterations}.") existing_files_content = file_utils.get_existing_files_content(args.build_folder, existing_files) - response_files = codeplainAPI.refactor_source_files_if_needed(changed_files, existing_files_content) + response_files = codeplainAPI.refactor_source_files_if_needed( + frid=frid, + files_to_check=changed_files, + existing_files_content=existing_files_content, + ) if len(response_files) == 0: if args.verbose: @@ -470,15 +579,17 @@ def render_functional_requirement(args, codeplainAPI, plain_source_tree, frid, a if os.path.exists(build_folder_copy) and os.path.isdir(build_folder_copy): file_utils.delete_files_and_subfolders(build_folder_copy) - + for file_name in existing_files: file_utils.copy_file(os.path.join(args.build_folder, file_name), os.path.join(build_folder_copy, file_name)) existing_files = file_utils.store_response_files(args.build_folder, response_files, existing_files) print("Files refactored:") - print('\n'.join(response_files.keys())) + print("\n".join(response_files.keys())) - [existing_files, tmp_changed_files] = run_unittests(args, codeplainAPI, frid, plain_source_tree, linked_resources, existing_files) + [existing_files, tmp_changed_files] = run_unittests( + args, codeplainAPI, frid, plain_source_tree, linked_resources, existing_files + ) for file_name in tmp_changed_files: if file_name not in existing_files: @@ -487,8 +598,14 @@ def render_functional_requirement(args, codeplainAPI, plain_source_tree, frid, a else: changed_files.add(file_name) - if args.conformance_tests_script and plain_spec.TEST_REQUIREMENTS in specifications and specifications[plain_spec.TEST_REQUIREMENTS]: - conformance_tests = conformance_testing(args, codeplainAPI, frid, plain_source_tree, linked_resources, existing_files, conformance_tests) + if ( + args.conformance_tests_script + and plain_spec.TEST_REQUIREMENTS in specifications + and specifications[plain_spec.TEST_REQUIREMENTS] + ): + conformance_tests = conformance_testing( + args, codeplainAPI, frid, plain_source_tree, linked_resources, existing_files, conformance_tests + ) if os.path.exists(args.conformance_tests_folder): if args.verbose: @@ -497,8 +614,9 @@ def render_functional_requirement(args, codeplainAPI, plain_source_tree, frid, a with open(conformance_tests_definition_file_name, "w") as f: json.dump(conformance_tests, f, indent=4) - if plain_spec.get_next_frid(plain_source_tree, frid) is not None and \ - (args.render_range is None or frid in args.render_range): + if plain_spec.get_next_frid(plain_source_tree, frid) is not None and ( + args.render_range is None or frid in args.render_range + ): previous_build_folder = args.build_folder + "." + frid if args.verbose: @@ -521,15 +639,15 @@ def render(args): logging.getLogger("httpcore").setLevel(logging.WARNING) logging.getLogger("anthropic").setLevel(logging.WARNING) - formatter = IndentedFormatter('%(levelname)s:%(name)s:%(message)s') + formatter = IndentedFormatter("%(levelname)s:%(name)s:%(message)s") console_handler = logging.StreamHandler() console_handler.setFormatter(formatter) - codeplain_logger = logging.getLogger('codeplain') + codeplain_logger = logging.getLogger("codeplain") codeplain_logger.addHandler(console_handler) codeplain_logger.propagate = False - llm_logger = logging.getLogger('llm') + llm_logger = logging.getLogger("llm") llm_logger.addHandler(console_handler) llm_logger.propagate = False @@ -538,7 +656,7 @@ def render(args): template_dirs = [ os.path.dirname(args.filename), - os.path.join(os.path.dirname( os.path.abspath(__file__)), DEFAULT_TEMPLATE_DIRS) + os.path.join(os.path.dirname(os.path.abspath(__file__)), DEFAULT_TEMPLATE_DIRS), ] [full_plain_source, loaded_templates] = file_utils.get_loaded_templates(template_dirs, plain_source) @@ -558,7 +676,7 @@ def render(args): codeplainAPI.api_url = args.api print(f"Rendering {args.filename} to target code.") - + plain_source_tree = codeplainAPI.get_plain_source_tree(plain_source, loaded_templates) if args.render_range is not None: @@ -577,45 +695,63 @@ def render(args): if __name__ == "__main__": - parser = argparse.ArgumentParser(description='Render plain code to target code.') - parser.add_argument('filename', type=str, help='plain file to render') - parser.add_argument('--verbose', '-v', action='store_true', help='enable verbose output') - parser.add_argument('--debug', action='store_true', help='enable debug information') - parser.add_argument('--base-folder', type=str, help='base folder for the build files') - parser.add_argument("--build-folder", type=non_empty_string, default=DEFAULT_BUILD_FOLDER, help="folder for build files") - parser.add_argument('--render-range', type=str, help='which functional requirements should be generated') - parser.add_argument('--unittests-script', type=str, help='a script to run unit tests') - parser.add_argument('--conformance-tests-folder', type=non_empty_string, default=DEFAULT_CONFORMANCE_TESTS_FOLDER, help='folder for conformance test files') - parser.add_argument('--conformance-tests-script', type=str, help='a script to run conformance tests') - parser.add_argument('--api', type=str, nargs='?', const="https://api.codeplain.ai", help='force using the API (for internal use)') - parser.add_argument('--api-key', type=str, default=CLAUDE_API_KEY, help='API key used to access the API. If not provided, the CLAUDE_API_KEY environment variable is used.') - parser.add_argument('--full-plain', action='store_true', help='emit full plain text to render') - parser.add_argument('--dry-run', action='store_true', help='preview what plain2code would do without actually making any changes') + parser = argparse.ArgumentParser(description="Render plain code to target code.") + parser.add_argument("filename", type=str, help="plain file to render") + parser.add_argument("--verbose", "-v", action="store_true", help="enable verbose output") + parser.add_argument("--debug", action="store_true", help="enable debug information") + parser.add_argument("--base-folder", type=str, help="base folder for the build files") + parser.add_argument( + "--build-folder", type=non_empty_string, default=DEFAULT_BUILD_FOLDER, help="folder for build files" + ) + parser.add_argument("--render-range", type=str, help="which functional requirements should be generated") + parser.add_argument("--unittests-script", type=str, help="a script to run unit tests") + parser.add_argument( + "--conformance-tests-folder", + type=non_empty_string, + default=DEFAULT_CONFORMANCE_TESTS_FOLDER, + help="folder for conformance test files", + ) + parser.add_argument("--conformance-tests-script", type=str, help="a script to run conformance tests") + parser.add_argument( + "--api", type=str, nargs="?", const="https://api.codeplain.ai", help="force using the API (for internal use)" + ) + parser.add_argument( + "--api-key", + type=str, + default=CLAUDE_API_KEY, + help="API key used to access the API. If not provided, the CLAUDE_API_KEY environment variable is used.", + ) + parser.add_argument("--full-plain", action="store_true", help="emit full plain text to render") + parser.add_argument( + "--dry-run", action="store_true", help="preview what plain2code would do without actually making any changes" + ) args = parser.parse_args() - codeplain_api_module_name = 'codeplain_local_api' + codeplain_api_module_name = "codeplain_local_api" codeplain_api_spec = importlib.util.find_spec(codeplain_api_module_name) if args.api or codeplain_api_spec is None: if not args.api: args.api = "https://api.codeplain.ai" - print(f"Running plain2code using REST API at {args.api }\n") + print(f"Running plain2code using REST API at {args.api}.") import codeplain_REST_api as codeplain_api else: if args.verbose or not args.full_plain: - print(f"Running plain2code using local API.\n") + print("Running plain2code using local API.\n") codeplain_api = importlib.import_module(codeplain_api_module_name) if not args.api_key or args.api_key == "": - print("Error: API key is not provided. Please provide an API key using the --api-key flag or by setting the CLAUDE_API_KEY environment variable.") + print( + "Error: API key is not provided. Please provide an API key using the --api-key flag or by setting the CLAUDE_API_KEY environment variable." + ) sys.exit(1) try: render(args) except InvalidRenderRange as e: - print(f"Error rendering plain code. Invalid render range: {args.render_range}\n") + print(f"Error rendering plain code: {str(e)}. Invalid render range: {args.render_range}\n") except FileNotFoundError as e: print(f"Error rendering plain code: {str(e)}\n") except Exception as e: diff --git a/plain_spec.py b/plain_spec.py index 0d5003b..221068f 100644 --- a/plain_spec.py +++ b/plain_spec.py @@ -1,14 +1,13 @@ -import copy -import uuid import hashlib import json -from liquid2.filter import with_context +import uuid +from liquid2.filter import with_context -DEFINITIONS = 'Definitions:' -NON_FUNCTIONAL_REQUIREMENTS = 'Non-Functional Requirements:' -TEST_REQUIREMENTS = 'Test Requirements:' -FUNCTIONAL_REQUIREMENTS = 'Functional Requirements:' +DEFINITIONS = "Definitions:" +NON_FUNCTIONAL_REQUIREMENTS = "Non-Functional Requirements:" +TEST_REQUIREMENTS = "Test Requirements:" +FUNCTIONAL_REQUIREMENTS = "Functional Requirements:" ALLOWED_SPECIFICATION_HEADINGS = [DEFINITIONS, NON_FUNCTIONAL_REQUIREMENTS, TEST_REQUIREMENTS, FUNCTIONAL_REQUIREMENTS] @@ -19,33 +18,37 @@ class InvalidLiquidVariableName(Exception): def collect_specification_linked_resources(specification, specification_heading, linked_resources_list): linked_resources = [] - if 'linked_resources' in specification: - linked_resources.extend(specification['linked_resources']) + if "linked_resources" in specification: + linked_resources.extend(specification["linked_resources"]) for resource in linked_resources: resource_found = False for resource_map in linked_resources_list: - if resource['text'] == resource_map['text']: - if resource['target'] != resource_map['target']: - raise Exception(f"The file {resource['target']} is linked to multiple linked resources with the same text: {resource['text']}") - elif resource['target'] == resource_map['target']: - if resource['text'] != resource_map['text']: - raise Exception(f"The linked resource {resource['text']} is linked to multiple files: {resource_map['target']}") + if resource["text"] == resource_map["text"]: + if resource["target"] != resource_map["target"]: + raise Exception( + f"The file {resource['target']} is linked to multiple linked resources with the same text: {resource['text']}" + ) + elif resource["target"] == resource_map["target"]: + if resource["text"] != resource_map["text"]: + raise Exception( + f"The linked resource {resource['text']} is linked to multiple files: {resource_map['target']}" + ) else: continue if resource_found: - raise Exception("Duplicate linked resource found: " + resource['text'] + " (" + resource['target'] + ")") + raise Exception( + "Duplicate linked resource found: " + resource["text"] + " (" + resource["target"] + ")" + ) resource_found = True - resource_map['sections'].append(specification_heading) + resource_map["sections"].append(specification_heading) if not resource_found: - linked_resources_list.append({ - 'text': resource['text'], - 'target': resource['target'], - 'sections': [specification_heading] - }) + linked_resources_list.append( + {"text": resource["text"], "target": resource["target"], "sections": [specification_heading]} + ) def collect_linked_resources_in_section(section, linked_resources_list, frid=None): @@ -60,16 +63,16 @@ def collect_linked_resources_in_section(section, linked_resources_list, frid=Non collect_specification_linked_resources(requirement, FUNCTIONAL_REQUIREMENTS, linked_resources_list) functional_requirement_count += 1 - if 'ID' in section: - current_frid = section['ID'] + "." + str(functional_requirement_count) + if "ID" in section: + current_frid = section["ID"] + "." + str(functional_requirement_count) else: current_frid = str(functional_requirement_count) if current_frid == frid: return True - if 'sections' in section: - for subsection in section['sections']: + if "sections" in section: + for subsection in section["sections"]: if collect_linked_resources_in_section(subsection, linked_resources_list, frid): return True @@ -92,13 +95,13 @@ def collect_linked_resources(plain_source_tree, linked_resources_list, frid=None def get_frids(plain_source_tree): if FUNCTIONAL_REQUIREMENTS in plain_source_tree: for functional_requirement_id in range(1, len(plain_source_tree[FUNCTIONAL_REQUIREMENTS]) + 1): - if 'ID' in plain_source_tree: - yield plain_source_tree['ID'] + "." + str(functional_requirement_id) + if "ID" in plain_source_tree: + yield plain_source_tree["ID"] + "." + str(functional_requirement_id) else: yield str(functional_requirement_id) - if 'sections' in plain_source_tree: - for section in plain_source_tree['sections']: + if "sections" in plain_source_tree: + for section in plain_source_tree["sections"]: yield from get_frids(section) @@ -127,22 +130,34 @@ def get_previous_frid(plain_source_tree, frid): def get_specification_item_markdown(specification_item, code_variables, replace_code_variables): - markdown = specification_item['markdown'] - if 'code_variables' in specification_item: - for code_variable in specification_item['code_variables']: - if code_variable['name'] in code_variables: - if code_variables[code_variable['name']] != code_variable['value']: - raise Exception(f"Code variable {code_variable['name']} has multiple values: {code_variables[code_variable['name']]} and {code_variable['value']}") + markdown = specification_item["markdown"] + if "code_variables" in specification_item: + for code_variable in specification_item["code_variables"]: + if code_variable["name"] in code_variables: + if code_variables[code_variable["name"]] != code_variable["value"]: + raise Exception( + f"Code variable {code_variable['name']} has multiple values: {code_variables[code_variable['name']]} and {code_variable['value']}" + ) else: - code_variables[code_variable['name']] = code_variable['value'] + code_variables[code_variable["name"]] = code_variable["value"] if replace_code_variables: - markdown = markdown.replace(f"{{{{ {code_variable['name']} }}}}", code_variable['value']) + markdown = markdown.replace(f"{{{{ {code_variable['name']} }}}}", code_variable["value"]) return markdown -def get_specifications_from_plain_source_tree(frid, plain_source_tree, definitions, non_functional_requirements, test_requirements, functional_requirements, code_variables, replace_code_variables, section_id=None): +def get_specifications_from_plain_source_tree( + frid, + plain_source_tree, + definitions, + non_functional_requirements, + test_requirements, + functional_requirements, + code_variables, + replace_code_variables, + section_id=None, +): return_frid = None if FUNCTIONAL_REQUIREMENTS in plain_source_tree and len(plain_source_tree[FUNCTIONAL_REQUIREMENTS]) > 0: functional_requirement_count = 0 @@ -153,26 +168,50 @@ def get_specifications_from_plain_source_tree(frid, plain_source_tree, definitio else: current_frid = section_id + "." + str(functional_requirement_count) - functional_requirements.append(get_specification_item_markdown(functional_requirement, code_variables, replace_code_variables)) + functional_requirements.append( + get_specification_item_markdown(functional_requirement, code_variables, replace_code_variables) + ) if current_frid == frid: return_frid = current_frid break - if 'sections' in plain_source_tree: - for section in plain_source_tree['sections']: - sub_frid = get_specifications_from_plain_source_tree(frid, section, definitions, non_functional_requirements, test_requirements, functional_requirements, code_variables, replace_code_variables, section['ID']) + if "sections" in plain_source_tree: + for section in plain_source_tree["sections"]: + sub_frid = get_specifications_from_plain_source_tree( + frid, + section, + definitions, + non_functional_requirements, + test_requirements, + functional_requirements, + code_variables, + replace_code_variables, + section["ID"], + ) if sub_frid is not None: return_frid = sub_frid break if return_frid is not None: if DEFINITIONS in plain_source_tree and plain_source_tree[DEFINITIONS] is not None: - definitions[0:0] = [get_specification_item_markdown(specification, code_variables, replace_code_variables) for specification in plain_source_tree[DEFINITIONS]] - if NON_FUNCTIONAL_REQUIREMENTS in plain_source_tree and plain_source_tree[NON_FUNCTIONAL_REQUIREMENTS] is not None: - non_functional_requirements[0:0] = [get_specification_item_markdown(specification, code_variables, replace_code_variables) for specification in plain_source_tree[NON_FUNCTIONAL_REQUIREMENTS]] + definitions[0:0] = [ + get_specification_item_markdown(specification, code_variables, replace_code_variables) + for specification in plain_source_tree[DEFINITIONS] + ] + if ( + NON_FUNCTIONAL_REQUIREMENTS in plain_source_tree + and plain_source_tree[NON_FUNCTIONAL_REQUIREMENTS] is not None + ): + non_functional_requirements[0:0] = [ + get_specification_item_markdown(specification, code_variables, replace_code_variables) + for specification in plain_source_tree[NON_FUNCTIONAL_REQUIREMENTS] + ] if TEST_REQUIREMENTS in plain_source_tree and plain_source_tree[TEST_REQUIREMENTS] is not None: - test_requirements[0:0] = [get_specification_item_markdown(specification, code_variables, replace_code_variables) for specification in plain_source_tree[TEST_REQUIREMENTS]] + test_requirements[0:0] = [ + get_specification_item_markdown(specification, code_variables, replace_code_variables) + for specification in plain_source_tree[TEST_REQUIREMENTS] + ] return return_frid @@ -185,7 +224,16 @@ def get_specifications_for_frid(plain_source_tree, frid, replace_code_variables= code_variables = {} - result = get_specifications_from_plain_source_tree(frid, plain_source_tree, definitions, non_functional_requirements, test_requirements, functional_requirements, code_variables, replace_code_variables) + result = get_specifications_from_plain_source_tree( + frid, + plain_source_tree, + definitions, + non_functional_requirements, + test_requirements, + functional_requirements, + code_variables, + replace_code_variables, + ) if result is None: raise Exception(f"Functional requirement {frid} does not exist.") @@ -193,7 +241,7 @@ def get_specifications_for_frid(plain_source_tree, frid, replace_code_variables= DEFINITIONS: definitions, NON_FUNCTIONAL_REQUIREMENTS: non_functional_requirements, TEST_REQUIREMENTS: test_requirements, - FUNCTIONAL_REQUIREMENTS: functional_requirements + FUNCTIONAL_REQUIREMENTS: functional_requirements, } if code_variables: @@ -207,8 +255,8 @@ def code_variable_liquid_filter(value, *, context): if len(context.scope) == 0: raise Exception("Invalid use of code_variable filter!") - if 'code_variables' in context.globals: - code_variables = context.globals['code_variables'] + if "code_variables" in context.globals: + code_variables = context.globals["code_variables"] variable = next(iter(context.scope.items())) @@ -225,17 +273,19 @@ def code_variable_liquid_filter(value, *, context): def prohibited_chars_liquid_filter(value, prohibited_chars, *, context): if not isinstance(value, str): value = str(value) - + if len(context.scope) == 0: raise Exception("Invalid use of prohibited_chars filter!") variable = next(iter(context.scope.items())) variable_name = variable[0] - + for char in prohibited_chars: if char in value: - raise InvalidLiquidVariableName(f"'{char}' is not a valid character for variable '{variable_name}' (value: '{value}').") - + raise InvalidLiquidVariableName( + f"'{char}' is not a valid character for variable '{variable_name}' (value: '{value}')." + ) + return value