diff --git a/framework/python/src/core/testrun.py b/framework/python/src/core/testrun.py index a172f3511..d11527316 100644 --- a/framework/python/src/core/testrun.py +++ b/framework/python/src/core/testrun.py @@ -506,7 +506,7 @@ def _stop_ui(self): if container is not None: container.kill() except docker.errors.NotFound: - return + pass def start_ws(self): @@ -542,4 +542,4 @@ def _stop_ws(self): if container is not None: container.kill() except docker.errors.NotFound: - return + pass diff --git a/framework/python/src/net_orc/ip_control.py b/framework/python/src/net_orc/ip_control.py index 06e0f4de4..890277963 100644 --- a/framework/python/src/net_orc/ip_control.py +++ b/framework/python/src/net_orc/ip_control.py @@ -258,4 +258,4 @@ def get_sys_interfaces() -> t.Dict[str, t.Dict[str, str]]: ifaces[key] = nic[0].address - return ifaces \ No newline at end of file + return ifaces diff --git a/modules/test/base/python/src/test_module.py b/modules/test/base/python/src/test_module.py index a59a05239..dcd9be905 100644 --- a/modules/test/base/python/src/test_module.py +++ b/modules/test/base/python/src/test_module.py @@ -48,9 +48,9 @@ def __init__(self, def _add_logger(self, log_name, module_name, log_dir=None): global LOGGER - LOGGER = logger.get_logger(name=log_name, + LOGGER = logger.get_logger(name=log_name, # pylint: disable=E1123 log_file=module_name, - log_dir=log_dir) # pylint: disable=E1123 + log_dir=log_dir) def generate_module_report(self): pass @@ -187,7 +187,7 @@ def _write_results(self, results): def _get_device_ipv4(self): command = f"""/testrun/bin/get_ipv4_addr {self._ipv4_subnet} {self._device_mac.upper()}""" - text = util.run_command(command)[0] + text = util.run_command(command)[0] # pylint: disable=E1120 if text: return text.split('\n')[0] return None diff --git a/modules/test/conn/python/src/connection_module.py b/modules/test/conn/python/src/connection_module.py index 5e8b78ec3..aaaf51638 100644 --- a/modules/test/conn/python/src/connection_module.py +++ b/modules/test/conn/python/src/connection_module.py @@ -432,7 +432,7 @@ def _ping(self, host, ipv6=False): cmd += ' -6 ' if ipv6 else '' cmd += str(host) #cmd = 'ping -c 1 ' + str(host) - success = util.run_command(cmd, output=False) + success = util.run_command(cmd, output=False) # pylint: disable=E1120 return success def restore_failover_dhcp_server(self, subnet): diff --git a/modules/test/conn/python/src/dhcp_util.py b/modules/test/conn/python/src/dhcp_util.py index be5f0cac2..3654d0401 100644 --- a/modules/test/conn/python/src/dhcp_util.py +++ b/modules/test/conn/python/src/dhcp_util.py @@ -207,7 +207,7 @@ def is_lease_active(self, lease): def ping(self, host): cmd = 'ping -c 1 ' + str(host) - success = util.run_command(cmd, output=False) + success = util.run_command(cmd, output=False) # pylint: disable=E1120 return success def add_reserved_lease(self, diff --git a/modules/test/ntp/python/src/ntp_module.py b/modules/test/ntp/python/src/ntp_module.py index d9d277534..033e98974 100644 --- a/modules/test/ntp/python/src/ntp_module.py +++ b/modules/test/ntp/python/src/ntp_module.py @@ -171,6 +171,10 @@ def extract_ntp_data(self): # Local NTP server syncs to external servers so we need to filter only # for traffic to/from the device if self._device_mac in (source_mac, destination_mac): + + source_ip = None + dest_ip = None + if IP in packet: source_ip = packet[IP].src dest_ip = packet[IP].dst @@ -218,6 +222,9 @@ def _ntp_network_ntp_support(self): for packet in packet_capture: if NTP in packet and packet.src == self._device_mac: + + dest_ip = None + if IP in packet: dest_ip = packet[IP].dst elif IPv6 in packet: @@ -229,9 +236,9 @@ def _ntp_network_ntp_support(self): device_sends_ntp3 = True LOGGER.info(f'Device sent NTPv3 request to {dest_ip}') - if not (device_sends_ntp3 or device_sends_ntp4): - result = False, 'Device has not sent any NTP requests' - elif device_sends_ntp3 and device_sends_ntp4: + result = False, 'Device has not sent any NTP requests' + + if device_sends_ntp3 and device_sends_ntp4: result = False, ('Device sent NTPv3 and NTPv4 packets. ' + 'NTPv3 is not allowed') elif device_sends_ntp3: @@ -239,6 +246,7 @@ def _ntp_network_ntp_support(self): 'NTPv3 is not allowed') elif device_sends_ntp4: result = True, 'Device sent NTPv4 packets' + LOGGER.info(result[1]) return result @@ -255,6 +263,7 @@ def _ntp_network_ntp_dhcp(self): for packet in packet_capture: if NTP in packet and packet.src == self._device_mac: device_sends_ntp = True + dest_ip = None if IP in packet: dest_ip = packet[IP].dst elif IPv6 in packet: @@ -266,6 +275,8 @@ def _ntp_network_ntp_dhcp(self): LOGGER.info('Device sent NTP request to non-DHCP provided NTP server') ntp_to_remote = True + result = 'Feature Not Detected', 'Device has not sent any NTP requests' + if device_sends_ntp: if ntp_to_local and ntp_to_remote: result = False, ('Device sent NTP request to DHCP provided ' + @@ -275,8 +286,6 @@ def _ntp_network_ntp_dhcp(self): 'Device sent NTP request to non-DHCP provided server') elif ntp_to_local: result = True, 'Device sent NTP request to DHCP provided server' - else: - result = 'Feature Not Detected', 'Device has not sent any NTP requests' LOGGER.info(result[1]) return result diff --git a/modules/test/services/python/src/services_module.py b/modules/test/services/python/src/services_module.py index bfa232c87..b14c74234 100644 --- a/modules/test/services/python/src/services_module.py +++ b/modules/test/services/python/src/services_module.py @@ -200,7 +200,7 @@ def _process_port_results(self): def _scan_tcp_ports(self): max_port = 1000 LOGGER.info('Running nmap TCP port scan') - nmap_results = util.run_command( + nmap_results = util.run_command( # pylint: disable=E1120 f'''nmap --open -sT -sV -Pn -v -p 1-{max_port} --version-intensity 7 -T4 -oX - {self._ipv4_addr}''')[0] @@ -228,7 +228,7 @@ def _scan_udp_ports(self): port_list = ','.join(ports) LOGGER.info('Running nmap UDP port scan') LOGGER.debug('UDP ports: ' + str(port_list)) - nmap_results = util.run_command( + nmap_results = util.run_command( # pylint: disable=E1120 f'nmap -sU -sV -p {port_list} -oX - {self._ipv4_addr}')[0] LOGGER.info('UDP port scan complete') nmap_results_json = self._nmap_results_to_json(nmap_results) diff --git a/modules/test/tls/python/src/tls_util.py b/modules/test/tls/python/src/tls_util.py index d8c1d7a16..bee4015af 100644 --- a/modules/test/tls/python/src/tls_util.py +++ b/modules/test/tls/python/src/tls_util.py @@ -372,6 +372,8 @@ def validate_tls_server(self, host, tls_version): public_key = self.get_public_key(public_cert) if public_key: key_valid = self.verify_public_key(public_key) + else: + key_valid = [0] sig_valid = self.validate_signature(host) diff --git a/testing/api/profiles/new_profile.json b/testing/api/profiles/new_profile.json new file mode 100644 index 000000000..7043f6cfa --- /dev/null +++ b/testing/api/profiles/new_profile.json @@ -0,0 +1,145 @@ +{ + "name": "New Profile", + "status": "Draft", + "created": "2024-05-23 12:38:26", + "version": "v1.3", + "questions": [ + { + "question": "What type of device is this?", + "type": "select", + "options": [ + "IoT Sensor", + "IoT Controller", + "Smart Device", + "Something else" + ], + "answer": "IoT Sensor", + "validation": { + "required": true + } + }, + { + "question": "How will this device be used at Google?", + "type": "text-long", + "answer": "Installed in a building", + "validation": { + "max": "128", + "required": true + } + }, + { + "question": "What is the email of the device owner(s)?", + "type": "email-multiple", + "answer": "boddey@google.com, cmeredith@google.com", + "validation": { + "required": true, + "max": "128" + } + }, + { + "question": "Is this device going to be managed by Google or a third party?", + "type": "select", + "options": [ + "Google", + "Third Party" + ], + "answer": "Google", + "validation": { + "required": true + } + }, + { + "question": "Will the third-party device administrator be able to grant access to authorized Google personnel upon request?", + "type": "select", + "options": [ + "Yes", + "No", + "N/A" + ], + "default": "N/A", + "answer": "Yes", + "validation": { + "required": true + } + }, + { + "question": "Are any of the following statements true about your device?", + "description": "This tells us about the data your device will collect", + "type": "select-multiple", + "answer": [ + 0, + 2 + ], + "options": [ + "The device collects any Personal Identifiable Information (PII) or Personal Health Information (PHI)", + "The device collects intellectual property and trade secrets, sensitive business data, critical infrastructure data, identity assets", + "The device stream confidential business data in real-time (seconds)?" + ] + }, + { + "question": "Which of the following statements are true about this device?", + "description": "This tells us about the types of data that are transmitted from this device and how the transmission is performed from a technical standpoint.", + "type": "select-multiple", + "answer": [ + 0, + 1, + 5 + ], + "options": [ + "PII/PHI, confidential business data, or crown jewel data is transmitted to a destination outside Alphabet's ownership", + "Data transmission occurs across less-trusted networks (e.g. the internet).", + "A failure in data transmission would likely have a substantial negative impact (https://www.rra.rocks/docs/standard_levels#levels-definitions)", + "A confidentiality breach during transmission would have a substantial negative impact", + "The device encrypts data during transmission", + "The device network protocol is well-established and currently used by Google" + ] + }, + { + "question": "Does the network protocol assure server-to-client identity verification?", + "type": "select", + "answer": "Yes", + "options": [ + "Yes", + "No", + "I don't know" + ], + "validation": { + "required": true + } + }, + { + "question": "Click the statements that best describe the characteristics of this device.", + "description": "This tells us about how this device is managed remotely.", + "type": "select-multiple", + "answer": [ + 0, + 1, + 2 + ], + "options": [ + "PII/PHI, or confidential business data is accessible from the device without authentication", + "Unrecoverable actions (e.g. disk wipe) can be performed remotely", + "Authentication is required for remote access", + "The management interface is accessible from the public internet", + "Static credentials are used for administration" + ] + }, + { + "question": "Are any of the following statements true about this device?", + "description": "This informs us about what other systems and processes this device is a part of.", + "type": "select-multiple", + "answer": [ + 2, + 3 + ], + "options": [ + "The device monitors an environment for active risks to human life.", + "The device is used to convey people, or critical property.", + "The device controls robotics in human-accessible spaces.", + "The device controls physical access systems.", + "The device is involved in processes required by regulations, or compliance. (ex. privacy, security, safety regulations)", + "The device's failure would cause faults in other high-criticality processes." + ] + } + ] +} \ No newline at end of file diff --git a/testing/api/profiles/new_profile_2.json b/testing/api/profiles/new_profile_2.json new file mode 100644 index 000000000..51782e187 --- /dev/null +++ b/testing/api/profiles/new_profile_2.json @@ -0,0 +1,145 @@ +{ + "name": "New Profile 2", + "status": "Draft", + "created": "2024-05-23 12:38:26", + "version": "v1.3", + "questions": [ + { + "question": "What type of device is this?", + "type": "select", + "options": [ + "IoT Sensor", + "IoT Controller", + "Smart Device", + "Something else" + ], + "answer": "IoT Sensor", + "validation": { + "required": true + } + }, + { + "question": "How will this device be used at Google?", + "type": "text-long", + "answer": "Installed in a building", + "validation": { + "max": "128", + "required": true + } + }, + { + "question": "What is the email of the device owner(s)?", + "type": "email-multiple", + "answer": "boddey@google.com, cmeredith@google.com", + "validation": { + "required": true, + "max": "128" + } + }, + { + "question": "Is this device going to be managed by Google or a third party?", + "type": "select", + "options": [ + "Google", + "Third Party" + ], + "answer": "Google", + "validation": { + "required": true + } + }, + { + "question": "Will the third-party device administrator be able to grant access to authorized Google personnel upon request?", + "type": "select", + "options": [ + "Yes", + "No", + "N/A" + ], + "default": "N/A", + "answer": "Yes", + "validation": { + "required": true + } + }, + { + "question": "Are any of the following statements true about your device?", + "description": "This tells us about the data your device will collect", + "type": "select-multiple", + "answer": [ + 0, + 2 + ], + "options": [ + "The device collects any Personal Identifiable Information (PII) or Personal Health Information (PHI)", + "The device collects intellectual property and trade secrets, sensitive business data, critical infrastructure data, identity assets", + "The device stream confidential business data in real-time (seconds)?" + ] + }, + { + "question": "Which of the following statements are true about this device?", + "description": "This tells us about the types of data that are transmitted from this device and how the transmission is performed from a technical standpoint.", + "type": "select-multiple", + "answer": [ + 0, + 1, + 5 + ], + "options": [ + "PII/PHI, confidential business data, or crown jewel data is transmitted to a destination outside Alphabet's ownership", + "Data transmission occurs across less-trusted networks (e.g. the internet).", + "A failure in data transmission would likely have a substantial negative impact (https://www.rra.rocks/docs/standard_levels#levels-definitions)", + "A confidentiality breach during transmission would have a substantial negative impact", + "The device encrypts data during transmission", + "The device network protocol is well-established and currently used by Google" + ] + }, + { + "question": "Does the network protocol assure server-to-client identity verification?", + "type": "select", + "answer": "Yes", + "options": [ + "Yes", + "No", + "I don't know" + ], + "validation": { + "required": true + } + }, + { + "question": "Click the statements that best describe the characteristics of this device.", + "description": "This tells us about how this device is managed remotely.", + "type": "select-multiple", + "answer": [ + 0, + 1, + 2 + ], + "options": [ + "PII/PHI, or confidential business data is accessible from the device without authentication", + "Unrecoverable actions (e.g. disk wipe) can be performed remotely", + "Authentication is required for remote access", + "The management interface is accessible from the public internet", + "Static credentials are used for administration" + ] + }, + { + "question": "Are any of the following statements true about this device?", + "description": "This informs us about what other systems and processes this device is a part of.", + "type": "select-multiple", + "answer": [ + 2, + 3 + ], + "options": [ + "The device monitors an environment for active risks to human life.", + "The device is used to convey people, or critical property.", + "The device controls robotics in human-accessible spaces.", + "The device controls physical access systems.", + "The device is involved in processes required by regulations, or compliance. (ex. privacy, security, safety regulations)", + "The device's failure would cause faults in other high-criticality processes." + ] + } + ] +} \ No newline at end of file diff --git a/testing/api/profiles/updated_profile.json b/testing/api/profiles/updated_profile.json new file mode 100644 index 000000000..a659cd937 --- /dev/null +++ b/testing/api/profiles/updated_profile.json @@ -0,0 +1,146 @@ +{ + "name": "New Profile", + "rename": "Updated Profile", + "status": "Draft", + "created": "2024-05-23 12:38:26", + "version": "v1.3", + "questions": [ + { + "question": "What type of device is this?", + "type": "select", + "options": [ + "IoT Sensor", + "IoT Controller", + "Smart Device", + "Something else" + ], + "answer": "IoT Sensor", + "validation": { + "required": true + } + }, + { + "question": "How will this device be used at Google?", + "type": "text-long", + "answer": "Installed in a building", + "validation": { + "max": "128", + "required": true + } + }, + { + "question": "What is the email of the device owner(s)?", + "type": "email-multiple", + "answer": "boddey@google.com, cmeredith@google.com", + "validation": { + "required": true, + "max": "128" + } + }, + { + "question": "Is this device going to be managed by Google or a third party?", + "type": "select", + "options": [ + "Google", + "Third Party" + ], + "answer": "Google", + "validation": { + "required": true + } + }, + { + "question": "Will the third-party device administrator be able to grant access to authorized Google personnel upon request?", + "type": "select", + "options": [ + "Yes", + "No", + "N/A" + ], + "default": "N/A", + "answer": "Yes", + "validation": { + "required": true + } + }, + { + "question": "Are any of the following statements true about your device?", + "description": "This tells us about the data your device will collect", + "type": "select-multiple", + "answer": [ + 0, + 2 + ], + "options": [ + "The device collects any Personal Identifiable Information (PII) or Personal Health Information (PHI)", + "The device collects intellectual property and trade secrets, sensitive business data, critical infrastructure data, identity assets", + "The device stream confidential business data in real-time (seconds)?" + ] + }, + { + "question": "Which of the following statements are true about this device?", + "description": "This tells us about the types of data that are transmitted from this device and how the transmission is performed from a technical standpoint.", + "type": "select-multiple", + "answer": [ + 0, + 1, + 5 + ], + "options": [ + "PII/PHI, confidential business data, or crown jewel data is transmitted to a destination outside Alphabet's ownership", + "Data transmission occurs across less-trusted networks (e.g. the internet).", + "A failure in data transmission would likely have a substantial negative impact (https://www.rra.rocks/docs/standard_levels#levels-definitions)", + "A confidentiality breach during transmission would have a substantial negative impact", + "The device encrypts data during transmission", + "The device network protocol is well-established and currently used by Google" + ] + }, + { + "question": "Does the network protocol assure server-to-client identity verification?", + "type": "select", + "answer": "Yes", + "options": [ + "Yes", + "No", + "I don't know" + ], + "validation": { + "required": true + } + }, + { + "question": "Click the statements that best describe the characteristics of this device.", + "description": "This tells us about how this device is managed remotely.", + "type": "select-multiple", + "answer": [ + 0, + 1, + 2 + ], + "options": [ + "PII/PHI, or confidential business data is accessible from the device without authentication", + "Unrecoverable actions (e.g. disk wipe) can be performed remotely", + "Authentication is required for remote access", + "The management interface is accessible from the public internet", + "Static credentials are used for administration" + ] + }, + { + "question": "Are any of the following statements true about this device?", + "description": "This informs us about what other systems and processes this device is a part of.", + "type": "select-multiple", + "answer": [ + 2, + 3 + ], + "options": [ + "The device monitors an environment for active risks to human life.", + "The device is used to convey people, or critical property.", + "The device controls robotics in human-accessible spaces.", + "The device controls physical access systems.", + "The device is involved in processes required by regulations, or compliance. (ex. privacy, security, safety regulations)", + "The device's failure would cause faults in other high-criticality processes." + ] + } + ] +} \ No newline at end of file diff --git a/testing/api/test_api.py b/testing/api/test_api.py index 0ad0185aa..33163cecc 100644 --- a/testing/api/test_api.py +++ b/testing/api/test_api.py @@ -36,6 +36,7 @@ DEVICES_DIRECTORY = "local/devices" TESTING_DEVICES = "../device_configs" +PROFILES_DIRECTORY = "local/risk_profiles" SYSTEM_CONFIG_PATH = "local/system.json" BASELINE_MAC_ADDR = "02:42:aa:00:01:01" @@ -49,14 +50,14 @@ def pretty_print(dictionary: dict): def query_system_status() -> str: """Query system status from API and returns this""" r = requests.get(f"{API}/system/status", timeout=5) - response = json.loads(r.text) + response = r.json() return response["status"] def query_test_count() -> int: """Queries status and returns number of test results""" r = requests.get(f"{API}/system/status", timeout=5) - response = json.loads(r.text) + response = r.json() return len(response["tests"]["results"]) @@ -88,7 +89,6 @@ def stop_test_device(device_name): ) print(cmd.stdout) - def docker_logs(device_name): """ Print docker logs from given docker container name """ cmd = subprocess.run( @@ -228,7 +228,7 @@ def local_get_devices(): def test_get_system_interfaces(testrun): # pylint: disable=W0613 """Tests API system interfaces against actual local interfaces""" r = requests.get(f"{API}/system/interfaces", timeout=5) - response = json.loads(r.text) + response = r.json() local_interfaces = get_network_interfaces() assert set(response.keys()) == set(local_interfaces) @@ -270,7 +270,7 @@ def test_status_in_progress(testing_devices, testrun): # pylint: disable=W0613 def test_status_non_compliant(testing_devices, testrun): # pylint: disable=W0613 r = requests.get(f"{API}/devices", timeout=5) - all_devices = json.loads(r.text) + all_devices = r.json() payload = { "device": { "mac_addr": all_devices[0]["mac_addr"], @@ -298,6 +298,7 @@ def test_status_non_compliant(testing_devices, testrun): # pylint: disable=W0613 stop_test_device("x123") + def test_create_get_devices(empty_devices_dir, testrun): # pylint: disable=W0613 device_1 = { "manufacturer": "Google", @@ -337,7 +338,7 @@ def test_create_get_devices(empty_devices_dir, testrun): # pylint: disable=W0613 # Test that returned devices API endpoint matches expected structure r = requests.get(f"{API}/devices", timeout=5) - all_devices = json.loads(r.text) + all_devices = r.json() pretty_print(all_devices) with open( @@ -414,7 +415,7 @@ def test_delete_device_success(empty_devices_dir, testrun): # pylint: disable=W0 # Test that returned devices API endpoint matches expected structure r = requests.get(f"{API}/devices", timeout=5) - all_devices = json.loads(r.text) + all_devices = r.json() pretty_print(all_devices) with open( @@ -555,7 +556,16 @@ def test_delete_device_testrun_running(testing_devices, testrun): # pylint: disa def test_start_testrun_started_successfully( testing_devices, # pylint: disable=W0613 testrun): # pylint: disable=W0613 - payload = {"device": {"mac_addr": BASELINE_MAC_ADDR, "firmware": "asd"}} + payload = {"device": { + "mac_addr": BASELINE_MAC_ADDR, + "firmware": "asd", + "test_modules": { + "dns": {"enabled": False}, + "connection": {"enabled": True}, + "ntp": {"enabled": False}, + "baseline": {"enabled": False}, + "nmap": {"enabled": False} + }}} r = requests.post(f"{API}/system/start", data=json.dumps(payload), timeout=10) assert r.status_code == 200 @@ -584,6 +594,7 @@ def test_start_testrun_already_in_progress( r = requests.post(f"{API}/system/start", data=json.dumps(payload), timeout=10) assert r.status_code == 409 + def test_start_system_not_configured_correctly( empty_devices_dir, # pylint: disable=W0613 testrun): # pylint: disable=W0613 @@ -741,7 +752,7 @@ def test_device_edit_device( new_model = "Alphabet" r = requests.get(f"{API}/devices", timeout=5) - all_devices = json.loads(r.text) + all_devices = r.json() api_device = next(x for x in all_devices if x["mac_addr"] == mac_addr) @@ -771,7 +782,7 @@ def test_device_edit_device( assert r.status_code == 200 r = requests.get(f"{API}/devices", timeout=5) - all_devices = json.loads(r.text) + all_devices = r.json() updated_device_api = next(x for x in all_devices if x["mac_addr"] == mac_addr) assert updated_device_api["model"] == new_model @@ -909,9 +920,10 @@ def test_device_edit_device_with_mac_already_exists( def test_system_latest_version(testrun): # pylint: disable=W0613 r = requests.get(f"{API}/system/version", timeout=5) assert r.status_code == 200 - updated_system_version = json.loads(r.text)["update_available"] + updated_system_version = r.json()["update_available"] assert updated_system_version is False + def test_get_system_config(testrun): # pylint: disable=W0613 r = requests.get(f"{API}/system/config", timeout=5) @@ -921,7 +933,7 @@ def test_get_system_config(testrun): # pylint: disable=W0613 ) as f: local_config = json.load(f) - api_config = json.loads(r.text) + api_config = r.json() # validate structure assert set(dict_paths(api_config)) | set(dict_paths(local_config)) == set( @@ -940,7 +952,7 @@ def test_get_system_config(testrun): # pylint: disable=W0613 def test_invalid_path_get(testrun): # pylint: disable=W0613 r = requests.get(f"{API}/blah/blah", timeout=5) - response = json.loads(r.text) + response = r.json() assert r.status_code == 404 with open( os.path.join(os.path.dirname(__file__), "mockito/invalid_request.json"), @@ -976,7 +988,7 @@ def test_trigger_run(testing_devices, testrun): # pylint: disable=W0613 # Validate response r = requests.get(f"{API}/system/status", timeout=5) - response = json.loads(r.text) + response = r.json() pretty_print(response) # Validate results @@ -1030,14 +1042,14 @@ def test_stop_running_test(testing_devices, testrun): # pylint: disable=W0613 # Validate response r = requests.post(f"{API}/system/stop", timeout=5) - response = json.loads(r.text) + response = r.json() pretty_print(response) assert response == {"success": "Testrun stopped"} time.sleep(1) # Validate response r = requests.get(f"{API}/system/status", timeout=5) - response = json.loads(r.text) + response = r.json() pretty_print(response) assert response["status"] == "Cancelled" @@ -1047,7 +1059,7 @@ def test_stop_running_not_running(testrun): # pylint: disable=W0613 # Validate response r = requests.post(f"{API}/system/stop", timeout=10) - response = json.loads(r.text) + response = r.json() pretty_print(response) assert r.status_code == 404 @@ -1079,7 +1091,7 @@ def test_multiple_runs(testing_devices, testrun): # pylint: disable=W0613 # Validate response r = requests.get(f"{API}/system/status", timeout=5) - response = json.loads(r.text) + response = r.json() pretty_print(response) # Validate results @@ -1136,3 +1148,299 @@ def test_create_invalid_chars(empty_devices_dir, testrun): # pylint: disable=W06 timeout=5) print(r.text) print(r.status_code) + + +# Tests for profile endpoints +def delete_all_profiles(): + """Utility method to delete all profiles from risk_profiles folder""" + profiles_path = Path(PROFILES_DIRECTORY) + + try: + # Check if the profile_path (local/risk_profiles) exists and is a folder + if profiles_path.exists() and profiles_path.is_dir(): + # Iterate over all profiles from risk_profiles folder + for item in profiles_path.iterdir(): + # Check if item is a file + if item.is_file(): + #If True remove file + item.unlink() + else: + # If item is a folder remove it + shutil.rmtree(item) + + except PermissionError: + # Permission related issues + print(f"Permission Denied: {item}") + except OSError as err: + # System related issues + print(f"Error removing {item}: {err}") + +def create_profile(file_name): + """Utility method to create the profile""" + # Load the profile + new_profile = load_profile(file_name) + # Assign the profile name to profile_name + profile_name = new_profile["name"] + + # Exception if the profile already exists + if profile_exists(profile_name): + raise ValueError(f"Profile: {profile_name} exists") + + # Send the post request + r = requests.post(f"{API}/profiles", data=json.dumps(new_profile), timeout=5) + + # Exception if status code is not 201 + if r.status_code != 201: + raise ValueError(f"API request failed with code: {r.status_code}") + + # Return the profile + return new_profile + + +@pytest.fixture() +def reset_profiles(): + """Delete the profiles before and after each test""" + # Delete before the test + delete_all_profiles() + yield + # Delete after the test + delete_all_profiles() + +@pytest.fixture() +def add_profile(): + """Fixture to create profiles during tests.""" + # Returning the reference to create_profile + return create_profile + +def load_profile(file_name): + """Utility method to load the profiles from 'testing/api/profiles' """ + # Construct the file path + file_path = os.path.join(os.path.dirname(__file__), "profiles", file_name) + # Open the file in read mode + with open(file_path, "r", encoding="utf-8") as file: + # Return the file content + return json.load(file) + +def profile_exists(profile_name): + """Utility method to check if profile exists""" + # Send the get request + r = requests.get(f"{API}/profiles", timeout=5) + # Check if status code is not 200 (OK) + if r.status_code != 200: + raise ValueError(f"Api request failed with code: {r.status_code}") + # Parse the JSON response to get the list of profiles + profiles = r.json() + # Return if name is in the list of profiles + return any(p["name"] == profile_name for p in profiles) + +def test_get_profiles_format(testrun): # pylint: disable=W0613 + """Test profiles format""" + # Send the get request + r = requests.get(f"{API}/profiles/format", timeout=5) + # Check if status code is 200 (OK) + assert r.status_code == 200 + # Parse the response + response = r.json() + # Check if the response is a list + assert isinstance(response, list) + + # Check that each item in the response has keys "questions" and "type" + for item in response: + assert "question" in item + assert "type" in item + +def test_get_profiles(testrun, reset_profiles, add_profile): # pylint: disable=W0613 + """Test for get profiles (no profile, one profile, two profiles)""" + + # Test for no profiles + + # Send the get request to "/profiles" endpoint + r = requests.get(f"{API}/profiles", timeout=5) + # Check if status code is 200 (OK) + assert r.status_code == 200 + # Parse the response (profiles) + response = r.json() + # Check if response is a list + assert isinstance(response, list) + # Check if the list is empty + assert len(response) == 0 + + # Test for one profile + + # Load the profile using add_profile fixture + add_profile("new_profile.json") + + # Send get request to the "/profiles" endpoint + r = requests.get(f"{API}/profiles", timeout=5) + # Check if status code is 200 (OK) + assert r.status_code == 200 + # Parse the response (profiles) + response = r.json() + # Check if response is a list + assert isinstance(response, list) + # Check if response contains one profile + assert len(response) == 1 + + # Check that each profile has the expected fields + for profile in response: + # Check if "name" key exists in profile + assert "name" in profile + # Check if "status" key exists in profile + assert "status" in profile + # Check if "created" key exists in profile + assert "created" in profile + # Check if "version" key exists in profile + assert "version" in profile + # Check if "questions" key exists in profile + assert "questions" in profile + + # Check if "questions" value is a list + assert isinstance(profile["questions"], list) + + #check that "questions" value has the expected fields + for element in profile["questions"]: + # Check if each element is dict + assert isinstance(element, dict) + # Check if "question" key is in dict element + assert "question" in element + # Check if "type" key is in dict element + assert "type" in element + # Check if "asnswer" key is in dict element + assert "answer" in element + + # Test for two profiles + + # Load the profile using add_profile fixture + add_profile("new_profile_2.json") + + # Send the get request to "/profiles" endpoint + r = requests.get(f"{API}/profiles", timeout=5) + + # Check if status code is 200 (OK) + assert r.status_code == 200 + # Parse the response (profiles) + response = r.json() + # Check if response is a list + assert isinstance(response, list) + # Check if response contains two profiles + assert len(response) == 2 + +def test_create_profile(testrun, reset_profiles): # pylint: disable=W0613 + """Test for create profile if not exists""" + + # Load the profile + new_profile = load_profile("new_profile.json") + # Assign the profile name to profile_name + profile_name = new_profile["name"] + + # Check if the profile already exists + if profile_exists(profile_name): + raise ValueError(f"Profile: {profile_name} exists") + + # Send the post request + r = requests.post(f"{API}/profiles", data=json.dumps(new_profile), timeout=5) + + # Check if status code is 201 (Created) + assert r.status_code == 201 + # Parse the response + response = r.json() + # Check if "success" key in response + assert "success" in response + + # Verify profile creation + r = requests.get(f"{API}/profiles", timeout=5) + + # Check if status code is 200 (OK) + assert r.status_code == 200 + # Parse the response + profiles = r.json() + + # Iterate through all the profiles to find the profile based on the "name" + created_profile = next( + (p for p in profiles if p["name"] == profile_name), None + ) + # Check if profile was created + assert created_profile is not None + +def test_update_profile(testrun, reset_profiles, add_profile): # pylint: disable=W0613 + """test for update profile when exists""" + + # Load the new profile using add_profile fixture + new_profile = add_profile("new_profile.json") + # Load the updated profile using load_profile utility method + updated_profile = load_profile("updated_profile.json") + + # Assign the new_profile name + profile_name = new_profile["name"] + # Assign the updated_profile name + updated_profile_name = updated_profile["rename"] + + # Exception if the profile does't exists + if not profile_exists(profile_name): + raise ValueError(f"Profile: {profile_name} exists") + + # Send the post request to update the profile + r = requests.post( + f"{API}/profiles", + data=json.dumps(updated_profile), + timeout=5) + + # Check if status code is 200 (OK) + assert r.status_code == 200 + # Parse the response + response = r.json() + # Check if "success" key in response + assert "success" in response + + # Get request to verify profile update + r = requests.get(f"{API}/profiles", timeout=5) + + # Check if status code is 200 (OK) + assert r.status_code == 200 + # Parse the response + profiles = r.json() + + # Iterate through the profiles to find the profile based on the updated "name" + updated_profile_check = next( + (p for p in profiles if p["name"] == updated_profile_name), + None + ) + # Check if profile was updated + assert updated_profile_check is not None + +def test_delete_profile(testrun, reset_profiles, add_profile): # pylint: disable=W0613 + """Test for delete profile""" + + # Assign the profile from the fixture + profile_to_delete = add_profile("new_profile.json") + # Assign the profile name + profile_name = profile_to_delete["name"] + + # Delete the profile + r = requests.delete( + f"{API}/profiles", + data=json.dumps(profile_to_delete), + timeout=5) + + # Check if status code is 200 (OK) + assert r.status_code == 200 + # Parse the JSON response + response = r.json() + # Check if the response contains "success" key + assert "success" in response + + # Check if the profile has been deleted + r = requests.get(f"{API}/profiles", timeout=5) + + # Check if status code is 200 (OK) + assert r.status_code == 200 + # Parse the JSON response + profiles = r.json() + + # Iterate through the profiles to find the profile based on the "name" + deleted_profile = next( + (p for p in profiles if p["name"] == profile_name), + None + ) + # Check if profile was deleted + assert deleted_profile is None diff --git a/testing/pylint/test_pylint b/testing/pylint/test_pylint index 1f71482e5..7e102c7f8 100755 --- a/testing/pylint/test_pylint +++ b/testing/pylint/test_pylint @@ -14,27 +14,35 @@ # See the License for the specific language governing permissions and # limitations under the License. -ERROR_LIMIT=25 - -sudo cmd/install +# Install python venv +python3 -m venv venv +# Activate the venv source venv/bin/activate -sudo pip3 install pylint==3.0.3 +# Install pylint +pip install pylint==3.2.6 + +# Declare the applicable files files=$(find . -path ./venv -prune -o -name '*.py' -print) +# Define the pylint output file OUT=pylint.out -rm -f $OUT && touch $OUT +# Remove it if it already exists +rm -f $OUT +# Run pylint against the target files +# Change the evaluation to total the number of errors +# Output to the specified output file pylint $files -ry --extension-pkg-allow-list=docker --evaluation="error + warning + refactor + convention" 2>/dev/null | tee -a $OUT -new_errors=$(cat $OUT | grep -oP "(?!=^Your code has been rated at)([0-9]+)(?=\.00/10[ \(]?)" ) +# Obtain the total number of errors from the pylint out file +errors=$(cat $OUT | grep -oP "(?!=^Your code has been rated at)([0-9]+)(?=\.00/10[ \(]?)" ) -echo "$new_errors > $ERROR_LIMIT?" -if (( $new_errors > $ERROR_LIMIT)); then - echo new errors $new_errors > error limit $ERROR_LIMIT - echo failing .. +# Check if any errors exist +if (( $errors > 0 )); then + echo "$errors pylint issues have been identified. These must be resolved before merging." exit 1 fi