From e506d8390d8789331a6a5c57112072121039af59 Mon Sep 17 00:00:00 2001 From: Nick Le Mouton Date: Sun, 27 Apr 2025 13:03:55 +1200 Subject: [PATCH 1/7] Convert unit tests to use pytest --- poetry.lock | 155 +++++++++- pyproject.toml | 7 +- tests/test_private_func.py | 227 ++++++-------- tests/test_pytmfunc.py | 612 ++++++++++++++++++------------------- 4 files changed, 531 insertions(+), 470 deletions(-) diff --git a/poetry.lock b/poetry.lock index a299b41c..ec18d6fb 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,17 +1,152 @@ +# This file is automatically @generated by Poetry 2.1.2 and should not be changed by hand. + +[[package]] +name = "colorama" +version = "0.4.6" +description = "Cross-platform colored terminal text." +optional = false +python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,!=3.5.*,!=3.6.*,>=2.7" +groups = ["main"] +markers = "sys_platform == \"win32\"" +files = [ + {file = "colorama-0.4.6-py2.py3-none-any.whl", hash = "sha256:4f1d9991f5acc0ca119f9d443620b77f9d6b33703e51011c16baf57afb285fc6"}, + {file = "colorama-0.4.6.tar.gz", hash = "sha256:08695f5cb7ed6e0531a20572697297273c47b8cae5a63ffc6d6ed5c201be6e44"}, +] + +[[package]] +name = "exceptiongroup" +version = "1.2.2" +description = "Backport of PEP 654 (exception groups)" +optional = false +python-versions = ">=3.7" +groups = ["main"] +markers = "python_version < \"3.11\"" +files = [ + {file = "exceptiongroup-1.2.2-py3-none-any.whl", hash = "sha256:3111b9d131c238bec2f8f516e123e14ba243563fb135d3fe885990585aa7795b"}, + {file = "exceptiongroup-1.2.2.tar.gz", hash = "sha256:47c2edf7c6738fafb49fd34290706d1a1a2f4d1c6df275526b62cbb4aa5393cc"}, +] + +[package.extras] +test = ["pytest (>=6)"] + +[[package]] +name = "iniconfig" +version = "2.1.0" +description = "brain-dead simple config-ini parsing" +optional = false +python-versions = ">=3.8" +groups = ["main"] +files = [ + {file = "iniconfig-2.1.0-py3-none-any.whl", hash = "sha256:9deba5723312380e77435581c6bf4935c94cbfab9b1ed33ef8d238ea168eb760"}, + {file = "iniconfig-2.1.0.tar.gz", hash = "sha256:3abbd2e30b36733fee78f9c7f7308f2d0050e88f0087fd25c2645f63c773e1c7"}, +] + +[[package]] +name = "packaging" +version = "25.0" +description = "Core utilities for Python packages" +optional = false +python-versions = ">=3.8" +groups = ["main"] +files = [ + {file = "packaging-25.0-py3-none-any.whl", hash = "sha256:29572ef2b1f17581046b3a2227d5c611fb25ec70ca1ba8554b24b0e69331a484"}, + {file = "packaging-25.0.tar.gz", hash = "sha256:d443872c98d677bf60f6a1f2f8c1cb748e8fe762d2bf9d3148b5599295b0fc4f"}, +] + +[[package]] +name = "pluggy" +version = "1.5.0" +description = "plugin and hook calling mechanisms for python" +optional = false +python-versions = ">=3.8" +groups = ["main"] +files = [ + {file = "pluggy-1.5.0-py3-none-any.whl", hash = "sha256:44e1ad92c8ca002de6377e165f3e0f1be63266ab4d554740532335b9d75ea669"}, + {file = "pluggy-1.5.0.tar.gz", hash = "sha256:2cffa88e94fdc978c4c574f15f9e59b7f4201d439195c3715ca9e2486f1d0cf1"}, +] + +[package.extras] +dev = ["pre-commit", "tox"] +testing = ["pytest", "pytest-benchmark"] + [[package]] -category = "main" -description = "a pure Python Database Abstraction Layer (for python version 2.7 and 3.x)" name = "pydal" +version = "20200714.1" +description = "a pure Python Database Abstraction Layer (for python version 2.7 and 3.x)" optional = false python-versions = "*" -version = "20200714.1" +groups = ["main"] +files = [ + {file = "pydal-20200714.1.tar.gz", hash = "sha256:dd35b8ecb009099cce7efa72a40707d2e9bdcdf85924f30683a52d5172d1242f"}, +] -[metadata] -content-hash = "74d2d05a266a2c8725df1f70157ccff43acaed599da9ca304291ec35f8a993cd" -lock-version = "1.0" -python-versions = "^3.8" +[[package]] +name = "pytest" +version = "8.3.5" +description = "pytest: simple powerful testing with Python" +optional = false +python-versions = ">=3.8" +groups = ["main"] +files = [ + {file = "pytest-8.3.5-py3-none-any.whl", hash = "sha256:c69214aa47deac29fad6c2a4f590b9c4a9fdb16a403176fe154b79c0b4d4d820"}, + {file = "pytest-8.3.5.tar.gz", hash = "sha256:f4efe70cc14e511565ac476b57c279e12a855b11f48f212af1080ef2263d3845"}, +] -[metadata.files] -pydal = [ - {file = "pydal-20200714.1.tar.gz", hash = "sha256:dd35b8ecb009099cce7efa72a40707d2e9bdcdf85924f30683a52d5172d1242f"}, +[package.dependencies] +colorama = {version = "*", markers = "sys_platform == \"win32\""} +exceptiongroup = {version = ">=1.0.0rc8", markers = "python_version < \"3.11\""} +iniconfig = "*" +packaging = "*" +pluggy = ">=1.5,<2" +tomli = {version = ">=1", markers = "python_version < \"3.11\""} + +[package.extras] +dev = ["argcomplete", "attrs (>=19.2)", "hypothesis (>=3.56)", "mock", "pygments (>=2.7.2)", "requests", "setuptools", "xmlschema"] + +[[package]] +name = "tomli" +version = "2.2.1" +description = "A lil' TOML parser" +optional = false +python-versions = ">=3.8" +groups = ["main"] +markers = "python_version < \"3.11\"" +files = [ + {file = "tomli-2.2.1-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:678e4fa69e4575eb77d103de3df8a895e1591b48e740211bd1067378c69e8249"}, + {file = "tomli-2.2.1-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:023aa114dd824ade0100497eb2318602af309e5a55595f76b626d6d9f3b7b0a6"}, + {file = "tomli-2.2.1-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:ece47d672db52ac607a3d9599a9d48dcb2f2f735c6c2d1f34130085bb12b112a"}, + {file = "tomli-2.2.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6972ca9c9cc9f0acaa56a8ca1ff51e7af152a9f87fb64623e31d5c83700080ee"}, + {file = "tomli-2.2.1-cp311-cp311-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:c954d2250168d28797dd4e3ac5cf812a406cd5a92674ee4c8f123c889786aa8e"}, + {file = "tomli-2.2.1-cp311-cp311-musllinux_1_2_aarch64.whl", hash = "sha256:8dd28b3e155b80f4d54beb40a441d366adcfe740969820caf156c019fb5c7ec4"}, + {file = "tomli-2.2.1-cp311-cp311-musllinux_1_2_i686.whl", hash = "sha256:e59e304978767a54663af13c07b3d1af22ddee3bb2fb0618ca1593e4f593a106"}, + {file = "tomli-2.2.1-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:33580bccab0338d00994d7f16f4c4ec25b776af3ffaac1ed74e0b3fc95e885a8"}, + {file = "tomli-2.2.1-cp311-cp311-win32.whl", hash = "sha256:465af0e0875402f1d226519c9904f37254b3045fc5084697cefb9bdde1ff99ff"}, + {file = "tomli-2.2.1-cp311-cp311-win_amd64.whl", hash = "sha256:2d0f2fdd22b02c6d81637a3c95f8cd77f995846af7414c5c4b8d0545afa1bc4b"}, + {file = "tomli-2.2.1-cp312-cp312-macosx_10_13_x86_64.whl", hash = "sha256:4a8f6e44de52d5e6c657c9fe83b562f5f4256d8ebbfe4ff922c495620a7f6cea"}, + {file = "tomli-2.2.1-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:8d57ca8095a641b8237d5b079147646153d22552f1c637fd3ba7f4b0b29167a8"}, + {file = "tomli-2.2.1-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:4e340144ad7ae1533cb897d406382b4b6fede8890a03738ff1683af800d54192"}, + {file = "tomli-2.2.1-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:db2b95f9de79181805df90bedc5a5ab4c165e6ec3fe99f970d0e302f384ad222"}, + {file = "tomli-2.2.1-cp312-cp312-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:40741994320b232529c802f8bc86da4e1aa9f413db394617b9a256ae0f9a7f77"}, + {file = "tomli-2.2.1-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:400e720fe168c0f8521520190686ef8ef033fb19fc493da09779e592861b78c6"}, + {file = "tomli-2.2.1-cp312-cp312-musllinux_1_2_i686.whl", hash = "sha256:02abe224de6ae62c19f090f68da4e27b10af2b93213d36cf44e6e1c5abd19fdd"}, + {file = "tomli-2.2.1-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:b82ebccc8c8a36f2094e969560a1b836758481f3dc360ce9a3277c65f374285e"}, + {file = "tomli-2.2.1-cp312-cp312-win32.whl", hash = "sha256:889f80ef92701b9dbb224e49ec87c645ce5df3fa2cc548664eb8a25e03127a98"}, + {file = "tomli-2.2.1-cp312-cp312-win_amd64.whl", hash = "sha256:7fc04e92e1d624a4a63c76474610238576942d6b8950a2d7f908a340494e67e4"}, + {file = "tomli-2.2.1-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:f4039b9cbc3048b2416cc57ab3bda989a6fcf9b36cf8937f01a6e731b64f80d7"}, + {file = "tomli-2.2.1-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:286f0ca2ffeeb5b9bd4fcc8d6c330534323ec51b2f52da063b11c502da16f30c"}, + {file = "tomli-2.2.1-cp313-cp313-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a92ef1a44547e894e2a17d24e7557a5e85a9e1d0048b0b5e7541f76c5032cb13"}, + {file = "tomli-2.2.1-cp313-cp313-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:9316dc65bed1684c9a98ee68759ceaed29d229e985297003e494aa825ebb0281"}, + {file = "tomli-2.2.1-cp313-cp313-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:e85e99945e688e32d5a35c1ff38ed0b3f41f43fad8df0bdf79f72b2ba7bc5272"}, + {file = "tomli-2.2.1-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:ac065718db92ca818f8d6141b5f66369833d4a80a9d74435a268c52bdfa73140"}, + {file = "tomli-2.2.1-cp313-cp313-musllinux_1_2_i686.whl", hash = "sha256:d920f33822747519673ee656a4b6ac33e382eca9d331c87770faa3eef562aeb2"}, + {file = "tomli-2.2.1-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:a198f10c4d1b1375d7687bc25294306e551bf1abfa4eace6650070a5c1ae2744"}, + {file = "tomli-2.2.1-cp313-cp313-win32.whl", hash = "sha256:d3f5614314d758649ab2ab3a62d4f2004c825922f9e370b29416484086b264ec"}, + {file = "tomli-2.2.1-cp313-cp313-win_amd64.whl", hash = "sha256:a38aa0308e754b0e3c67e344754dff64999ff9b513e691d0e786265c93583c69"}, + {file = "tomli-2.2.1-py3-none-any.whl", hash = "sha256:cb55c73c5f4408779d0cf3eef9f762b9c9f147a77de7b258bef0a5628adc85cc"}, + {file = "tomli-2.2.1.tar.gz", hash = "sha256:cd45e1dc79c835ce60f7404ec8119f2eb06d38b1deba146f07ced3bbc44505ff"}, ] + +[metadata] +lock-version = "2.1" +python-versions = "^3.8 || ^3.9 || ^3.10 || ^3.11" +content-hash = "bca85be723f8e0ddc479a7f7dbf75a0322a1ac05ed3b3db56789ec7a1e139eda" diff --git a/pyproject.toml b/pyproject.toml index eb209f8e..daf6bc22 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -6,11 +6,12 @@ authors = ["pytm Team"] license = "MIT License" [tool.poetry.dependencies] -python = "^3.11 || ^3.10 || ^3.9 || ^3.8" +python = "^3.8 || ^3.9 || ^3.10 || ^3.11" pydal = "~20200714.1" +pytest = "^8.3.5" [tool.poetry.dev-dependencies] [build-system] -requires = ["poetry>=0.12"] -build-backend = "poetry.masonry.api" +requires = ["poetry-core>=1.0.0"] +build-backend = "poetry.core.masonry.api" diff --git a/tests/test_private_func.py b/tests/test_private_func.py index 4ce0816e..7f7b06e0 100644 --- a/tests/test_private_func.py +++ b/tests/test_private_func.py @@ -1,5 +1,5 @@ import random -import unittest +import pytest from pytm.pytm import ( TM, @@ -18,8 +18,7 @@ encode_threat_data, ) - -class TestUniqueNames(unittest.TestCase): +class TestUniqueNames: def test_duplicate_boundary_names_have_different_unique_names(self): random.seed(0) object_1 = Boundary("foo") @@ -28,66 +27,57 @@ def test_duplicate_boundary_names_have_different_unique_names(self): object_1_uniq_name = object_1._uniq_name() object_2_uniq_name = object_2._uniq_name() - self.assertNotEqual(object_1_uniq_name, object_2_uniq_name) - self.assertEqual(object_1_uniq_name, "boundary_foo_acf3059e70") - self.assertEqual(object_2_uniq_name, "boundary_foo_88f2d9c06f") - + assert object_1_uniq_name != object_2_uniq_name + assert object_1_uniq_name == "boundary_foo_acf3059e70" + assert object_2_uniq_name == "boundary_foo_88f2d9c06f" -class TestAttributes(unittest.TestCase): +class TestAttributes: def test_write_once(self): user = Actor("User") - with self.assertRaises(ValueError): + with pytest.raises(ValueError): user.name = "Computer" def test_kwargs(self): user = Actor("User", isAdmin=True) - self.assertEqual(user.isAdmin, True) + assert user.isAdmin is True user = Actor("User") - self.assertEqual(user.isAdmin, False) + assert user.isAdmin is False user.isAdmin = True - self.assertEqual(user.isAdmin, True) + assert user.isAdmin is True def test_load_threats(self): tm = TM("TM") - self.assertNotEqual(len(TM._threats), 0) - with self.assertRaises(UIError): + assert len(TM._threats) != 0 + with pytest.raises(UIError): tm.threatsFile = "threats.json" - - with self.assertRaises(UIError): + with pytest.raises(UIError): TM("TM", threatsFile="threats.json") def test_responses(self): tm = TM("my test tm", description="aa", isOrdered=True) - user = Actor("User") web = Server("Web Server") db = Datastore("SQL Database") - http_req = Dataflow(user, web, "http req") insert = Dataflow(web, db, "insert data") query = Dataflow(web, db, "query") query_resp = Dataflow(db, web, "query results", responseTo=query) http_resp = Dataflow(web, user, "http resp") http_resp.responseTo = http_req - - self.assertTrue(tm.check()) - - self.assertEqual(http_req.response, http_resp) - self.assertIs(http_resp.isResponse, True) - - self.assertIs(query_resp.isResponse, True) - self.assertEqual(query_resp.responseTo, query) - self.assertEqual(query.response, query_resp) - - self.assertIsNone(insert.response) - self.assertIs(insert.isResponse, False) + assert tm.check() + assert http_req.response == http_resp + assert http_resp.isResponse is True + assert query_resp.isResponse is True + assert query_resp.responseTo == query + assert query.response == query_resp + assert insert.response is None + assert insert.isResponse is False def test_defaults(self): tm = TM("TM") user_data = Data("HTTP") user = Actor("User", data=user_data) - user.controls.authenticatesDestination=True - + user.controls.authenticatesDestination = True json_data = Data("JSON") server = Server( "Server", port=443, protocol="HTTPS", isEncrypted=True, data=json_data @@ -99,10 +89,9 @@ def test_defaults(self): protocol="PostgreSQL", data=sql_resp, ) - db.controls.isEncrypted=False + db.controls.isEncrypted = False db.type = DatastoreType.SQL worker = Process("Task queue worker") - req_get_data = Data("HTTP GET") req_get = Dataflow(user, server, "HTTP GET", data=req_get_data) server_query_data = Data("SQL") @@ -111,108 +100,82 @@ def test_defaults(self): result = Dataflow(db, server, "Results", data=result_data, isResponse=True) resp_get_data = Data("HTTP Response") resp_get = Dataflow(server, user, "HTTP Response", data=resp_get_data, isResponse=True) - test_assumption = Assumption("test assumption") resp_get.assumptions = [test_assumption] - req_post_data = Data("JSON") req_post = Dataflow(user, server, "HTTP POST", data=req_post_data) resp_post = Dataflow(server, user, "HTTP Response", isResponse=True) test_assumption_exclude = Assumption("test assumption", exclude=["ABCD", "BCDE"]) resp_post.assumptions = [test_assumption_exclude] - sql_data = Data("SQL") worker_query = Dataflow(worker, db, "Query", data=sql_data) Dataflow(db, worker, "Results", isResponse=True) - cookie = Data("Auth Cookie", carriedBy=[req_get, req_post]) - - self.assertTrue(tm.check()) - - self.assertEqual(req_get.srcPort, -1) - self.assertEqual(req_get.dstPort, server.port) - self.assertEqual(req_get.controls.isEncrypted, server.controls.isEncrypted) - self.assertEqual( - req_get.controls.authenticatesDestination, user.controls.authenticatesDestination - ) - self.assertEqual(req_get.protocol, server.protocol) - self.assertTrue(user.data.issubset(req_get.data)) - - self.assertEqual(server_query.srcPort, -1) - self.assertEqual(server_query.dstPort, db.port) - self.assertEqual(server_query.controls.isEncrypted, db.controls.isEncrypted) - self.assertEqual( - server_query.controls.authenticatesDestination, server.controls.authenticatesDestination - ) - self.assertEqual(server_query.protocol, db.protocol) - self.assertTrue(server.data.issubset(server_query.data)) - - self.assertEqual(result.srcPort, db.port) - self.assertEqual(result.dstPort, -1) - self.assertEqual(result.controls.isEncrypted, db.controls.isEncrypted) - self.assertEqual(result.controls.authenticatesDestination, False) - self.assertEqual(result.protocol, db.protocol) - self.assertTrue(db.data.issubset(result.data)) - self.assertListEqual(db.assumptions, []) - - self.assertEqual(resp_get.srcPort, server.port) - self.assertEqual(resp_get.dstPort, -1) - self.assertEqual(resp_get.controls.isEncrypted, server.controls.isEncrypted) - self.assertEqual(resp_get.controls.authenticatesDestination, False) - self.assertEqual(resp_get.protocol, server.protocol) - self.assertTrue(server.data.issubset(resp_get.data)) - self.assertListEqual(resp_get.assumptions, [test_assumption]) - - self.assertEqual(req_post.srcPort, -1) - self.assertEqual(req_post.dstPort, server.port) - self.assertEqual(req_post.controls.isEncrypted, server.controls.isEncrypted) - self.assertEqual( - req_post.controls.authenticatesDestination, user.controls.authenticatesDestination - ) - self.assertEqual(req_post.protocol, server.protocol) - self.assertTrue(user.data.issubset(req_post.data)) - - self.assertEqual(resp_post.srcPort, server.port) - self.assertEqual(resp_post.dstPort, -1) - self.assertEqual(resp_post.controls.isEncrypted, server.controls.isEncrypted) - self.assertEqual(resp_post.controls.authenticatesDestination, False) - self.assertEqual(resp_post.protocol, server.protocol) - self.assertTrue(server.data.issubset(resp_post.data)) - self.assertListEqual(resp_post.assumptions, [test_assumption_exclude]) - self.assertSetEqual(resp_post.assumptions[0].exclude, test_assumption_exclude.exclude) - - self.assertListEqual(server.inputs, [req_get, req_post]) - self.assertListEqual(server.outputs, [server_query]) - self.assertListEqual(worker.inputs, []) - self.assertListEqual(worker.outputs, [worker_query]) - - self.assertListEqual(cookie.carriedBy, [req_get, req_post]) - self.assertSetEqual(set(cookie.processedBy), set([user, server])) - self.assertIn(cookie, req_get.data) - self.assertSetEqual( - set([d.name for d in req_post.data]), set([cookie.name, "HTTP", "JSON"]) - ) - - -class TestMethod(unittest.TestCase): + assert tm.check() + assert req_get.srcPort == -1 + assert req_get.dstPort == server.port + assert req_get.controls.isEncrypted == server.controls.isEncrypted + assert req_get.controls.authenticatesDestination == user.controls.authenticatesDestination + assert req_get.protocol == server.protocol + assert user.data.issubset(req_get.data) + assert server_query.srcPort == -1 + assert server_query.dstPort == db.port + assert server_query.controls.isEncrypted == db.controls.isEncrypted + assert server_query.controls.authenticatesDestination == server.controls.authenticatesDestination + assert server_query.protocol == db.protocol + assert server.data.issubset(server_query.data) + assert result.srcPort == db.port + assert result.dstPort == -1 + assert result.controls.isEncrypted == db.controls.isEncrypted + assert result.controls.authenticatesDestination is False + assert result.protocol == db.protocol + assert db.data.issubset(result.data) + assert db.assumptions == [] + assert resp_get.srcPort == server.port + assert resp_get.dstPort == -1 + assert resp_get.controls.isEncrypted == server.controls.isEncrypted + assert resp_get.controls.authenticatesDestination is False + assert resp_get.protocol == server.protocol + assert server.data.issubset(resp_get.data) + assert resp_get.assumptions == [test_assumption] + assert req_post.srcPort == -1 + assert req_post.dstPort == server.port + assert req_post.controls.isEncrypted == server.controls.isEncrypted + assert req_post.controls.authenticatesDestination == user.controls.authenticatesDestination + assert req_post.protocol == server.protocol + assert user.data.issubset(req_post.data) + assert resp_post.srcPort == server.port + assert resp_post.dstPort == -1 + assert resp_post.controls.isEncrypted == server.controls.isEncrypted + assert resp_post.controls.authenticatesDestination is False + assert resp_post.protocol == server.protocol + assert server.data.issubset(resp_post.data) + assert resp_post.assumptions == [test_assumption_exclude] + assert resp_post.assumptions[0].exclude == set(test_assumption_exclude.exclude) + assert server.inputs == [req_get, req_post] + assert server.outputs == [server_query] + assert worker.inputs == [] + assert worker.outputs == [worker_query] + assert cookie.carriedBy == [req_get, req_post] + assert set(cookie.processedBy) == set([user, server]) + assert cookie in req_get.data + assert set([d.name for d in req_post.data]) == set([cookie.name, "HTTP", "JSON"]) + +class TestMethod: def test_defaults(self): tm = TM("my test tm", description="aa", isOrdered=True) - internet = Boundary("Internet") cloud = Boundary("Cloud") - user = Actor("User", inBoundary=internet) server = Server("Server") db = Datastore("DB", inBoundary=cloud) db.type = DatastoreType.SQL func = Datastore("Lambda function", inBoundary=cloud) - request = Dataflow(user, server, "request") response = Dataflow(server, user, "response", isResponse=True) user_query = Dataflow(user, db, "user query") server_query = Dataflow(server, db, "server query") func_query = Dataflow(func, db, "func query") - default_target = ["Actor", "Boundary", "Dataflow", "Datastore", "Server"] testCases = [ {"target": server, "condition": "target.oneOf(Server, Datastore)"}, @@ -235,21 +198,12 @@ def test_defaults(self): "for f in target.outputs)", }, ] - - self.assertTrue(tm.check()) - + assert tm.check() for case in testCases: t = Threat(SID="", target=default_target, condition=case["condition"]) - self.assertTrue( - t.apply(case["target"]), - "Failed to match {} against {}".format( - case["target"], - case["condition"], - ), - ) + assert t.apply(case["target"]), f"Failed to match {case['target']} against {case['condition']}" - -class TestFunction(unittest.TestCase): +class TestFunction: def test_encode_threat_data(self): findings = [ Finding( @@ -271,17 +225,16 @@ def test_encode_threat_data(self): ) ] encoded_findings = encode_threat_data(findings) - - self.assertEqual(len(encoded_findings), 2) - self.assertEqual(encoded_findings[0].description, "A test description") - self.assertEqual(encoded_findings[0].severity, "High") - self.assertEqual(encoded_findings[0].id, "1") - self.assertEqual(encoded_findings[0].threat_id, "INP01") - self.assertEqual(encoded_findings[0].cvss, "9.876") - self.assertEqual(encoded_findings[0].response, "A test response") - self.assertEqual(encoded_findings[1].description, "An escape test <script>") - self.assertEqual(encoded_findings[1].severity, "Medium") - self.assertEqual(encoded_findings[1].id, "2") - self.assertEqual(encoded_findings[1].threat_id, "INP02") - self.assertEqual(encoded_findings[1].cvss, "1.234") - self.assertEqual(encoded_findings[1].response, "A test response") + assert len(encoded_findings) == 2 + assert encoded_findings[0].description == "A test description" + assert encoded_findings[0].severity == "High" + assert encoded_findings[0].id == "1" + assert encoded_findings[0].threat_id == "INP01" + assert encoded_findings[0].cvss == "9.876" + assert encoded_findings[0].response == "A test response" + assert encoded_findings[1].description == "An escape test <script>" + assert encoded_findings[1].severity == "Medium" + assert encoded_findings[1].id == "2" + assert encoded_findings[1].threat_id == "INP02" + assert encoded_findings[1].cvss == "1.234" + assert encoded_findings[1].response == "A test response" diff --git a/tests/test_pytmfunc.py b/tests/test_pytmfunc.py index f94d042c..e052d97b 100644 --- a/tests/test_pytmfunc.py +++ b/tests/test_pytmfunc.py @@ -2,8 +2,8 @@ import os import random import re -import unittest import tempfile +import pytest from contextlib import redirect_stdout from pytm import ( @@ -36,9 +36,9 @@ ) as threat_file: threats = {t["SID"]: Threat(**t) for t in json.load(threat_file)} -output_path=tempfile.gettempdir() +output_path = tempfile.gettempdir() -class TestTM(unittest.TestCase): +class TestTM: def test_seq(self): random.seed(0) dir_path = os.path.dirname(os.path.realpath(__file__)) @@ -59,11 +59,9 @@ def test_seq(self): Dataflow(db, web, "Retrieve comments") Dataflow(web, user, "Show comments (*)") - self.assertTrue(tm.check()) + assert tm.check() output = tm.seq() - - self.maxDiff = None - self.assertEqual(output, expected) + assert output == expected def test_seq_unused(self): random.seed(0) @@ -85,11 +83,9 @@ def test_seq_unused(self): Dataflow(db, web, "Retrieve comments") Dataflow(web, user, "Show comments (*)") - self.assertTrue(tm.check()) + assert tm.check() output = tm.seq() - - self.maxDiff = None - self.assertEqual(output, expected) + assert output == expected def test_dfd(self): dir_path = os.path.dirname(os.path.realpath(__file__)) @@ -121,11 +117,10 @@ def test_dfd(self): Dataflow(web, gw, "Response") Dataflow(gw, user, "Show comments (*)") - self.assertTrue(tm.check()) + assert tm.check() output = tm.dfd() - self.maxDiff = None - self.assertEqual(output, expected) + assert output == expected def test_dfd_colormap(self): dir_path = os.path.dirname(os.path.realpath(__file__)) @@ -157,12 +152,11 @@ def test_dfd_colormap(self): Dataflow(web, gw, "Response") Dataflow(gw, user, "Show comments (*)") - self.assertTrue(tm.check()) + assert tm.check() tm.resolve() output = tm.dfd(colormap=True) - self.maxDiff = None - self.assertEqual(output, expected) + assert output == expected def test_dfd_duplicates_ignore(self): dir_path = os.path.dirname(os.path.realpath(__file__)) @@ -194,11 +188,10 @@ def test_dfd_duplicates_ignore(self): Dataflow(web, gw, "Response") Dataflow(gw, user, "Show comments (*)") - self.assertTrue(tm.check()) + assert tm.check() output = tm.dfd() - self.maxDiff = None - self.assertEqual(output, expected) + assert output == expected def test_dfd_duplicates_raise(self): random.seed(0) @@ -223,7 +216,7 @@ def test_dfd_duplicates_raise(self): "and Server(Web Server): Dataflow(User enters comments (*)) " "is same as Dataflow(User views comments)" ) - with self.assertRaisesRegex(ValueError, e): + with pytest.raises(ValueError, match=e): tm.check() def test_exclude_threats_ignore(self): @@ -240,13 +233,13 @@ def test_exclude_threats_ignore(self): web = Server("Web") web.sanitizesInput = False web.encodesOutput = False - self.assertTrue(threats[excluded_threat].apply(web)) - self.assertTrue(threats[remaining_threat].apply(web)) + assert threats[excluded_threat].apply(web) + assert threats[remaining_threat].apply(web) tm.resolve() - self.assertNotIn(excluded_threat, [t.threat_id for t in tm.findings]) - self.assertIn(remaining_threat, [t.threat_id for t in tm.findings]) + assert excluded_threat not in [t.threat_id for t in tm.findings] + assert remaining_threat in [t.threat_id for t in tm.findings] def test_resolve(self): random.seed(0) @@ -270,18 +263,21 @@ def test_resolve(self): ] tm.resolve() - self.maxDiff = None - self.assertEqual( - [f.threat_id for f in tm.findings], - ["Server", "Datastore", "Dataflow", "Dataflow", "Dataflow", "Dataflow"], - ) - self.assertEqual([f.threat_id for f in user.findings], []) - self.assertEqual([f.threat_id for f in web.findings], ["Server"]) - self.assertEqual([f.threat_id for f in db.findings], ["Datastore"]) - self.assertEqual([f.threat_id for f in req.findings], ["Dataflow"]) - self.assertEqual([f.threat_id for f in query.findings], ["Dataflow"]) - self.assertEqual([f.threat_id for f in results.findings], ["Dataflow"]) - self.assertEqual([f.threat_id for f in resp.findings], ["Dataflow"]) + assert [f.threat_id for f in tm.findings] == [ + "Server", + "Datastore", + "Dataflow", + "Dataflow", + "Dataflow", + "Dataflow", + ] + assert [f.threat_id for f in user.findings] == [] + assert [f.threat_id for f in web.findings] == ["Server"] + assert [f.threat_id for f in db.findings] == ["Datastore"] + assert [f.threat_id for f in req.findings] == ["Dataflow"] + assert [f.threat_id for f in query.findings] == ["Dataflow"] + assert [f.threat_id for f in results.findings] == ["Dataflow"] + assert [f.threat_id for f in resp.findings] == ["Dataflow"] def test_overrides(self): random.seed(0) @@ -324,26 +320,13 @@ def test_overrides(self): ] tm.resolve() - self.maxDiff = None - self.assertEqual( - [f.threat_id for f in tm.findings], - ["Server", "Datastore"], - ) - self.assertEqual( - [f.response for f in web.findings], ["mitigated by adding TLS"] - ) - self.assertEqual( - [f.cvss for f in web.findings], - ["1.234"], - ) - self.assertEqual( - [f.response for f in db.findings], - ["accepted since inside the trust boundary"], - ) - self.assertEqual( - [f.cvss for f in db.findings], - ["9.876"], - ) + assert [f.threat_id for f in tm.findings] == ["Server", "Datastore"] + assert [f.response for f in web.findings] == ["mitigated by adding TLS"] + assert [f.cvss for f in web.findings] == ["1.234"] + assert [f.response for f in db.findings] == [ + "accepted since inside the trust boundary" + ] + assert [f.cvss for f in db.findings] == ["9.876"] def test_json_dumps(self): random.seed(0) @@ -375,14 +358,13 @@ def test_json_dumps(self): Dataflow(web, user, "Show comments (*)") Dataflow(worker, db, "Query for tasks") - self.assertTrue(tm.check()) + assert tm.check() output = json.dumps(tm, default=to_serializable, sort_keys=True, indent=4) with open(os.path.join(output_path, "output_current.json"), "w") as x: x.write(output) - self.maxDiff = None - self.assertEqual(output, expected) + assert output == expected def test_json_loads(self): random.seed(0) @@ -392,27 +374,21 @@ def test_json_loads(self): TM.reset() tm = loads(contents) - self.assertTrue(tm.check()) - - self.maxDiff = None - self.assertEqual([b.name for b in tm._boundaries], ["Internet", "Server/DB"]) - self.assertEqual( - [e.name for e in tm._elements], - [ - "Internet", - "Server/DB", - "User", - "Web Server", - "SQL Database", - "Request", - "Insert", - "Select", - "Response", - ], - ) - self.assertEqual( - [f.name for f in tm._flows], ["Request", "Insert", "Select", "Response"] - ) + assert tm.check() + + assert [b.name for b in tm._boundaries] == ["Internet", "Server/DB"] + assert [e.name for e in tm._elements] == [ + "Internet", + "Server/DB", + "User", + "Web Server", + "SQL Database", + "Request", + "Insert", + "Select", + "Response", + ] + assert [f.name for f in tm._flows] == ["Request", "Insert", "Select", "Response"] def test_report(self): random.seed(0) @@ -445,14 +421,13 @@ def test_report(self): Dataflow(web, user, "Show comments (*)") Dataflow(worker, db, "Query for tasks") - self.assertTrue(tm.check()) + assert tm.check() output = tm.report("docs/basic_template.md") with open(os.path.join(output_path, "output_current.md"), "w") as x: x.write(output) - self.maxDiff = None - self.assertEqual(output.strip(), expected.strip()) + assert output.strip() == expected.strip() def test_multilevel_dfd(self): random.seed(0) @@ -481,12 +456,11 @@ def test_multilevel_dfd(self): Dataflow(db, web, "Retrieve comments") Dataflow(web, user, "Show comments (*)") - self.assertTrue(tm.check()) + assert tm.check() output = tm.dfd(levels={0}) with open(os.path.join(output_path, "0.txt"), "w") as x: x.write(output) - self.maxDiff = None - self.assertEqual(output, level_0) + assert output == level_0 TM.reset() tm = TM("my test tm", description="aaa") @@ -501,12 +475,11 @@ def test_multilevel_dfd(self): Dataflow(db, web, "Retrieve comments") Dataflow(web, user, "Show comments (*)") - self.assertTrue(tm.check()) + assert tm.check() output = tm.dfd(levels={1}) with open(os.path.join(output_path, "1.txt"), "w") as x: x.write(output) - self.maxDiff = None - self.assertEqual(output, level_1) + assert output == level_1 def test_element_assumptions(self): web = Server("Web Server") @@ -514,14 +487,14 @@ def test_element_assumptions(self): assumption2 = Assumption("Assumption 2", exclude=["INP03"]) web.assumptions = [assumption1, assumption2] - self.assertEqual(len(web.assumptions), 2) - self.assertEqual(web.assumptions[0].name, "Assumption 1") - self.assertSetEqual(web.assumptions[0].exclude, {"INP01", "INP02"}) - self.assertEqual(web.assumptions[1].name, "Assumption 2") - self.assertSetEqual(web.assumptions[1].exclude, {"INP03"}) + assert len(web.assumptions) == 2 + assert web.assumptions[0].name == "Assumption 1" + assert web.assumptions[0].exclude == {"INP01", "INP02"} + assert web.assumptions[1].name == "Assumption 2" + assert web.assumptions[1].exclude == {"INP03"} # Test adding an invalid assumption - with self.assertRaises(ValueError): + with pytest.raises(ValueError): web.assumptions = [assumption1, "Invalid Assumption"] def test_exclude_threats_by_assumptions(self): @@ -535,11 +508,11 @@ def test_exclude_threats_by_assumptions(self): tm = TM("Test TM") tm.resolve() - self.assertNotIn("INP03", [f.threat_id for f in web.findings]) - self.assertIn("INP03", [f.threat_id for f in tm.excluded_findings]) + assert "INP03" not in [f.threat_id for f in web.findings] + assert "INP03" in [f.threat_id for f in tm.excluded_findings] -class Testpytm(unittest.TestCase): +class Testpytm: # Test for all the threats in threats.py - test Threat.apply() function def test_INP01(self): @@ -552,21 +525,21 @@ def test_INP01(self): process1.controls.sanitizesInput = False process1.controls.checksInputBounds = False threat = threats["INP01"] - self.assertTrue(threat.apply(lambda1)) - self.assertTrue(threat.apply(process1)) + assert threat.apply(lambda1) + assert threat.apply(process1) def test_INP02(self): process1 = Process("myprocess") process1.controls.checksInputBounds = False threat = threats["INP02"] - self.assertTrue(threat.apply(process1)) + assert threat.apply(process1) def test_INP03(self): web = Server("Web") web.controls.sanitizesInput = False web.controls.encodesOutput = False threat = threats["INP03"] - self.assertTrue(threat.apply(web)) + assert threat.apply(web) def test_CR01(self): user = Actor("User") @@ -579,8 +552,8 @@ def test_CR01(self): user_to_web.usesVPN = False user_to_web.usesSessionTokens = True threat = threats["CR01"] - self.assertTrue(threat.apply(web)) - self.assertTrue(threat.apply(user_to_web)) + assert threat.apply(web) + assert threat.apply(user_to_web) def test_INP04(self): web = Server("Web Server") @@ -588,7 +561,7 @@ def test_INP04(self): web.controls.validatesHeaders = False web.protocol = "HTTP" threat = threats["INP04"] - self.assertTrue(threat.apply(web)) + assert threat.apply(web) def test_CR02(self): user = Actor("User") @@ -603,14 +576,14 @@ def test_CR02(self): user_to_web.controls.validatesInput = False user_to_web.usesSessionTokens = True threat = threats["CR02"] - self.assertTrue(threat.apply(web)) - self.assertTrue(threat.apply(user_to_web)) + assert threat.apply(web) + assert threat.apply(user_to_web) def test_INP05(self): web = Server("Web Server") web.controls.validatesInput = False threat = threats["INP05"] - self.assertTrue(threat.apply(web)) + assert threat.apply(web) def test_INP06(self): web = Server("Web Server") @@ -618,7 +591,7 @@ def test_INP06(self): web.controls.sanitizesInput = False web.controls.validatesInput = False threat = threats["INP06"] - self.assertTrue(threat.apply(web)) + assert threat.apply(web) def test_SC01(self): process1 = Process("Process1") @@ -626,20 +599,20 @@ def test_SC01(self): json = Data(name="JSON", description="some JSON data", format="JSON") process1.data = json threat = threats["SC01"] - self.assertTrue(threat.apply(process1)) + assert threat.apply(process1) def test_LB01(self): process1 = Process("Process1") + lambda1 = Lambda("Lambda1") process1.implementsAPI = True process1.controls.validatesInput = False process1.controls.sanitizesInput = False - lambda1 = Lambda("Lambda1") lambda1.implementsAPI = True lambda1.controls.validatesInput = False lambda1.controls.sanitizesInput = False threat = threats["LB01"] - self.assertTrue(threat.apply(process1)) - self.assertTrue(threat.apply(lambda1)) + assert threat.apply(process1) + assert threat.apply(lambda1) def test_AA01(self): process1 = Process("Process1") @@ -647,8 +620,8 @@ def test_AA01(self): process1.authenticatesSource = False web.authenticatesSource = False threat = threats["AA01"] - self.assertTrue(threat.apply(process1)) - self.assertTrue(threat.apply(web)) + assert threat.apply(process1) + assert threat.apply(web) def test_DS01(self): web = Server("Web Server") @@ -656,78 +629,77 @@ def test_DS01(self): web.controls.validatesInput = False web.controls.encodesOutput = False threat = threats["DS01"] - self.assertTrue(threat.apply(web)) + assert threat.apply(web) def test_DE01(self): + # Default case + user = Actor("User") + web = Server("Web Server") + user_to_web = Dataflow(user, web, "User enters comments (*)") + user_to_web.protocol = "HTTP" + threat = threats["DE01"] + assert threat.apply(user_to_web) - with self.subTest("Default case"): - user = Actor("User") - web = Server("Web Server") - user_to_web = Dataflow(user, web, "User enters comments (*)") - user_to_web.protocol = "HTTP" - threat = threats["DE01"] - self.assertTrue(threat.apply(user_to_web)) - - with self.subTest("Success case"): - user = Actor("User") - web = Server("Web Server") - web.minTLSVersion = TLSVersion.TLSv12 - user_to_web = Dataflow(user, web, "User enters comments (*)") - user_to_web.tlsVersion = TLSVersion.TLSv13 - user_to_web.controls.isEncrypted = True - user_to_web.controls.authenticatesDestination = True - user_to_web.controls.checksDestinationRevocation = True - threat = threats["DE01"] - self.assertFalse(threat.apply(user_to_web)) - - with self.subTest("Dataflow TLS below minimum version"): - user = Actor("User") - web = Server("Web Server") - web.minTLSVersion = TLSVersion.TLSv12 - user_to_web = Dataflow(user, web, "User enters comments (*)") - user_to_web.tlsVersion = TLSVersion.TLSv11 - user_to_web.controls.isEncrypted = True - user_to_web.controls.authenticatesDestination = True - user_to_web.controls.checksDestinationRevocation = True - threat = threats["DE01"] - self.assertTrue(threat.apply(user_to_web)) - - with self.subTest("Dataflow doesn't authenticate destination"): - user = Actor("User") - web = Server("Web Server") - web.minTLSVersion = TLSVersion.TLSv12 - user_to_web = Dataflow(user, web, "User enters comments (*)") - user_to_web.tlsVersion = TLSVersion.TLSv13 - user_to_web.controls.isEncrypted = True - user_to_web.controls.authenticatesDestination = False - user_to_web.controls.checksDestinationRevocation = True - threat = threats["DE01"] - self.assertTrue(threat.apply(user_to_web)) - - with self.subTest("Dataflow doesn't check destination revocation"): - user = Actor("User") - web = Server("Web Server") - web.minTLSVersion = TLSVersion.TLSv12 - user_to_web = Dataflow(user, web, "User enters comments (*)") - user_to_web.tlsVersion = TLSVersion.TLSv13 - user_to_web.controls.isEncrypted = True - user_to_web.controls.authenticatesDestination = True - user_to_web.controls.checksDestinationRevocation = False - threat = threats["DE01"] - self.assertTrue(threat.apply(user_to_web)) - - with self.subTest("Dataflow is response"): - user = Actor("User") - web = Server("Web Server") - web.minTLSVersion = TLSVersion.TLSv12 - user_to_web = Dataflow(user, web, "User enters comments (*)") - user_to_web.isResponse = True - user_to_web.tlsVersion = TLSVersion.TLSv13 - user_to_web.controls.isEncrypted = True - user_to_web.controls.authenticatesDestination = False - user_to_web.controls.checksDestinationRevocation = False - threat = threats["DE01"] - self.assertFalse(threat.apply(user_to_web)) + # Success case + user = Actor("User") + web = Server("Web Server") + web.minTLSVersion = TLSVersion.TLSv12 + user_to_web = Dataflow(user, web, "User enters comments (*)") + user_to_web.tlsVersion = TLSVersion.TLSv13 + user_to_web.controls.isEncrypted = True + user_to_web.controls.authenticatesDestination = True + user_to_web.controls.checksDestinationRevocation = True + threat = threats["DE01"] + assert not threat.apply(user_to_web) + + # Dataflow TLS below minimum version + user = Actor("User") + web = Server("Web Server") + web.minTLSVersion = TLSVersion.TLSv12 + user_to_web = Dataflow(user, web, "User enters comments (*)") + user_to_web.tlsVersion = TLSVersion.TLSv11 + user_to_web.controls.isEncrypted = True + user_to_web.controls.authenticatesDestination = True + user_to_web.controls.checksDestinationRevocation = True + threat = threats["DE01"] + assert threat.apply(user_to_web) + + # Dataflow doesn't authenticate destination + user = Actor("User") + web = Server("Web Server") + web.minTLSVersion = TLSVersion.TLSv12 + user_to_web = Dataflow(user, web, "User enters comments (*)") + user_to_web.tlsVersion = TLSVersion.TLSv13 + user_to_web.controls.isEncrypted = True + user_to_web.controls.authenticatesDestination = False + user_to_web.controls.checksDestinationRevocation = True + threat = threats["DE01"] + assert threat.apply(user_to_web) + + # Dataflow doesn't check destination revocation + user = Actor("User") + web = Server("Web Server") + web.minTLSVersion = TLSVersion.TLSv12 + user_to_web = Dataflow(user, web, "User enters comments (*)") + user_to_web.tlsVersion = TLSVersion.TLSv13 + user_to_web.controls.isEncrypted = True + user_to_web.controls.authenticatesDestination = True + user_to_web.controls.checksDestinationRevocation = False + threat = threats["DE01"] + assert threat.apply(user_to_web) + + # Dataflow is response + user = Actor("User") + web = Server("Web Server") + web.minTLSVersion = TLSVersion.TLSv12 + user_to_web = Dataflow(user, web, "User enters comments (*)") + user_to_web.isResponse = True + user_to_web.tlsVersion = TLSVersion.TLSv13 + user_to_web.controls.isEncrypted = True + user_to_web.controls.authenticatesDestination = False + user_to_web.controls.checksDestinationRevocation = False + threat = threats["DE01"] + assert not threat.apply(user_to_web) def test_DE02(self): web = Server("Web Server") @@ -737,8 +709,8 @@ def test_DE02(self): process1.controls.validatesInput = False process1.controls.sanitizesInput = False threat = threats["DE02"] - self.assertTrue(threat.apply(web)) - self.assertTrue(threat.apply(process1)) + assert threat.apply(web) + assert threat.apply(process1) def test_API01(self): process1 = Process("Process1") @@ -746,8 +718,8 @@ def test_API01(self): process1.implementsAPI = True lambda1.implementsAPI = True threat = threats["API01"] - self.assertTrue(threat.apply(process1)) - self.assertTrue(threat.apply(lambda1)) + assert threat.apply(process1) + assert threat.apply(lambda1) def test_AC01(self): web = Server("Web Server") @@ -760,21 +732,21 @@ def test_AC01(self): db.controls.hasAccessControl = False db.controls.authorizesSource = False threat = threats["AC01"] - self.assertTrue(threat.apply(process1)) - self.assertTrue(threat.apply(web)) - self.assertTrue(threat.apply(db)) + assert threat.apply(process1) + assert threat.apply(web) + assert threat.apply(db) def test_INP07(self): process1 = Process("Process1") process1.controls.usesSecureFunctions = False threat = threats["INP07"] - self.assertTrue(threat.apply(process1)) + assert threat.apply(process1) def test_AC02(self): db = Datastore("DB") db.isShared = True threat = threats["AC02"] - self.assertTrue(threat.apply(db)) + assert threat.apply(db) def test_DO01(self): process1 = Process("Process1") @@ -783,15 +755,15 @@ def test_DO01(self): process1.controls.isResilient = False web.handlesResourceConsumption = True threat = threats["DO01"] - self.assertTrue(threat.apply(process1)) - self.assertTrue(threat.apply(web)) + assert threat.apply(process1) + assert threat.apply(web) def test_HA01(self): web = Server("Web Server") web.controls.validatesInput = False web.controls.sanitizesInput = False threat = threats["HA01"] - self.assertTrue(threat.apply(web)) + assert threat.apply(web) def test_AC03(self): process1 = Process("Process1") @@ -805,8 +777,8 @@ def test_AC03(self): lambda1.controls.validatesInput = False lambda1.controls.authorizesSource = False threat = threats["AC03"] - self.assertTrue(threat.apply(process1)) - self.assertTrue(threat.apply(lambda1)) + assert threat.apply(process1) + assert threat.apply(lambda1) def test_DO02(self): process1 = Process("Process1") @@ -818,10 +790,10 @@ def test_DO02(self): web.controls.handlesResourceConsumption = False db.controls.handlesResourceConsumption = False threat = threats["DO02"] - self.assertTrue(threat.apply(process1)) - self.assertTrue(threat.apply(lambda1)) - self.assertTrue(threat.apply(web)) - self.assertTrue(threat.apply(db)) + assert threat.apply(process1) + assert threat.apply(lambda1) + assert threat.apply(web) + assert threat.apply(db) def test_DS02(self): process1 = Process("Process1") @@ -829,8 +801,8 @@ def test_DS02(self): process1.environment = "Production" lambda1.environment = "Production" threat = threats["DS02"] - self.assertTrue(threat.apply(process1)) - self.assertTrue(threat.apply(lambda1)) + assert threat.apply(process1) + assert threat.apply(lambda1) def test_INP08(self): process1 = Process("Process1") @@ -843,28 +815,28 @@ def test_INP08(self): web.controls.validatesInput = False web.controls.sanitizesInput = False threat = threats["INP08"] - self.assertTrue(threat.apply(process1)) - self.assertTrue(threat.apply(lambda1)) - self.assertTrue(threat.apply(web)) + assert threat.apply(process1) + assert threat.apply(lambda1) + assert threat.apply(web) def test_INP09(self): web = Server("Web Server") web.controls.validatesInput = False threat = threats["INP09"] - self.assertTrue(threat.apply(web)) + assert threat.apply(web) def test_INP10(self): web = Server("Web Server") web.controls.validatesInput = False threat = threats["INP10"] - self.assertTrue(threat.apply(web)) + assert threat.apply(web) def test_INP11(self): web = Server("Web Server") web.controls.validatesInput = False web.controls.sanitizesInput = False threat = threats["INP11"] - self.assertTrue(threat.apply(web)) + assert threat.apply(web) def test_INP12(self): process1 = Process("Process1") @@ -874,8 +846,8 @@ def test_INP12(self): lambda1.controls.checksInputBounds = False lambda1.controls.validatesInput = False threat = threats["INP12"] - self.assertTrue(threat.apply(process1)) - self.assertTrue(threat.apply(lambda1)) + assert threat.apply(process1) + assert threat.apply(lambda1) def test_AC04(self): user = Actor("User") @@ -885,7 +857,7 @@ def test_AC04(self): user_to_web.data = xml user_to_web.authorizesSource = False threat = threats["AC04"] - self.assertTrue(threat.apply(user_to_web)) + assert threat.apply(user_to_web) def test_DO03(self): user = Actor("User") @@ -895,7 +867,7 @@ def test_DO03(self): xml = Data(name="user to web data", description="textual", format="XML") user_to_web.data = xml threat = threats["DO03"] - self.assertTrue(threat.apply(user_to_web)) + assert threat.apply(user_to_web) def test_AC05(self): process1 = Process("Process1") @@ -905,7 +877,7 @@ def test_AC05(self): proc_to_web.protocol = "HTTPS" proc_to_web.controls.isEncrypted = True threat = threats["AC05"] - self.assertTrue(threat.apply(proc_to_web)) + assert threat.apply(proc_to_web) def test_INP13(self): process1 = Process("Process1") @@ -913,8 +885,8 @@ def test_INP13(self): process1.controls.validatesInput = False lambda1.controls.validatesInput = False threat = threats["INP13"] - self.assertTrue(threat.apply(process1)) - self.assertTrue(threat.apply(lambda1)) + assert threat.apply(process1) + assert threat.apply(lambda1) def test_INP14(self): process1 = Process("Process1") @@ -924,9 +896,9 @@ def test_INP14(self): lambda1.controls.validatesInput = False web.controls.validatesInput = False threat = threats["INP14"] - self.assertTrue(threat.apply(process1)) - self.assertTrue(threat.apply(lambda1)) - self.assertTrue(threat.apply(web)) + assert threat.apply(process1) + assert threat.apply(lambda1) + assert threat.apply(web) def test_DE03(self): user = Actor("User") @@ -936,7 +908,7 @@ def test_DE03(self): user_to_web.controls.isEncrypted = False user_to_web.usesVPN = False threat = threats["DE03"] - self.assertTrue(threat.apply(user_to_web)) + assert threat.apply(user_to_web) def test_CR03(self): process1 = Process("Process1") @@ -944,8 +916,8 @@ def test_CR03(self): process1.implementsAuthenticationScheme = False web.implementsAuthenticationScheme = False threat = threats["CR03"] - self.assertTrue(threat.apply(process1)) - self.assertTrue(threat.apply(web)) + assert threat.apply(process1) + assert threat.apply(web) def test_API02(self): process1 = Process("Process1") @@ -955,27 +927,27 @@ def test_API02(self): lambda1.implementsAPI = True lambda1.controls.validatesInput = False threat = threats["API02"] - self.assertTrue(threat.apply(process1)) - self.assertTrue(threat.apply(lambda1)) + assert threat.apply(process1) + assert threat.apply(lambda1) def test_HA02(self): EE = ExternalEntity("EE") EE.hasPhysicalAccess = True threat = threats["HA02"] - self.assertTrue(threat.apply(EE)) + assert threat.apply(EE) def test_DS03(self): web = Server("Web Server") web.isHardened = False threat = threats["DS03"] - self.assertTrue(threat.apply(web)) + assert threat.apply(web) def test_AC06(self): web = Server("Web Server") web.isHardened = False web.controls.hasAccessControl = False threat = threats["AC06"] - self.assertTrue(threat.apply(web)) + assert threat.apply(web) def test_HA03(self): web = Server("Web Server") @@ -983,47 +955,47 @@ def test_HA03(self): web.controls.encodesOutput = False web.isHardened = False threat = threats["HA03"] - self.assertTrue(threat.apply(web)) + assert threat.apply(web) def test_SC02(self): web = Server("Web Server") web.controls.validatesInput = False web.controls.encodesOutput = False threat = threats["SC02"] - self.assertTrue(threat.apply(web)) + assert threat.apply(web) def test_AC07(self): web = Server("Web Server") web.controls.hasAccessControl = False threat = threats["AC07"] - self.assertTrue(threat.apply(web)) + assert threat.apply(web) def test_INP15(self): web = Server("Web Server") web.protocol = "IMAP" web.controls.sanitizesInput = False threat = threats["INP15"] - self.assertTrue(threat.apply(web)) + assert threat.apply(web) def test_HA04(self): EE = ExternalEntity("ee") EE.hasPhysicalAccess = True threat = threats["HA04"] - self.assertTrue(threat.apply(EE)) + assert threat.apply(EE) def test_SC03(self): web = Server("Web Server") - web.controls.validatesInput = False web.controls.sanitizesInput = False - web.controls.hasAccessControl = False + web.controls.validatesInput = False + web.controls.encodesOutput = False threat = threats["SC03"] - self.assertTrue(threat.apply(web)) + assert threat.apply(web) def test_INP16(self): web = Server("Web Server") web.controls.validatesInput = False threat = threats["INP16"] - self.assertTrue(threat.apply(web)) + assert threat.apply(web) def test_AA02(self): web = Server("Web Server") @@ -1031,15 +1003,15 @@ def test_AA02(self): web.authenticatesSource = False process1.authenticatesSource = False threat = threats["AA02"] - self.assertTrue(threat.apply(web)) - self.assertTrue(threat.apply(process1)) + assert threat.apply(web) + assert threat.apply(process1) def test_CR04(self): web = Server("Web Server") web.usesSessionTokens = True web.implementsNonce = False threat = threats["CR04"] - self.assertTrue(threat.apply(web)) + assert threat.apply(web) def test_DO04(self): user = Actor("User") @@ -1050,7 +1022,7 @@ def test_DO04(self): user_to_web.data = xml user_to_web.handlesResources = False threat = threats["DO04"] - self.assertTrue(threat.apply(user_to_web)) + assert threat.apply(user_to_web) def test_DS04(self): web = Server("Web Server") @@ -1058,7 +1030,7 @@ def test_DS04(self): web.controls.validatesInput = False web.controls.sanitizesInput = False threat = threats["DS04"] - self.assertTrue(threat.apply(web)) + assert threat.apply(web) def test_SC04(self): web = Server("Web Server") @@ -1066,7 +1038,7 @@ def test_SC04(self): web.controls.validatesInput = False web.controls.encodesOutput = False threat = threats["SC04"] - self.assertTrue(threat.apply(web)) + assert threat.apply(web) def test_CR05(self): web = Server("Web Server") @@ -1076,20 +1048,20 @@ def test_CR05(self): db.controls.usesEncryptionAlgorithm != "RSA" db.controls.usesEncryptionAlgorithm != "AES" threat = threats["CR05"] - self.assertTrue(threat.apply(web)) - self.assertTrue(threat.apply(db)) + assert threat.apply(web) + assert threat.apply(db) def test_AC08(self): web = Server("Web Server") web.controls.hasAccessControl = False threat = threats["AC08"] - self.assertTrue(threat.apply(web)) + assert threat.apply(web) def test_DS05(self): web = Server("Web Server") web.usesCache = True threat = threats["DS05"] - self.assertTrue(threat.apply(web)) + assert threat.apply(web) def test_DS06(self): threat = threats["DS06"] @@ -1108,43 +1080,43 @@ def create_dataflow( flow_.data = Data("Data", classification=data) return flow_ - with self.subTest("Doesn't apply unless dataflow has data defined"): - dataflow = create_dataflow(define_data=False) - self.assertFalse(threat.apply(dataflow)) + # Doesn't apply unless dataflow has data defined + dataflow = create_dataflow(define_data=False) + assert not threat.apply(dataflow) - with self.subTest("Data classification equals sink, source and dataflow"): - dataflow = create_dataflow() - self.assertFalse(threat.apply(dataflow)) + # Data classification equals sink, source and dataflow + dataflow = create_dataflow() + assert not threat.apply(dataflow) - with self.subTest("Data classification is less than sink, source and dataflow"): - dataflow = create_dataflow(data=Classification.PUBLIC) - self.assertFalse(threat.apply(dataflow)) + # Data classification is less than sink, source and dataflow + dataflow = create_dataflow(data=Classification.PUBLIC) + assert not threat.apply(dataflow) - with self.subTest("Data classification exceeds source"): - dataflow = create_dataflow(source=Classification.PUBLIC) - self.assertTrue(threat.apply(dataflow)) + # Data classification exceeds source + dataflow = create_dataflow(source=Classification.PUBLIC) + assert threat.apply(dataflow) - with self.subTest("Data classification exceeds sink"): - dataflow = create_dataflow(sink=Classification.PUBLIC) - self.assertTrue(threat.apply(dataflow)) + # Data classification exceeds sink + dataflow = create_dataflow(sink=Classification.PUBLIC) + assert threat.apply(dataflow) - with self.subTest("Data classification exceeds dataflow"): - dataflow = create_dataflow(dataflow=Classification.PUBLIC) - self.assertTrue(threat.apply(dataflow)) + # Data classification exceeds dataflow + dataflow = create_dataflow(dataflow=Classification.PUBLIC) + assert threat.apply(dataflow) def test_SC05(self): web = Server("Web Server") web.providesIntegrity = False web.controls.usesCodeSigning = False threat = threats["SC05"] - self.assertTrue(threat.apply(web)) + assert threat.apply(web) def test_INP17(self): web = Server("Web Server") web.controls.validatesContentType = False web.invokesScriptFilters = False threat = threats["INP17"] - self.assertTrue(threat.apply(web)) + assert threat.apply(web) def test_AA03(self): web = Server("Web Server") @@ -1152,21 +1124,21 @@ def test_AA03(self): web.authenticatesSource = False web.controls.usesStrongSessionIdentifiers = False threat = threats["AA03"] - self.assertTrue(threat.apply(web)) + assert threat.apply(web) def test_AC09(self): web = Server("Web Server") web.controls.hasAccessControl = False web.authorizesSource = False threat = threats["AC09"] - self.assertTrue(threat.apply(web)) + assert threat.apply(web) def test_INP18(self): web = Server("Web Server") web.controls.sanitizesInput = False web.controls.encodesOutput = False threat = threats["INP18"] - self.assertTrue(threat.apply(web)) + assert threat.apply(web) def test_CR06(self): user = Actor("User") @@ -1177,7 +1149,7 @@ def test_CR06(self): user_to_web.implementsAuthenticationScheme = False user_to_web.authorizesSource = False threat = threats["CR06"] - self.assertTrue(threat.apply(user_to_web)) + assert threat.apply(user_to_web) def test_AC10(self): user = Actor("User") @@ -1191,7 +1163,7 @@ def test_AC10(self): user_to_web.tlsVersion = TLSVersion.SSLv3 web.inputs = [user_to_web] threat = threats["AC10"] - self.assertTrue(threat.apply(web)) + assert threat.apply(web) def test_CR07(self): user = Actor("User") @@ -1201,7 +1173,7 @@ def test_CR07(self): xml = Data(name="user to web data", description="textual", format="XML") user_to_web.data = xml threat = threats["CR07"] - self.assertTrue(threat.apply(user_to_web)) + assert threat.apply(user_to_web) def test_AA04(self): web = Server("Web Server") @@ -1209,7 +1181,7 @@ def test_AA04(self): web.providesIntegrity = False web.authorizesSource = False threat = threats["AA04"] - self.assertTrue(threat.apply(web)) + assert threat.apply(web) def test_CR08(self): user = Actor("User") @@ -1220,40 +1192,40 @@ def test_CR08(self): user_to_web.controls.isEncrypted = True user_to_web.tlsVersion = TLSVersion.SSLv3 threat = threats["CR08"] - self.assertTrue(threat.apply(user_to_web)) + assert threat.apply(user_to_web) def test_INP19(self): web = Server("Web Server") web.usesXMLParser = False web.disablesDTD = False threat = threats["INP19"] - self.assertTrue(threat.apply(web)) + assert threat.apply(web) def test_INP20(self): process1 = Process("process") process1.disablesiFrames = False threat = threats["INP20"] - self.assertTrue(threat.apply(process1)) + assert threat.apply(process1) def test_AC11(self): web = Server("Web Server") web.controls.usesStrongSessionIdentifiers = False threat = threats["AC11"] - self.assertTrue(threat.apply(web)) + assert threat.apply(web) def test_INP21(self): web = Server("Web Server") web.usesXMLParser = False web.disablesDTD = False threat = threats["INP21"] - self.assertTrue(threat.apply(web)) + assert threat.apply(web) def test_INP22(self): web = Server("Web Server") web.usesXMLParser = False web.disablesDTD = False threat = threats["INP22"] - self.assertTrue(threat.apply(web)) + assert threat.apply(web) def test_INP23(self): process1 = Process("Process") @@ -1261,7 +1233,7 @@ def test_INP23(self): process1.controls.sanitizesInput = False process1.controls.validatesInput = False threat = threats["INP23"] - self.assertTrue(threat.apply(process1)) + assert threat.apply(process1) def test_DO05(self): web = Server("Web Server") @@ -1269,21 +1241,21 @@ def test_DO05(self): web.controls.sanitizesInput = False web.usesXMLParser = True threat = threats["DO05"] - self.assertTrue(threat.apply(web)) + assert threat.apply(web) def test_AC12(self): process1 = Process("Process") process1.hasAccessControl = False process1.controls.implementsPOLP = False threat = threats["AC12"] - self.assertTrue(threat.apply(process1)) + assert threat.apply(process1) def test_AC13(self): process1 = Process("Process") process1.hasAccessControl = False process1.controls.implementsPOLP = False threat = threats["AC13"] - self.assertTrue(threat.apply(process1)) + assert threat.apply(process1) def test_AC14(self): process1 = Process("Process") @@ -1291,7 +1263,7 @@ def test_AC14(self): process1.usesEnvironmentVariables = False process1.controls.validatesInput = False threat = threats["AC14"] - self.assertTrue(threat.apply(process1)) + assert threat.apply(process1) def test_INP24(self): process1 = Process("Process") @@ -1301,8 +1273,8 @@ def test_INP24(self): lambda1.controls.checksInputBounds = False lambda1.controls.validatesInput = False threat = threats["INP24"] - self.assertTrue(threat.apply(process1)) - self.assertTrue(threat.apply(lambda1)) + assert threat.apply(process1) + assert threat.apply(lambda1) def test_INP25(self): process1 = Process("Process") @@ -1312,8 +1284,8 @@ def test_INP25(self): lambda1.controls.validatesInput = False lambda1.controls.sanitizesInput = False threat = threats["INP25"] - self.assertTrue(threat.apply(process1)) - self.assertTrue(threat.apply(lambda1)) + assert threat.apply(process1) + assert threat.apply(lambda1) def test_INP26(self): process1 = Process("Process") @@ -1323,15 +1295,15 @@ def test_INP26(self): lambda1.controls.validatesInput = False lambda1.controls.sanitizesInput = False threat = threats["INP26"] - self.assertTrue(threat.apply(process1)) - self.assertTrue(threat.apply(lambda1)) + assert threat.apply(process1) + assert threat.apply(lambda1) def test_INP27(self): process1 = Process("Process") process1.controls.validatesInput = False process1.controls.sanitizesInput = False threat = threats["INP27"] - self.assertTrue(threat.apply(process1)) + assert threat.apply(process1) def test_INP28(self): web = Server("Web Server") @@ -1343,8 +1315,8 @@ def test_INP28(self): process1.controls.sanitizesInput = False process1.controls.encodesOutput = False threat = threats["INP28"] - self.assertTrue(threat.apply(process1)) - self.assertTrue(threat.apply(web)) + assert threat.apply(process1) + assert threat.apply(web) def test_INP29(self): web = Server("Web Server") @@ -1356,15 +1328,15 @@ def test_INP29(self): process1.controls.sanitizesInput = False process1.controls.encodesOutput = False threat = threats["INP29"] - self.assertTrue(threat.apply(process1)) - self.assertTrue(threat.apply(web)) + assert threat.apply(process1) + assert threat.apply(web) def test_INP30(self): process1 = Process("Process") process1.controls.validatesInput = False process1.controls.sanitizesInput = False threat = threats["INP30"] - self.assertTrue(threat.apply(process1)) + assert threat.apply(process1) def test_INP31(self): process1 = Process("Process") @@ -1372,7 +1344,7 @@ def test_INP31(self): process1.controls.sanitizesInput = False process1.controls.usesParameterizedInput = False threat = threats["INP31"] - self.assertTrue(threat.apply(process1)) + assert threat.apply(process1) def test_INP32(self): process1 = Process("Process") @@ -1380,54 +1352,54 @@ def test_INP32(self): process1.controls.sanitizesInput = False process1.controls.encodesOutput = False threat = threats["INP32"] - self.assertTrue(threat.apply(process1)) + assert threat.apply(process1) def test_INP33(self): process1 = Process("Process") process1.controls.validatesInput = False process1.controls.sanitizesInput = False threat = threats["INP33"] - self.assertTrue(threat.apply(process1)) + assert threat.apply(process1) def test_INP34(self): web = Server("web") web.controls.checksInputBounds = False threat = threats["INP34"] - self.assertTrue(threat.apply(web)) + assert threat.apply(web) def test_INP35(self): process1 = Process("Process") process1.controls.validatesInput = False process1.controls.sanitizesInput = False threat = threats["INP35"] - self.assertTrue(threat.apply(process1)) + assert threat.apply(process1) def test_DE04(self): data = Datastore("DB") data.controls.validatesInput = False data.controls.implementsPOLP = False threat = threats["DE04"] - self.assertTrue(threat.apply(data)) + assert threat.apply(data) def test_AC15(self): process1 = Process("Process") process1.controls.implementsPOLP = False threat = threats["AC15"] - self.assertTrue(threat.apply(process1)) + assert threat.apply(process1) def test_INP36(self): web = Server("web") web.implementsStrictHTTPValidation = False web.controls.encodesHeaders = False threat = threats["INP36"] - self.assertTrue(threat.apply(web)) + assert threat.apply(web) def test_INP37(self): web = Server("web") web.implementsStrictHTTPValidation = False web.controls.encodesHeaders = False threat = threats["INP37"] - self.assertTrue(threat.apply(web)) + assert threat.apply(web) def test_INP38(self): process1 = Process("Process") @@ -1435,14 +1407,14 @@ def test_INP38(self): process1.controls.validatesInput = False process1.controls.sanitizesInput = False threat = threats["INP38"] - self.assertTrue(threat.apply(process1)) + assert threat.apply(process1) def test_AC16(self): web = Server("web") web.controls.usesStrongSessionIdentifiers = False web.controls.encryptsCookies = False threat = threats["AC16"] - self.assertTrue(threat.apply(web)) + assert threat.apply(web) def test_INP39(self): process1 = Process("Process") @@ -1450,7 +1422,7 @@ def test_INP39(self): process1.controls.validatesInput = False process1.controls.sanitizesInput = False threat = threats["INP39"] - self.assertTrue(threat.apply(process1)) + assert threat.apply(process1) def test_INP40(self): process1 = Process("Process") @@ -1458,13 +1430,13 @@ def test_INP40(self): process1.controls.sanitizesInput = False process1.controls.validatesInput = False threat = threats["INP40"] - self.assertTrue(threat.apply(process1)) + assert threat.apply(process1) def test_AC17(self): web = Server("web") web.controls.usesStrongSessionIdentifiers = False threat = threats["AC17"] - self.assertTrue(threat.apply(web)) + assert threat.apply(web) def test_AC18(self): process1 = Process("Process") @@ -1472,21 +1444,21 @@ def test_AC18(self): process1.controls.encryptsCookies = False process1.controls.definesConnectionTimeout = False threat = threats["AC18"] - self.assertTrue(threat.apply(process1)) + assert threat.apply(process1) def test_INP41(self): process1 = Process("Process") process1.controls.validatesInput = False process1.controls.sanitizesInput = False threat = threats["INP41"] - self.assertTrue(threat.apply(process1)) + assert threat.apply(process1) def test_AC19(self): web = Server("web") web.usesSessionTokens = True web.implementsNonce = False threat = threats["AC19"] - self.assertTrue(threat.apply(web)) + assert threat.apply(web) def test_AC20(self): process1 = Process("Process") @@ -1494,14 +1466,14 @@ def test_AC20(self): process1.controls.usesMFA = False process1.controls.encryptsSessionData = False threat = threats["AC20"] - self.assertTrue(threat.apply(process1)) + assert threat.apply(process1) def test_AC21(self): process1 = Process("Process") process1.implementsCSRFToken = False process1.verifySessionIdentifiers = False threat = threats["AC21"] - self.assertTrue(threat.apply(process1)) + assert threat.apply(process1) def test_AC23(self): user = Actor("User") @@ -1513,7 +1485,7 @@ def test_AC23(self): user_to_web.protocol = "HTTPS" user_to_web.controls.isEncrypted = True threat = threats["AC23"] - self.assertTrue(threat.apply(user_to_web)) + assert threat.apply(user_to_web) def test_AC24(self): user = Actor("User") @@ -1525,7 +1497,7 @@ def test_AC24(self): user_to_web.protocol = "HTTPS" user_to_web.controls.isEncrypted = True threat = threats["AC24"] - self.assertTrue(threat.apply(user_to_web)) + assert threat.apply(user_to_web) def test_DR01(self): web = Server("Web Server") @@ -1534,4 +1506,4 @@ def test_DR01(self): insert.data = Data("ssn", isPII=True, isStored=True) insert.controls.isEncrypted = False threat = threats["DR01"] - self.assertTrue(threat.apply(insert)) + assert threat.apply(insert) From 7fd364c2b7b1c6f2b17b950b7c9b83ec048b0fe2 Mon Sep 17 00:00:00 2001 From: Nick Le Mouton Date: Sat, 4 Oct 2025 16:01:41 +1300 Subject: [PATCH 2/7] Update poetry.lock --- poetry.lock | 24 ++++++++++++++++++++---- 1 file changed, 20 insertions(+), 4 deletions(-) diff --git a/poetry.lock b/poetry.lock index ec18d6fb..82b55fb6 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 2.1.2 and should not be changed by hand. +# This file is automatically @generated by Poetry 2.2.1 and should not be changed by hand. [[package]] name = "colorama" @@ -15,17 +15,20 @@ files = [ [[package]] name = "exceptiongroup" -version = "1.2.2" +version = "1.3.0" description = "Backport of PEP 654 (exception groups)" optional = false python-versions = ">=3.7" groups = ["main"] markers = "python_version < \"3.11\"" files = [ - {file = "exceptiongroup-1.2.2-py3-none-any.whl", hash = "sha256:3111b9d131c238bec2f8f516e123e14ba243563fb135d3fe885990585aa7795b"}, - {file = "exceptiongroup-1.2.2.tar.gz", hash = "sha256:47c2edf7c6738fafb49fd34290706d1a1a2f4d1c6df275526b62cbb4aa5393cc"}, + {file = "exceptiongroup-1.3.0-py3-none-any.whl", hash = "sha256:4d111e6e0c13d0644cad6ddaa7ed0261a0b36971f6d23e7ec9b4b9097da78a10"}, + {file = "exceptiongroup-1.3.0.tar.gz", hash = "sha256:b241f5885f560bc56a59ee63ca4c6a8bfa46ae4ad651af316d4e81817bb9fd88"}, ] +[package.dependencies] +typing-extensions = {version = ">=4.6.0", markers = "python_version < \"3.13\""} + [package.extras] test = ["pytest (>=6)"] @@ -146,6 +149,19 @@ files = [ {file = "tomli-2.2.1.tar.gz", hash = "sha256:cd45e1dc79c835ce60f7404ec8119f2eb06d38b1deba146f07ced3bbc44505ff"}, ] +[[package]] +name = "typing-extensions" +version = "4.13.2" +description = "Backported and Experimental Type Hints for Python 3.8+" +optional = false +python-versions = ">=3.8" +groups = ["main"] +markers = "python_version < \"3.11\"" +files = [ + {file = "typing_extensions-4.13.2-py3-none-any.whl", hash = "sha256:a439e7c04b49fec3e5d3e2beaa21755cadbbdc391694e28ccdd36ca4a1408f8c"}, + {file = "typing_extensions-4.13.2.tar.gz", hash = "sha256:e6c81219bd689f51865d9e372991c540bda33a0379d5573cddb9a3a23f7caaef"}, +] + [metadata] lock-version = "2.1" python-versions = "^3.8 || ^3.9 || ^3.10 || ^3.11" From ae96863ec60291b484774ec8ed79f33f3518d588 Mon Sep 17 00:00:00 2001 From: Nick Le Mouton Date: Sun, 5 Oct 2025 13:14:43 +1300 Subject: [PATCH 3/7] Update tests/test_pytmfunc.py Not a part of the proposed changes, just a typo in the original tests Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- tests/test_pytmfunc.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_pytmfunc.py b/tests/test_pytmfunc.py index e052d97b..c6ece694 100644 --- a/tests/test_pytmfunc.py +++ b/tests/test_pytmfunc.py @@ -1462,7 +1462,7 @@ def test_AC19(self): def test_AC20(self): process1 = Process("Process") - process1.controlsdefinesConnectionTimeout = False + process1.controls.definesConnectionTimeout = False process1.controls.usesMFA = False process1.controls.encryptsSessionData = False threat = threats["AC20"] From 9a9aa5866c1546f15e4eb50cd29fdd5acc4f4b15 Mon Sep 17 00:00:00 2001 From: Nick Le Mouton Date: Sun, 5 Oct 2025 13:18:43 +1300 Subject: [PATCH 4/7] Move pytest to dev group --- poetry.lock | 18 +++++++++--------- pyproject.toml | 4 ++-- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/poetry.lock b/poetry.lock index 82b55fb6..34acb681 100644 --- a/poetry.lock +++ b/poetry.lock @@ -6,7 +6,7 @@ version = "0.4.6" description = "Cross-platform colored terminal text." optional = false python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,!=3.5.*,!=3.6.*,>=2.7" -groups = ["main"] +groups = ["dev"] markers = "sys_platform == \"win32\"" files = [ {file = "colorama-0.4.6-py2.py3-none-any.whl", hash = "sha256:4f1d9991f5acc0ca119f9d443620b77f9d6b33703e51011c16baf57afb285fc6"}, @@ -19,7 +19,7 @@ version = "1.3.0" description = "Backport of PEP 654 (exception groups)" optional = false python-versions = ">=3.7" -groups = ["main"] +groups = ["dev"] markers = "python_version < \"3.11\"" files = [ {file = "exceptiongroup-1.3.0-py3-none-any.whl", hash = "sha256:4d111e6e0c13d0644cad6ddaa7ed0261a0b36971f6d23e7ec9b4b9097da78a10"}, @@ -38,7 +38,7 @@ version = "2.1.0" description = "brain-dead simple config-ini parsing" optional = false python-versions = ">=3.8" -groups = ["main"] +groups = ["dev"] files = [ {file = "iniconfig-2.1.0-py3-none-any.whl", hash = "sha256:9deba5723312380e77435581c6bf4935c94cbfab9b1ed33ef8d238ea168eb760"}, {file = "iniconfig-2.1.0.tar.gz", hash = "sha256:3abbd2e30b36733fee78f9c7f7308f2d0050e88f0087fd25c2645f63c773e1c7"}, @@ -50,7 +50,7 @@ version = "25.0" description = "Core utilities for Python packages" optional = false python-versions = ">=3.8" -groups = ["main"] +groups = ["dev"] files = [ {file = "packaging-25.0-py3-none-any.whl", hash = "sha256:29572ef2b1f17581046b3a2227d5c611fb25ec70ca1ba8554b24b0e69331a484"}, {file = "packaging-25.0.tar.gz", hash = "sha256:d443872c98d677bf60f6a1f2f8c1cb748e8fe762d2bf9d3148b5599295b0fc4f"}, @@ -62,7 +62,7 @@ version = "1.5.0" description = "plugin and hook calling mechanisms for python" optional = false python-versions = ">=3.8" -groups = ["main"] +groups = ["dev"] files = [ {file = "pluggy-1.5.0-py3-none-any.whl", hash = "sha256:44e1ad92c8ca002de6377e165f3e0f1be63266ab4d554740532335b9d75ea669"}, {file = "pluggy-1.5.0.tar.gz", hash = "sha256:2cffa88e94fdc978c4c574f15f9e59b7f4201d439195c3715ca9e2486f1d0cf1"}, @@ -89,7 +89,7 @@ version = "8.3.5" description = "pytest: simple powerful testing with Python" optional = false python-versions = ">=3.8" -groups = ["main"] +groups = ["dev"] files = [ {file = "pytest-8.3.5-py3-none-any.whl", hash = "sha256:c69214aa47deac29fad6c2a4f590b9c4a9fdb16a403176fe154b79c0b4d4d820"}, {file = "pytest-8.3.5.tar.gz", hash = "sha256:f4efe70cc14e511565ac476b57c279e12a855b11f48f212af1080ef2263d3845"}, @@ -112,7 +112,7 @@ version = "2.2.1" description = "A lil' TOML parser" optional = false python-versions = ">=3.8" -groups = ["main"] +groups = ["dev"] markers = "python_version < \"3.11\"" files = [ {file = "tomli-2.2.1-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:678e4fa69e4575eb77d103de3df8a895e1591b48e740211bd1067378c69e8249"}, @@ -155,7 +155,7 @@ version = "4.13.2" description = "Backported and Experimental Type Hints for Python 3.8+" optional = false python-versions = ">=3.8" -groups = ["main"] +groups = ["dev"] markers = "python_version < \"3.11\"" files = [ {file = "typing_extensions-4.13.2-py3-none-any.whl", hash = "sha256:a439e7c04b49fec3e5d3e2beaa21755cadbbdc391694e28ccdd36ca4a1408f8c"}, @@ -165,4 +165,4 @@ files = [ [metadata] lock-version = "2.1" python-versions = "^3.8 || ^3.9 || ^3.10 || ^3.11" -content-hash = "bca85be723f8e0ddc479a7f7dbf75a0322a1ac05ed3b3db56789ec7a1e139eda" +content-hash = "0752d968df96a2ff8d2e69a190d2e019fcf7d765b6f5102314dee00ff1ec7510" diff --git a/pyproject.toml b/pyproject.toml index daf6bc22..eccbc249 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -8,9 +8,9 @@ license = "MIT License" [tool.poetry.dependencies] python = "^3.8 || ^3.9 || ^3.10 || ^3.11" pydal = "~20200714.1" -pytest = "^8.3.5" -[tool.poetry.dev-dependencies] +[tool.poetry.group.dev.dependencies] +pytest = "^8.3.5" [build-system] requires = ["poetry-core>=1.0.0"] From 04793f84f52816ec65ffc26de44bb537838ffd8b Mon Sep 17 00:00:00 2001 From: Nick Le Mouton Date: Sun, 5 Oct 2025 13:30:39 +1300 Subject: [PATCH 5/7] Bump github actions and update to use poetry instead of pip and pytest for tests --- .github/workflows/main.yml | 23 +++++++++-------------- 1 file changed, 9 insertions(+), 14 deletions(-) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 7e648c52..760cb3c8 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -26,20 +26,15 @@ jobs: # Steps represent a sequence of tasks that will be executed as part of the job steps: # Checks-out your repository under $GITHUB_WORKSPACE, so your job can access it - - uses: actions/checkout@v3 - # install newest pip + - uses: actions/checkout@v4 - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v4 + uses: actions/setup-python@v6 with: python-version: ${{ matrix.python-version }} - - name: Install pip - run: python -m pip install --upgrade pip setuptools wheel - # install requirements - - name: Install requirements - run: | - python -m pip install --upgrade pip - pip install -r requirements.txt - # Runs a set of commands using the runners shell - - name: Run pytm tests - run: | - python3 -m unittest + cache: poetry + - name: Install Poetry + run: python -m pip install --upgrade pip poetry + - name: Install dependencies + run: poetry install --with dev + - name: Run tests + run: poetry run pytest From b182cedc7a00f20a76c5f93996340a961e56d177 Mon Sep 17 00:00:00 2001 From: Nick Le Mouton Date: Sun, 5 Oct 2025 14:30:26 +1300 Subject: [PATCH 6/7] Fix poetry cache/install --- .github/workflows/main.yml | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 760cb3c8..be34ab4e 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -25,16 +25,16 @@ jobs: # Steps represent a sequence of tasks that will be executed as part of the job steps: - # Checks-out your repository under $GITHUB_WORKSPACE, so your job can access it - - uses: actions/checkout@v4 - - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v6 - with: - python-version: ${{ matrix.python-version }} - cache: poetry - - name: Install Poetry - run: python -m pip install --upgrade pip poetry - - name: Install dependencies - run: poetry install --with dev - - name: Run tests - run: poetry run pytest + # Checks-out your repository under $GITHUB_WORKSPACE, so your job can access it + - uses: actions/checkout@v4 + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v6 + with: + python-version: ${{ matrix.python-version }} + cache: 'poetry' + - name: Install Poetry + run: pip install poetry + - name: Install dependencies + run: poetry install --with dev + - name: Run tests + run: poetry run pytest From e93f815efe934e471da3e536b5bc633af7822c2c Mon Sep 17 00:00:00 2001 From: Nick Le Mouton Date: Sun, 5 Oct 2025 14:51:21 +1300 Subject: [PATCH 7/7] Add legacy-cgi for pydal on later versions of python Add unit tests for sqldump --- poetry.lock | 15 ++++++++- pyproject.toml | 1 + tests/test_sql_dump.py | 72 ++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 87 insertions(+), 1 deletion(-) create mode 100644 tests/test_sql_dump.py diff --git a/poetry.lock b/poetry.lock index 34acb681..e2cdd814 100644 --- a/poetry.lock +++ b/poetry.lock @@ -44,6 +44,19 @@ files = [ {file = "iniconfig-2.1.0.tar.gz", hash = "sha256:3abbd2e30b36733fee78f9c7f7308f2d0050e88f0087fd25c2645f63c773e1c7"}, ] +[[package]] +name = "legacy-cgi" +version = "2.6.3" +description = "Fork of the standard library cgi and cgitb modules removed in Python 3.13" +optional = false +python-versions = ">=3.8" +groups = ["main"] +markers = "python_version >= \"3.13\"" +files = [ + {file = "legacy_cgi-2.6.3-py3-none-any.whl", hash = "sha256:6df2ea5ae14c71ef6f097f8b6372b44f6685283dc018535a75c924564183cdab"}, + {file = "legacy_cgi-2.6.3.tar.gz", hash = "sha256:4c119d6cb8e9d8b6ad7cc0ddad880552c62df4029622835d06dfd18f438a8154"}, +] + [[package]] name = "packaging" version = "25.0" @@ -165,4 +178,4 @@ files = [ [metadata] lock-version = "2.1" python-versions = "^3.8 || ^3.9 || ^3.10 || ^3.11" -content-hash = "0752d968df96a2ff8d2e69a190d2e019fcf7d765b6f5102314dee00ff1ec7510" +content-hash = "32e73813213e418f75026fe31b272057de5677df56b7196fe23058d57671d53a" diff --git a/pyproject.toml b/pyproject.toml index eccbc249..39ab73b3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -8,6 +8,7 @@ license = "MIT License" [tool.poetry.dependencies] python = "^3.8 || ^3.9 || ^3.10 || ^3.11" pydal = "~20200714.1" +legacy-cgi = { version = "^2.0", markers = "python_version >= '3.13'" } [tool.poetry.group.dev.dependencies] pytest = "^8.3.5" diff --git a/tests/test_sql_dump.py b/tests/test_sql_dump.py new file mode 100644 index 00000000..7056e2d5 --- /dev/null +++ b/tests/test_sql_dump.py @@ -0,0 +1,72 @@ +import random +import sqlite3 +from pathlib import Path + +import pytest + +from pytm import Boundary, Server, Threat, TM + + +@pytest.fixture +def sample_tm(): + TM.reset() + random.seed(0) + tm = TM("sql dump tm", description="desc") + + internet = Boundary("Internet") + server_db = Boundary("Server/DB", inBoundary=internet) + Server("Web Server", inBoundary=server_db) + + TM._threats = [ + Threat( + SID="SRV001", + description="Server threat", + severity="High", + target="Server", + ) + ] + + tm.resolve() + assert tm.findings, "Expected at least one finding for sqlDump tests" + return tm + + +def _open_connection(tmp_path: Path) -> sqlite3.Connection: + db_path = tmp_path / "sqldump" / "test.db" + return sqlite3.connect(db_path) + + +def test_sql_dump_creates_serialized_columns(sample_tm, tmp_path, monkeypatch): + monkeypatch.chdir(tmp_path) + + sample_tm.sqlDump("test.db") + + with _open_connection(tmp_path) as conn: + column_names = { + column_info[1].lower() + for column_info in conn.execute("PRAGMA table_info(Boundary)") + } + + assert {"name", "inscope", "inboundary"}.issubset(column_names) + + +def test_sql_dump_persists_element_and_finding_data(sample_tm, tmp_path, monkeypatch): + monkeypatch.chdir(tmp_path) + + sample_tm.sqlDump("test.db") + + with _open_connection(tmp_path) as conn: + boundary_rows = conn.execute( + "SELECT name, inBoundary FROM Boundary ORDER BY id" + ).fetchall() + server_rows = conn.execute( + "SELECT name, inBoundary FROM Server ORDER BY id" + ).fetchall() + finding_rows = conn.execute( + "SELECT threat_id FROM Finding ORDER BY id" + ).fetchall() + + assert ("Internet", None) in boundary_rows + assert ("Server/DB", "Internet") in boundary_rows + assert ("Web Server", "Server/DB") in server_rows + assert [row[0] for row in finding_rows] == ["SRV001"] \ No newline at end of file