Skip to content

Commit

Permalink
Extract jvm version from openjdk vulnerabilities group (#3)
Browse files Browse the repository at this point in the history
* extract affected from the ojvg site properly

* fix test

* delete unneeded code

* clean up prints

* reformat

* karianna comments

* forgot to save before last commit

* add comments and move version intersection to separate function
  • Loading branch information
Scanteianu authored Apr 25, 2024
1 parent c5b4ccc commit dce886d
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 44 deletions.
86 changes: 61 additions & 25 deletions cvereporter/fetch_vulnerabilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ def retrieve_cves_from_internet(date: str) -> str:
"Referer": "http://www.google.com/",
},
)
print(r)
except requests.exceptions.ReadTimeout:
return None
if r.status_code == 404:
Expand All @@ -64,6 +63,40 @@ def parse_to_cyclone(resp_text: str, date: str) -> list[Vulnerability]:
return dict_to_vulns(dicts)


def populate_column_headers(column_headers, header):
if "CVE ID" in header.text:
current_column_header = header
while current_column_header is not None:
column_headers.append(current_column_header.text)
current_column_header = current_column_header.find_next_sibling("th")


# Extracted_affected is the top level versions affected by any cves in this OJVG Email.
# Affected_major_versions is the major java versions affected by this particular cve.
# This function figures out which minor versions belong to the affected major versions.
# This isn't a great heuristic (two cves might affect different minor versions of the same major version),
# but it's the best we can get from the OJVG email.
def intersect_major_versions_with_extracted_affected(
extracted_affected, affected_major_versions
):
affected_versions = []
for version in extracted_affected:
if (
"." in version
and int(version[0 : version.index(".")]) in affected_major_versions
):
affected_versions.append(version)

elif (
"u" in version
and int(version[0 : version.index("u")]) in affected_major_versions
):
affected_versions.append(version)
elif version.isnumeric() and int(version) in affected_major_versions:
affected_versions.append(version)
return affected_versions


def parse_to_dict(resp_text: str, date: str) -> list[dict]:
if resp_text is None:
return None
Expand All @@ -79,9 +112,9 @@ def parse_to_dict(resp_text: str, date: str) -> list[dict]:
# find all the rows in the table
rows = table.find_all("tr")
dicts = []
column_headers = []
# fetch CVE data from first td in each row
for row in rows:

# find the versions in the first row
header = row.find("th")
versions = []
Expand All @@ -92,28 +125,27 @@ def parse_to_dict(resp_text: str, date: str) -> list[dict]:
while score.find_next_sibling("th") is not None:
versions.append(score.find_next_sibling("th").text)
score = score.find_next_sibling("th")
# extract table column headers
populate_column_headers(column_headers, header)
print(column_headers)

cve = row.find("td")
affected_major_versions = []
index = 0
for column in row.find_all("td"):
if column.text == "•":
affected_major_versions.append(int(column_headers[index]))
index += 1
if cve is not None:
id = cve.text
if cve.text == "None":
continue
link = cve.find("a")["href"]
componentsTD = cve.find_next_sibling("td")
component = componentsTD.text.replace("\n", "")
scoreTD = componentsTD.find_next_sibling("td")
score = scoreTD.text

