Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 57 additions & 4 deletions jira/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,65 @@
from __future__ import annotations

import json
import os
import re
import tempfile
from typing import Any

from requests import Response


def _sanitize_headers(headers: dict[str, Any] | Any) -> dict[str, Any] | Any:
"""Mask sensitive headers."""
if not isinstance(headers, dict) and not hasattr(headers, "items"):
return headers

sensitive_headers = {
"authorization",
"cookie",
"set-cookie",
"x-atlassian-token",
"proxy-authorization",
}
sanitized = dict(headers)
for key in sanitized:
if key.lower() in sensitive_headers:
sanitized[key] = "********"
return sanitized


def _sanitize_body(body: str | Any) -> str | Any:
"""Mask sensitive information in the body (e.g. passwords in JSON)."""
if not isinstance(body, str):
return body

try:
data = json.loads(body)
if isinstance(data, dict):
sensitive_keys = {"password", "token", "secret", "access_token"}

def scrub(obj):
if isinstance(obj, dict):
for key, value in obj.items():
if key.lower() in sensitive_keys:
obj[key] = "********"
else:
scrub(value)
elif isinstance(obj, list):
for item in obj:
scrub(item)

scrub(data)
return json.dumps(data)
except (json.JSONDecodeError, TypeError):
# If it's not JSON, we can use some regex for common patterns if needed,
# but JSON is the most common for Jira APIs.
body = re.sub(r'("password"\s*:\s*")[^"]+(")', r'\1********\2', body, flags=re.I)
body = re.sub(r'(password=[^&\s]+)', r'password=********', body, flags=re.I)

return body


class JIRAError(Exception):
"""General error raised for all problems in operation of the client."""

Expand Down Expand Up @@ -46,16 +99,16 @@ def __str__(self) -> str:
details = ""
if self.request is not None:
if hasattr(self.request, "headers"):
details += f"\n\trequest headers = {self.request.headers}"
details += f"\n\trequest headers = {_sanitize_headers(self.request.headers)}"

if hasattr(self.request, "text"):
details += f"\n\trequest text = {self.request.text}"
details += f"\n\trequest text = {_sanitize_body(self.request.text)}"
if self.response is not None:
if hasattr(self.response, "headers"):
details += f"\n\tresponse headers = {self.response.headers}"
details += f"\n\tresponse headers = {_sanitize_headers(self.response.headers)}"

if hasattr(self.response, "text"):
details += f"\n\tresponse text = {self.response.text}"
details += f"\n\tresponse text = {_sanitize_body(self.response.text)}"

if self.log_to_tempfile:
# Only log to tempfile if the option is set.
Expand Down
Loading