diff --git a/.gitignore b/.gitignore index d4551a9..58e5297 100644 --- a/.gitignore +++ b/.gitignore @@ -1,12 +1,8 @@ .idea learning_orchestra_client/.idea -learning_orchestra_client/__pycache__ -learning_orchestra_client/dataset/__pycache__ -learning_orchestra_client/transform/__pycache__ -learning_orchestra_client/main.py -learning_orchestra_client/explore/__pycache__ -learning_orchestra_client/builder/__pycache__ +__pycache__ docs +venv sentiment_analysis_output.py mnist_output.py mnist_treatment.py \ No newline at end of file diff --git a/learning_orchestra_client/observe/observe.py b/learning_orchestra_client/observe/observe.py index 08e3ca7..7acb7ac 100644 --- a/learning_orchestra_client/observe/observe.py +++ b/learning_orchestra_client/observe/observe.py @@ -1,84 +1,122 @@ -from pymongo import MongoClient, change_stream - +import requests +from learning_orchestra_client._util._response_treat import ResponseTreat class Observer: + debug = True + __TIMEOUT_TIME_MULTIPLICATION = 1000 + __INPUT_NAME = "filename" + + __FILENAME_REQUEST_FIELD = 'filename' + __OBSERVER_TYPE_REQUEST_FIELD = 'observe_type' + __TIMEOUT_REQUEST_FIELD = 'timeout' + __OBSERVER_NAME_REQUEST_FIELD = 'observer_name' + __OBSERVER_PIPELINE_REQUEST_FIELD = 'pipeline' + __MICROSERVICE_PORT = '5010' + def __init__(self, cluster_ip: str): - cluster_ip = cluster_ip.replace("http://", "") - mongo_url = f'mongodb://root:owl45%2321@{cluster_ip}' - mongo_client = MongoClient( - mongo_url - ) + self.__api_path = "/api/learningOrchestra/v1/observer" + self.__service_base_url = f'{cluster_ip}:{self.__MICROSERVICE_PORT}' + self.__service_url = f'{self.__service_base_url}{self.__api_path}' + self.cluster_ip = cluster_ip.replace("http://", "") + self.__response_treat = ResponseTreat() - self.__database = mongo_client.database + def wait(self, name: str, timeout: int=0, + observer_name:str='', pretty_response: bool = False) -> dict: - def wait(self, name: str, timeout: int = None) -> dict: """ - :description: Observe the end of a pipe for a timeout seconds or + Observe the end of a pipe for a timeout seconds or until the pipe finishes its execution. - name: Represents the pipe name. Any tune, train, predict service can - wait its finish with a - wait method call. - timeout: the maximum time to wait the observed step, in seconds. + :param name: Represents the pipe name. Any tune, train, predict service can wait its finish with a wait method call. + :param timeout: the maximum time to wait the observed step, in seconds. If set to 0, there will be no timeout + :param observer_name: the name of the observer (default: observer_) + + :return: Returns a dictionary with the content of a mongo collection, representing any pipe result + """ + + return self.watch(name=name, + timeout=timeout, + type="wait", + observer_name=observer_name, + pretty_response=pretty_response) + + def watch(self, name: str, timeout: int=0, type:str="wait", + observer_name:str='',pipeline:[]=None, + pretty_response: bool = False) -> dict: + + """ + Observe the pipe for a timeout seconds or + until the pipe finishes its execution. It is a more complete method, + you can use it to configure your own pipelines if you wish. For more + simplistic uses, try the methods "wait" and "start_observing_pipe" + + :param name: the name of the pipe to be observed. A train, predict, explore, transform or any other pipe can be observed. + :param timeout: the maximum time to wait the observed step, in seconds. If set to 0, there will be no timeout + :param type: type of the observation, it can be "wait" to observe the end of the pipe, "observe" to observe until the pipe change it's content or "custom" if you wish to provide your own mongo pipeline + :param observer_name: the name of the observer (default observer_) + :param pipeline: the custom pipeline that you wish to use on the observer. It is only used if type is set to "custom" - :return: If True it returns a String. Otherwise, it returns - a dictionary with the content of a mongo collection, representing - any pipe result + :return: Returns a dictionary with the content of a mongo collection, representing any pipe result """ - dataset_collection = self.__database[name] - metadata_query = {"_id": 0} - dataset_metadata = dataset_collection.find_one(metadata_query) - - if dataset_metadata["finished"]: - return dataset_metadata - - observer_query = [ - {'$match': { - '$and': - [ - {'operationType': 'update'}, - {'fullDocument.finished': {'$eq': True}} - ] - }} - ] - return dataset_collection.watch( - observer_query, - full_document='updateLookup', - max_await_time_ms=timeout * self.__TIMEOUT_TIME_MULTIPLICATION - ).next()['fullDocument'] - - def observe_pipe(self, name: str, timeout: int = None) -> \ - change_stream.CollectionChangeStream: + if type == "all" or type == "wait" or type == '1': + type = "wait" + elif type == "finish" or type == "observe" or type == '2': + type = "observe" + else: + raise NameError("Invalid type parameter: " + type) + + request_url = f'{self.__service_url}' + request_body = { + self.__FILENAME_REQUEST_FIELD: name, + self.__OBSERVER_TYPE_REQUEST_FIELD: type, + self.__TIMEOUT_REQUEST_FIELD: timeout, + self.__OBSERVER_NAME_REQUEST_FIELD: observer_name, + self.__OBSERVER_PIPELINE_REQUEST_FIELD: pipeline + } + + observer_uri = requests.post(url=f'{request_url}', + json=request_body) + + if(observer_uri.status_code >= 200 and observer_uri.status_code < 400): + url = f"{self.__service_base_url}{observer_uri.json()['result']}" + + response = requests.get(url=url) + else: + raise Exception(observer_uri.json()['result']) + + if response.status_code >= 200 and response.status_code < 400: + response = self.__response_treat.treatment(response,pretty_response) + else: + if response.status_code == 408: + raise TimeoutError(response.json()['result']) + + raise Exception(response.json()['result']) + + delete_resp = requests.delete(url=url) + return response + + + def start_observing_pipe(self, name: str, timeout: int=0, + observer_name:str='', + pretty_response: bool = False) -> dict: """ - :description: It waits until a pipe change its content + It waits until a pipe change its content (replace, insert, update and delete mongoDB collection operation types), so it is a bit different from wait method with a timeout and a finish explicit condition. - :name: the name of the pipe to be observed. A train, predict, explore, - transform or any - other pipe can be observed. - timeout: the maximum time to wait the observed step, in milliseconds. + :param name: the name of the pipe to be observed. A train, predict, explore, transform or any other pipe can be observed. + :param timeout: the maximum time to wait the observed step, in seconds. If set to 0, there will be no timeout + :param observer_name: the name of the observer (default observer_) - :return: A pymongo CollectionChangeStream object. You must use the - builtin next() method to iterate over changes. + :returns: a dictionary with the content of a mongo collection, representing any pipe result """ - observer_query = [ - {'$match': { - '$or': [ - {'operationType': 'replace'}, - {'operationType': 'insert'}, - {'operationType': 'update'}, - {'operationType': 'delete'} - - ] - }} - ] - return self.__database[name].watch( - observer_query, - max_await_time_ms=timeout * self.__TIMEOUT_TIME_MULTIPLICATION, - full_document='updateLookup') + return self.watch(name=name, + timeout=timeout, + type="observe", + observer_name=observer_name, + pretty_response=pretty_response) \ No newline at end of file diff --git a/pipeline/imdb.py b/pipeline/imdb.py index 5c54e16..a07040b 100644 --- a/pipeline/imdb.py +++ b/pipeline/imdb.py @@ -4,7 +4,7 @@ from learning_orchestra_client.train.scikitlearn import TrainScikitLearn from learning_orchestra_client.predict.scikitlearn import PredictScikitLearn -CLUSTER_IP = "http://34.123.167.241" +CLUSTER_IP = "http://35.247.203.13" dataset_csv = DatasetCsv(CLUSTER_IP) diff --git a/pipeline/mnist.py b/pipeline/mnist.py index b097480..efbe991 100644 --- a/pipeline/mnist.py +++ b/pipeline/mnist.py @@ -5,9 +5,10 @@ from learning_orchestra_client.predict.tensorflow import PredictTensorflow from learning_orchestra_client.evaluate.tensorflow import EvaluateTensorflow -CLUSTER_IP = "http://35.224.50.116" +CLUSTER_IP = "http://35.247.197.191" dataset_generic = DatasetGeneric(CLUSTER_IP) + dataset_generic.insert_dataset_async( dataset_name="mnist_train_images", url="https://drive.google.com/u/0/uc?" diff --git a/pipeline/titanic.py b/pipeline/titanic.py index 789735b..ff0b02a 100644 --- a/pipeline/titanic.py +++ b/pipeline/titanic.py @@ -3,16 +3,16 @@ from learning_orchestra_client.transform.data_type import TransformDataType from learning_orchestra_client.builder.builder import BuilderSparkMl -CLUSTER_IP = "http://35.193.116.104" +CLUSTER_IP = "http://35.247.197.191" dataset_csv = DatasetCsv(CLUSTER_IP) dataset_csv.insert_dataset_async( - url="https://filebin.net/boniydu54k710l54/train.csv?t=s350xryf", + url="https://filebin.net/48b0fwidk4amp7fa/train.csv", dataset_name="titanic_training", ) dataset_csv.insert_dataset_async( - url="https://filebin.net/udtf7eogfgasqnx5/test.csv?t=h79pcy0l", + url="https://filebin.net/1ewibio2rziv6lrm/test.csv", dataset_name="titanic_testing" )