diff --git a/src/taskgraph/util/taskcluster.py b/src/taskgraph/util/taskcluster.py index c0a20d83..ab9f8929 100644 --- a/src/taskgraph/util/taskcluster.py +++ b/src/taskgraph/util/taskcluster.py @@ -6,9 +6,10 @@ import copy import datetime import functools +import io import logging import os -from typing import Dict, List, Union +from typing import Any, Dict, List, Union import requests import taskcluster_urls as liburls @@ -69,13 +70,32 @@ def get_taskcluster_client(service: str): return getattr(taskcluster, service[0].upper() + service[1:])(options) -def _handle_artifact(path, response): +def _handle_artifact( + path: str, response: Union[requests.Response, Dict[str, Any]] +) -> Any: + if isinstance(response, dict): + # When taskcluster client returns non-JSON responses, it wraps them in {"response": } + if "response" in response and isinstance( + response["response"], requests.Response + ): + response = response["response"] + else: + # If we already a dict (parsed JSON), return it directly. + return response + + # We have a response object, load the content based on the path extension. if path.endswith(".json"): return response.json() if path.endswith(".yml"): return yaml.load_stream(response.content) + # Otherwise return raw response content + if hasattr(response.raw, "tell") and response.raw.tell() > 0: + # Stream was already read, create a new one from content. This can + # happen when mocking with responses. + return io.BytesIO(response.content) + response.raw.read = functools.partial(response.raw.read, decode_content=True) return response.raw @@ -364,14 +384,14 @@ def get_purge_cache_url(provisioner_id, worker_type): def purge_cache(provisioner_id, worker_type, cache_name): """Requests a cache purge from the purge-caches service.""" + worker_pool_id = f"{provisioner_id}/{worker_type}" + if testing: - logger.info(f"Would have purged {provisioner_id}/{worker_type}/{cache_name}.") + logger.info(f"Would have purged {worker_pool_id}/{cache_name}.") else: - logger.info(f"Purging {provisioner_id}/{worker_type}/{cache_name}.") + logger.info(f"Purging {worker_pool_id}/{cache_name}.") purge_cache_client = get_taskcluster_client("purgeCache") - purge_cache_client.purgeCache( - provisioner_id, worker_type, {"cacheName": cache_name} - ) + purge_cache_client.purgeCache(worker_pool_id, {"cacheName": cache_name}) def send_email(address, subject, content, link): diff --git a/test/test_util_taskcluster.py b/test/test_util_taskcluster.py index c15a92b3..45791e69 100644 --- a/test/test_util_taskcluster.py +++ b/test/test_util_taskcluster.py @@ -4,7 +4,6 @@ import datetime import os -import unittest.mock as mock import pytest @@ -12,29 +11,26 @@ from taskgraph.util import taskcluster as tc -@pytest.fixture(autouse=True) -def mock_environ(monkeypatch): - # Ensure user specified environment variables don't interfere with URLs. - monkeypatch.setattr(os, "environ", {}) +@pytest.fixture +def root_url(): + return "https://tc.example.com" @pytest.fixture(autouse=True) -def mock_production_taskcluster_root_url(monkeypatch): - monkeypatch.setattr(tc, "PRODUCTION_TASKCLUSTER_ROOT_URL", "https://tc.example.com") - - -@pytest.fixture -def root_url(mock_environ): - tc.get_root_url.cache_clear() - return tc.get_root_url() +def mock_environ(monkeypatch, root_url): + # Ensure user specified environment variables don't interfere with URLs. + monkeypatch.setattr( + os, + "environ", + { + "TASKCLUSTER_ROOT_URL": root_url, + }, + ) -@pytest.fixture -def proxy_root_url(monkeypatch, mock_environ): - monkeypatch.setenv("TASK_ID", "123") - monkeypatch.setenv("TASKCLUSTER_PROXY_URL", "https://taskcluster-proxy.net") - tc.get_root_url.cache_clear() - return tc.get_root_url() +@pytest.fixture(autouse=True) +def mock_production_taskcluster_root_url(monkeypatch, root_url): + monkeypatch.setattr(tc, "PRODUCTION_TASKCLUSTER_ROOT_URL", root_url) def test_get_root_url(monkeypatch): @@ -88,57 +84,47 @@ def test_get_artifact_url(monkeypatch): assert tc.get_artifact_url(task_id, path) == expected_proxy -def test_get_artifact(monkeypatch): - tid = 123 - - mock_queue = mock.MagicMock() - - def mock_client(service): - if service == "queue": - return mock_queue - return mock.MagicMock() - - monkeypatch.setattr(tc, "get_taskcluster_client", mock_client) - - mock_response_txt = mock.MagicMock() - mock_response_txt.raw.read.return_value = b"foobar" - mock_queue.getLatestArtifact.return_value = mock_response_txt +def test_get_artifact(responses, root_url): + tid = "abc123" + tc.get_taskcluster_client.cache_clear() + # Test text artifact + responses.get( + f"{root_url}/api/queue/v1/task/{tid}/artifacts/artifact.txt", + body=b"foobar", + ) raw = tc.get_artifact(tid, "artifact.txt") assert raw.read() == b"foobar" - mock_queue.getLatestArtifact.assert_called_with(tid, "artifact.txt") - mock_response_json = mock.MagicMock() - mock_response_json.json.return_value = {"foo": "bar"} - mock_queue.getLatestArtifact.return_value = mock_response_json + # Test JSON artifact + responses.get( + f"{root_url}/api/queue/v1/task/{tid}/artifacts/artifact.json", + json={"foo": "bar"}, + ) result = tc.get_artifact(tid, "artifact.json") assert result == {"foo": "bar"} + # Test YAML artifact expected_result = {"foo": b"\xe2\x81\x83".decode()} - mock_response_yml = mock.MagicMock() - mock_response_yml.content = b'foo: "\xe2\x81\x83"' - mock_queue.getLatestArtifact.return_value = mock_response_yml + responses.get( + f"{root_url}/api/queue/v1/task/{tid}/artifacts/artifact.yml", + body=b'foo: "\xe2\x81\x83"', + ) result = tc.get_artifact(tid, "artifact.yml") assert result == expected_result -def test_list_artifact(monkeypatch): - tid = 123 - - mock_queue = mock.MagicMock() - - def mock_client(service): - if service == "queue": - return mock_queue - return mock.MagicMock() +def test_list_artifact(responses, root_url): + tid = "abc123" + tc.get_taskcluster_client.cache_clear() - monkeypatch.setattr(tc, "get_taskcluster_client", mock_client) - - mock_queue.task.return_value = {"artifacts": ["file1.txt", "file2.json"]} + responses.get( + f"{root_url}/api/queue/v1/task/{tid}", + json={"artifacts": ["file1.txt", "file2.json"]}, + ) result = tc.list_artifacts(tid) assert result == ["file1.txt", "file2.json"] - mock_queue.task.assert_called_with(tid) def test_get_artifact_path(): @@ -165,33 +151,28 @@ def test_get_artifact_path(): def test_get_index_url(root_url): index = "foo" + tc.get_root_url.cache_clear() assert tc.get_index_url(index) == f"{root_url}/api/index/v1/task/foo" assert ( tc.get_index_url(index, multiple=True) == f"{root_url}/api/index/v1/tasks/foo" ) -def test_find_task_id(monkeypatch): - tid = 123 +def test_find_task_id(responses, root_url): + tid = "abc123" index = "foo" + tc.get_taskcluster_client.cache_clear() - mock_index = mock.MagicMock() - - def mock_client(service): - if service == "index": - return mock_index - return mock.MagicMock() - - monkeypatch.setattr(tc, "get_taskcluster_client", mock_client) - - mock_index.findTask.return_value = {"taskId": tid} + responses.get( + f"{root_url}/api/index/v1/task/{index}", + json={"taskId": tid}, + ) result = tc.find_task_id(index) assert result == tid - mock_index.findTask.assert_called_with(index) -def test_find_task_id_batched(monkeypatch, responses, root_url): - monkeypatch.setattr(os, "environ", {"TASKCLUSTER_ROOT_URL": root_url}) +def test_find_task_id_batched(responses, root_url): + tc.get_taskcluster_client.cache_clear() responses.post( f"{root_url}/api/index/v1/tasks/indexes", json={ @@ -211,32 +192,23 @@ def test_find_task_id_batched(monkeypatch, responses, root_url): assert result == {"index.abc": "abc", "index.def": "def"} -def test_get_artifact_from_index(monkeypatch): +def test_get_artifact_from_index(responses, root_url): index = "foo" path = "file.txt" + tc.get_taskcluster_client.cache_clear() - mock_index = mock.MagicMock() - - def mock_client(service): - if service == "index": - return mock_index - return mock.MagicMock() - - monkeypatch.setattr(tc, "get_taskcluster_client", mock_client) - - mock_response = mock.MagicMock() - mock_response.raw.read.return_value = b"foobar" - mock_index.findArtifactFromTask.return_value = mock_response + responses.get( + f"{root_url}/api/index/v1/task/{index}/artifacts/{path}", + body=b"foobar", + ) result = tc.get_artifact_from_index(index, path) assert result.read() == b"foobar" - mock_index.findArtifactFromTask.assert_called_with(index, path) -def test_list_tasks(monkeypatch, responses, root_url): +def test_list_tasks(responses, root_url): index = "foo" - - monkeypatch.setattr(os, "environ", {"TASKCLUSTER_ROOT_URL": root_url}) + tc.get_taskcluster_client.cache_clear() responses.get( f"{root_url}/api/index/v1/tasks/{index}", json={ @@ -278,63 +250,47 @@ def test_parse_time(): def test_get_task_url(root_url): tid = "123" + tc.get_root_url.cache_clear() assert tc.get_task_url(tid) == f"{root_url}/api/queue/v1/task/{tid}" -def test_get_task_definition(monkeypatch): - tid = "123" - - mock_queue = mock.MagicMock() - - def mock_client(service): - if service == "queue": - return mock_queue - return mock.MagicMock() - - monkeypatch.setattr(tc, "get_taskcluster_client", mock_client) +def test_get_task_definition(responses, root_url): + tid = "abc123" + tc.get_taskcluster_client.cache_clear() - mock_queue.task.return_value = {"payload": "blah"} + responses.get( + f"{root_url}/api/queue/v1/task/{tid}", + json={"payload": "blah"}, + ) result = tc.get_task_definition(tid) assert result == {"payload": "blah"} - mock_queue.task.assert_called_with(tid) - - -def test_cancel_task(monkeypatch): - tid = "123" - - mock_queue = mock.MagicMock() - def mock_client(service): - if service == "queue": - return mock_queue - return mock.MagicMock() - monkeypatch.setattr(tc, "get_taskcluster_client", mock_client) +def test_cancel_task(responses, root_url): + tid = "abc123" + tc.get_taskcluster_client.cache_clear() + responses.post( + f"{root_url}/api/queue/v1/task/{tid}/cancel", + json={"status": {"taskId": tid, "state": "cancelled"}}, + ) tc.cancel_task(tid) - mock_queue.cancelTask.assert_called_with(tid) - - -def test_status_task(monkeypatch): - tid = "123" - - mock_queue = mock.MagicMock() - def mock_client(service): - if service == "queue": - return mock_queue - return mock.MagicMock() - monkeypatch.setattr(tc, "get_taskcluster_client", mock_client) +def test_status_task(responses, root_url): + tid = "abc123" + tc.get_taskcluster_client.cache_clear() - mock_queue.status.return_value = {"status": {"state": "running"}} + responses.get( + f"{root_url}/api/queue/v1/task/{tid}/status", + json={"status": {"state": "running"}}, + ) result = tc.status_task(tid) assert result == {"state": "running"} - mock_queue.status.assert_called_with(tid) -def test_status_task_batched(monkeypatch, responses, root_url): - monkeypatch.setattr(os, "environ", {"TASKCLUSTER_ROOT_URL": root_url}) +def test_status_task_batched(responses, root_url): + tc.get_taskcluster_client.cache_clear() responses.post( f"{root_url}/api/queue/v1/tasks/status", json={ @@ -367,120 +323,99 @@ def test_status_task_batched(monkeypatch, responses, root_url): } -def test_state_task(monkeypatch): - tid = "123" +def test_state_task(responses, root_url): + tid = "abc123" + tc.get_taskcluster_client.cache_clear() - monkeypatch.setattr( - tc, "status_task", mock.MagicMock(return_value={"state": "running"}) + responses.get( + f"{root_url}/api/queue/v1/task/{tid}/status", + json={"status": {"state": "running"}}, ) result = tc.state_task(tid) assert result == "running" -def test_rerun_task(monkeypatch): - tid = "123" - - mock_queue = mock.MagicMock() - - def mock_client(service): - if service == "queue": - return mock_queue - return mock.MagicMock() - - monkeypatch.setattr(tc, "get_taskcluster_client", mock_client) +def test_rerun_task(responses, root_url): + tid = "abc123" + tc.get_taskcluster_client.cache_clear() + responses.post( + f"{root_url}/api/queue/v1/task/{tid}/rerun", + json={"status": {"taskId": tid, "state": "unscheduled"}}, + ) tc.rerun_task(tid) - mock_queue.rerunTask.assert_called_with(tid) - -def test_get_current_scopes(monkeypatch): - mock_auth = mock.MagicMock() - def mock_client(service): - if service == "auth": - return mock_auth - return mock.MagicMock() +def test_get_current_scopes(responses, root_url): + tc.get_taskcluster_client.cache_clear() - monkeypatch.setattr(tc, "get_taskcluster_client", mock_client) - - mock_auth.currentScopes.return_value = {"scopes": ["foo", "bar"]} + responses.get( + f"{root_url}/api/auth/v1/scopes/current", + json={"scopes": ["foo", "bar"]}, + ) result = tc.get_current_scopes() assert result == ["foo", "bar"] - mock_auth.currentScopes.assert_called_once() -def test_purge_cache(monkeypatch): +def test_purge_cache(responses, root_url): provisioner = "hardware" worker_type = "mac" cache = "cache" + tc.get_taskcluster_client.cache_clear() - mock_purge_cache = mock.MagicMock() - - def mock_client(service): - if service == "purgeCache": - return mock_purge_cache - return mock.MagicMock() - - monkeypatch.setattr(tc, "get_taskcluster_client", mock_client) + # Modern Taskcluster API uses workerPoolId format + # The "/" in the workerPoolId gets URL-encoded as %2F + responses.post( + f"{root_url}/api/purge-cache/v1/purge-cache/hardware%2Fmac", + json={}, + ) tc.purge_cache(provisioner, worker_type, cache) - mock_purge_cache.purgeCache.assert_called_with( - provisioner, worker_type, {"cacheName": cache} - ) -def test_send_email(monkeypatch): +def test_send_email(responses, root_url): data = { "address": "test@example.org", "subject": "Hello", "content": "Goodbye", "link": "https://example.org", } + tc.get_taskcluster_client.cache_clear() - mock_notify = mock.MagicMock() - - def mock_client(service): - if service == "notify": - return mock_notify - return mock.MagicMock() - - monkeypatch.setattr(tc, "get_taskcluster_client", mock_client) + responses.post( + f"{root_url}/api/notify/v1/email", + json={}, + ) tc.send_email(**data) - mock_notify.email.assert_called_with(data) -def test_list_task_group_incomplete_tasks(monkeypatch): - tgid = "123" +def test_list_task_group_incomplete_tasks(responses, root_url): + tgid = "abc123" + tc.get_taskcluster_client.cache_clear() - mock_queue = mock.MagicMock() - - def mock_client(service): - if service == "queue": - return mock_queue - return mock.MagicMock() - - monkeypatch.setattr(tc, "get_taskcluster_client", mock_client) - - mock_queue.listTaskGroup.return_value = { - "tasks": [ - {"status": {"taskId": "1", "state": "pending"}}, - {"status": {"taskId": "2", "state": "unscheduled"}}, - {"status": {"taskId": "3", "state": "running"}}, - {"status": {"taskId": "4", "state": "completed"}}, - ] - } + responses.get( + f"{root_url}/api/queue/v1/task-group/{tgid}/list", + json={ + "tasks": [ + {"status": {"taskId": "1", "state": "pending"}}, + {"status": {"taskId": "2", "state": "unscheduled"}}, + {"status": {"taskId": "3", "state": "running"}}, + {"status": {"taskId": "4", "state": "completed"}}, + ] + }, + ) result = list(tc.list_task_group_incomplete_tasks(tgid)) assert result == ["1", "2", "3"] - mock_queue.listTaskGroup.assert_called_with(tgid) -def test_get_ancestors(monkeypatch): +def test_get_ancestors(responses, root_url): tc.get_task_definition.cache_clear() tc._get_deps.cache_clear() + tc.get_taskcluster_client.cache_clear() task_definitions = { "fff": { @@ -509,10 +444,12 @@ def test_get_ancestors(monkeypatch): }, } - def mock_get_task_definition(task_id): - return task_definitions.get(task_id) - - monkeypatch.setattr(tc, "get_task_definition", mock_get_task_definition) + # Mock API responses for each task definition + for task_id, definition in task_definitions.items(): + responses.get( + f"{root_url}/api/queue/v1/task/{task_id}", + json=definition, + ) got = tc.get_ancestors(["bbb", "fff"]) expected = { @@ -524,9 +461,10 @@ def mock_get_task_definition(task_id): assert got == expected, f"got: {got}, expected: {expected}" -def test_get_ancestors_string(monkeypatch): +def test_get_ancestors_string(responses, root_url): tc.get_task_definition.cache_clear() tc._get_deps.cache_clear() + tc.get_taskcluster_client.cache_clear() task_definitions = { "fff": { @@ -555,10 +493,12 @@ def test_get_ancestors_string(monkeypatch): }, } - def mock_get_task_definition(task_id): - return task_definitions.get(task_id) - - monkeypatch.setattr(tc, "get_task_definition", mock_get_task_definition) + # Mock API responses for each task definition + for task_id, definition in task_definitions.items(): + responses.get( + f"{root_url}/api/queue/v1/task/{task_id}", + json=definition, + ) got = tc.get_ancestors("fff") expected = {