Skip to content

Commit 759ab97

Browse files
committed
refactor(fake-email): replace dns_resolver with email-validator for email domain validation
Signed-off-by: Amine <[email protected]>
1 parent a7103e4 commit 759ab97

File tree

4 files changed

+107
-121
lines changed

4 files changed

+107
-121
lines changed

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ dependencies = [
3838
"problog >= 2.2.6,<3.0.0",
3939
"cryptography >=44.0.0,<45.0.0",
4040
"semgrep == 1.113.0",
41-
"dnspython >=2.7.0,<3.0.0",
41+
"email_validator >=2.2.0",
4242
]
4343
keywords = []
4444
# https://pypi.org/classifiers/

src/macaron/malware_analyzer/README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,8 @@ When a heuristic fails, with `HeuristicResult.FAIL`, then that is an indicator b
5858
- **Dependency**: None.
5959

6060
11. **Fake Email**
61-
- **Description**: Checks if the package maintainer or author has a suspicious or invalid email .
62-
- **Rule**: Return `HeuristicResult.FAIL` if the email format is invalid or the email domain has no MX records ; otherwise, return `HeuristicResult.PASS`.
61+
- **Description**: Checks if the package maintainer or author has a suspicious or invalid email.
62+
- **Rule**: Return `HeuristicResult.FAIL` if the email is invalid; otherwise, return `HeuristicResult.PASS`.
6363
- **Dependency**: None.
6464
### Source Code Analysis with Semgrep
6565
**PyPI Source Code Analyzer**

src/macaron/malware_analyzer/pypi_heuristics/metadata/fake_email.py

Lines changed: 30 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,10 @@
44
"""The heuristic analyzer to check the email address of the package maintainers."""
55

66
import logging
7-
import re
87

9-
import dns.resolver as dns_resolver
8+
from email_validator import EmailNotValidError, ValidatedEmail, validate_email
109

11-
from macaron.errors import HeuristicAnalyzerValueError
12-
from macaron.json_tools import JsonType
10+
from macaron.json_tools import JsonType, json_extract
1311
from macaron.malware_analyzer.pypi_heuristics.base_analyzer import BaseHeuristicAnalyzer
1412
from macaron.malware_analyzer.pypi_heuristics.heuristics import HeuristicResult, Heuristics
1513
from macaron.slsa_analyzer.package_registry.pypi_registry import PyPIPackageJsonAsset
@@ -27,7 +25,7 @@ def __init__(self) -> None:
2725
depends_on=None,
2826
)
2927

30-
def is_valid_email(self, email: str) -> bool:
28+
def is_valid_email(self, email: str) -> ValidatedEmail | None:
3129
"""Check if the email format is valid and the domain has MX records.
3230
3331
Parameters
@@ -37,26 +35,21 @@ def is_valid_email(self, email: str) -> bool:
3735
3836
Returns
3937
-------
40-
bool:
41-
``True`` if the email address is valid, ``False`` otherwise.
38+
ValidatedEmail | None
39+
The validated email object if the email is valid, otherwise None.
4240
4341
Raises
4442
------
4543
HeuristicAnalyzerValueError
4644
if the failure is due to DNS resolution.
4745
"""
48-
if not re.match(r"[^@]+@[^@]+\.[^@]+", email):
49-
return False
50-
51-
domain = email.split("@")[1]
46+
emailinfo = None
5247
try:
53-
records = dns_resolver.resolve(domain, "MX")
54-
if not records:
55-
return False
56-
except Exception as err:
57-
err_message = f"Failed to resolve domain {domain}: {err}"
58-
raise HeuristicAnalyzerValueError(err_message) from err
59-
return True
48+
emailinfo = validate_email(email, check_deliverability=True)
49+
except EmailNotValidError as err:
50+
err_message = f"Invalid email address: {email}. Error: {err}"
51+
logger.warning(err_message)
52+
return emailinfo
6053

