Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 74 additions & 20 deletions exifread_case/exifread_case.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@
import hashlib
import argparse
import logging
import typing
import uuid
import warnings

import case_utils.inherent_uuid
import case_utils.local_uuid
import exifread
import rdflib
Expand All @@ -23,8 +27,31 @@
ns_kb = rdflib.Namespace("http://example.org/kb/")


def get_node_iri(ns: rdflib.Namespace, prefix: str) -> rdflib.URIRef:
node_id = rdflib.URIRef(f"{prefix}{case_utils.local_uuid.demo_uuid()}", ns)
def get_node_iri(
ns: rdflib.Namespace,
prefix: str,
*args: typing.Any,
facet_class: typing.Optional[rdflib.URIRef] = None,
uco_object_node: typing.Optional[rdflib.URIRef] = None,
use_deterministic_uuids: bool = False,
**kwargs: typing.Any
) -> rdflib.URIRef:
node_id: typing.Optional[rdflib.URIRef] = None
if use_deterministic_uuids:
if uco_object_node is None:
warnings.warn("get_node_iri() called requesting deterministic UUIDs, but no UcoObject node was provided.")
else:
if uco_object_node is None:
warnings.warn("get_node_iri() called requesting deterministic UUIDs, but no Facet class node was provided.")
else:
_node_id = case_utils.inherent_id.get_facet_uriref(uco_object_node, facet_class, namespace=ns)
# Swap in the requested prefix value.
_node_uuid = str(_node_id)[-36:]
node_id = ns[f"{prefix}{_node_uuid}"]

if node_id is None:
node_id = ns[f"{prefix}{case_utils.local_uuid.demo_uuid()}"]

return node_id


Expand Down Expand Up @@ -72,17 +99,18 @@ def create_exif_dict(tags):
return exif


def n_cyber_object_to_node(graph):
def n_cyber_object_to_node(graph, *args: typing.Any, use_deterministic_uuids: bool = False, **kwargs: typing.Any):
"""
Initial function to create nodes for each of the file's facet nodes
:param graph: rdflib graph object for adding nodes to
:return: The four nodes for each fo the other functions to fill
"""
cyber_object = rdflib.URIRef(get_node_iri(ns=ns_kb, prefix="observableobject-"))
n_raster_facet = rdflib.URIRef(get_node_iri(ns=ns_kb, prefix="rasterpicture-"))
n_file_facet = rdflib.URIRef(get_node_iri(ns=ns_kb, prefix="filefacet-"))
n_content_facet = rdflib.URIRef(get_node_iri(ns=ns_kb, prefix="contentfacet-"))
n_exif_facet = rdflib.URIRef(get_node_iri(ns=ns_kb, prefix="exiffacet-"))

n_raster_facet = rdflib.URIRef(get_node_iri(ns=ns_kb, prefix="rasterpicture-", facet_class=NS_UCO_OBSERVABLE.RasterPictureFacet, uco_object_node=cyber_object, use_deterministic_uuids=use_deterministic_uuids))
n_file_facet = rdflib.URIRef(get_node_iri(ns=ns_kb, prefix="filefacet-", facet_class=NS_UCO_OBSERVABLE.FileFacet, uco_object_node=cyber_object, use_deterministic_uuids=use_deterministic_uuids))
n_content_facet = rdflib.URIRef(get_node_iri(ns=ns_kb, prefix="contentfacet-", facet_class=NS_UCO_OBSERVABLE.ContentDataFacet, uco_object_node=cyber_object, use_deterministic_uuids=use_deterministic_uuids))
n_exif_facet = rdflib.URIRef(get_node_iri(ns=ns_kb, prefix="exiffacet-", facet_class=NS_UCO_OBSERVABLE.EXIFFacet, uco_object_node=cyber_object, use_deterministic_uuids=use_deterministic_uuids))
graph.add((
cyber_object,
NS_RDF.type,
Expand Down Expand Up @@ -111,15 +139,14 @@ def n_cyber_object_to_node(graph):
return n_exif_facet, n_raster_facet, n_file_facet, n_content_facet


def filecontent_object_to_node(graph, n_content_facet, file_information):
def filecontent_object_to_node(graph, n_content_facet, file_information, *args: typing.Any, use_deterministic_uuids: bool = False, **kwargs: typing.Any):
"""
Unused: Create a node that will add the file content facet node to the graph
:param graph: rdflib graph object for adding nodes to
:param n_content_facet: Blank node to contain all content facet information
:param n_content_facet: Node to contain all content facet information
:param file_information: Dictionary containing information about file being analysed
:return: None
"""
file_hash_facet = rdflib.URIRef(get_node_iri(ns=ns_kb, prefix="hash-"))
graph.add((
n_content_facet,
NS_RDF.type,
Expand All @@ -143,16 +170,38 @@ def filecontent_object_to_node(graph, n_content_facet, file_information):
rdflib.term.Literal(file_information["size"],
datatype=NS_XSD.integer)
))
graph.add((
n_content_facet,
NS_UCO_OBSERVABLE.hash,
file_hash_facet
))
graph.add((
file_hash_facet,
NS_RDF.type,
NS_UCO_TYPES.Hash
))

if "SHA256" in file_information:
hash_method = rdflib.Literal("SHA256", datatype=NS_UCO_VOCABULARY.HashNameVocab)
hash_value = rdflib.Literal(file_information["SHA256"], datatype=NS_XSD.hexBinary)

file_hash: rdflib.URIRef
if use_deterministic_uuids:
file_hash_uuid: uuid.UUID = case_utils.inherent_uuid.hash_method_value_uuid(hash_method, hash_value, namespace=ns_kb)
file_hash = ns_kb["hash-" + str(file_hash_uuid)]
else:
file_hash = get_node_iri(ns=ns_kb, prefix="hash-")

graph.add((
n_content_facet,
NS_UCO_OBSERVABLE.hash,
file_hash
))
graph.add((
file_hash,
NS_RDF.type,
NS_UCO_TYPES.Hash
))
graph.add((
file_hash,
NS_UCO_TYPES.hashMethod,
hash_method
))
graph.add((
file_hash,
NS_UCO_TYPES.hashValue,
hash_value
))


def filefacets_object_to_node(graph, n_file_facet, file_information):
Expand Down Expand Up @@ -309,6 +358,11 @@ def main():
"""
parser = argparse.ArgumentParser()
parser.add_argument("file", help="file to extract exif data from")
parser.add_argument(
"--use-deterministic-uuids",
action="store_true",
help="Use UUIDs computed using the case_utils.inherent_uuid module.",
)
args = parser.parse_args()
local_file = args.file
file_info = get_file_info(local_file)
Expand Down