Skip to content

Commit

Permalink
Additional configs for warnings about new domains
Browse files Browse the repository at this point in the history
  • Loading branch information
kam193 committed May 29, 2024
1 parent 298bef7 commit 2202369
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 14 deletions.
2 changes: 1 addition & 1 deletion network-information/VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
11
14
50 changes: 38 additions & 12 deletions network-information/service/al_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,19 +130,26 @@ def _handle_ip(self, ip_address: str) -> ResultTableSection:
section.add_subsection(subsection)
return section if section.subsections else None

def _get_tag_values(self, tag_template: str, selected_type: SelectedTagType) -> set[str]:
selected_tags = set()
def _get_tag_values(
self, tag_template: str, selected_type: SelectedTagType
) -> tuple[set[str], set[str]]:
# selected_tags = set()
static_tags = set()
dynamic_tags = set()
if selected_type in (SelectedTagType.STATIC, SelectedTagType.BOTH):
selected_tags.update(self._request.task.tags.get(tag_template.format("static")) or [])
static_tags.update(self._request.task.tags.get(tag_template.format("static")) or [])
if selected_type in (SelectedTagType.DYNAMIC, SelectedTagType.BOTH):
selected_tags.update(self._request.task.tags.get(tag_template.format("dynamic")) or [])
return selected_tags
dynamic_tags.update(self._request.task.tags.get(tag_template.format("dynamic")) or [])
return static_tags, dynamic_tags

def _handle_ips(self) -> ResultSection | None:
if not self._mmdb_enabled:
return None

ips = self._get_tag_values("network.{}.ip", self._request.get_param("ip_mmdb_lookup"))
static_ips, dynamic_ips = self._get_tag_values(
"network.{}.ip", self._request.get_param("ip_mmdb_lookup")
)
ips = static_ips | dynamic_ips
if not ips:
return None

Expand Down Expand Up @@ -180,7 +187,7 @@ def _call_cached_whois(self, domain: str) -> dict | None:
self.redis_client.set(cache_key, json.dumps(domain_info or {}), ex=self._cache_ttl)
return domain_info or None

def _handle_domain(self, domain: str) -> ResultTableSection:
def _handle_domain(self, domain: str, warn_new_domain: bool) -> ResultTableSection:
try:
domain_info = self._call_cached_whois(domain)
except Exception as exc:
Expand All @@ -197,7 +204,7 @@ def _handle_domain(self, domain: str) -> ResultTableSection:
value = "; ".join(value)
section.add_row(TableRow({"Information": key.title(), "Value": value}))

if self._warn_newer_than:
if self._warn_newer_than and warn_new_domain:
if created_at := domain_info.get("creation_date"):
if isinstance(created_at, list):
try:
Expand All @@ -223,24 +230,43 @@ def _handle_domains(self) -> ResultSection | None:
if not self._whois_enabled:
return None

domains = self._get_tag_values(
warn_new_domain = self._request.get_param("warn_new_domain")

static_domains, dynamic_domains = self._get_tag_values(
"network.{}.domain", self._request.get_param("domain_whois_lookup")
)
uris = self._get_tag_values("network.{}.uri", self._request.get_param("uri_whois_lookup"))
static_uris, dynamic_uris = self._get_tag_values(
"network.{}.uri", self._request.get_param("uri_whois_lookup")
)
domains = static_domains | dynamic_domains
uris = static_uris | dynamic_uris
if not domains and not uris:
return None

domains_to_check_creation = set()

top_domains = set()
for path in itertools.chain(domains, uris):
try:
parsed = tldextract.extract(path)
top_domains.add(f"{parsed.domain}.{parsed.suffix}")
domain = f"{parsed.domain}.{parsed.suffix}"
top_domains.add(domain)
if warn_new_domain == SelectedTagType.BOTH:
domains_to_check_creation.add(domain)
elif warn_new_domain == SelectedTagType.STATIC:
if path in static_domains or path in static_uris:
domains_to_check_creation.add(domain)
elif warn_new_domain == SelectedTagType.DYNAMIC:
if path in dynamic_domains or path in dynamic_uris:
domains_to_check_creation.add(domain)
except Exception as exc:
self.log.warning("Error parsing URI/Domain %s: %s", path, exc, exc_info=True)

main_section = ResultTextSection("Domain Information")
for domain in sorted(top_domains):
domain_section = self._handle_domain(domain)
domain_section = self._handle_domain(
domain, warn_new_domain=domain in domains_to_check_creation
)
if domain_section:
main_section.add_subsection(domain_section)

Expand Down
7 changes: 6 additions & 1 deletion network-information/service_manifest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ submission_params:
value: "both"
type: list
list: ["dynamic", "static", "both", "none"]
- name: warn_new_domain
default: "both"
value: "both"
type: list
list: ["dynamic", "static", "both", "none"]

# -1000: safe
# 0 - 299: informative
Expand Down Expand Up @@ -81,7 +86,7 @@ dependencies:
run_as_core: true
netinfo_cache:
container:
ram_mb: 256
ram_mb: 64
ram_mb_min: 32
cpu_cores: 0.5
allow_internet_access: false
Expand Down

0 comments on commit 2202369

Please sign in to comment.