6154
def analyze(self, pypi_package_json: PyPIPackageJsonAsset) -> tuple[HeuristicResult, dict[str, JsonType]]:
6255
"""Analyze the package.
@@ -76,16 +69,22 @@ def analyze(self, pypi_package_json: PyPIPackageJsonAsset) -> tuple[HeuristicRes
7669
HeuristicAnalyzerValueError
7770
if the analysis fails.
7871
"""
79-
data = pypi_package_json.package_json
80-
author_email = data.get("info", {}).get("author_email", None)
81-
maintainer_email = data.get("info", {}).get("maintainer_email", None)
82-
if maintainer_email is None and author_email is None:
83-
message = "No maintainers are available"
84-
return HeuristicResult.SKIP, {"message": message}
85-
86-
if author_email is not None and not self.is_valid_email(author_email):
87-
return HeuristicResult.FAIL, {"email": author_email}
88-
if maintainer_email is not None and not self.is_valid_email(maintainer_email):
89-
return HeuristicResult.FAIL, {"email": maintainer_email}
90-
91-
return HeuristicResult.PASS, {}
72+
package_json = pypi_package_json.package_json
73+
author_email = json_extract(package_json, ["info", "author_email"], str)
74+
maintainer_email = json_extract(package_json, ["info", "maintainer_email"], str)
75+
76+
if not author_email and not maintainer_email:
77+
return HeuristicResult.SKIP, {"message": "No author or maintainer email available."}
78+
79+
validated_emails: list[JsonType] = []
80+
details = ["normalized", "local_part", "domain"]
81+
82+
for email in [author_email, maintainer_email]:
83+
if email:
84+
email_info = self.is_valid_email(email)
85+
if not email_info:
86+
return HeuristicResult.FAIL, {"email": email}
87+
88+
validated_emails.append({key: getattr(email_info, key) for key in details})
89+
90+
return HeuristicResult.PASS, {"validated_emails": validated_emails}

tests/malware_analyzer/pypi/test_fake_email.py

Lines changed: 74 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,15 @@
88
from unittest.mock import MagicMock, patch
99

1010
import pytest
11+
from email_validator import EmailNotValidError
1112

1213
from macaron.malware_analyzer.pypi_heuristics.heuristics import HeuristicResult
1314
from macaron.malware_analyzer.pypi_heuristics.metadata.fake_email import FakeEmailAnalyzer
1415
from macaron.slsa_analyzer.package_registry.pypi_registry import PyPIPackageJsonAsset
1516

1617

1718
@pytest.fixture(name="analyzer")
18-
def analyzer_fixture() -> FakeEmailAnalyzer:
19+
def analyzer_() -> FakeEmailAnalyzer:
1920
"""Pytest fixture to create a FakeEmailAnalyzer instance."""
2021
return FakeEmailAnalyzer()
2122

@@ -24,132 +25,118 @@ def analyzer_fixture() -> FakeEmailAnalyzer:
2425
def pypi_package_json_asset_mock_fixture() -> MagicMock:
2526
"""Pytest fixture for a mock PyPIPackageJsonAsset."""
2627
mock_asset = MagicMock(spec=PyPIPackageJsonAsset)
27-
# Default to successful download, tests can override
28-
mock_asset.download = MagicMock(return_value=True)
29-
# package_json should be set by each test to simulate different PyPI responses
3028
mock_asset.package_json = {}
3129
return mock_asset
3230

3331

34-
@pytest.fixture(name="mock_dns_resolve")
35-
def mock_dns_resolve_fixture() -> Generator[MagicMock]:
36-
"""General purpose mock for dns.resolver.resolve.
32+
@pytest.fixture(name="mock_validate_email")
33+
def mock_validate_email_fixture() -> Generator[MagicMock]:
34+
"""Patch validate_email and mock its behavior."""
35+
with patch("macaron.malware_analyzer.pypi_heuristics.metadata.fake_email.validate_email") as mock:
36+
yield mock
3737

38-
Patches where dns_resolver is imported in the module under test.
39-
"""
40-
with patch("macaron.malware_analyzer.pypi_heuristics.metadata.fake_email.dns_resolver.resolve") as mock_resolve:
41-
# Default behavior: simulate successful MX record lookup.
42-
mock_mx_record = MagicMock()
43-
mock_mx_record.exchange = "mail.default-domain.com"
44-
mock_resolve.return_value = [mock_mx_record]
45-
yield mock_resolve
4638

47-
48-
# Tests for the analyze method
4939
def test_analyze_skip_no_emails_present(analyzer: FakeEmailAnalyzer, pypi_package_json_asset_mock: MagicMock) -> None:
5040
"""Test the analyzer skips if no author_email or maintainer_email is present."""
5141
pypi_package_json_asset_mock.package_json = {"info": {"author_email": None, "maintainer_email": None}}
5242
result, info = analyzer.analyze(pypi_package_json_asset_mock)
5343
assert result == HeuristicResult.SKIP
54-
assert info["message"] == "No maintainers are available"
44+
assert info["message"] == "No author or maintainer email available."
5545

5646

