diff --git a/cvereporter/fetch_vulnerabilities.py b/cvereporter/fetch_vulnerabilities.py index 6cd38ff..7ee637f 100644 --- a/cvereporter/fetch_vulnerabilities.py +++ b/cvereporter/fetch_vulnerabilities.py @@ -47,7 +47,6 @@ def retrieve_cves_from_internet(date: str) -> str: "Referer": "http://www.google.com/", }, ) - print(r) except requests.exceptions.ReadTimeout: return None if r.status_code == 404: @@ -64,6 +63,40 @@ def parse_to_cyclone(resp_text: str, date: str) -> list[Vulnerability]: return dict_to_vulns(dicts) +def populate_column_headers(column_headers, header): + if "CVE ID" in header.text: + current_column_header = header + while current_column_header is not None: + column_headers.append(current_column_header.text) + current_column_header = current_column_header.find_next_sibling("th") + + +# Extracted_affected is the top level versions affected by any cves in this OJVG Email. +# Affected_major_versions is the major java versions affected by this particular cve. +# This function figures out which minor versions belong to the affected major versions. +# This isn't a great heuristic (two cves might affect different minor versions of the same major version), +# but it's the best we can get from the OJVG email. +def intersect_major_versions_with_extracted_affected( + extracted_affected, affected_major_versions +): + affected_versions = [] + for version in extracted_affected: + if ( + "." in version + and int(version[0 : version.index(".")]) in affected_major_versions + ): + affected_versions.append(version) + + elif ( + "u" in version + and int(version[0 : version.index("u")]) in affected_major_versions + ): + affected_versions.append(version) + elif version.isnumeric() and int(version) in affected_major_versions: + affected_versions.append(version) + return affected_versions + + def parse_to_dict(resp_text: str, date: str) -> list[dict]: if resp_text is None: return None @@ -79,9 +112,9 @@ def parse_to_dict(resp_text: str, date: str) -> list[dict]: # find all the rows in the table rows = table.find_all("tr") dicts = [] + column_headers = [] # fetch CVE data from first td in each row for row in rows: - # find the versions in the first row header = row.find("th") versions = [] @@ -92,8 +125,17 @@ def parse_to_dict(resp_text: str, date: str) -> list[dict]: while score.find_next_sibling("th") is not None: versions.append(score.find_next_sibling("th").text) score = score.find_next_sibling("th") + # extract table column headers + populate_column_headers(column_headers, header) + print(column_headers) cve = row.find("td") + affected_major_versions = [] + index = 0 + for column in row.find_all("td"): + if column.text == "•": + affected_major_versions.append(int(column_headers[index])) + index += 1 if cve is not None: id = cve.text if cve.text == "None": @@ -101,19 +143,9 @@ def parse_to_dict(resp_text: str, date: str) -> list[dict]: link = cve.find("a")["href"] componentsTD = cve.find_next_sibling("td") component = componentsTD.text.replace("\n", "") - scoreTD = componentsTD.find_next_sibling("td") - score = scoreTD.text - - versionCheck = scoreTD - affected_versions = [] - affected_versions += ( - extracted_affected # todo - maybe just the extracted ones + affected_versions = intersect_major_versions_with_extracted_affected( + extracted_affected, affected_major_versions ) - for version in versions: - versionCheck = versionCheck.find_next_sibling("td") - if versionCheck.text == "•": - affected_versions.append(int(version)) - parsed_data = {} parsed_data["id"] = id parsed_data["url"] = link @@ -130,10 +162,11 @@ def dict_to_vulns(dicts: list[dict]) -> list[Vulnerability]: vulnerabilities = [] for parsed_data in dicts: affects = BomTarget(ref=parsed_data["component"]) - # for v in parsed_data["affected"]: - # todo: this is not actually true - the affected versions are just for the whole report - # we need to extract affected versions on a per cve basis, not a per ojvg report basis - # affects.versions.add(v) + for v in parsed_data["affected"]: + # todo: we assume that the affected versions are an intersection between the dots on the grid + # and the list of all affected versions. This may not necessarily be true, if there are multiple cves + # one that affects one minor version and another that affects another, within the same major version + affects.versions.add(v) vuln = Vulnerability( id=parsed_data["id"], source=VulnerabilitySource( @@ -147,13 +180,21 @@ def dict_to_vulns(dicts: list[dict]) -> list[Vulnerability]: ) vuln.affects.add(affects) vulnerabilities.append(vuln) - # print(vuln) return vulnerabilities +""" +We assume the text for the affected versions is in a block like: + +"The following vulnerabilities in OpenJDK source code were fixed in this release. +The affected versions are 12, 11.0.2, 8u202, 7u211, and earlier. +We recommend that you upgrade as soon as possible." + +""" + + def extract_affected(header_string: str) -> list[str]: header_string = header_string.replace("\r", "").replace("\n", " ") - # print(header_string) affected = [] start_vulns = "The affected versions are " end_vulns = "Please note that defense-in-depth issues" @@ -163,13 +204,8 @@ def extract_affected(header_string: str) -> list[str]: header_string.index(start_vulns) + len(start_vulns) : header_string.index(end_vulns) ] - # print(vulns_sub) for ver in vulns_sub.split(","): ver = ver.strip() if "earlier" not in ver: affected.append(ver) - # print(affected) return affected - - -# fetch_cves('2023-01-17') diff --git a/cvereporter/nist_enhance.py b/cvereporter/nist_enhance.py index 5990a50..c283276 100644 --- a/cvereporter/nist_enhance.py +++ b/cvereporter/nist_enhance.py @@ -102,7 +102,12 @@ def enhance(vulns: list[Vulnerability]): ) vuln.ratings.add(vr) vuln.description = relevant["description"] - for affects in vuln.affects: - for ver in relevant["versions"]: - affects.versions.add(ver) + # for now - we use versions we extract when we download from OpenJDK Vulnerability group + # this version extraction is tied to the Oracle JDKs which might not map directly to openjdk versions + # that approach also has limitations: we have to do a bit of guesswork mapping cves to versions + extract_versions_from_nist = False + if extract_versions_from_nist: + for affects in vuln.affects: + for ver in relevant["versions"]: + affects.versions.add(ver) # print(vuln) diff --git a/ojvg_download.py b/ojvg_download.py index fd6558f..e9d5bce 100644 --- a/ojvg_download.py +++ b/ojvg_download.py @@ -7,11 +7,10 @@ It downloads all the vulnerability reports as html files to the `data` directory and saves the relevant data in `data/ojvg_summary.json` """ start_date = date(2019, 1, 1) -end_date = date(2024, 2, 4) +end_date = date.today() current_date = start_date responses = [] while current_date < end_date: - date_str = current_date.strftime("%Y-%m-%d") print(date_str) resp = fetch_vulnerabilities.fetch_dicts(date_str) diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py index 1ec5c17..30f4a32 100644 --- a/tests/test_pipeline.py +++ b/tests/test_pipeline.py @@ -1,17 +1,32 @@ from cvereporter import fetch_vulnerabilities, nist_enhance import json + +# To run a single test: python3 -m pytest -v -k test_fetch -s (in this case, runs "test_fetch") def test_fetch(): - with open("tests/data/open_jvg_dump_2023-01-17.html","r") as data: + with open("tests/data/open_jvg_dump_2023-01-17.html", "r") as data: vulns = fetch_vulnerabilities.parse_to_cyclone(data, "2023-01-17") - assert(len(vulns)==3) - #todo: do some better assertions on the actual vulnerability contents here - assert(vulns[0].id == "CVE-2023-21835") - assert(list(vulns[0].affects)[0].ref == "security-libs/javax.net.ssl") - assert(vulns[1].id == "CVE-2023-21830") - assert(list(vulns[1].affects)[0].ref == "other-libs") - assert(vulns[2].id == "CVE-2023-21843") - assert(list(vulns[2].affects)[0].ref == "client-libs/javax.sound") + + print(vulns) + assert len(vulns) == 3 + # todo: do some better assertions on the actual vulnerability contents here + assert vulns[0].id == "CVE-2023-21835" + assert list(vulns[0].affects)[0].ref == "security-libs/javax.net.ssl" + assert vulns[1].id == "CVE-2023-21830" + assert list(vulns[1].affects)[0].ref == "other-libs" + assert vulns[2].id == "CVE-2023-21843" + assert list(vulns[2].affects)[0].ref == "client-libs/javax.sound" + assert len(list(vulns[1].affects)[0].versions) == 2 + + +def test_parse_to_dict(): + with open("tests/data/open_jvg_dump_2023-01-17.html", "r") as data: + vulns = fetch_vulnerabilities.parse_to_dict(data, "2023-01-17") + print(vulns) + for cve in vulns: + if cve["id"] == "CVE-2023-21830": + assert len(cve["affected"]) == 2 + def test_nist_parse(): with open("tests/data/nist_CVE-2023-21830.json", "r") as file_data: @@ -19,8 +34,8 @@ def test_nist_parse(): relevant_parts = nist_enhance.extract_relevant_parts(nist_data) rtg = relevant_parts["ratings"][0] desc = relevant_parts["description"] - assert(rtg["source"] == 'secalert_us@oracle.com') - assert(rtg["score"] == 5.3) - assert(rtg["severity"] == "MEDIUM") - assert(rtg["vector"] == "CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:N/I:L/A:N") - assert(len(relevant_parts["versions"])==4) \ No newline at end of file + assert rtg["source"] == "secalert_us@oracle.com" + assert rtg["score"] == 5.3 + assert rtg["severity"] == "MEDIUM" + assert rtg["vector"] == "CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:N/I:L/A:N" + assert len(relevant_parts["versions"]) == 4