|
1 | | -# Copyright (c) 2022 - 2023, Oracle and/or its affiliates. All rights reserved. |
| 1 | +# Copyright (c) 2022 - 2024, Oracle and/or its affiliates. All rights reserved. |
2 | 2 | # Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. |
3 | 3 |
|
4 | 4 | """This module contains the loaders for SLSA provenances.""" |
5 | 5 |
|
6 | 6 | import base64 |
| 7 | +import configparser |
7 | 8 | import gzip |
8 | 9 | import json |
9 | 10 | import zlib |
| 11 | +from urllib.parse import urlparse |
10 | 12 |
|
| 13 | +from macaron.config.defaults import defaults |
11 | 14 | from macaron.slsa_analyzer.provenance.intoto import InTotoPayload, validate_intoto_payload |
12 | 15 | from macaron.slsa_analyzer.provenance.intoto.errors import LoadIntotoAttestationError, ValidateInTotoPayloadError |
13 | | -from macaron.util import JsonType |
| 16 | +from macaron.util import JsonType, send_get_http_raw |
14 | 17 |
|
15 | 18 |
|
16 | | -def load_provenance_file(filepath: str) -> dict[str, JsonType]: |
17 | | - """Load a provenance file and obtain the payload. |
| 19 | +def _try_read_url_link_file(file_content: bytes) -> str | None: |
| 20 | + parser = configparser.ConfigParser() |
| 21 | + try: |
| 22 | + parser.read_string(file_content.decode()) |
| 23 | + return parser.get("InternetShortcut", "url", fallback=None) |
| 24 | + except (configparser.Error, UnicodeDecodeError): |
| 25 | + return None |
18 | 26 |
|
19 | | - Inside a provenance file is a DSSE envelope containing a base64-encoded |
20 | | - provenance JSON payload. See: https://github.com/secure-systems-lab/dsse. |
21 | | - If the file is gzipped, it will be transparently decompressed. |
22 | 27 |
|
23 | | - Parameters |
24 | | - ---------- |
25 | | - filepath : str |
26 | | - Path to the provenance file. |
| 28 | +def _download_url_file_content(url: str, url_link_hostname_allowlist: list[str]) -> bytes: |
| 29 | + hostname = urlparse(url).hostname |
| 30 | + if hostname is None or hostname == "": |
| 31 | + raise LoadIntotoAttestationError("Cannot resolve URL link file: invalid URL") |
| 32 | + if hostname not in url_link_hostname_allowlist: |
| 33 | + raise LoadIntotoAttestationError( |
| 34 | + "Cannot resolve URL link file: target hostname '" + hostname + "' is not in allowed hostnames." |
| 35 | + ) |
27 | 36 |
|
28 | | - Returns |
29 | | - ------- |
30 | | - dict[str, JsonType] |
31 | | - The provenance JSON payload. |
| 37 | + # TODO download size limit? |
| 38 | + timeout = defaults.getint("downloads", "timeout", fallback=120) |
| 39 | + response = send_get_http_raw(url=url, headers=None, timeout=timeout) |
| 40 | + if response is None: |
| 41 | + raise LoadIntotoAttestationError("Cannot resolve URL link file: Failed to download file") |
| 42 | + if response.status_code != 200: |
| 43 | + raise LoadIntotoAttestationError( |
| 44 | + "Cannot resolve URL link file: Failed to download file, error " + str(response.status_code) |
| 45 | + ) |
| 46 | + return response.content |
| 47 | + |
| 48 | + |
| 49 | +def _load_provenance_file_content( |
| 50 | + file_content: bytes, url_link_hostname_allowlist: list[str], url_link_depth_limit: int = 5 |
| 51 | +) -> dict[str, JsonType]: |
| 52 | + url_link_depth = 0 |
| 53 | + while url_link_depth <= url_link_depth_limit: |
| 54 | + url = _try_read_url_link_file(file_content) |
| 55 | + if url is None: |
| 56 | + break |
| 57 | + if url_link_depth == url_link_depth_limit: |
| 58 | + raise LoadIntotoAttestationError("Cannot resolve URL link file: depth limit exceeded") |
| 59 | + file_content = _download_url_file_content(url, url_link_hostname_allowlist) |
| 60 | + url_link_depth = url_link_depth + 1 |
32 | 61 |
|
33 | | - Raises |
34 | | - ------ |
35 | | - LoadIntotoAttestationError |
36 | | - If there is an error loading the provenance JSON payload. |
37 | | - """ |
38 | 62 | try: |
39 | 63 | try: |
40 | | - with gzip.open(filepath, mode="rt", encoding="utf-8") as file: |
41 | | - provenance = json.load(file) |
| 64 | + decompressed_file_content = gzip.decompress(file_content) |
| 65 | + provenance = json.loads(decompressed_file_content.decode()) |
42 | 66 | except (gzip.BadGzipFile, EOFError, zlib.error): |
43 | | - with open(filepath, encoding="utf-8") as file: |
44 | | - provenance = json.load(file) |
45 | | - except (OSError, json.JSONDecodeError, TypeError) as error: |
| 67 | + provenance = json.loads(file_content.decode()) |
| 68 | + except (json.JSONDecodeError, TypeError, UnicodeDecodeError) as error: |
46 | 69 | raise LoadIntotoAttestationError( |
47 | 70 | "Cannot deserialize the file content as JSON.", |
48 | 71 | ) from error |
@@ -71,6 +94,41 @@ def load_provenance_file(filepath: str) -> dict[str, JsonType]: |
71 | 94 | return json_payload |
72 | 95 |
|
73 | 96 |
|
| 97 | +def load_provenance_file(filepath: str) -> dict[str, JsonType]: |
| 98 | + """Load a provenance file and obtain the payload. |
| 99 | +
|
| 100 | + Inside a provenance file is a DSSE envelope containing a base64-encoded |
| 101 | + provenance JSON payload. See: https://github.com/secure-systems-lab/dsse. |
| 102 | + If the file is gzipped, it will be transparently decompressed. |
| 103 | + If the file is a URL file (Windows .url file format, i.e. an ini file with |
| 104 | + a "URL" field inside an "InternetShortcut" section), it will be transparently |
| 105 | + downloaded. |
| 106 | +
|
| 107 | + Parameters |
| 108 | + ---------- |
| 109 | + filepath : str |
| 110 | + Path to the provenance file. |
| 111 | +
|
| 112 | + Returns |
| 113 | + ------- |
| 114 | + dict[str, JsonType] |
| 115 | + The provenance JSON payload. |
| 116 | +
|
| 117 | + Raises |
| 118 | + ------ |
| 119 | + LoadIntotoAttestationError |
| 120 | + If there is an error loading the provenance JSON payload. |
| 121 | + """ |
| 122 | + try: |
| 123 | + with open(filepath, mode="rb") as file: |
| 124 | + file_content = file.read() |
| 125 | + return _load_provenance_file_content( |
| 126 | + file_content, defaults.get_list("slsa.verifier", "url_link_hostname_allowlist", fallback=[]) |
| 127 | + ) |
| 128 | + except OSError as error: |
| 129 | + raise LoadIntotoAttestationError("Cannot open file.") from error |
| 130 | + |
| 131 | + |
74 | 132 | def load_provenance_payload(filepath: str) -> InTotoPayload: |
75 | 133 | """Load, verify, and construct an in-toto payload. |
76 | 134 |
|
|
0 commit comments