5747
def test_analyze_skip_no_info_key(analyzer: FakeEmailAnalyzer, pypi_package_json_asset_mock: MagicMock) -> None:
5848
"""Test the analyzer skips if 'info' key is missing in PyPI data."""
5949
pypi_package_json_asset_mock.package_json = {} # No 'info' key
6050
result, info = analyzer.analyze(pypi_package_json_asset_mock)
6151
assert result == HeuristicResult.SKIP
62-
assert info["message"] == "No maintainers are available"
52+
assert info["message"] == "No author or maintainer email available."
53+
6354

55+
def test_analyze_fail_invalid_email(
56+
analyzer: FakeEmailAnalyzer, pypi_package_json_asset_mock: MagicMock, mock_validate_email: MagicMock
57+
) -> None:
58+
"""Test analyzer fails for an invalid email format."""
59+
invalid_email = "invalid-email"
60+
pypi_package_json_asset_mock.package_json = {"info": {"author_email": invalid_email, "maintainer_email": None}}
61+
mock_validate_email.side_effect = EmailNotValidError("Invalid email.")
6462

65-
def test_analyze_fail_empty_author_email(analyzer: FakeEmailAnalyzer, pypi_package_json_asset_mock: MagicMock) -> None:
66-
"""Test analyzer fails for empty author_email string (maintainer_email is None)."""
67-
pypi_package_json_asset_mock.package_json = {"info": {"author_email": "", "maintainer_email": None}}
6863
result, info = analyzer.analyze(pypi_package_json_asset_mock)
64+
6965
assert result == HeuristicResult.FAIL
70-
assert info["email"] == ""
66+
assert info == {"email": invalid_email}
67+
mock_validate_email.assert_called_once_with(invalid_email, check_deliverability=True)
7168

7269

7370
def test_analyze_pass_only_maintainer_email_valid(
74-
analyzer: FakeEmailAnalyzer, pypi_package_json_asset_mock: MagicMock, mock_dns_resolve: MagicMock
71+
analyzer: FakeEmailAnalyzer, pypi_package_json_asset_mock: MagicMock, mock_validate_email: MagicMock
7572
) -> None:
7673
"""Test analyzer passes when only maintainer_email is present and valid."""
77-
mock_mx_record = MagicMock()
78-
mock_mx_record.exchange = "mail.example.net"
79-
mock_dns_resolve.return_value = [mock_mx_record]
74+
75+
pypi_package_json_asset_mock.package_json = {"info": {"author_email": None, "maintainer_email": email}}
76+
77+
mock_email_info = MagicMock()
78+
mock_email_info.normalized = "[email protected]"
79+
mock_email_info.local_part = "maintainer"
80+
mock_email_info.domain = "example.net"
81+
mock_validate_email.return_value = mock_email_info
8082

81-
pypi_package_json_asset_mock.package_json = {
82-
"info": {"author_email": None, "maintainer_email": "[email protected]"}
83-
}
8483
result, info = analyzer.analyze(pypi_package_json_asset_mock)
8584
assert result == HeuristicResult.PASS
86-
assert info == {}
87-
mock_dns_resolve.assert_called_once_with("example.net", "MX")
85+
assert info["validated_emails"] == [
86+
{"normalized": "[email protected]", "local_part": "maintainer", "domain": "example.net"}
87+
]
88+
mock_validate_email.assert_called_once_with(email, check_deliverability=True)
8889

8990

9091
def test_analyze_pass_both_emails_valid(
91-
analyzer: FakeEmailAnalyzer, pypi_package_json_asset_mock: MagicMock, mock_dns_resolve: MagicMock
92+
analyzer: FakeEmailAnalyzer, pypi_package_json_asset_mock: MagicMock, mock_validate_email: MagicMock
9293
) -> None:
9394
"""Test the analyzer passes when both emails are present and valid."""
9495

95-
def side_effect_dns_resolve(domain: str, record_type: str = "MX") -> list[MagicMock]:
96-
mock_mx = MagicMock()
97-
domains = {
98-
"MX": {"example.com", "example.net"},
99-
}
100-
if domain not in domains.get(record_type, set()):
101-
pytest.fail(f"Unexpected domain for DNS resolve: {domain}")
102-
mock_mx.exchange = f"mail.{domain}"
103-
return [mock_mx]
96+
def side_effect(email: str, check_deliverability: bool) -> MagicMock: # pylint: disable=unused-argument
97+
local_part, domain = email.split("@")
98+
mock_email_info = MagicMock()
99+
mock_email_info.normalized = email
100+
mock_email_info.local_part = local_part
101+
mock_email_info.domain = domain
102+
return mock_email_info
104103

105-
mock_dns_resolve.side_effect = side_effect_dns_resolve
104+
mock_validate_email.side_effect = side_effect
106105

