Skip to content

Commit

Permalink
Black everything
Browse files Browse the repository at this point in the history
  • Loading branch information
alufers committed Feb 18, 2023
1 parent e98bf13 commit 396bfd6
Show file tree
Hide file tree
Showing 5 changed files with 255 additions and 196 deletions.
16 changes: 11 additions & 5 deletions mitmproxy2swagger/console_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,32 +29,38 @@ def rgb_interpolate(start, end, progress):
def rainbow_at_position(progress):
idx_a = int(progress * float(len(RAINBOW_COLORS) - 1))
idx_b = idx_a + 1
return rgb_interpolate(RAINBOW_COLORS[idx_a], RAINBOW_COLORS[idx_b], progress * float(len(RAINBOW_COLORS) - 1) - idx_a)
return rgb_interpolate(
RAINBOW_COLORS[idx_a],
RAINBOW_COLORS[idx_b],
progress * float(len(RAINBOW_COLORS) - 1) - idx_a,
)


def print_progress_bar(progress=0.0):
sys.stdout.write("\r")
progress_bar_contents = ""
PROGRESS_LENGTH = 30
blocks = ['▉', '▊', '▋', '▌', '▍', '▎', '▏']
blocks = ["▉", "▊", "▋", "▌", "▍", "▎", "▏"]

for i in range(PROGRESS_LENGTH):
interpolated = rainbow_at_position(i / PROGRESS_LENGTH)
# check if should print a full block
if i < int(progress * PROGRESS_LENGTH):
interpolated_2nd_half = rainbow_at_position((i + 0.5) / PROGRESS_LENGTH)
interpolated_2nd_half = rainbow_at_position((i + 0.5) / PROGRESS_LENGTH)
progress_bar_contents += ANSI_RGB.format(*interpolated)
progress_bar_contents += ANSI_RGB_BG.format(*interpolated_2nd_half)
progress_bar_contents += "▌"
# check if should print a non-full block
elif i < int((progress * PROGRESS_LENGTH) + 0.5):
progress_bar_contents += ANSI_RESET
progress_bar_contents += ANSI_RGB.format(*interpolated)
progress_bar_contents += blocks[int((progress * PROGRESS_LENGTH) + 0.5) - i - 1]
progress_bar_contents += blocks[
int((progress * PROGRESS_LENGTH) + 0.5) - i - 1
]
# otherwise, print a space
else:
progress_bar_contents += ANSI_RESET
progress_bar_contents += ' '
progress_bar_contents += " "

progress_bar_contents += ANSI_RESET
sys.stdout.write("[{}] {:.1f}%".format(progress_bar_contents, progress * 100))
Expand Down
58 changes: 35 additions & 23 deletions mitmproxy2swagger/har_capture_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,16 @@
def har_archive_heuristic(file_path: str) -> int:
val = 0
# if has the har extension
if file_path.endswith('.har'):
if file_path.endswith(".har"):
val += 15
# read the first 2048 bytes
with open(file_path, 'rb') as f:
with open(file_path, "rb") as f:
data = f.read(2048)
# if file contains only ascii characters
if data.decode('utf-8', 'ignore').isprintable() is True:
if data.decode("utf-8", "ignore").isprintable() is True:
val += 25
# if first character is a '{'
if data[0] == '{':
if data[0] == "{":
val += 23
# if it contains the word '"WebInspector"'
if b'"WebInspector"' in data:
Expand All @@ -36,46 +36,57 @@ def __init__(self, flow: dict):
self.flow = flow

def get_url(self):
return self.flow['request']['url']
return self.flow["request"]["url"]

def get_method(self):
return self.flow['request']['method']
return self.flow["request"]["method"]

def get_request_headers(self):
headers = {}
for kv in self.flow['request']['headers']:
k = kv['name']
v = kv['value']
for kv in self.flow["request"]["headers"]:
k = kv["name"]
v = kv["value"]
# create list on key if it does not exist
headers[k] = headers.get(k, [])
headers[k].append(v)

def get_request_body(self):
if 'request' in self.flow and 'postData' in self.flow['request'] and 'text' in self.flow['request']['postData']:
return self.flow['request']['postData']['text']
if (
"request" in self.flow
and "postData" in self.flow["request"]
and "text" in self.flow["request"]["postData"]
):
return self.flow["request"]["postData"]["text"]
return None

def get_response_status_code(self):
return self.flow['response']['status']
return self.flow["response"]["status"]

def get_response_reason(self):
return self.flow['response']['statusText']
return self.flow["response"]["statusText"]

def get_response_headers(self):
headers = {}
for kv in self.flow['response']['headers']:
k = kv['name']
v = kv['value']
for kv in self.flow["response"]["headers"]:
k = kv["name"]
v = kv["value"]
# create list on key if it does not exist
headers[k] = headers.get(k, [])
headers[k].append(v)
return headers

def get_response_body(self):
if 'response' in self.flow and 'content' in self.flow['response'] and 'text' in self.flow['response']['content']:
if 'encoding' in self.flow['response']['content'] and self.flow['response']['content']['encoding'] == 'base64':
return b64decode(self.flow['response']['content']['text']).decode()
return self.flow['response']['content']['text']
if (
"response" in self.flow
and "content" in self.flow["response"]
and "text" in self.flow["response"]["content"]
):
if (
"encoding" in self.flow["response"]["content"]
and self.flow["response"]["content"]["encoding"] == "base64"
):
return b64decode(self.flow["response"]["content"]["text"]).decode()
return self.flow["response"]["content"]["text"]
return None


Expand All @@ -86,11 +97,12 @@ def __init__(self, file_path: str, progress_callback=None):

def captured_requests(self) -> Iterator[HarFlowWrapper]:
har_file_size = os.path.getsize(self.file_path)
with open(self.file_path, 'r', encoding='utf-8') as f:
with open(self.file_path, "r", encoding="utf-8") as f:
data = json_stream.load(f)
for entry in data['log']['entries'].persistent():
for entry in data["log"]["entries"].persistent():
if self.progress_callback:
self.progress_callback(f.tell() / har_file_size)
yield HarFlowWrapper(entry)

def name(self):
return 'har'
return "har"
Loading

0 comments on commit 396bfd6

Please sign in to comment.