Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 27 additions & 7 deletions src/taskgraph/util/taskcluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@
import copy
import datetime
import functools
import io
import logging
import os
from typing import Dict, List, Union
from typing import Any, Dict, List, Union

import requests
import taskcluster_urls as liburls
Expand Down Expand Up @@ -69,13 +70,32 @@ def get_taskcluster_client(service: str):
return getattr(taskcluster, service[0].upper() + service[1:])(options)


def _handle_artifact(path, response):
def _handle_artifact(
path: str, response: Union[requests.Response, Dict[str, Any]]
) -> Any:
if isinstance(response, dict):
# When taskcluster client returns non-JSON responses, it wraps them in {"response": <Response>}
if "response" in response and isinstance(
response["response"], requests.Response
):
response = response["response"]
else:
# If we already a dict (parsed JSON), return it directly.
return response

# We have a response object, load the content based on the path extension.
if path.endswith(".json"):
return response.json()

if path.endswith(".yml"):
return yaml.load_stream(response.content)

# Otherwise return raw response content
if hasattr(response.raw, "tell") and response.raw.tell() > 0:
# Stream was already read, create a new one from content. This can
# happen when mocking with responses.
return io.BytesIO(response.content)

response.raw.read = functools.partial(response.raw.read, decode_content=True)
return response.raw

Expand Down Expand Up @@ -364,14 +384,14 @@ def get_purge_cache_url(provisioner_id, worker_type):

def purge_cache(provisioner_id, worker_type, cache_name):
"""Requests a cache purge from the purge-caches service."""
worker_pool_id = f"{provisioner_id}/{worker_type}"

if testing:
logger.info(f"Would have purged {provisioner_id}/{worker_type}/{cache_name}.")
logger.info(f"Would have purged {worker_pool_id}/{cache_name}.")
else:
logger.info(f"Purging {provisioner_id}/{worker_type}/{cache_name}.")
logger.info(f"Purging {worker_pool_id}/{cache_name}.")
purge_cache_client = get_taskcluster_client("purgeCache")
purge_cache_client.purgeCache(
provisioner_id, worker_type, {"cacheName": cache_name}
)
purge_cache_client.purgeCache(worker_pool_id, {"cacheName": cache_name})


def send_email(address, subject, content, link):
Expand Down
Loading
Loading