diff --git a/cli/cli.py b/cli/cli.py index e6997383..737074e1 100644 --- a/cli/cli.py +++ b/cli/cli.py @@ -32,7 +32,9 @@ if operation == 'download': download(config, args.num_images) - elif operation == 'onboard': + elif operation == 'onboard' and not args.folder: + print ("--folder arg required for onboard operation") + elif operation == 'onboard' and args.folder: onboard(config, args.folder) else: upload(config) diff --git a/cli/operations.py b/cli/operations.py index 13319c80..fd7ae237 100644 --- a/cli/operations.py +++ b/cli/operations.py @@ -6,7 +6,8 @@ import pathlib import json import copy -from azure.storage.blob import BlockBlobService, ContentSettings +from azure.storage.blob.blockblobservice import BlockBlobService +from azure.storage.file import ContentSettings FUNCTIONS_SECTION = 'FUNCTIONS' FUNCTIONS_KEY = 'FUNCTIONS_KEY' @@ -38,7 +39,7 @@ class MissingConfigException(Exception): class ImageLimitException(Exception): pass - +#TODO: Verify the storage account is correct. Currently we get an unhelpful error message if you have a type in Storage Name def get_azure_storage_client(config): # Todo: Move away from global client. global azure_storage_client @@ -53,11 +54,12 @@ def get_azure_storage_client(config): return azure_storage_client - +#TODO We should create the container if it does not exist def onboard(config, folder_name): blob_storage = get_azure_storage_client(config) uri = 'https://' + config.get("storage_account") + '.blob.core.windows.net/' + config.get("storage_container") + '/' functions_url = config.get('url') + '/api/onboarding' + user_name = config.get("tagging_user") images = [] for image in os.listdir(folder_name): if image.lower().endswith('.png') or image.lower().endswith('.jpg') or image.lower().endswith('.jpeg') or image.lower().endswith('.gif'): @@ -73,11 +75,17 @@ def onboard(config, folder_name): data['imageUrls'] = images headers = {'content-type': 'application/json'} query = { - "code": config.get('key') + "code": config.get('key'), + "userName": user_name } + #TODO: Ensure we don't get 4xx or 5xx return codes response = requests.post(functions_url, data=json.dumps(data), headers=headers, params=query) - print("Images successfully uploaded. \n" + response.text) + json_resp = response.json() + count = len(json_resp['imageUrls']) + print("Successfully uploaded " + str(count) + " images.") + for url in json_resp['imageUrls']: + print(url) def _download_bounds(num_images): @@ -95,9 +103,11 @@ def _download_bounds(num_images): def download(config, num_images, strategy=None): # TODO: better/more proper URI handling. functions_url = config.get("url") + "/api/download" + user_name = config.get("tagging_user") images_to_download = _download_bounds(num_images) query = { - "imageCount": images_to_download + "imageCount": images_to_download, + "userName": user_name } response = requests.get(functions_url, params=query) @@ -123,9 +133,12 @@ def download(config, num_images, strategy=None): exist_ok=True ) - download_images(config, data_dir, json_resp) - print("Downloaded files. Ready to tag!") - return images_to_download + local_images = download_images(config, data_dir, json_resp) + count = len(local_images) + print("Successfully downloaded " + str(count) + " images.") + for image_path in local_images: + print(image_path) + print("Ready to tag!") def download_images(config, image_dir, json_resp): @@ -135,25 +148,23 @@ def download_images(config, image_dir, json_resp): write_vott_data(image_dir, json_resp) urls = json_resp['imageUrls'] - dummy = "https://cdn.pixabay.com/photo/2017/02/20/18/03/cat-2083492_960_720.jpg" - + downloaded_file_paths = [] for index in range(len(urls)): url = urls[index] - # file will look something like - # https://csehackstorage.blob.core.windows.net/image-to-tag/image4.jpeg - # need to massage it to get the last portion. - file_name = url.split('/')[-1] - # todo: change this when we get actual data. - response = requests.get(dummy) + #TODO: We will download an empty file if we get a permission error on the blob store URL + # We should raise an exception. For now the blob store must be publically accessible + response = requests.get(url) file_path = pathlib.Path(image_dir / file_name) with open(str(file_path), "wb") as file: for chunk in response.iter_content(chunk_size=128): file.write(chunk) file.close() + downloaded_file_paths.append(file_path) + return downloaded_file_paths def write_vott_data(image_dir, json_resp): @@ -197,6 +208,7 @@ def prepend_file_paths(image_dir, vott_json): def upload(config): functions_url = config.get("url") + "/api/upload" + user_name = config.get("tagging_user") tagging_location = pathlib.Path( os.path.expanduser(config.get("tagging_location")) ) @@ -210,7 +222,11 @@ def upload(config): # Munge the vott json file. munged_json = trim_file_paths(json_data) - response = requests.post(functions_url, json=munged_json) + query = { + "userName": user_name + } + + response = requests.post(functions_url, json=munged_json, params=query) response.raise_for_status() resp_json = response.json() @@ -229,6 +245,8 @@ def trim_file_paths(json_data): munged_visited_frames = [] for frame_path in visited_frames: + #TODO: This line assumes that the visited frames name is a full path. + # Centralize this business logic in the codebase. It probably exists in shared code too munged_visited_frames.append( pathlib.Path(frame_path).name ) diff --git a/functions/pipeline/onboarding/__init__.py b/functions/pipeline/onboarding/__init__.py index 01429488..5b1cb4f2 100644 --- a/functions/pipeline/onboarding/__init__.py +++ b/functions/pipeline/onboarding/__init__.py @@ -7,6 +7,7 @@ from ..shared.db_access import ImageTagDataAccess, ImageInfo from ..shared.onboarding import copy_images_to_permanent_storage from azure.storage.blob import BlockBlobService +DEFAULT_RETURN_HEADER= { "content-type": "application/json"} def main(req: func.HttpRequest) -> func.HttpResponse: logging.info('Python HTTP trigger function processed a request.') @@ -16,16 +17,15 @@ def main(req: func.HttpRequest) -> func.HttpResponse: if not user_name: return func.HttpResponse( status_code=401, - headers={ "content-type": "application/json"}, + headers=DEFAULT_RETURN_HEADER, body=json.dumps({"error": "invalid userName given or omitted"}) ) try: req_body = req.get_json() - logging.error(req.get_json()) + logging.debug(req.get_json()) raw_url_list = req_body["imageUrls"] except ValueError: - logging.error("ERROR: Unable to decode JSON body") return func.HttpResponse("ERROR: Unable to decode POST body", status_code=400) if not raw_url_list: @@ -34,50 +34,49 @@ def main(req: func.HttpRequest) -> func.HttpResponse: # Check to ensure image URLs sent by client are all unique. url_list = set(raw_url_list) - # Get list of image objects to pass to DAL for insertion into DB. try: image_object_list = build_objects_from_url_list(url_list) except Exception as e: logging.error("ERROR: Could not build image object list. Exception: " + str(e)) return func.HttpResponse("ERROR: Could not build image object list.", status_code=401) - # Connect to database. try: - logging.info("Now connecting to database...") data_access = ImageTagDataAccess(get_postgres_provider()) - logging.info("Connected.") + user_id= data_access.create_user(user_name) + + logging.debug("Add new images to the database, and retrieve a dictionary ImageId's mapped to ImageUrl's") + image_id_url_map = data_access.add_new_images(image_object_list,user_id) + + copy_source = os.getenv('SOURCE_CONTAINER_NAME') + copy_destination = os.getenv('DESTINATION_CONTAINER_NAME') + + # Create blob service for storage account + blob_service = BlockBlobService(account_name=os.getenv('STORAGE_ACCOUNT_NAME'), account_key=os.getenv('STORAGE_ACCOUNT_KEY')) + + # Copy images to permanent storage and get a dictionary of images for which to update URLs in DB. + # TODO: Prefer to have this function return a JSON blob as a string containing a list of successes + # and a list of failures. If the list of failures contains any items, return a status code other than 200. + update_urls_dictionary = copy_images_to_permanent_storage(image_id_url_map, copy_source, copy_destination, blob_service) + + # If the dictionary of images is empty, this means a faiure occurred in a copy/delete operation. + # Otherwise, dictionary contains permanent image URLs for each image ID that was successfully copied. + if not update_urls_dictionary: + return func.HttpResponse("ERROR: Image copy/delete operation failed. Check state of images in storage.", status_code=401) + else: + logging.debug("Now updating permanent URLs in the DB...") + data_access.update_image_urls(update_urls_dictionary, user_id) + + content = json.dumps({"imageUrls":list(update_urls_dictionary.values())}) + return func.HttpResponse( + status_code=200, + headers=DEFAULT_RETURN_HEADER, + body=content + ) + except Exception as e: - logging.error("ERROR: Database connection failed. Exception: " + str(e)) - return func.HttpResponse("ERROR: Unable to connect to database", status_code=503) - - # Create/get user id - user_id_number = data_access.create_user(user_id) - logging.info("User id for {0} is {1}".format(user_id, str(user_id_number))) - - # Add new images to the database, and retrieve a dictionary ImageId's mapped to ImageUrl's - image_id_url_map = data_access.add_new_images(image_object_list,user_id) - - copy_source = os.getenv('SOURCE_CONTAINER_NAME') - copy_destination = os.getenv('DESTINATION_CONTAINER_NAME') - - # Create blob service for storage account - blob_service = BlockBlobService(account_name=os.getenv('STORAGE_ACCOUNT_NAME'), account_key=os.getenv('STORAGE_ACCOUNT_KEY')) - - # Copy images to permanent storage and get a dictionary of images for which to update URLs in DB. - # TODO: Prefer to have this function return a JSON blob as a string containing a list of successes - # and a list of failures. If the list of failures contains any items, return a status code other than 200. - update_urls_dictionary = copy_images_to_permanent_storage(image_id_url_map, copy_source, copy_destination, blob_service) - - # If the dictionary of images is empty, this means a faiure occurred in a copy/delete operation. - # Otherwise, dictionary contains permanent image URLs for each image ID that was successfully copied. - if not update_urls_dictionary: - return func.HttpResponse("ERROR: Image copy/delete operation failed. Check state of images in storage.", status_code=401) - else: - logging.info("Now updating permanent URLs in the DB...") - data_access.update_image_urls(update_urls_dictionary, user_id_number) - logging.info("Done.") - # Return string containing list of URLs to images in permanent blob storage - return func.HttpResponse("Images were successfully added to the database and copied to permanent storage.", status_code=200) + logging.error("Exception: " + str(e)) + return func.HttpResponse("Internal error occured", status_code=503) + # Given a list ofnimage URL's, build an ImageInfo object for each, and return a list of these image objects. def build_objects_from_url_list(url_list): @@ -91,4 +90,3 @@ def build_objects_from_url_list(url_list): # Append image object to the list image_object_list.append(image) return image_object_list - \ No newline at end of file diff --git a/functions/pipeline/requirements.txt b/functions/pipeline/requirements.txt index 41de3ac2..1ee794f5 100644 --- a/functions/pipeline/requirements.txt +++ b/functions/pipeline/requirements.txt @@ -1,6 +1,7 @@ azure-functions==1.0.0a5 azure-functions-worker==1.0.0a6 azure-storage-blob==1.4.0 +azure-storage-file==1.4.0 grpcio==1.14.2 grpcio-tools==1.14.2 protobuf==3.6.1 diff --git a/functions/pipeline/shared/db_access/db_access_v2.py b/functions/pipeline/shared/db_access/db_access_v2.py index ed020ec3..c174f57b 100644 --- a/functions/pipeline/shared/db_access/db_access_v2.py +++ b/functions/pipeline/shared/db_access/db_access_v2.py @@ -2,6 +2,7 @@ import string # import os # import time +import logging import random from enum import IntEnum, unique import getpass @@ -44,9 +45,9 @@ def test_connection(self): cursor = conn.cursor() cursor.execute('select * from tagstate') row = cursor.fetchone() - print() + logging.info('') while row: - print(str(row[0]) + " " + str(row[1])) + logging.info(str(row[0]) + " " + str(row[1])) row = cursor.fetchone() def create_user(self,user_name): @@ -63,7 +64,7 @@ def create_user(self,user_name): conn.commit() finally: cursor.close() except Exception as e: - print("An error occured creating a user: {0}".format(e)) + logging.error("An error occured creating a user: {0}".format(e)) raise finally: conn.close() return user_id @@ -82,12 +83,12 @@ def get_new_images(self, number_of_images, user_id): "a.createddtim DESC limit {0}") cursor.execute(query.format(number_of_images, ImageTagState.READY_TO_TAG, ImageTagState.INCOMPLETE_TAG)) for row in cursor: - print('Image Id: {0} \t\tImage Name: {1} \t\tTag State: {2}'.format(row[0], row[1], row[2])) + logging.debug('Image Id: {0} \t\tImage Name: {1} \t\tTag State: {2}'.format(row[0], row[1], row[2])) selected_images_to_tag[str(row[0])] = str(row[1]) self._update_images(selected_images_to_tag,ImageTagState.TAG_IN_PROGRESS, user_id, conn) finally: cursor.close() except Exception as e: - print("An errors occured getting images: {0}".format(e)) + logging.error("An errors occured getting images: {0}".format(e)) raise finally: conn.close() return selected_images_to_tag.values() @@ -111,9 +112,9 @@ def add_new_images(self,list_of_image_infos, user_id): url_to_image_id_map[img.image_location] = new_img_id conn.commit() finally: cursor.close() - print("Inserted {0} images to the DB".format(len(url_to_image_id_map))) + logging.debug("Inserted {0} images to the DB".format(len(url_to_image_id_map))) except Exception as e: - print("An errors occured getting image ids: {0}".format(e)) + logging.error("An errors occured getting image ids: {0}".format(e)) raise finally: conn.close() return url_to_image_id_map @@ -121,12 +122,12 @@ def add_new_images(self,list_of_image_infos, user_id): def update_incomplete_images(self, list_of_image_ids, user_id): #TODO: Make sure the image ids are in a TAG_IN_PROGRESS state self._update_images(list_of_image_ids,ImageTagState.INCOMPLETE_TAG,user_id, self._db_provider.get_connection()) - print("Updated {0} image(s) to the state {1}".format(len(list_of_image_ids),ImageTagState.INCOMPLETE_TAG.name)) + logging.debug("Updated {0} image(s) to the state {1}".format(len(list_of_image_ids),ImageTagState.INCOMPLETE_TAG.name)) def update_completed_untagged_images(self,list_of_image_ids, user_id): #TODO: Make sure the image ids are in a TAG_IN_PROGRESS state self._update_images(list_of_image_ids,ImageTagState.COMPLETED_TAG,user_id, self._db_provider.get_connection()) - print("Updated {0} image(s) to the state {1}".format(len(list_of_image_ids),ImageTagState.COMPLETED_TAG.name)) + logging.debug("Updated {0} image(s) to the state {1}".format(len(list_of_image_ids),ImageTagState.COMPLETED_TAG.name)) def _update_images(self, list_of_image_ids, new_image_tag_state, user_id, conn): if not isinstance(new_image_tag_state, ImageTagState): @@ -150,9 +151,9 @@ def _update_images(self, list_of_image_ids, new_image_tag_state, user_id, conn): conn.commit() finally: cursor.close() else: - print("No images to update") + logging.debug("No images to update") except Exception as e: - print("An errors occured updating images: {0}".format(e)) + logging.error("An errors occured updating images: {0}".format(e)) raise def update_image_urls(self,image_id_to_url_map, user_id): @@ -169,12 +170,12 @@ def update_image_urls(self,image_id_to_url_map, user_id): query = "UPDATE Image_Info SET ImageLocation = '{0}', ModifiedDtim = now() WHERE ImageId = {1}" cursor.execute(query.format(new_url,image_id)) conn.commit() - print("Updated ImageId: {0} to new ImageLocation: {1}".format(image_id,new_url)) + logging.debug("Updated ImageId: {0} to new ImageLocation: {1}".format(image_id,new_url)) self._update_images([image_id],ImageTagState.READY_TO_TAG, user_id,conn) - print("ImageId: {0} to has a new state: {1}".format(image_id,ImageTagState.READY_TO_TAG.name)) + logging.debug("ImageId: {0} to has a new state: {1}".format(image_id,ImageTagState.READY_TO_TAG.name)) finally: cursor.close() except Exception as e: - print("An errors occured updating image urls: {0}".format(e)) + logging.error("An errors occured updating image urls: {0}".format(e)) raise finally: conn.close() @@ -208,10 +209,10 @@ def update_tagged_images(self,list_of_image_tags, user_id): cursor.execute(query.format(img_tag.image_id,img_tag.x_min,img_tag.x_max,img_tag.y_min,img_tag.y_max,user_id,classifications)) self._update_images([img_id],ImageTagState.COMPLETED_TAG,user_id,conn) conn.commit() - print("Updated {0} image tags".format(len(list_of_image_tags))) + logging.debug("Updated {0} image tags".format(len(list_of_image_tags))) finally: cursor.close() except Exception as e: - print("An errors occured updating tagged image: {0}".format(e)) + logging.error("An errors occured updating tagged image: {0}".format(e)) raise finally: conn.close() @@ -232,7 +233,7 @@ def main(): db_config = DatabaseInfo("","","","") data_access = ImageTagDataAccess(PostGresProvider(db_config)) user_id = data_access.create_user(getpass.getuser()) - print("The user id for '{0}' is {1}".format(getpass.getuser(),user_id)) + logging.info("The user id for '{0}' is {1}".format(getpass.getuser(),user_id)) list_of_image_infos = generate_test_image_infos(5) url_to_image_id_map = data_access.add_new_images(list_of_image_infos,user_id) @@ -269,4 +270,8 @@ def id_generator(size=6, chars=string.ascii_uppercase + string.digits): return ''.join(random.choice(chars) for _ in range(size)) if __name__ == "__main__": + #Log to console when run locally + console = logging.StreamHandler() + log = logging.getLogger() + log.addHandler(console) main() diff --git a/functions/pipeline/shared/onboarding/__init__.py b/functions/pipeline/shared/onboarding/__init__.py index 59b814c3..3d0a58d0 100644 --- a/functions/pipeline/shared/onboarding/__init__.py +++ b/functions/pipeline/shared/onboarding/__init__.py @@ -15,9 +15,9 @@ def copy_images_to_permanent_storage(image_id_url_map, copy_source, copy_destina new_blob_name = (str(image_id) + file_extension) # Verbose logging for testing - logging.info("Original image name: " + original_blob_name) - logging.info("Image ID: " + str(image_id)) - logging.info("New blob name: " + new_blob_name) + logging.debug("Original image name: " + original_blob_name) + logging.debug("Image ID: " + str(image_id)) + logging.debug("New blob name: " + new_blob_name) # Create the blob URLs source_blob_path = blob_service.make_blob_url(copy_source, original_blob_name) @@ -25,20 +25,20 @@ def copy_images_to_permanent_storage(image_id_url_map, copy_source, copy_destina # Copy blob from temp storage to permanent storage try: - logging.info("Now copying file from temporary to permanent storage...") - logging.info("Source path: " + source_blob_path) - logging.info("Destination path: " + destination_blob_path) + logging.debug("Now copying file from temporary to permanent storage...") + logging.debug("Source path: " + source_blob_path) + logging.debug("Destination path: " + destination_blob_path) blob_service.copy_blob(copy_destination, new_blob_name, source_blob_path) - logging.info("Done.") + logging.debug("Done.") # Add ImageId and permanent storage url to new dictionary to be sent to update function update_urls_dictionary[image_id] = destination_blob_path # Delete the file from temp storage once it's been copied - logging.info("Now deleting image " + original_blob_name + " from temp storage container.") + logging.debug("Now deleting image " + original_blob_name + " from temp storage container.") try: blob_service.delete_blob(copy_source, original_blob_name) - logging.info("Blob " + original_blob_name + " has been deleted successfully") + logging.debug("Blob " + original_blob_name + " has been deleted successfully") except Exception as e: logging.error("ERROR: Deletion of blob " + original_blob_name + " failed. Exception: " + str(e)) update_urls_dictionary.clear() diff --git a/functions/pipeline/upload/__init__.py b/functions/pipeline/upload/__init__.py index 9fd4a701..f229224b 100644 --- a/functions/pipeline/upload/__init__.py +++ b/functions/pipeline/upload/__init__.py @@ -1,5 +1,5 @@ import json - +import logging from ..shared.vott_parser import process_vott_json from ..shared.db_provider import get_postgres_provider from ..shared.db_access import ImageTag, ImageTagDataAccess @@ -40,13 +40,13 @@ def main(req: func.HttpRequest) -> func.HttpResponse: if ids_to_tags[image_id]: all_imagetags.extend(__create_ImageTag_list(image_id, ids_to_tags[image_id])) - # Update all visited images with tags and set state to completed + logging.info("Update all visited images with tags and set state to completed") data_access.update_tagged_images(all_imagetags, user_id) - # Update visited but no tags identified images + logging.info("Update visited but no tags identified images") data_access.update_completed_untagged_images(upload_data["imagesVisitedNoTag"], user_id) - # Update unvisited/incomplete images + logging.info("Update unvisited/incomplete images") data_access.update_incomplete_images(upload_data["imagesNotVisited"], user_id) return func.HttpResponse( diff --git a/requirements.txt b/requirements.txt index b6871c9c..6251e5a4 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,7 @@ azure-functions==1.0.0a5 azure-functions-worker==1.0.0a6 azure-storage-blob==1.4.0 +azure-storage-file==1.4.0 grpcio==1.14.2 grpcio-tools==1.14.2 protobuf==3.6.1