99import datetime
1010import json
1111import logging
12+ from collections .abc import Iterable
1213from enum import StrEnum
1314from importlib import metadata as importlib_metadata
1415from typing import TypedDict
1516
17+ import sqlalchemy
18+ from packageurl import PackageURL
19+ from sqlalchemy .orm import Session
20+
21+ from macaron .database .database_manager import get_db_manager
22+ from macaron .database .table_definitions import ProvenanceSubject
23+ from macaron .util import JsonType
24+
1625logger : logging .Logger = logging .getLogger (__name__ )
1726
1827# Note: The lint error "N815:mixedCase variable in class scope" is disabled for
@@ -135,11 +144,42 @@ class VerificationResult(StrEnum):
135144 PASSED = "PASSED"
136145
137146
147+ def get_common_purl_from_artifact_purls (purl_strs : Iterable [str ]) -> str | None :
148+ """Get a single common PackageURL given some artifact PackageURLs.
149+
150+ Assumption: A package may have more than one artifact. If each artifact is identified
151+ by a PackageURL, these PackageURLs still share the type, namespace, name, and
152+ version values. The common PackageURL contains these values.
153+ """
154+ try :
155+ purls = [PackageURL .from_string (_ ) for _ in purl_strs ]
156+ except ValueError :
157+ return None
158+
159+ purl_type = purls [0 ].type
160+ namespace = purls [0 ].namespace
161+ name = purls [0 ].name
162+ version = purls [0 ].version
163+
164+ for purl in purls :
165+ if any (
166+ [
167+ purl_type != purl .type ,
168+ namespace != purl .namespace ,
169+ name != purl .name ,
170+ version != purl .version ,
171+ ]
172+ ):
173+ return None
174+
175+ common_purl = PackageURL (type = purl_type , namespace = namespace , name = name , version = version )
176+ return str (common_purl )
177+
178+
138179def create_vsa_statement (
139- subject_purl : str ,
180+ passed_components : dict [ str , int ] ,
140181 policy_content : str ,
141- verification_result : VerificationResult ,
142- ) -> VsaStatement :
182+ ) -> VsaStatement | None :
143183 """Construct the Statement layer of the VSA.
144184
145185 Parameters
@@ -157,13 +197,49 @@ def create_vsa_statement(
157197 VsaStatement
158198 A Statement layer of the VSA.
159199 """
200+ subjects = []
201+
202+ try :
203+ with Session (get_db_manager ().engine ) as session , session .begin ():
204+ for purl , component_id in passed_components .items ():
205+ try :
206+ provenance_subject = (
207+ session .execute (
208+ sqlalchemy .select (ProvenanceSubject ).where (ProvenanceSubject .component_id == component_id )
209+ )
210+ .scalars ()
211+ .one ()
212+ )
213+ sha256 = provenance_subject .sha256
214+ except sqlalchemy .orm .exc .NoResultFound :
215+ sha256 = None
216+ logger .debug ("No digest stored for software component '%s'." , purl )
217+ except sqlalchemy .orm .exc .MultipleResultsFound as e :
218+ logger .debug (
219+ "Unexpected database query result. "
220+ "Expected no more than one result when retrieving SHA256 of a provenance subject. "
221+ "Error: %s" ,
222+ e ,
223+ )
224+ continue
225+
226+ subject : dict [str , JsonType ] = {
227+ "uri" : purl ,
228+ }
229+ if sha256 :
230+ subject ["digest" ] = {
231+ "sha256" : sha256 ,
232+ }
233+
234+ subjects .append (subject )
235+
236+ except sqlalchemy .exc .SQLAlchemyError as error :
237+ logger .debug ("Cannot retrieve hash digest of software components: %s." , error )
238+ return None
239+
160240 return VsaStatement (
161241 _type = "https://in-toto.io/Statement/v1" ,
162- subject = [
163- {
164- "uri" : subject_purl ,
165- }
166- ],
242+ subject = subjects ,
167243 predicateType = "https://slsa.dev/verification_summary/v1" ,
168244 predicate = VsaPredicate (
169245 verifier = Verifier (
@@ -173,34 +249,33 @@ def create_vsa_statement(
173249 },
174250 ),
175251 timeVerified = datetime .datetime .now (tz = datetime .UTC ).isoformat (),
176- resourceUri = subject_purl ,
252+ resourceUri = get_common_purl_from_artifact_purls ( passed_components . keys ()) or "" ,
177253 policy = {
178254 "content" : policy_content ,
179255 },
180- verificationResult = verification_result ,
256+ verificationResult = VerificationResult . PASSED ,
181257 verifiedLevels = [],
182258 ),
183259 )
184260
185261
186- def get_subject_verification_result (policy_result : dict ) -> tuple [str , VerificationResult ] | None :
187- """Get the PURL (string) and verification result of the single software component the policy applies to .
262+ def get_components_passing_policy (policy_result : dict ) -> dict [str , int ] | None :
263+ """Get the verification result in the form of PURLs and component ids of software artifacts passing the policy.
188264
189265 This is currently done by reading the facts of two relations:
190266 ``component_violates_policy``, and ``component_satisfies_policy``
191267 from the result of the policy engine.
192268
193- We define two PURLs to be different if the two PURL strings are different.
269+ The result of this function depends on the policy engine result.
270+
271+ If there exist any software component failing the policy, this function returns ``None``.
194272
195- The result of this function depends on the policy engine result:
273+ When all software components in the result pass the policy, if there exist multiple occurrences
274+ of the same PURL, this function returns the latest occurrence, which is the one with the highest
275+ component id, taking advantage of component ids being auto-incremented.
196276
197- - If there exist multiple different PURLs, this function returns ``None``.
198- - If there exist multiple occurrences of the same PURL and it is the only unique
199- PURL in the policy engine result, this function returns the latest occurrence,
200- which is the PURL that goes with the highest component ID, taking advantage of
201- component IDs being auto-incremented.
202- - If there is no PURL in the result, i.e. the policy applies to no software component
203- in the database, this function also returns ``None``.
277+ If there is no PURL in the result, i.e. the policy applies to no software component in the database,
278+ this function also returns ``None``.
204279
205280 Parameters
206281 ----------
@@ -210,53 +285,39 @@ def get_subject_verification_result(policy_result: dict) -> tuple[str, Verificat
210285
211286 Returns
212287 -------
213- tuple[str, VerificationResult] | None
214- A pair of PURL and verification result of the only software component that
215- the policy applies to, or ``None`` according to the aforementioned conditions.
288+ dict[str, int] | None
289+ A dictionary of software components passing the policy, or ``None`` if there is any
290+ component failing the policy or if there is no software component in the policy engine result.
291+ Each key is a PackageURL of the software component, and each value is the corresponding
292+ component id of that component.
216293 """
217294 component_violates_policy_facts = policy_result .get ("component_violates_policy" , [])
218295 component_satisfies_policy_facts = policy_result .get ("component_satisfies_policy" , [])
219296
297+ if len (component_violates_policy_facts ) > 0 :
298+ logger .info ("Encountered software component failing the policy. No VSA is generated." )
299+ return None
300+
220301 # key: PURL; value: result with the highest component id
221- component_results : dict [str , tuple [ int , VerificationResult ] ] = {}
302+ passed_components : dict [str , int ] = {}
222303
223- for component_id_string , purl , _ in component_violates_policy_facts :
224- try :
225- component_id = int (component_id_string )
226- except ValueError :
227- logger .error ("Expected component id %s to be an integer." , component_id_string )
228- return None
229- if purl not in component_results :
230- component_results [purl ] = (component_id , VerificationResult .FAILED )
231- else :
232- current_component_id , _ = component_results [purl ]
233- if component_id > current_component_id :
234- component_results [purl ] = (component_id , VerificationResult .FAILED )
235304 for component_id_string , purl , _ in component_satisfies_policy_facts :
236305 try :
237306 component_id = int (component_id_string )
238307 except ValueError :
239308 logger .error ("Expected component id %s to be an integer." , component_id_string )
240309 return None
241- if purl not in component_results :
242- component_results [purl ] = ( component_id , VerificationResult . PASSED )
310+ if purl not in passed_components :
311+ passed_components [purl ] = component_id
243312 else :
244- current_component_id , _ = component_results [purl ]
313+ current_component_id = passed_components [purl ]
245314 if component_id > current_component_id :
246- component_results [purl ] = (component_id , VerificationResult .PASSED )
247-
248- if len (component_results ) != 1 :
249- if len (component_results ) == 0 :
250- logger .info ("The policy applies to no software components." )
251- if len (component_results ) > 1 :
252- logger .info ("The policy applies to more than one software components." )
253- logger .info ("No VSA will be generated." )
254- return None
315+ passed_components [purl ] = component_id
255316
256- subject_purl = next ( iter ( component_results . keys ()))
257- _ , verification_result = component_results [ subject_purl ]
317+ if len ( passed_components ) == 0 :
318+ return None
258319
259- return subject_purl , verification_result
320+ return passed_components
260321
261322
262323def generate_vsa (policy_content : str , policy_result : dict ) -> Vsa | None :
@@ -275,17 +336,14 @@ def generate_vsa(policy_content: str, policy_result: dict) -> Vsa | None:
275336 The VSA, or ``None`` if generating a VSA is not appropriate according
276337 to the policy engine result.
277338 """
278- subject_verification_result = get_subject_verification_result (policy_result )
339+ passed_components = get_components_passing_policy (policy_result )
279340
280- if subject_verification_result is None :
341+ if passed_components is None :
281342 return None
282343
283- subject_purl , verification_result = subject_verification_result
284-
285344 unencoded_payload = create_vsa_statement (
286- subject_purl = subject_purl ,
345+ passed_components ,
287346 policy_content = policy_content ,
288- verification_result = verification_result ,
289347 )
290348
291349 try :
0 commit comments