From f219600bae405a6c3f3cc71a9bc1d17d4f637ef7 Mon Sep 17 00:00:00 2001 From: Chynna Syas Date: Tue, 6 Aug 2024 10:04:14 -0700 Subject: [PATCH 1/3] - Added asynchronous HttpRequest class - Modified the authentication to integrate asynchronous functionality - Modified the where HttpRequest is called to AsyncHttpRequest class Co-authored-by: Kyongsu Kang Co-authored-by: Lizzy Zhou --- googleapiclient/_auth.py | 103 ++++++---- googleapiclient/discovery.py | 3 +- googleapiclient/http.py | 382 +++++++++++++++++++++++++++++++++++ 3 files changed, 445 insertions(+), 43 deletions(-) diff --git a/googleapiclient/_auth.py b/googleapiclient/_auth.py index 065b2ecd307..2982412ce4e 100644 --- a/googleapiclient/_auth.py +++ b/googleapiclient/_auth.py @@ -16,19 +16,18 @@ import httplib2 +import aiohttp #Team_CKL ADDED CODE +import asyncio #Team_CKL ADDED CODE + try: import google.auth import google.auth.credentials + from google.auth.transport.requests import Request as GoogleAuthRequest #Team_CKL ADDED CODE HAS_GOOGLE_AUTH = True except ImportError: # pragma: NO COVER HAS_GOOGLE_AUTH = False -try: - import google_auth_httplib2 -except ImportError: # pragma: NO COVER - google_auth_httplib2 = None - try: import oauth2client import oauth2client.client @@ -38,7 +37,7 @@ HAS_OAUTH2CLIENT = False -def credentials_from_file(filename, scopes=None, quota_project_id=None): +async def credentials_from_file(filename, scopes=None, quota_project_id=None): #Team_CKL MODIFIED """Returns credentials loaded from a file.""" if HAS_GOOGLE_AUTH: credentials, _ = google.auth.load_credentials_from_file( @@ -51,7 +50,7 @@ def credentials_from_file(filename, scopes=None, quota_project_id=None): ) -def default_credentials(scopes=None, quota_project_id=None): +async def default_credentials(scopes=None, quota_project_id=None): #Team_CKL MODIFIED """Returns Application Default Credentials.""" if HAS_GOOGLE_AUTH: credentials, _ = google.auth.default( @@ -72,7 +71,7 @@ def default_credentials(scopes=None, quota_project_id=None): ) -def with_scopes(credentials, scopes): +async def with_scopes(credentials, scopes): #Team_CKL MODIFIED """Scopes the credentials if necessary. Args: @@ -97,7 +96,7 @@ def with_scopes(credentials, scopes): return credentials -def authorized_http(credentials): +async def authorized_http(credentials): #Team_CKL MODIFIED """Returns an http client that is authorized with the given credentials. Args: @@ -106,45 +105,60 @@ def authorized_http(credentials): oauth2client.client.Credentials]): The credentials to use. Returns: - Union[httplib2.Http, google_auth_httplib2.AuthorizedHttp]: An - authorized http client. + aiohttp.ClientSession: An authorized aiohttp client session. """ - from googleapiclient.http import build_http - if HAS_GOOGLE_AUTH and isinstance(credentials, google.auth.credentials.Credentials): - if google_auth_httplib2 is None: - raise ValueError( - "Credentials from google.auth specified, but " - "google-api-python-client is unable to use these credentials " - "unless google-auth-httplib2 is installed. Please install " - "google-auth-httplib2." - ) - return google_auth_httplib2.AuthorizedHttp(credentials, http=build_http()) + return aiohttp.ClientSession(auth=GoogleAuthRequest(credentials)) else: - return credentials.authorize(build_http()) + headers = await apply_credentials(credentials, {}) + return aiohttp.ClientSession(headers=headers) -def refresh_credentials(credentials): - # Refresh must use a new http instance, as the one associated with the - # credentials could be a AuthorizedHttp or an oauth2client-decorated - # Http instance which would cause a weird recursive loop of refreshing - # and likely tear a hole in spacetime. - refresh_http = httplib2.Http() +async def refresh_credentials(credentials): #Team_CKL MODIFIED + """Refreshes the credentials. + + Args: + credentials (Union[ + google.auth.credentials.Credentials, + oauth2client.client.Credentials]): The credentials to refresh. + """ if HAS_GOOGLE_AUTH and isinstance(credentials, google.auth.credentials.Credentials): - request = google_auth_httplib2.Request(refresh_http) - return credentials.refresh(request) + request = GoogleAuthRequest() + await credentials.refresh(request) else: - return credentials.refresh(refresh_http) + refresh_http = aiohttp.ClientSession() + await credentials.refresh(refresh_http) + await refresh_http.close() + +async def apply_credentials(credentials, headers): #Team_CKL MODIFIED + """Applies the credentials to the request headers. + + Args: + credentials (Union[ + google.auth.credentials.Credentials, + oauth2client.client.Credentials]): The credentials to apply. + headers (dict): The request headers. -def apply_credentials(credentials, headers): - # oauth2client and google-auth have the same interface for this. - if not is_valid(credentials): - refresh_credentials(credentials) + Returns: + dict: The updated headers with credentials applied. + """ + if not await is_valid(credentials): + await refresh_credentials(credentials) return credentials.apply(headers) -def is_valid(credentials): +async def is_valid(credentials): #Team_CKL MODIFIED + """Checks if the credentials are valid. + + Args: + credentials (Union[ + google.auth.credentials.Credentials, + oauth2client.client.Credentials]): The credentials to check. + + Returns: + bool: True if the credentials are valid, False otherwise. + """ if HAS_GOOGLE_AUTH and isinstance(credentials, google.auth.credentials.Credentials): return credentials.valid else: @@ -154,14 +168,19 @@ def is_valid(credentials): ) -def get_credentials_from_http(http): +async def get_credentials_from_http(http): #Team_CKL MODIFIED + """Gets the credentials from the http client session. + + Args: + http (aiohttp.ClientSession): The http client session. + + Returns: + google.auth.credentials.Credentials: The credentials. + """ if http is None: return None - elif hasattr(http.request, "credentials"): - return http.request.credentials - elif hasattr(http, "credentials") and not isinstance( - http.credentials, httplib2.Credentials - ): + elif hasattr(http, "credentials"): return http.credentials else: return None + \ No newline at end of file diff --git a/googleapiclient/discovery.py b/googleapiclient/discovery.py index f7bbd77763c..65dee5fc2f4 100644 --- a/googleapiclient/discovery.py +++ b/googleapiclient/discovery.py @@ -76,6 +76,7 @@ HttpMock, HttpMockSequence, HttpRequest, + AsyncHttpRequest, #Team_CKL CODE added MediaFileUpload, MediaUpload, build_http, @@ -435,7 +436,7 @@ def _retrieve_discovery_doc( # Execute this request with retries build into HttpRequest # Note that it will already raise an error if we don't get a 2xx response - req = HttpRequest(http, HttpRequest.null_postproc, actual_url) + req = AsyncHttpRequest(http, AsyncHttpRequest.null_postproc, actual_url) #Team_CKL Code modified resp, content = req.execute(num_retries=num_retries) try: diff --git a/googleapiclient/http.py b/googleapiclient/http.py index 187f6f5dac8..0ec8d4779cc 100644 --- a/googleapiclient/http.py +++ b/googleapiclient/http.py @@ -35,6 +35,13 @@ import urllib import uuid +import aiohttp #Team_CKL CODE ADDED +import asyncio #Team_CKL CODE ADDED +from google.oauth2 import service_account #Team_CKL CODE ADDED +from google.auth.transport.requests import Request #Team_CKL CODE ADDED +from googleapiclient.errors import HttpError, ResumableUploadError #Team_CKL CODE ADDED +from googleapiclient.http import MediaUploadProgress #Team_CKL CODE ADDED + import httplib2 # TODO(issue 221): Remove this conditional import jibbajabba. @@ -1161,6 +1168,381 @@ def from_json(s, http, postproc): def null_postproc(resp, contents): return resp, contents +class AsyncHttpRequest(object): #Team_CKL ADDED CODE + """Encapsulates a single HTTP request with async capabilities.""" + + def __init__( + self, + http, + postproc, + uri, + method="GET", + body=None, + headers=None, + methodId=None, + resumable=None, + ): + """Constructor for an AsyncHttpRequest. + + Args: + http: aiohttp.ClientSession, the transport object to use to make a request + postproc: callable, called on the HTTP response and content to transform + it into a data object before returning, or raising an exception + on an error. + uri: string, the absolute URI to send the request to + method: string, the HTTP method to use + body: string, the request body of the HTTP request, + headers: dict, the HTTP request headers + methodId: string, a unique identifier for the API method being called. + resumable: MediaUpload, None if this is not a resumbale request. + """ + self.uri = uri + self.method = method + self.body = body + self.headers = headers or {} + self.methodId = methodId + self.http = http + self.postproc = postproc + self.resumable = resumable + self.response_callbacks = [] + self._in_error_state = False + + # The size of the non-media part of the request. + self.body_size = len(self.body or "") + + # The resumable URI to send chunks to. + self.resumable_uri = None + + # The bytes that have been uploaded. + self.resumable_progress = 0 + + # Stubs for testing. + self._rand = random.random + self._sleep = time.sleep + + async def execute(self, http=None, num_retries=0): + """Execute the request asynchronously. + + Args: + http: aiohttp.ClientSession, a http object to be used in place of the + one the AsyncHttpRequest request object was constructed with. + num_retries: Integer, number of times to retry with randomized + exponential backoff. If all retries fail, the raised HttpError + represents the last request. If zero (default), we attempt the + request only once. + + Returns: + A deserialized object model of the response body as determined + by the postproc. + + Raises: + googleapiclient.errors.HttpError if the response was not a 2xx. + aiohttp.ClientError if a transport error has occurred. + """ + if http is None: + http = self.http + + if self.resumable: + body = None + while body is None: + _, body = await self.next_chunk(http=http, num_retries=num_retries) + return body + + # Non-resumable case. + + if "content-length" not in self.headers: + self.headers["content-length"] = str(self.body_size) + # If the request URI is too long then turn it into a POST request. + # Assume that a GET request never contains a request body. + if len(self.uri) > MAX_URI_LENGTH and self.method == "GET": + self.method = "POST" + self.headers["x-http-method-override"] = "GET" + self.headers["content-type"] = "application/x-www-form-urlencoded" + parsed = urllib.parse.urlparse(self.uri) + self.uri = urllib.parse.urlunparse( + (parsed.scheme, parsed.netloc, parsed.path, parsed.params, None, None) + ) + self.body = parsed.query + self.headers["content-length"] = str(len(self.body)) + + # Handle retries for server-side errors. + resp, content = await self._retry_request( + http, + num_retries, + "request", + self._sleep, + self._rand, + str(self.uri), + method=str(self.method), + body=self.body, + headers=self.headers, + ) + + for callback in self.response_callbacks: + callback(resp) + if resp.status >= 300: + raise HttpError(resp, content, uri=self.uri) + return self.postproc(resp, content) + + async def add_response_callback(self, cb): + """add_response_headers_callback + + Args: + cb: Callback to be called on receiving the response headers, of signature: + + def cb(resp): + # Where resp is an instance of aiohttp.ClientResponse + """ + self.response_callbacks.append(cb) + + async def next_chunk(self, http=None, num_retries=0): + """Execute the next step of a resumable upload asynchronously. + + Can only be used if the method being executed supports media uploads and + the MediaUpload object passed in was flagged as using resumable upload. + + Example: + + media = MediaFileUpload('cow.png', mimetype='image/png', + chunksize=1000, resumable=True) + request = farm.animals().insert( + id='cow', + name='cow.png', + media_body=media) + + response = None + while response is None: + status, response = request.next_chunk() + if status: + print "Upload %d%% complete." % int(status.progress() * 100) + Args: + session: aiohttp.ClientSession, a http object to be used in place of the + one the AsyncHttpRequest request object was constructed with. + num_retries: Integer, number of times to retry with randomized + exponential backoff. If all retries fail, the raised HttpError + represents the last request. If zero (default), we attempt the + request only once. + + Returns: + (status, body): (ResumableMediaStatus, object) + The body will be None until the resumable media is fully uploaded. + + Raises: + googleapiclient.errors.HttpError if the response was not a 2xx. + aiohttp.ClientError if a transport error has occurred. + """ + if http is None: + http = self.http + + if self.resumable.size() is None: + size = "*" + else: + size = str(self.resumable.size()) + + if self.resumable_uri is None: + start_headers = copy.copy(self.headers) + start_headers["X-Upload-Content-Type"] = self.resumable.mimetype() + if size != "*": + start_headers["X-Upload-Content-Length"] = size + start_headers["content-length"] = str(self.body_size) + + resp, content = await self._retry_request( + http, + num_retries, + "resumable URI request", + self._sleep, + self._rand, + self.uri, + method=self.method, + body=self.body, + headers=start_headers, + ) + + if resp.status == 200 and "location" in resp.headers: + self.resumable_uri = resp.headers["location"] + else: + raise ResumableUploadError(resp, content) + elif self._in_error_state: + # If we are in an error state then query the server for current state of + # the upload by sending an empty PUT and reading the 'range' header in + # the response. + headers = {"Content-Range": "bytes */%s" % size, "content-length": "0"} + async with http.put(self.resumable_uri, headers=headers) as resp: + content = await resp.text() + status, body = self._process_response(resp, content) + if body: + # The upload was complete. + return (status, body) + + if self.resumable.has_stream(): + data = self.resumable.stream() + if self.resumable.chunksize() == -1: + data.seek(self.resumable_progress) + chunk_end = self.resumable.size() - self.resumable_progress - 1 + else: + # Doing chunking with a stream, so wrap a slice of the stream. + data = _StreamSlice( + data, self.resumable_progress, self.resumable.chunksize() + ) + chunk_end = min( + self.resumable_progress + self.resumable.chunksize() - 1, + self.resumable.size() - 1, + ) + else: + data = self.resumable.getbytes( + self.resumable_progress, self.resumable.chunksize() + ) + + # A short read implies that we are at EOF, so finish the upload. + if len(data) < self.resumable.chunksize(): + size = str(self.resumable_progress + len(data)) + + chunk_end = self.resumable_progress + len(data) - 1 + + headers = { + # Must set the content-length header here because aiohttp can't + # calculate the size when working with _StreamSlice. + "Content-Length": str(chunk_end - self.resumable_progress + 1), + } + + # An empty file results in chunk_end = -1 and size = 0 + # sending "bytes 0--1/0" results in an invalid request + # Only add header "Content-Range" if chunk_end != -1 + if chunk_end != -1: + headers["Content-Range"] = "bytes %d-%d/%s" % ( + self.resumable_progress, + chunk_end, + size, + ) + + for retry_num in range(num_retries + 1): + if retry_num > 0: + await asyncio.sleep(random.random() * 2**retry_num) + print( + "Retry #%d for media upload: %s %s, following status: %d" + % (retry_num, self.method, self.uri, resp.status) + ) + + try: + async with http.put(self.resumable_uri, method="PUT", body=data, headers=headers) as resp: + content = await resp.text() + break + except: + self._in_error_state = True + raise + if not self._should_retry_response(resp.status, content): + break + + return self._process_response(resp, content) + + def _process_response(self, resp, content): + """Process the response from a single chunk upload. + + Args: + resp: aiohttp.ClientResponse, the response object. + content: string, the content of the response. + + Returns: + (status, body): (ResumableMediaStatus, object) + The body will be None until the resumable media is fully uploaded. + + Raises: + googleapiclient.errors.HttpError if the response was not a 2xx or a 308. + """ + if resp.status in [200, 201]: + self._in_error_state = False + return None, self.postproc(resp, content) + elif resp.status == 308: + self._in_error_state = False + # A "308 Resume Incomplete" indicates we are not done. + try: + self.resumable_progress = int(resp["range"].split("-")[1]) + 1 + except KeyError: + # If resp doesn't contain range header, resumable progress is 0 + self.resumable_progress = 0 + if "location" in resp: + self.resumable_uri = resp.headers["location"] + else: + self._in_error_state = True + raise HttpError(resp, content, uri=self.uri) + + return ( + MediaUploadProgress(self.resumable_progress, self.resumable.size()), + None, + ) + + async def _retry_request_(self, http, num_retries, request_type, url, method="GET", body=None, headers=None): + """Retry request with exponential backoff. + + Args: + http: aiohttp.ClientSession, the session to use to make the request. + num_retries: Integer, number of times to retry with randomized + exponential backoff. + request_type: String, type of the request for logging purposes. + url: String, the URL to send the request to. + method: String, the HTTP method to use. + body: String, the request body. + headers: Dict, the request headers. + + Returns: + Tuple of (resp, content): response object and response content. + + Rasies: + googleapiclient.errors.HttpError if the response was not a 2xx. + aiohttp.ClientError if a transport error has occurred. + """ + + for retry_num in range(num_retries + 1): + if retry_num > 0: + await asyncio.sleep(random.random() * 2**retry_num) + print( + "Retry #%d for %s: %s %s" + % (retry_num, request_type, method, url) + ) + try: + async with http.request(method, url, data=body, headers=headers) as resp: + content = await resp.text() + if resp.status < 300: + return resp, content + except aiohttp.ClientError as e: + if retry_num == num_retries: + raise e + + + + + def to_json(self): + """Returns a JSON representation of the HttpRequest.""" + d = copy.copy(self.__dict__) + if d["resumable"] is not None: + d["resumable"] = self.resumable.to_json() + del d["http"] + del d["postproc"] + del d["_sleep"] + del d["_rand"] + + return json.dumps(d) + + @staticmethod + def from_json(s, http, postproc): + """Returns an HttpRequest populated with info from a JSON object.""" + d = json.loads(s) + if d["resumable"] is not None: + d["resumable"] = MediaUpload.new_from_json(d["resumable"]) + return AsyncHttpRequest( + http, + postproc, + uri=d["uri"], + method=d["method"], + body=d["body"], + headers=d["headers"], + methodId=d["methodId"], + resumable=d["resumable"], + ) + + @staticmethod + def null_postproc(resp, contents): + return resp, contents class BatchHttpRequest(object): """Batches multiple HttpRequest objects into a single HTTP request. From 9b7dd43b86a17f054a4fde38cbbd100ab50820be Mon Sep 17 00:00:00 2001 From: ChynnaSyas Date: Tue, 6 Aug 2024 13:52:54 -0700 Subject: [PATCH 2/3] Modified the requestBuilder to call the AsyncHttpRequest class instead Co-Author-By: Kyongsu Kang Co-Author-By: Lizzy Zhou --- googleapiclient/discovery.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/googleapiclient/discovery.py b/googleapiclient/discovery.py index 65dee5fc2f4..e74b88ea0d3 100644 --- a/googleapiclient/discovery.py +++ b/googleapiclient/discovery.py @@ -198,7 +198,7 @@ def build( discoveryServiceUrl=None, developerKey=None, model=None, - requestBuilder=HttpRequest, + requestBuilder=AsyncHttpRequest, #Team_CKL code modified to async credentials=None, cache_discovery=True, cache=None, @@ -469,7 +469,7 @@ def build_from_document( http=None, developerKey=None, model=None, - requestBuilder=HttpRequest, + requestBuilder=AsyncHttpRequest, #Team_CKL code modified to async credentials=None, client_options=None, adc_cert_path=None, From 7601cb0f2d2eb56f1651da48a5416272ea240d2a Mon Sep 17 00:00:00 2001 From: ChynnaSyas Date: Sun, 25 Aug 2024 16:24:04 -0700 Subject: [PATCH 3/3] Updated an AsyncHttpRequest class function name Co-Authored by: Kyongsu Kang Co-Authored by: Lizzy Zhou --- googleapiclient/http.py | 27 +++++++++++++++------------ 1 file changed, 15 insertions(+), 12 deletions(-) diff --git a/googleapiclient/http.py b/googleapiclient/http.py index 0ec8d4779cc..15bb5c7d0db 100644 --- a/googleapiclient/http.py +++ b/googleapiclient/http.py @@ -1266,7 +1266,7 @@ async def execute(self, http=None, num_retries=0): self.headers["content-length"] = str(len(self.body)) # Handle retries for server-side errors. - resp, content = await self._retry_request( + resp, content = await self._retry_request_( http, num_retries, "request", @@ -1346,7 +1346,7 @@ async def next_chunk(self, http=None, num_retries=0): start_headers["X-Upload-Content-Length"] = size start_headers["content-length"] = str(self.body_size) - resp, content = await self._retry_request( + resp, content = await self._retry_request_( http, num_retries, "resumable URI request", @@ -1358,8 +1358,8 @@ async def next_chunk(self, http=None, num_retries=0): headers=start_headers, ) - if resp.status == 200 and "location" in resp.headers: - self.resumable_uri = resp.headers["location"] + if resp.status == 200 and "location" in resp.headers: #Potential Tech Issue problem + self.resumable_uri = resp.headers["location"] #Potential Tech Issue problem else: raise ResumableUploadError(resp, content) elif self._in_error_state: @@ -1367,7 +1367,7 @@ async def next_chunk(self, http=None, num_retries=0): # the upload by sending an empty PUT and reading the 'range' header in # the response. headers = {"Content-Range": "bytes */%s" % size, "content-length": "0"} - async with http.put(self.resumable_uri, headers=headers) as resp: + async with http.put(self.resumable_uri, headers=headers) as resp: #Potential Tech Issue problem content = await resp.text() status, body = self._process_response(resp, content) if body: @@ -1424,16 +1424,19 @@ async def next_chunk(self, http=None, num_retries=0): ) try: - async with http.put(self.resumable_uri, method="PUT", body=data, headers=headers) as resp: + async with http.put(self.resumable_uri, method="PUT", body=data, headers=headers) as resp: #Potential Tech Issue problem content = await resp.text() - break + if not self._should_retry_response(resp.status, content): + break except: self._in_error_state = True raise - if not self._should_retry_response(resp.status, content): - break + - return self._process_response(resp, content) + if 'resp' in locals() and 'content' in locals(): + return self._process_response(resp, content) + else: + raise RuntimeError("Failed to upload after multiple retries") def _process_response(self, resp, content): """Process the response from a single chunk upload. @@ -1461,7 +1464,7 @@ def _process_response(self, resp, content): # If resp doesn't contain range header, resumable progress is 0 self.resumable_progress = 0 if "location" in resp: - self.resumable_uri = resp.headers["location"] + self.resumable_uri = resp.headers["location"] #Potential Tech Issue problem else: self._in_error_state = True raise HttpError(resp, content, uri=self.uri) @@ -1471,7 +1474,7 @@ def _process_response(self, resp, content): None, ) - async def _retry_request_(self, http, num_retries, request_type, url, method="GET", body=None, headers=None): + async def _retry_request_(self, http, num_retries, request_type, url, method="GET", body=None, headers=None): #Potential Tech Issue problem """Retry request with exponential backoff. Args: