diff --git a/functions/pipeline/onboarding/__init__.py b/functions/pipeline/onboarding/__init__.py index 0961e3f2..01429488 100644 --- a/functions/pipeline/onboarding/__init__.py +++ b/functions/pipeline/onboarding/__init__.py @@ -5,12 +5,9 @@ from ..shared.db_provider import get_postgres_provider from ..shared.db_access import ImageTagDataAccess, ImageInfo +from ..shared.onboarding import copy_images_to_permanent_storage from azure.storage.blob import BlockBlobService -# TODO: User id as param to function - holding off until further discussion -# regarding whether user ID should be generated/looked up by the CLI or -# from within this function - def main(req: func.HttpRequest) -> func.HttpResponse: logging.info('Python HTTP trigger function processed a request.') @@ -28,103 +25,70 @@ def main(req: func.HttpRequest) -> func.HttpResponse: logging.error(req.get_json()) raw_url_list = req_body["imageUrls"] except ValueError: - print("Unable to decode JSON body") - return func.HttpResponse("Unable to decode POST body", status_code=400) - - logging.error(req_body) + logging.error("ERROR: Unable to decode JSON body") + return func.HttpResponse("ERROR: Unable to decode POST body", status_code=400) - # Build list of image objects to pass to DAL for insertion into DB. - image_object_list = [] - image_name_list = [] + if not raw_url_list: + return func.HttpResponse("ERROR: URL list empty.", status_code=401) # Check to ensure image URLs sent by client are all unique. url_list = set(raw_url_list) - # TODO: Encapsulate this loop in a method - # TODO: Wrap method in try/catch, send an appropriate http response in the event of an error - for url in url_list: - # Split original image name from URL - original_filename = url.split("/")[-1] - image_name_list.append(original_filename) - # Create ImageInfo object (def in db_access.py) - # Note: For testing, default image height/width are set to 50x50 - # TODO: Figure out where actual height/width need to come from - image = ImageInfo(original_filename, url, 50, 50) - # Append image object to the list - image_object_list.append(image) + # Get list of image objects to pass to DAL for insertion into DB. + try: + image_object_list = build_objects_from_url_list(url_list) + except Exception as e: + logging.error("ERROR: Could not build image object list. Exception: " + str(e)) + return func.HttpResponse("ERROR: Could not build image object list.", status_code=401) - # TODO: Wrap db access section in try/catch, send an appropriate http response in the event of an error - data_access = ImageTagDataAccess(get_postgres_provider()) - user_id = data_access.create_user(user_name) + # Connect to database. + try: + logging.info("Now connecting to database...") + data_access = ImageTagDataAccess(get_postgres_provider()) + logging.info("Connected.") + except Exception as e: + logging.error("ERROR: Database connection failed. Exception: " + str(e)) + return func.HttpResponse("ERROR: Unable to connect to database", status_code=503) + + # Create/get user id + user_id_number = data_access.create_user(user_id) + logging.info("User id for {0} is {1}".format(user_id, str(user_id_number))) # Add new images to the database, and retrieve a dictionary ImageId's mapped to ImageUrl's image_id_url_map = data_access.add_new_images(image_object_list,user_id) - # Print out dictionary for debugging - logging.info("Image ID and URL map dictionary:") - logging.info(image_id_url_map) - - # Copy over images to permanent blob store and save URLs in a list - permanent_url_list = [] - update_urls_dictionary = {} - - # TODO: Add check to make sure image exists in temp storage before attempting these operations - # TODO: Put blob storage manipulation into a separate function and add to shared codebase - # TODO: Try/catch to distinguish among errors - for key, value in image_id_url_map.items(): + copy_source = os.getenv('SOURCE_CONTAINER_NAME') + copy_destination = os.getenv('DESTINATION_CONTAINER_NAME') - # Verbose logging for testing - logging.info("Key: " + key) - logging.info("Value: " + str(value)) + # Create blob service for storage account + blob_service = BlockBlobService(account_name=os.getenv('STORAGE_ACCOUNT_NAME'), account_key=os.getenv('STORAGE_ACCOUNT_KEY')) - original_image_url = key - original_blob_name = original_image_url.split("/")[-1] - file_extension = os.path.splitext(original_image_url)[1] - image_id = value - new_blob_name = (str(image_id) + file_extension) - copy_from_container = os.getenv('SOURCE_CONTAINER_NAME') - copy_to_container = os.getenv('DESTINATION_CONTAINER_NAME') - permanent_storage_path = "https://{0}.blob.core.windows.net/{0}/{1}".format(copy_from_container, new_blob_name) + # Copy images to permanent storage and get a dictionary of images for which to update URLs in DB. + # TODO: Prefer to have this function return a JSON blob as a string containing a list of successes + # and a list of failures. If the list of failures contains any items, return a status code other than 200. + update_urls_dictionary = copy_images_to_permanent_storage(image_id_url_map, copy_source, copy_destination, blob_service) - # Verbose logging for testing - logging.info("Original image URL: " + original_image_url) - logging.info("Original image name: " + original_blob_name) - logging.info("File extension: " + file_extension) - logging.info("Image ID: " + str(image_id)) - logging.info("New blob name: " + new_blob_name) - logging.info("Now copying file from temporary to permanent storage...") - logging.info("Permanent image URL: " + permanent_storage_path) - - blob_service = BlockBlobService(account_name=os.getenv('STORAGE_ACCOUNT_NAME'), account_key=os.getenv('STORAGE_ACCOUNT_KEY')) - source_blob_url = blob_service.make_blob_url(copy_from_container, original_blob_name) - - # TODO: Exception handling in case blob cannot be copied for some reason. - blob_service.copy_blob(copy_to_container, new_blob_name, source_blob_url) + # If the dictionary of images is empty, this means a faiure occurred in a copy/delete operation. + # Otherwise, dictionary contains permanent image URLs for each image ID that was successfully copied. + if not update_urls_dictionary: + return func.HttpResponse("ERROR: Image copy/delete operation failed. Check state of images in storage.", status_code=401) + else: + logging.info("Now updating permanent URLs in the DB...") + data_access.update_image_urls(update_urls_dictionary, user_id_number) logging.info("Done.") + # Return string containing list of URLs to images in permanent blob storage + return func.HttpResponse("Images were successfully added to the database and copied to permanent storage.", status_code=200) - # Delete the file from temp storage once it's been copied - # Note: The following delete code works. Commenting out for testing of other functions. - ''' - logging.info("Now deleting image " + original_blob_name + " from temp storage container.") - try: - blob_service.delete_blob(copy_from_container, original_blob_name) - print("Blob " + original_blob_name + " has been deleted successfully") - except: - print("Blob " + original_blob_name + " deletion failed") - ''' - - # Add image to the list of images to be returned in the response - permanent_url_list.append(permanent_storage_path) - # Add ImageId and permanent storage url to new dictionary to be sent to update function - update_urls_dictionary[image_id] = permanent_storage_path - - logging.info("Now updating permanent URLs in the DB...") - data_access.update_image_urls(update_urls_dictionary, user_id) - logging.info("Done.") - - # Construct response string of permanent URLs - permanent_url_string = (", ".join(permanent_url_list)) - - # Return string containing list of URLs to images in permanent blob storage - return func.HttpResponse("The following images should now be added to the DB and exist in permanent blob storage: \n" - + permanent_url_string, status_code=200) +# Given a list ofnimage URL's, build an ImageInfo object for each, and return a list of these image objects. +def build_objects_from_url_list(url_list): + image_object_list = [] + for url in url_list: + # Split original image name from URL + original_filename = url.split("/")[-1] + # Create ImageInfo object (def in db_access.py) + # TODO: Figure out where actual height/width need to come from. Values are hard-coded for testing. + image = ImageInfo(original_filename, url, 50, 50) + # Append image object to the list + image_object_list.append(image) + return image_object_list + \ No newline at end of file diff --git a/functions/pipeline/onboarding/onboarding-client.py b/functions/pipeline/onboarding/onboarding-client.py deleted file mode 100644 index bb7243a8..00000000 --- a/functions/pipeline/onboarding/onboarding-client.py +++ /dev/null @@ -1,39 +0,0 @@ -import requests -import json -import pg8000 - -# The following mock client imitates the CLI during the onboarding scenario for new images. -# The expectation is that the CLI uploads images to a temporary blob store, then gets a list -# of URLs to those images and passes the list to an HTTP trigger function in the format of -# a JSON string. The HTTP trigger function creates rows in the database for the images, -# retrieves the ImageId's for them, and then copies the images, each renamed as "ImageId.extension", -# into a permanent blob storage container. The HTTP function returns the list of URLs to -# the images in permanent blob storage. - -print("\nTest client for CLI Onboarding scenario") -print('-' * 40) - -# functionURL = "http://localhost:7071/api/onboarding?userId=aka" -functionURL = "https://onboardinghttptrigger.azurewebsites.net/api/onboarding?userId=aka" - -urlList = { "imageUrls": ["https://akaonboardingstorage.blob.core.windows.net/aka-temp-source-container/puppies1.jpg", - "https://akaonboardingstorage.blob.core.windows.net/aka-temp-source-container/puppies2.jpg", - "https://akaonboardingstorage.blob.core.windows.net/aka-temp-source-container/puppies3.jpg", - "https://akaonboardingstorage.blob.core.windows.net/aka-temp-source-container/puppies2.jpg"] } - -headers = {"Content-Type": "application/json"} - -print("Now executing POST request to onboard images...to:") -print("Function URL: " + functionURL) -print("Headers:") -for key, value in headers.items(): - print("\t" + key + ": " + value) -response = requests.post(url=functionURL, headers=headers, json=urlList) -print("Completed POST request.") - -raw_response = response.text -response_array = raw_response.split(", ") -response_output = "\n".join(response_array) - -print(f"Response status code: {response.status_code}") -print(f"Response string: {response_output}") diff --git a/functions/pipeline/shared/onboarding/__init__.py b/functions/pipeline/shared/onboarding/__init__.py new file mode 100644 index 00000000..59b814c3 --- /dev/null +++ b/functions/pipeline/shared/onboarding/__init__.py @@ -0,0 +1,52 @@ +import os +import logging + +# TODO: Modify this function to return a JSON string that contains a "succeeded" list and a "failed" list. +def copy_images_to_permanent_storage(image_id_url_map, copy_source, copy_destination, blob_service): + # Create a dictionary to store map of new permanent image URLs to image ID's + update_urls_dictionary = {} + + # Copy images from temporary to permanent storage and delete them. + for key, value in image_id_url_map.items(): + original_image_url = key + original_blob_name = original_image_url.split("/")[-1] + file_extension = os.path.splitext(original_image_url)[1] + image_id = value + new_blob_name = (str(image_id) + file_extension) + + # Verbose logging for testing + logging.info("Original image name: " + original_blob_name) + logging.info("Image ID: " + str(image_id)) + logging.info("New blob name: " + new_blob_name) + + # Create the blob URLs + source_blob_path = blob_service.make_blob_url(copy_source, original_blob_name) + destination_blob_path = blob_service.make_blob_url(copy_destination, new_blob_name) + + # Copy blob from temp storage to permanent storage + try: + logging.info("Now copying file from temporary to permanent storage...") + logging.info("Source path: " + source_blob_path) + logging.info("Destination path: " + destination_blob_path) + blob_service.copy_blob(copy_destination, new_blob_name, source_blob_path) + logging.info("Done.") + + # Add ImageId and permanent storage url to new dictionary to be sent to update function + update_urls_dictionary[image_id] = destination_blob_path + + # Delete the file from temp storage once it's been copied + logging.info("Now deleting image " + original_blob_name + " from temp storage container.") + try: + blob_service.delete_blob(copy_source, original_blob_name) + logging.info("Blob " + original_blob_name + " has been deleted successfully") + except Exception as e: + logging.error("ERROR: Deletion of blob " + original_blob_name + " failed. Exception: " + str(e)) + update_urls_dictionary.clear() + return update_urls_dictionary + + except Exception as e: + logging.error("ERROR: Copy of blob " + original_blob_name + " failed. Exception: " + str(e)) + update_urls_dictionary.clear() + return update_urls_dictionary + + return update_urls_dictionary