From 396bfd67e16fda76c598043a30e05711923203c2 Mon Sep 17 00:00:00 2001 From: alufers Date: Sat, 18 Feb 2023 19:40:35 +0100 Subject: [PATCH] Black everything --- mitmproxy2swagger/console_util.py | 16 +- mitmproxy2swagger/har_capture_reader.py | 58 ++-- mitmproxy2swagger/mitmproxy2swagger.py | 251 +++++++++++------- mitmproxy2swagger/mitmproxy_capture_reader.py | 19 +- mitmproxy2swagger/swagger_util.py | 107 ++++---- 5 files changed, 255 insertions(+), 196 deletions(-) diff --git a/mitmproxy2swagger/console_util.py b/mitmproxy2swagger/console_util.py index f1dda76..b1a174a 100644 --- a/mitmproxy2swagger/console_util.py +++ b/mitmproxy2swagger/console_util.py @@ -29,20 +29,24 @@ def rgb_interpolate(start, end, progress): def rainbow_at_position(progress): idx_a = int(progress * float(len(RAINBOW_COLORS) - 1)) idx_b = idx_a + 1 - return rgb_interpolate(RAINBOW_COLORS[idx_a], RAINBOW_COLORS[idx_b], progress * float(len(RAINBOW_COLORS) - 1) - idx_a) + return rgb_interpolate( + RAINBOW_COLORS[idx_a], + RAINBOW_COLORS[idx_b], + progress * float(len(RAINBOW_COLORS) - 1) - idx_a, + ) def print_progress_bar(progress=0.0): sys.stdout.write("\r") progress_bar_contents = "" PROGRESS_LENGTH = 30 - blocks = ['▉', '▊', '▋', '▌', '▍', '▎', '▏'] + blocks = ["▉", "▊", "▋", "▌", "▍", "▎", "▏"] for i in range(PROGRESS_LENGTH): interpolated = rainbow_at_position(i / PROGRESS_LENGTH) # check if should print a full block if i < int(progress * PROGRESS_LENGTH): - interpolated_2nd_half = rainbow_at_position((i + 0.5) / PROGRESS_LENGTH) + interpolated_2nd_half = rainbow_at_position((i + 0.5) / PROGRESS_LENGTH) progress_bar_contents += ANSI_RGB.format(*interpolated) progress_bar_contents += ANSI_RGB_BG.format(*interpolated_2nd_half) progress_bar_contents += "▌" @@ -50,11 +54,13 @@ def print_progress_bar(progress=0.0): elif i < int((progress * PROGRESS_LENGTH) + 0.5): progress_bar_contents += ANSI_RESET progress_bar_contents += ANSI_RGB.format(*interpolated) - progress_bar_contents += blocks[int((progress * PROGRESS_LENGTH) + 0.5) - i - 1] + progress_bar_contents += blocks[ + int((progress * PROGRESS_LENGTH) + 0.5) - i - 1 + ] # otherwise, print a space else: progress_bar_contents += ANSI_RESET - progress_bar_contents += ' ' + progress_bar_contents += " " progress_bar_contents += ANSI_RESET sys.stdout.write("[{}] {:.1f}%".format(progress_bar_contents, progress * 100)) diff --git a/mitmproxy2swagger/har_capture_reader.py b/mitmproxy2swagger/har_capture_reader.py index 6f00dd6..b46f9d6 100644 --- a/mitmproxy2swagger/har_capture_reader.py +++ b/mitmproxy2swagger/har_capture_reader.py @@ -8,16 +8,16 @@ def har_archive_heuristic(file_path: str) -> int: val = 0 # if has the har extension - if file_path.endswith('.har'): + if file_path.endswith(".har"): val += 15 # read the first 2048 bytes - with open(file_path, 'rb') as f: + with open(file_path, "rb") as f: data = f.read(2048) # if file contains only ascii characters - if data.decode('utf-8', 'ignore').isprintable() is True: + if data.decode("utf-8", "ignore").isprintable() is True: val += 25 # if first character is a '{' - if data[0] == '{': + if data[0] == "{": val += 23 # if it contains the word '"WebInspector"' if b'"WebInspector"' in data: @@ -36,46 +36,57 @@ def __init__(self, flow: dict): self.flow = flow def get_url(self): - return self.flow['request']['url'] + return self.flow["request"]["url"] def get_method(self): - return self.flow['request']['method'] + return self.flow["request"]["method"] def get_request_headers(self): headers = {} - for kv in self.flow['request']['headers']: - k = kv['name'] - v = kv['value'] + for kv in self.flow["request"]["headers"]: + k = kv["name"] + v = kv["value"] # create list on key if it does not exist headers[k] = headers.get(k, []) headers[k].append(v) def get_request_body(self): - if 'request' in self.flow and 'postData' in self.flow['request'] and 'text' in self.flow['request']['postData']: - return self.flow['request']['postData']['text'] + if ( + "request" in self.flow + and "postData" in self.flow["request"] + and "text" in self.flow["request"]["postData"] + ): + return self.flow["request"]["postData"]["text"] return None def get_response_status_code(self): - return self.flow['response']['status'] + return self.flow["response"]["status"] def get_response_reason(self): - return self.flow['response']['statusText'] + return self.flow["response"]["statusText"] def get_response_headers(self): headers = {} - for kv in self.flow['response']['headers']: - k = kv['name'] - v = kv['value'] + for kv in self.flow["response"]["headers"]: + k = kv["name"] + v = kv["value"] # create list on key if it does not exist headers[k] = headers.get(k, []) headers[k].append(v) return headers def get_response_body(self): - if 'response' in self.flow and 'content' in self.flow['response'] and 'text' in self.flow['response']['content']: - if 'encoding' in self.flow['response']['content'] and self.flow['response']['content']['encoding'] == 'base64': - return b64decode(self.flow['response']['content']['text']).decode() - return self.flow['response']['content']['text'] + if ( + "response" in self.flow + and "content" in self.flow["response"] + and "text" in self.flow["response"]["content"] + ): + if ( + "encoding" in self.flow["response"]["content"] + and self.flow["response"]["content"]["encoding"] == "base64" + ): + return b64decode(self.flow["response"]["content"]["text"]).decode() + return self.flow["response"]["content"]["text"] return None @@ -86,11 +97,12 @@ def __init__(self, file_path: str, progress_callback=None): def captured_requests(self) -> Iterator[HarFlowWrapper]: har_file_size = os.path.getsize(self.file_path) - with open(self.file_path, 'r', encoding='utf-8') as f: + with open(self.file_path, "r", encoding="utf-8") as f: data = json_stream.load(f) - for entry in data['log']['entries'].persistent(): + for entry in data["log"]["entries"].persistent(): if self.progress_callback: self.progress_callback(f.tell() / har_file_size) yield HarFlowWrapper(entry) + def name(self): - return 'har' + return "har" diff --git a/mitmproxy2swagger/mitmproxy2swagger.py b/mitmproxy2swagger/mitmproxy2swagger.py index 2c6d8ab..9c22bd3 100755 --- a/mitmproxy2swagger/mitmproxy2swagger.py +++ b/mitmproxy2swagger/mitmproxy2swagger.py @@ -12,30 +12,34 @@ import re from . import swagger_util from .har_capture_reader import HarCaptureReader, har_archive_heuristic -from .mitmproxy_capture_reader import MitmproxyCaptureReader, mitmproxy_dump_file_huristic +from .mitmproxy_capture_reader import ( + MitmproxyCaptureReader, + mitmproxy_dump_file_huristic, +) from . import console_util import urllib + def path_to_regex(path): # replace the path template with a regex - path = path.replace('\\', '\\\\') - path = path.replace('{', '(?P<') - path = path.replace('}', '>[^/]+)') - path = path.replace('*', '.*') - path = path.replace('/', '\\/') - path = path.replace('?', '\\?') - path = path.replace('(', '\\(') - path = path.replace(')', '\\)') - path = path.replace('.', '\\.') - path = path.replace('+', '\\+') - path = path.replace('[', '\\[') - path = path.replace(']', '\\]') + path = path.replace("\\", "\\\\") + path = path.replace("{", "(?P<") + path = path.replace("}", ">[^/]+)") + path = path.replace("*", ".*") + path = path.replace("/", "\\/") + path = path.replace("?", "\\?") + path = path.replace("(", "\\(") + path = path.replace(")", "\\)") + path = path.replace(".", "\\.") + path = path.replace("+", "\\+") + path = path.replace("[", "\\[") + path = path.replace("]", "\\]") return "^" + path + "$" def strip_query_string(path): # remove the query string from the path - return path.split('?')[0] + return path.split("?")[0] def set_key_if_not_exists(dict, key, value): @@ -49,10 +53,10 @@ def progress_callback(progress): def detect_input_format(file_path): har_score = har_archive_heuristic(file_path) - mitmproxy_score = mitmproxy_dump_file_huristic(file_path) - if 'MITMPROXY2SWAGGER_DEBUG' in os.environ: - print('har score: ' + str(har_score)) - print('mitmproxy score: ' + str(mitmproxy_score)) + mitmproxy_score = mitmproxy_dump_file_huristic(file_path) + if "MITMPROXY2SWAGGER_DEBUG" in os.environ: + print("har score: " + str(har_score)) + print("mitmproxy score: " + str(mitmproxy_score)) if har_score > mitmproxy_score: return HarCaptureReader(file_path, progress_callback) return MitmproxyCaptureReader(file_path, progress_callback) @@ -60,24 +64,41 @@ def detect_input_format(file_path): def main(): parser = argparse.ArgumentParser( - description='Converts a mitmproxy dump file or HAR to a swagger schema.') + description="Converts a mitmproxy dump file or HAR to a swagger schema." + ) + parser.add_argument( + "-i", + "--input", + help="The input mitmproxy dump file or HAR dump file (from DevTools)", + required=True, + ) parser.add_argument( - '-i', '--input', help='The input mitmproxy dump file or HAR dump file (from DevTools)', required=True) + "-o", + "--output", + help="The output swagger schema file (yaml). If it exists, new endpoints will be added", + required=True, + ) + parser.add_argument("-p", "--api-prefix", help="The api prefix", required=True) parser.add_argument( - '-o', '--output', help='The output swagger schema file (yaml). If it exists, new endpoints will be added', required=True) - parser.add_argument('-p', '--api-prefix', help='The api prefix', required=True) - parser.add_argument('-e', '--examples', action='store_true', - help='Include examples in the schema. This might expose sensitive information.') - parser.add_argument('-f', '--format', choices=['flow', 'har'], - help='Override the input file format auto-detection.') + "-e", + "--examples", + action="store_true", + help="Include examples in the schema. This might expose sensitive information.", + ) + parser.add_argument( + "-f", + "--format", + choices=["flow", "har"], + help="Override the input file format auto-detection.", + ) args = parser.parse_args() yaml = ruamel.yaml.YAML() capture_reader = None - if args.format == 'flow' or args.format == 'mitmproxy': + if args.format == "flow" or args.format == "mitmproxy": capture_reader = MitmproxyCaptureReader(args.input, progress_callback) - elif args.format == 'har': + elif args.format == "har": capture_reader = HarCaptureReader(args.input, progress_callback) else: capture_reader = detect_input_format(args.input) @@ -86,45 +107,46 @@ def main(): # try loading the existing swagger file try: - with open(args.output, 'r') as f: + with open(args.output, "r") as f: swagger = yaml.load(f) except FileNotFoundError: print("No existing swagger file found. Creating new one.") pass if swagger is None: - swagger = ruamel.yaml.comments.CommentedMap({ - "openapi": "3.0.0", - "info": { - "title": args.input + " Mitmproxy2Swagger", - "version": "1.0.0" - }, - }) + swagger = ruamel.yaml.comments.CommentedMap( + { + "openapi": "3.0.0", + "info": { + "title": args.input + " Mitmproxy2Swagger", + "version": "1.0.0", + }, + } + ) # strip the trailing slash from the api prefix - args.api_prefix = args.api_prefix.rstrip('/') + args.api_prefix = args.api_prefix.rstrip("/") - if 'servers' not in swagger or swagger['servers'] is None: - swagger['servers'] = [] + if "servers" not in swagger or swagger["servers"] is None: + swagger["servers"] = [] # add the server if it doesn't exist - if not any(server['url'] == args.api_prefix for server in swagger['servers']): - swagger['servers'].append({ - "url": args.api_prefix, - "description": "The default server" - }) + if not any(server["url"] == args.api_prefix for server in swagger["servers"]): + swagger["servers"].append( + {"url": args.api_prefix, "description": "The default server"} + ) - if 'paths' not in swagger or swagger['paths'] is None: - swagger['paths'] = {} + if "paths" not in swagger or swagger["paths"] is None: + swagger["paths"] = {} - if 'x-path-templates' not in swagger or swagger['x-path-templates'] is None: - swagger['x-path-templates'] = [] + if "x-path-templates" not in swagger or swagger["x-path-templates"] is None: + swagger["x-path-templates"] = [] path_templates = [] - for path in swagger['paths']: + for path in swagger["paths"]: path_templates.append(path) # also add paths from the the x-path-templates array - if 'x-path-templates' in swagger and swagger['x-path-templates'] is not None: - for path in swagger['x-path-templates']: + if "x-path-templates" in swagger and swagger["x-path-templates"] is not None: + for path in swagger["x-path-templates"]: path_templates.append(path) # new endpoints will be added here so that they can be added as comments in the swagger file @@ -133,8 +155,7 @@ def main(): print("Compiling path " + path) print("Compiled to regex: " + path_to_regex(path)) re.compile(path_to_regex(path)) - path_template_regexes = [re.compile(path_to_regex(path)) - for path in path_templates] + path_template_regexes = [re.compile(path_to_regex(path)) for path in path_templates] try: for f in capture_reader.captured_requests(): @@ -159,20 +180,27 @@ def main(): continue path_template_to_set = path_templates[path_template_index] - set_key_if_not_exists( - swagger['paths'], path_template_to_set, {}) + set_key_if_not_exists(swagger["paths"], path_template_to_set, {}) - set_key_if_not_exists(swagger['paths'][path_template_to_set], method, { - 'summary': swagger_util.path_template_to_endpoint_name(method, path_template_to_set), - 'responses': {} - }) + set_key_if_not_exists( + swagger["paths"][path_template_to_set], + method, + { + "summary": swagger_util.path_template_to_endpoint_name( + method, path_template_to_set + ), + "responses": {}, + }, + ) params = swagger_util.url_to_params(url, path_template_to_set) if params is not None and len(params) > 0: - set_key_if_not_exists(swagger['paths'][path_template_to_set][method], 'parameters', params) + set_key_if_not_exists( + swagger["paths"][path_template_to_set][method], "parameters", params + ) - if method not in ['get', 'head']: + if method not in ["get", "head"]: body = f.get_request_body() if body is not None: body_val = None @@ -180,7 +208,7 @@ def main(): # try to parse the body as json try: body_val = json.loads(f.get_request_body()) - content_type = 'application/json' + content_type = "application/json" except UnicodeDecodeError: pass except json.decoder.JSONDecodeError: @@ -188,32 +216,40 @@ def main(): if content_type is None: # try to parse the body as form data try: - body_val_bytes = dict(urllib.parse.parse_qsl(body, encoding='utf-8', keep_blank_values=True)) + body_val_bytes = dict( + urllib.parse.parse_qsl( + body, encoding="utf-8", keep_blank_values=True + ) + ) body_val = {} did_find_anything = False for key, value in body_val_bytes.items(): did_find_anything = True - body_val[key.decode('utf-8')] = value.decode('utf-8') + body_val[key.decode("utf-8")] = value.decode("utf-8") if did_find_anything: - content_type = 'application/x-www-form-urlencoded' + content_type = "application/x-www-form-urlencoded" else: body_val = None except UnicodeDecodeError: pass - + if body_val is not None: content_to_set = { - 'content': { + "content": { content_type: { - 'schema': swagger_util.value_to_schema(body_val) + "schema": swagger_util.value_to_schema(body_val) } } } if args.examples: - content_to_set['content'][content_type]['example'] = swagger_util.limit_example_size( - body_val) + content_to_set["content"][content_type][ + "example" + ] = swagger_util.limit_example_size(body_val) set_key_if_not_exists( - swagger['paths'][path_template_to_set][method], 'requestBody', content_to_set) + swagger["paths"][path_template_to_set][method], + "requestBody", + content_to_set, + ) # try parsing the response as json response_body = f.get_response_body() @@ -226,33 +262,45 @@ def main(): response_json = None if response_json is not None: resp_data_to_set = { - 'description': f.get_response_reason(), - 'content': { - 'application/json': { - 'schema': swagger_util.value_to_schema(response_json) + "description": f.get_response_reason(), + "content": { + "application/json": { + "schema": swagger_util.value_to_schema(response_json) } - } + }, } if args.examples: - resp_data_to_set['content']['application/json']['example'] = swagger_util.limit_example_size( - response_json) - set_key_if_not_exists(swagger['paths'][path_template_to_set][method]['responses'], str( - status), resp_data_to_set) + resp_data_to_set["content"]["application/json"][ + "example" + ] = swagger_util.limit_example_size(response_json) + set_key_if_not_exists( + swagger["paths"][path_template_to_set][method]["responses"], + str(status), + resp_data_to_set, + ) except FlowReadException as e: print(f"Flow file corrupted: {e}") traceback.print_exception(*sys.exc_info()) - print(f"{console_util.ANSI_RED}Failed to parse the input file as '{capture_reader.name()}'. ") + print( + f"{console_util.ANSI_RED}Failed to parse the input file as '{capture_reader.name()}'. " + ) if not args.format: - print(f"It might happen that the input format as incorrectly detected. Please try using '--format flow' or '--format har' to specify the input format.{console_util.ANSI_RESET}") + print( + f"It might happen that the input format as incorrectly detected. Please try using '--format flow' or '--format har' to specify the input format.{console_util.ANSI_RESET}" + ) sys.exit(1) except ValueError as e: print(f"ValueError: {e}") # print stack trace traceback.print_exception(*sys.exc_info()) - print(f"{console_util.ANSI_RED}Failed to parse the input file as '{capture_reader.name()}'. ") + print( + f"{console_util.ANSI_RED}Failed to parse the input file as '{capture_reader.name()}'. " + ) if not args.format: - print(f"It might happen that the input format as incorrectly detected. Please try using '--format flow' or '--format har' to specify the input format.{console_util.ANSI_RESET}") + print( + f"It might happen that the input format as incorrectly detected. Please try using '--format flow' or '--format har' to specify the input format.{console_util.ANSI_RESET}" + ) sys.exit(1) new_path_templates.sort() @@ -262,50 +310,53 @@ def main(): new_path_templates_with_suggestions = [] for idx, path in enumerate(new_path_templates): # check if path contains number-only segments - segments = path.split('/') + segments = path.split("/") if any(segment.isdigit() for segment in segments): # replace digit segments with {id}, {id1}, {id2} etc new_segments = [] param_id = 0 for segment in segments: if segment.isdigit(): - param_name = 'id' + str(param_id) + param_name = "id" + str(param_id) if param_id == 0: - param_name = 'id' - new_segments.append('{' + param_name + '}') + param_name = "id" + new_segments.append("{" + param_name + "}") param_id += 1 else: new_segments.append(segment) - suggested_path = '/'.join(new_segments) + suggested_path = "/".join(new_segments) # prepend the suggested path to the new_path_templates list if suggested_path not in new_path_templates_with_suggestions: - new_path_templates_with_suggestions.append( - "ignore:" + suggested_path) + new_path_templates_with_suggestions.append("ignore:" + suggested_path) new_path_templates_with_suggestions.append("ignore:" + path) # remove the ending comments not to add them twice # append the contents of new_path_templates_with_suggestions to swagger['x-path-templates'] for path in new_path_templates_with_suggestions: - swagger['x-path-templates'].append(path) + swagger["x-path-templates"].append(path) # remove elements already generated - swagger['x-path-templates'] = [ - path for path in swagger['x-path-templates'] if path not in swagger['paths']] + swagger["x-path-templates"] = [ + path for path in swagger["x-path-templates"] if path not in swagger["paths"] + ] # remove duplicates while preserving order def f7(seq): seen = set() seen_add = seen.add return [x for x in seq if not (x in seen or seen_add(x))] - swagger['x-path-templates'] = f7(swagger['x-path-templates']) - swagger['x-path-templates'] = ruamel.yaml.comments.CommentedSeq( - swagger['x-path-templates']) - swagger['x-path-templates'].yaml_set_start_comment( - 'Remove the ignore: prefix to generate an endpoint with its URL\nLines that are closer to the top take precedence, the matching is greedy') + swagger["x-path-templates"] = f7(swagger["x-path-templates"]) + + swagger["x-path-templates"] = ruamel.yaml.comments.CommentedSeq( + swagger["x-path-templates"] + ) + swagger["x-path-templates"].yaml_set_start_comment( + "Remove the ignore: prefix to generate an endpoint with its URL\nLines that are closer to the top take precedence, the matching is greedy" + ) # save the swagger file - with open(args.output, 'w') as f: + with open(args.output, "w") as f: yaml.dump(swagger, f) print("Done!") diff --git a/mitmproxy2swagger/mitmproxy_capture_reader.py b/mitmproxy2swagger/mitmproxy_capture_reader.py index a356d45..6fb9c31 100644 --- a/mitmproxy2swagger/mitmproxy_capture_reader.py +++ b/mitmproxy2swagger/mitmproxy_capture_reader.py @@ -6,23 +6,23 @@ def mitmproxy_dump_file_huristic(file_path: str) -> int: val = 0 - if 'flow' in file_path: + if "flow" in file_path: val += 1 - if 'mitmproxy' in file_path: + if "mitmproxy" in file_path: val += 1 # read the first 2048 bytes - with open(file_path, 'rb') as f: + with open(file_path, "rb") as f: data = f.read(2048) # if file contains non-ascii characters - if data.decode('utf-8', 'ignore').isprintable() is False: + if data.decode("utf-8", "ignore").isprintable() is False: val += 50 # if first character of the byte array is a digit if str(data[0]).isdigit() is True: val += 5 # if it contains the word status_code - if b'status_code' in data: + if b"status_code" in data: val += 5 - if b'regular' in data: + if b"regular" in data: val += 10 return val @@ -72,7 +72,7 @@ def __init__(self, file_path, progress_callback=None): self.progress_callback = progress_callback def captured_requests(self) -> Iterator[MitmproxyFlowWrapper]: - with open(self.file_path, 'rb') as logfile: + with open(self.file_path, "rb") as logfile: logfile_size = os.path.getsize(self.file_path) freader = iom.FlowReader(logfile) try: @@ -81,10 +81,13 @@ def captured_requests(self) -> Iterator[MitmproxyFlowWrapper]: self.progress_callback(logfile.tell() / logfile_size) if isinstance(f, http.HTTPFlow): if f.response is None: - print("[warn] flow without response: {}".format(f.request.url)) + print( + "[warn] flow without response: {}".format(f.request.url) + ) continue yield MitmproxyFlowWrapper(f) except FlowReadException as e: print(f"Flow file corrupted: {e}") + def name(self): return "flow" diff --git a/mitmproxy2swagger/swagger_util.py b/mitmproxy2swagger/swagger_util.py index 55f5d1c..fef2d7d 100644 --- a/mitmproxy2swagger/swagger_util.py +++ b/mitmproxy2swagger/swagger_util.py @@ -1,32 +1,32 @@ import urllib VERBS = [ - 'add', - 'create', - 'delete', - 'get', - 'attach', - 'detach', - 'update', - 'push', - 'extendedcreate', - 'activate' + "add", + "create", + "delete", + "get", + "attach", + "detach", + "update", + "push", + "extendedcreate", + "activate", ] # generate a name for the endpoint from the path template # POST /api/v1/things/{id}/create -> POST create thing by id def path_template_to_endpoint_name(method, path_template): - path_template = path_template.strip('/') - segments = path_template.split('/') + path_template = path_template.strip("/") + segments = path_template.split("/") # remove params to a separate array params = [] for idx, segment in enumerate(segments): - if segment.startswith('{') and segment.endswith('}'): + if segment.startswith("{") and segment.endswith("}"): params.append(segment) - segments[idx] = '{}' + segments[idx] = "{}" # remove them from the segments - segments = [segment for segment in segments if segment != '{}'] + segments = [segment for segment in segments if segment != "{}"] # reverse the segments segments.reverse() name_parts = [] @@ -38,84 +38,71 @@ def path_template_to_endpoint_name(method, path_template): name_parts.insert(0, segment.lower()) break for param in params: - name_parts.append("by " + param.replace('{', '').replace('}', '')) + name_parts.append("by " + param.replace("{", "").replace("}", "")) break - return method.upper() + ' ' + ' '.join(name_parts) + return method.upper() + " " + " ".join(name_parts) # when given an url and its path template, generates the parameters section of the request def url_to_params(url, path_template): - path_template = path_template.strip('/') - segments = path_template.split('/') - url_segments = url.split('?')[0].strip('/').split('/') + path_template = path_template.strip("/") + segments = path_template.split("/") + url_segments = url.split("?")[0].strip("/").split("/") params = [] for idx, segment in enumerate(segments): - if segment.startswith('{') and segment.endswith('}'): - params.append({ - 'name': segment.replace('{', '').replace('}', ''), - 'in': 'path', - 'required': True, - 'schema': { - 'type': 'number' if url_segments[idx].isdigit() else 'string' + if segment.startswith("{") and segment.endswith("}"): + params.append( + { + "name": segment.replace("{", "").replace("}", ""), + "in": "path", + "required": True, + "schema": { + "type": "number" if url_segments[idx].isdigit() else "string" + }, } - }) + ) query_string = urllib.parse.urlparse(url).query if query_string: query_params = urllib.parse.parse_qs(query_string) for key in query_params: - params.append({ - 'name': key, - 'in': 'query', - 'required': False, - 'schema': { - 'type': 'number' if query_params[key][0].isdigit() else 'string' + params.append( + { + "name": key, + "in": "query", + "required": False, + "schema": { + "type": "number" if query_params[key][0].isdigit() else "string" + }, } - }) + ) return params def value_to_schema(value): # check if value is a number if type(value) == int or type(value) == float: - return { - 'type': 'number' - } + return {"type": "number"} # check if value is a boolean elif type(value) == bool: - return { - 'type': 'boolean' - } + return {"type": "boolean"} # check if value is a string elif type(value) == str: - return { - 'type': 'string' - } + return {"type": "string"} # check if value is a list elif type(value) == list: if len(value) == 0: - return { - 'type': 'array', - 'items': {} - } + return {"type": "array", "items": {}} - return { - 'type': 'array', - 'items': value_to_schema(value[0]) - } + return {"type": "array", "items": value_to_schema(value[0])} # check if value is a dict elif type(value) == dict: return { - 'type': 'object', - 'properties': { - key: value_to_schema(value[key]) - for key in value - } + "type": "object", + "properties": {key: value_to_schema(value[key]) for key in value}, } # if it is none, return null elif value is None: - return { - 'type': 'object' - } + return {"type": "object"} MAX_EXAMPLE_ARRAY_ELEMENTS = 10