107106
pypi_package_json_asset_mock.package_json = {
108107
"info": {"author_email": "[email protected]", "maintainer_email": "[email protected]"}
109108
}
110109
result, info = analyzer.analyze(pypi_package_json_asset_mock)
111110
assert result == HeuristicResult.PASS
112-
assert info == {}
113-
assert mock_dns_resolve.call_count == 2
114-
mock_dns_resolve.assert_any_call("example.com", "MX")
115-
mock_dns_resolve.assert_any_call("example.net", "MX")
116-
117-
118-
def test_analyze_fail_author_email_invalid_format(
119-
analyzer: FakeEmailAnalyzer, pypi_package_json_asset_mock: MagicMock, mock_dns_resolve: MagicMock
120-
) -> None:
121-
"""Test analyzer fails when author_email has an invalid format."""
122-
pypi_package_json_asset_mock.package_json = {
123-
"info": {"author_email": "bad_email_format", "maintainer_email": "[email protected]"}
124-
}
125-
result, info = analyzer.analyze(pypi_package_json_asset_mock)
126-
assert result == HeuristicResult.FAIL
127-
assert info["email"] == "bad_email_format"
128-
mock_dns_resolve.assert_not_called() # Regex check fails before DNS lookup
129-
130-
131-
# Tests for the is_valid_email method
132-
def test_is_valid_email_valid_email_with_mx(analyzer: FakeEmailAnalyzer, mock_dns_resolve: MagicMock) -> None:
133-
"""Test is_valid_email returns True for a valid email with MX records."""
134-
mock_mx_record = MagicMock()
135-
mock_mx_record.exchange = "mail.example.com"
136-
mock_dns_resolve.return_value = [mock_mx_record]
137-
assert analyzer.is_valid_email("[email protected]") is True
138-
mock_dns_resolve.assert_called_once_with("example.com", "MX")
139-
140-
141-
def test_is_valid_email_invalid_format(analyzer: FakeEmailAnalyzer, mock_dns_resolve: MagicMock) -> None:
142-
"""Test is_valid_email method with various invalid email formats."""
143-
assert not analyzer.is_valid_email("not_an_email")
144-
assert not analyzer.is_valid_email("test@")
145-
assert not analyzer.is_valid_email("@example.com")
146-
assert not analyzer.is_valid_email("test@example")
147-
assert not analyzer.is_valid_email("")
148-
mock_dns_resolve.assert_not_called()
149-
150-
151-
def test_is_valid_email_no_mx_records_returned(analyzer: FakeEmailAnalyzer, mock_dns_resolve: MagicMock) -> None:
152-
"""Test is_valid_email returns False if DNS resolve returns no MX records."""
153-
mock_dns_resolve.return_value = [] # Simulate no MX records found
154-
assert analyzer.is_valid_email("[email protected]") is False
155-
mock_dns_resolve.assert_called_once_with("no-mx-domain.com", "MX")
111+
assert mock_validate_email.call_count == 2
112+
113+
validated_emails = info.get("validated_emails")
114+
assert isinstance(validated_emails, list)
115+
assert len(validated_emails) == 2
116+
assert {"normalized": "[email protected]", "local_part": "author", "domain": "example.com"} in validated_emails
117+
assert {
118+
"normalized": "[email protected]",
119+
"local_part": "maintainer",
120+
"domain": "example.net",
121+
} in validated_emails
122+
123+
124+
def test_is_valid_email_success(analyzer: FakeEmailAnalyzer, mock_validate_email: MagicMock) -> None:
125+
"""Test is_valid_email returns the validation object on success."""
126+
mock_validated_email = MagicMock()
127+
mock_validated_email.normalized = "[email protected]"
128+
mock_validated_email.local_part = "test"
129+
mock_validated_email.domain = "example.com"
130+
131+
mock_validate_email.return_value = mock_validated_email
132+
result = analyzer.is_valid_email("[email protected]")
133+
assert result == mock_validated_email
134+
mock_validate_email.assert_called_once_with("[email protected]", check_deliverability=True)
135+
136+
137+
def test_is_valid_email_failure(analyzer: FakeEmailAnalyzer, mock_validate_email: MagicMock) -> None:
138+
"""Test is_valid_email returns None on failure."""
139+
mock_validate_email.side_effect = EmailNotValidError("The email address is not valid.")
140+
result = analyzer.is_valid_email("invalid-email")
141+
assert result is None
142+
mock_validate_email.assert_called_once_with("invalid-email", check_deliverability=True)

0 commit comments

Comments
 (0)