Skip to content

Commit f88c543

Browse files
committed
Add PySec Live V2 Importer Pipeline #1981
* Add PySec Live V2 Importer * Add tests for the PySec Live V2 Importer * Tested functionally using the Live Evaluation API in #1969 Signed-off-by: Michael Ehab Mikhail <[email protected]>
1 parent dcb0511 commit f88c543

File tree

3 files changed

+270
-0
lines changed

3 files changed

+270
-0
lines changed

vulnerabilities/importers/__init__.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
from vulnerabilities.pipelines.v2_importers import postgresql_importer as postgresql_importer_v2
5858
from vulnerabilities.pipelines.v2_importers import pypa_importer as pypa_importer_v2
5959
from vulnerabilities.pipelines.v2_importers import pysec_importer as pysec_importer_v2
60+
from vulnerabilities.pipelines.v2_importers import pysec_live_importer as pysec_live_importer_v2
6061
from vulnerabilities.pipelines.v2_importers import redhat_importer as redhat_importer_v2
6162
from vulnerabilities.pipelines.v2_importers import vulnrichment_importer as vulnrichment_importer_v2
6263
from vulnerabilities.pipelines.v2_importers import xen_importer as xen_importer_v2
@@ -117,3 +118,9 @@
117118
oss_fuzz.OSSFuzzImporter,
118119
]
119120
)
121+
122+
LIVE_IMPORTERS_REGISTRY = create_registry(
123+
[
124+
pysec_live_importer_v2.PySecLiveImporterPipeline,
125+
]
126+
)
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
#
2+
# Copyright (c) nexB Inc. and others. All rights reserved.
3+
# VulnerableCode is a trademark of nexB Inc.
4+
# SPDX-License-Identifier: Apache-2.0
5+
#
6+
7+
from io import BytesIO
8+
from typing import Iterable
9+
from zipfile import ZipFile
10+
11+
from packageurl import PackageURL
12+
from univers.versions import PypiVersion
13+
14+
from vulnerabilities.importer import AdvisoryData
15+
from vulnerabilities.pipelines.v2_importers.pysec_importer import PyPIImporterPipeline
16+
17+
18+
class PySecLiveImporterPipeline(PyPIImporterPipeline):
19+
"""
20+
PySec Live Importer Pipeline
21+
22+
Collect advisories from OSV PyPI zip for a single PURL.
23+
"""
24+
25+
pipeline_id = "pysec_live_importer_v2"
26+
supported_types = ["pypi"]
27+
28+
@classmethod
29+
def steps(cls):
30+
return (
31+
cls.get_purl_inputs,
32+
cls.fetch_zip,
33+
cls.collect_and_store_advisories,
34+
)
35+
36+
def get_purl_inputs(self):
37+
purl = self.inputs["purl"]
38+
if not purl:
39+
raise ValueError("PURL is required for PySecLiveImporterPipeline")
40+
41+
if isinstance(purl, str):
42+
purl = PackageURL.from_string(purl)
43+
44+
if not isinstance(purl, PackageURL):
45+
raise ValueError(f"Object of type {type(purl)} {purl!r} is not a PackageURL instance")
46+
47+
if purl.type not in self.supported_types:
48+
raise ValueError(
49+
f"PURL: {purl!s} is not among the supported package types {self.supported_types!r}"
50+
)
51+
52+
if not purl.version:
53+
raise ValueError(f"PURL: {purl!s} is expected to have a version")
54+
55+
self.purl = purl
56+
57+
def _is_version_affected(self, advisory_dict, version):
58+
affected = advisory_dict.get("affected", [])
59+
try:
60+
v = PypiVersion(version)
61+
except Exception:
62+
return False
63+
for entry in affected:
64+
ranges = entry.get("ranges", [])
65+
for r in ranges:
66+
events = r.get("events", [])
67+
introduced = None
68+
fixed = None
69+
for event in events:
70+
if "introduced" in event:
71+
introduced = event["introduced"]
72+
if "fixed" in event:
73+
fixed = event["fixed"]
74+
try:
75+
if introduced:
76+
introduced_v = PypiVersion(introduced)
77+
if v < introduced_v:
78+
continue
79+
if fixed:
80+
fixed_v = PypiVersion(fixed)
81+
if v >= fixed_v:
82+
continue
83+
if introduced:
84+
introduced_v = PypiVersion(introduced)
85+
if (not fixed or v < PypiVersion(fixed)) and v >= introduced_v:
86+
return True
87+
except Exception:
88+
continue
89+
return False
90+
91+
def collect_advisories(self) -> Iterable[AdvisoryData]:
92+
from vulnerabilities.importers.osv import parse_advisory_data_v2
93+
94+
with ZipFile(BytesIO(self.advisory_zip)) as zip_file:
95+
for file_name in zip_file.namelist():
96+
if not file_name.startswith("PYSEC-"):
97+
continue
98+
with zip_file.open(file_name) as f:
99+
import json
100+
101+
advisory_dict = json.load(f)
102+
103+
affected = advisory_dict.get("affected", [])
104+
found = False
105+
for entry in affected:
106+
pkg = entry.get("package", {})
107+
if pkg.get("name") == self.purl.name:
108+
found = True
109+
break
110+
if not found:
111+
continue
112+
if not self._is_version_affected(advisory_dict, self.purl.version):
113+
continue
114+
115+
f.seek(0)
116+
advisory_text = f.read().decode("utf-8")
117+
yield parse_advisory_data_v2(
118+
raw_data=advisory_dict,
119+
supported_ecosystems=["pypi"],
120+
advisory_url=self.url,
121+
advisory_text=advisory_text,
122+
)
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
import json
2+
from io import BytesIO
3+
from unittest.mock import patch
4+
from zipfile import ZipFile
5+
6+
import pytest
7+
from packageurl import PackageURL
8+
9+
from vulnerabilities.importer import AdvisoryData
10+
11+
12+
@pytest.fixture
13+
def mock_zip_data():
14+
# Create a zip with two advisories for the same package with different versions
15+
zip_buffer = BytesIO()
16+
with ZipFile(zip_buffer, mode="w") as zip_file:
17+
advisory1 = {
18+
"advisory_id": "PYSEC-1001",
19+
"summary": "Vuln in foo",
20+
"affected": [
21+
{
22+
"package": {"name": "foo", "ecosystem": "PyPI"},
23+
"ranges": [
24+
{
25+
"type": "ECOSYSTEM",
26+
"events": [{"introduced": "1.0.0"}, {"fixed": "2.0.0"}],
27+
}
28+
],
29+
}
30+
],
31+
}
32+
advisory2 = {
33+
"advisory_id": "PYSEC-1002",
34+
"summary": "Vuln in foo, later version",
35+
"affected": [
36+
{
37+
"package": {"name": "foo", "ecosystem": "PyPI"},
38+
"ranges": [
39+
{
40+
"type": "ECOSYSTEM",
41+
"events": [{"introduced": "2.5.0"}, {"fixed": "3.0.0"}],
42+
}
43+
],
44+
}
45+
],
46+
}
47+
advisory3 = {
48+
"advisory_id": "PYSEC-2000",
49+
"summary": "Vuln in bar",
50+
"affected": [
51+
{
52+
"package": {"name": "bar", "ecosystem": "PyPI"},
53+
"ranges": [
54+
{
55+
"type": "ECOSYSTEM",
56+
"events": [{"introduced": "0.1.0"}, {"fixed": "0.2.0"}],
57+
}
58+
],
59+
}
60+
],
61+
}
62+
zip_file.writestr("PYSEC-1001.json", json.dumps(advisory1))
63+
zip_file.writestr("PYSEC-1002.json", json.dumps(advisory2))
64+
zip_file.writestr("PYSEC-2000.json", json.dumps(advisory3))
65+
zip_buffer.seek(0)
66+
return zip_buffer
67+
68+
69+
def test_package_with_version_affected(mock_zip_data):
70+
from vulnerabilities.pipelines.v2_importers.pysec_live_importer import PySecLiveImporterPipeline
71+
72+
purl = PackageURL(type="pypi", name="foo", version="1.5.0")
73+
74+
with patch("requests.get") as mock_get:
75+
mock_get.return_value.content = mock_zip_data.read()
76+
77+
with patch("vulnerabilities.importers.osv.parse_advisory_data_v2") as mock_parse:
78+
79+
def parse_side_effect(raw_data, supported_ecosystems, advisory_url, advisory_text):
80+
return AdvisoryData(
81+
advisory_id=raw_data["advisory_id"],
82+
summary=raw_data["summary"],
83+
references_v2=[{"url": advisory_url}],
84+
affected_packages=[],
85+
weaknesses=[],
86+
url=advisory_url,
87+
)
88+
89+
mock_parse.side_effect = parse_side_effect
90+
91+
pipeline = PySecLiveImporterPipeline(purl=purl)
92+
pipeline.get_purl_inputs()
93+
pipeline.fetch_zip()
94+
advisories = list(pipeline.collect_advisories())
95+
96+
# Only PYSEC-1001 should match
97+
assert len(advisories) == 1
98+
assert advisories[0].advisory_id == "PYSEC-1001"
99+
100+
101+
def test_package_with_version_not_affected(mock_zip_data):
102+
from vulnerabilities.pipelines.v2_importers.pysec_live_importer import PySecLiveImporterPipeline
103+
104+
purl = PackageURL(type="pypi", name="foo", version="2.2.0")
105+
106+
with patch("requests.get") as mock_get:
107+
mock_get.return_value.content = mock_zip_data.read()
108+
109+
with patch("vulnerabilities.importers.osv.parse_advisory_data_v2") as mock_parse:
110+
mock_parse.return_value = AdvisoryData(
111+
advisory_id="PYSEC-1002",
112+
summary="Vuln in foo, later version",
113+
references_v2=[{"url": "dummy"}],
114+
affected_packages=[],
115+
weaknesses=[],
116+
url="dummy",
117+
)
118+
119+
pipeline = PySecLiveImporterPipeline(purl=purl)
120+
pipeline.get_purl_inputs()
121+
pipeline.fetch_zip()
122+
advisories = list(pipeline.collect_advisories())
123+
124+
# No advisories should match
125+
assert len(advisories) == 0
126+
127+
128+
def test_nonexistent_package(mock_zip_data):
129+
from vulnerabilities.pipelines.v2_importers.pysec_live_importer import PySecLiveImporterPipeline
130+
131+
purl = PackageURL(type="pypi", name="baz", version="1.0.0")
132+
133+
with patch("requests.get") as mock_get:
134+
mock_get.return_value.content = mock_zip_data.read()
135+
136+
pipeline = PySecLiveImporterPipeline(purl=purl)
137+
pipeline.get_purl_inputs()
138+
pipeline.fetch_zip()
139+
advisories = list(pipeline.collect_advisories())
140+
141+
assert len(advisories) == 0

0 commit comments

Comments
 (0)