versionCheck = scoreTD
affected_versions = []
affected_versions += (
extracted_affected # todo - maybe just the extracted ones
affected_versions = intersect_major_versions_with_extracted_affected(
extracted_affected, affected_major_versions
)
for version in versions:
versionCheck = versionCheck.find_next_sibling("td")
if versionCheck.text == "•":
affected_versions.append(int(version))

parsed_data = {}
parsed_data["id"] = id
parsed_data["url"] = link
Expand All @@ -130,10 +162,11 @@ def dict_to_vulns(dicts: list[dict]) -> list[Vulnerability]:
vulnerabilities = []
for parsed_data in dicts:
affects = BomTarget(ref=parsed_data["component"])
# for v in parsed_data["affected"]:
# todo: this is not actually true - the affected versions are just for the whole report
# we need to extract affected versions on a per cve basis, not a per ojvg report basis
# affects.versions.add(v)
for v in parsed_data["affected"]:
# todo: we assume that the affected versions are an intersection between the dots on the grid
# and the list of all affected versions. This may not necessarily be true, if there are multiple cves
# one that affects one minor version and another that affects another, within the same major version
affects.versions.add(v)
vuln = Vulnerability(
id=parsed_data["id"],
source=VulnerabilitySource(
Expand All @@ -147,13 +180,21 @@ def dict_to_vulns(dicts: list[dict]) -> list[Vulnerability]:
)
vuln.affects.add(affects)
vulnerabilities.append(vuln)
# print(vuln)
return vulnerabilities


"""
We assume the text for the affected versions is in a block like:
"The following vulnerabilities in OpenJDK source code were fixed in this release.
The affected versions are 12, 11.0.2, 8u202, 7u211, and earlier.
We recommend that you upgrade as soon as possible."
"""


def extract_affected(header_string: str) -> list[str]:
header_string = header_string.replace("\r", "").replace("\n", " ")
# print(header_string)
affected = []
start_vulns = "The affected versions are "
end_vulns = "Please note that defense-in-depth issues"
Expand All @@ -163,13 +204,8 @@ def extract_affected(header_string: str) -> list[str]:
header_string.index(start_vulns)
+ len(start_vulns) : header_string.index(end_vulns)
]
# print(vulns_sub)
for ver in vulns_sub.split(","):
ver = ver.strip()
if "earlier" not in ver:
affected.append(ver)
# print(affected)
return affected


# fetch_cves('2023-01-17')
11 changes: 8 additions & 3 deletions cvereporter/nist_enhance.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,12 @@ def enhance(vulns: list[Vulnerability]):
)
vuln.ratings.add(vr)
vuln.description = relevant["description"]
for affects in vuln.affects:
for ver in relevant["versions"]:
affects.versions.add(ver)
# for now - we use versions we extract when we download from OpenJDK Vulnerability group
# this version extraction is tied to the Oracle JDKs which might not map directly to openjdk versions
# that approach also has limitations: we have to do a bit of guesswork mapping cves to versions
extract_versions_from_nist = False
if extract_versions_from_nist:
for affects in vuln.affects:
for ver in relevant["versions"]:
affects.versions.add(ver)
# print(vuln)
3 changes: 1 addition & 2 deletions ojvg_download.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,10 @@
It downloads all the vulnerability reports as html files to the `data` directory and saves the relevant data in `data/ojvg_summary.json`
"""
start_date = date(2019, 1, 1)
end_date = date(2024, 2, 4)
end_date = date.today()
current_date = start_date
responses = []
while current_date < end_date:

date_str = current_date.strftime("%Y-%m-%d")
print(date_str)
resp = fetch_vulnerabilities.fetch_dicts(date_str)
Expand Down
43 changes: 29 additions & 14 deletions tests/test_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,41 @@
from cvereporter import fetch_vulnerabilities, nist_enhance
import json


# To run a single test: python3 -m pytest -v -k test_fetch -s (in this case, runs "test_fetch")
def test_fetch():
with open("tests/data/open_jvg_dump_2023-01-17.html","r") as data:
with open("tests/data/open_jvg_dump_2023-01-17.html", "r") as data:
vulns = fetch_vulnerabilities.parse_to_cyclone(data, "2023-01-17")
assert(len(vulns)==3)
#todo: do some better assertions on the actual vulnerability contents here
assert(vulns[0].id == "CVE-2023-21835")
assert(list(vulns[0].affects)[0].ref == "security-libs/javax.net.ssl")
assert(vulns[1].id == "CVE-2023-21830")
assert(list(vulns[1].affects)[0].ref == "other-libs")
assert(vulns[2].id == "CVE-2023-21843")
assert(list(vulns[2].affects)[0].ref == "client-libs/javax.sound")

print(vulns)
assert len(vulns) == 3
# todo: do some better assertions on the actual vulnerability contents here
assert vulns[0].id == "CVE-2023-21835"
assert list(vulns[0].affects)[0].ref == "security-libs/javax.net.ssl"
assert vulns[1].id == "CVE-2023-21830"
assert list(vulns[1].affects)[0].ref == "other-libs"
assert vulns[2].id == "CVE-2023-21843"
assert list(vulns[2].affects)[0].ref == "client-libs/javax.sound"
assert len(list(vulns[1].affects)[0].versions) == 2


def test_parse_to_dict():
with open("tests/data/open_jvg_dump_2023-01-17.html", "r") as data:
vulns = fetch_vulnerabilities.parse_to_dict(data, "2023-01-17")
print(vulns)
for cve in vulns:
if cve["id"] == "CVE-2023-21830":
assert len(cve["affected"]) == 2


def test_nist_parse():
with open("tests/data/nist_CVE-2023-21830.json", "r") as file_data:
nist_data = json.load(file_data)["data"]
relevant_parts = nist_enhance.extract_relevant_parts(nist_data)
rtg = relevant_parts["ratings"][0]
desc = relevant_parts["description"]
assert(rtg["source"] == '[email protected]')
assert(rtg["score"] == 5.3)
assert(rtg["severity"] == "MEDIUM")
assert(rtg["vector"] == "CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:N/I:L/A:N")
assert(len(relevant_parts["versions"])==4)
assert rtg["source"] == "[email protected]"
assert rtg["score"] == 5.3
assert rtg["severity"] == "MEDIUM"
assert rtg["vector"] == "CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:N/I:L/A:N"
assert len(relevant_parts["versions"]) == 4

0 comments on commit dce886d

Please sign in to comment.