From 445889ce5e5296a619b3970efdf5609cd07be66c Mon Sep 17 00:00:00 2001 From: "Xue, Chendi" Date: Thu, 15 Feb 2024 19:02:14 +0000 Subject: [PATCH 1/2] Necessary codes fixing and enable openai API in webui Signed-off-by: Xue, Chendi --- .../api_server_openai/query_http_requests.py | 15 +- .../api_openai_backend/request_handler.py | 2 +- inference/deepspeed_predictor.py | 2 +- inference/serve.py | 2 +- tests/inference/test_utils.py | 2 +- ui/start_ui.py | 192 ++++++++++++++---- 6 files changed, 161 insertions(+), 54 deletions(-) diff --git a/examples/inference/api_server_openai/query_http_requests.py b/examples/inference/api_server_openai/query_http_requests.py index 22d22447d..f0dc24eba 100644 --- a/examples/inference/api_server_openai/query_http_requests.py +++ b/examples/inference/api_server_openai/query_http_requests.py @@ -35,17 +35,21 @@ help="Whether to enable streaming response", ) parser.add_argument( - "--max_new_tokens", default=None, help="The maximum numbers of tokens to generate" + "--max_new_tokens", default=256, help="The maximum numbers of tokens to generate" ) parser.add_argument( - "--temperature", default=None, help="The value used to modulate the next token probabilities" + "--temperature", default=0.2, help="The value used to modulate the next token probabilities" ) parser.add_argument( "--top_p", - default=None, + default=0.7, help="If set to float < 1, only the smallest set of most probable tokens with probabilities that add up to`Top p` or higher are kept for generation", ) - +parser.add_argument( + "--input_text", + default="Tell me a long story with many words.", + help="question to ask model", +) args = parser.parse_args() s = requests.Session() @@ -54,8 +58,7 @@ body = { "model": args.model_name, "messages": [ - {"role": "assistant", "content": "You are a helpful assistant."}, - {"role": "user", "content": "Tell me a long story with many words."}, + {"role": "user", "content": args.input_text}, ], "stream": args.streaming_response, "max_tokens": args.max_new_tokens, diff --git a/inference/api_openai_backend/request_handler.py b/inference/api_openai_backend/request_handler.py index 126943e22..dd5a1189d 100644 --- a/inference/api_openai_backend/request_handler.py +++ b/inference/api_openai_backend/request_handler.py @@ -38,7 +38,7 @@ from fastapi import status, HTTPException, Request from starlette.responses import JSONResponse from pydantic import ValidationError as PydanticValidationError -from logger import get_logger +from inference.logger import get_logger from .openai_protocol import Prompt, ModelResponse, ErrorResponse, FinishReason logger = get_logger(__name__) diff --git a/inference/deepspeed_predictor.py b/inference/deepspeed_predictor.py index 464c81506..ef75c6118 100644 --- a/inference/deepspeed_predictor.py +++ b/inference/deepspeed_predictor.py @@ -13,7 +13,7 @@ from typing import List import os from predictor import Predictor -from utils import get_torch_dtype +from inference.utils import get_torch_dtype from inference.inference_config import ( InferenceConfig, GenerateResult, diff --git a/inference/serve.py b/inference/serve.py index e73397a79..47d54c6ba 100644 --- a/inference/serve.py +++ b/inference/serve.py @@ -16,7 +16,7 @@ import ray import sys -from utils import get_deployment_actor_options +from inference.utils import get_deployment_actor_options from pydantic_yaml import parse_yaml_raw_as from api_server_simple import serve_run from api_server_openai import openai_serve_run diff --git a/tests/inference/test_utils.py b/tests/inference/test_utils.py index c0bb43a08..37b16d677 100644 --- a/tests/inference/test_utils.py +++ b/tests/inference/test_utils.py @@ -1,7 +1,7 @@ import pytest import torch -from utils import ( +from inference.utils import ( get_deployment_actor_options, StoppingCriteriaSub, max_input_len, diff --git a/ui/start_ui.py b/ui/start_ui.py index 4377531b7..60a9dcce2 100644 --- a/ui/start_ui.py +++ b/ui/start_ui.py @@ -48,7 +48,10 @@ RAGTextFix, ) from pyrecdp.primitives.document.reader import _default_file_readers +from pyrecdp.core.cache_utils import RECDP_MODELS_CACHE +if not os.environ['RECDP_CACHE_HOME']: + os.environ['RECDP_CACHE_HOME'] = os.getcwd() class CustomStopper(Stopper): def __init__(self): @@ -64,6 +67,19 @@ def stop_all(self) -> bool: def stop(self, flag): self.should_stop = flag +def convert_openai_output(chunk): + print(chunk) + try: + ret = chunk["choices"][0]["delta"]["content"] + except: + ret = "" + return ret + +def is_simple_api(request_url, model_name): + if model_name is None: + return True + return model_name in request_url + @ray.remote class Progress_Actor: @@ -144,8 +160,6 @@ def __init__( self.finetune_status = False self.default_rag_path = default_rag_path self.embedding_model_name = "sentence-transformers/all-mpnet-base-v2" - self.embeddings = HuggingFaceEmbeddings(model_name=self.embedding_model_name) - self._init_ui() @staticmethod @@ -164,13 +178,14 @@ def history_to_messages(history): "role": "assistant", "content": bot_text, } - ) + ) return messages @staticmethod def add_knowledge(prompt, enhance_knowledge): description = "Known knowledge: {knowledge}. Then please answer the question based on follow conversation: {conversation}." - return description.format(knowledge=enhance_knowledge, conversation=prompt) + prompt[-1]['content'] = description.format(knowledge=enhance_knowledge, conversation=prompt[-1]['content']) + return prompt def clear(self): return ( @@ -188,34 +203,59 @@ def reset(self, id): def user(self, user_message, history): return "", history + [[user_message, None]] - def model_generate(self, prompt, request_url, config): - print("prompt: ", prompt) - - sample_input = {"text": prompt, "config": config, "stream": True} + def model_generate(self, prompt, request_url, model_name, config, simple_api=True): + if simple_api: + prompt = self.process_tool.get_prompt(prompt) + sample_input = {"text": prompt, "config": config, "stream": True} + else: + sample_input = { + "model": model_name, + "messages": prompt, + "stream": True, + "max_tokens": config["max_new_tokens"], + "temperature": config["temperature"], + "top_p": config["top_p"], + "top_k": config["top_k"] + } proxies = {"http": None, "https": None} + print(sample_input) outputs = requests.post(request_url, proxies=proxies, json=sample_input, stream=True) outputs.raise_for_status() for output in outputs.iter_content(chunk_size=None, decode_unicode=True): # remove context - if prompt in output: - output = output[len(prompt) :] + if simple_api: + if prompt in output: + output = output[len(prompt) :] + else: + import json + import re + try: + output = re.sub("^data: ", "", output) + output = json.loads(output) + print(f"After load as json: {output}") + except: + output = "" yield output def bot( self, history, + deploy_model_endpoint, model_endpoint, Max_new_tokens, Temperature, Top_p, Top_k, + model_name=None, enhance_knowledge=None, ): + request_url = model_endpoint if model_endpoint != "" else deploy_model_endpoint + simple_api = is_simple_api(request_url, model_name) prompt = self.history_to_messages(history) - prompt = self.process_tool.get_prompt(prompt) + if enhance_knowledge: prompt = self.add_knowledge(prompt, enhance_knowledge) - request_url = model_endpoint + time_start = time.time() token_num = 0 config = { @@ -225,16 +265,18 @@ def bot( "top_p": Top_p, "top_k": Top_k, } - outputs = self.model_generate(prompt=prompt, request_url=request_url, config=config) + outputs = self.model_generate(prompt=prompt, request_url=request_url, model_name=model_name, config=config, simple_api=simple_api) + if history[-1][1] is None: + history[-1][1] = "" for output in outputs: if len(output) != 0: time_end = time.time() - if history[-1][1] is None: - history[-1][1] = output - else: + if isinstance(output, str): history[-1][1] += output - history[-1][1] = self.process_tool.convert_output(history[-1][1]) + history[-1][1] = self.process_tool.convert_output(history[-1][1]) + else: + history[-1][1] += convert_openai_output(output) time_spend = round(time_end - time_start, 3) token_num += 1 new_token_latency = f""" @@ -254,10 +296,11 @@ def bot_test( Temperature, Top_p, Top_k, + model_name=None ): - prompt = self.history_to_messages(history) - prompt = self.process_tool.get_prompt(prompt) request_url = model_endpoint + simple_api = is_simple_api(request_url, model_name) + prompt = self.history_to_messages(history) time_start = time.time() config = { "max_new_tokens": Max_new_tokens, @@ -266,16 +309,18 @@ def bot_test( "top_p": Top_p, "top_k": Top_k, } - outputs = self.model_generate(prompt=prompt, request_url=request_url, config=config) + outputs = self.model_generate(prompt=prompt, request_url=request_url, model_name=model_name, config=config, simple_api=simple_api) + history[-1][1] = "" for output in outputs: if len(output) != 0: time_end = time.time() - if history[-1][1] is None: - history[-1][1] = output - else: + if isinstance(output, str): history[-1][1] += output - history[-1][1] = self.process_tool.convert_output(history[-1][1]) + history[-1][1] = self.process_tool.convert_output(history[-1][1]) + else: + history[-1][1] += convert_openai_output(output) + time_spend = time_end - time_start bot_queue.put([queue_id, history, time_spend]) bot_queue.put([queue_id, "", ""]) @@ -283,6 +328,7 @@ def bot_test( def bot_rag( self, history, + deploy_model_endpoint, model_endpoint, Max_new_tokens, Temperature, @@ -291,6 +337,7 @@ def bot_rag( rag_selector, rag_path, returned_k, + model_name=None ): enhance_knowledge = None if os.path.isabs(rag_path): @@ -304,6 +351,14 @@ def bot_rag( question = history[-1][0] print("history: ", history) print("question: ", question) + + if not hasattr(self, "embeddings"): + local_embedding_model_path = os.path.join(RECDP_MODELS_CACHE, self.embedding_model_name) + if os.path.exists(local_embedding_model_path): + self.embeddings = HuggingFaceEmbeddings(model_name=local_embedding_model_path) + else: + self.embeddings = HuggingFaceEmbeddings(model_name=self.embedding_model_name) + vectorstore = FAISS.load_local(load_dir, self.embeddings, index_name="knowledge_db") sim_res = vectorstore.similarity_search(question, k=int(returned_k)) enhance_knowledge = "" @@ -312,12 +367,14 @@ def bot_rag( bot_generator = self.bot( history, + deploy_model_endpoint, model_endpoint, Max_new_tokens, Temperature, Top_p, Top_k, - enhance_knowledge, + model_name=model_name, + enhance_knowledge=enhance_knowledge, ) for output in bot_generator: yield output @@ -380,10 +437,16 @@ def regenerate( "separators": ["\n\n", "\n", " ", ""], } embeddings_type = "HuggingFaceEmbeddings" - embeddings_args = {"model_name": embedding_model} - if embedding_model != self.embedding_model_name: - self.embedding_model_name = embedding_model + + self.embedding_model_name = embedding_model + local_embedding_model_path = os.path.join(RECDP_MODELS_CACHE, self.embedding_model_name) + if os.path.exists(local_embedding_model_path): + self.embeddings = HuggingFaceEmbeddings(model_name=local_embedding_model_path) + embeddings_args = {"model_name": local_embedding_model_path} + else: self.embeddings = HuggingFaceEmbeddings(model_name=self.embedding_model_name) + embeddings_args = {"model_name": self.embedding_model_name} + pipeline = TextPipeline() ops = [loader] @@ -472,7 +535,10 @@ def finetune( if "CPU" not in ray_resources or cpus_per_worker_ftn * worker_num + 1 > int( ray.available_resources()["CPU"] ): - raise gr.Error("Resources are not meeting the demand") + num_req = cpus_per_worker_ftn * worker_num + 1 + num_act = int(ray.available_resources()['CPU']) + error_msg = f"Resources are not meeting the demand, required num_cpu is {num_req}, actual num_cpu is {num_act}" + raise gr.Error(error_msg) if ( worker_num != exist_worker or cpus_per_worker_ftn != exist_cpus_per_worker_ftn @@ -677,6 +743,8 @@ def get_ray_cluster(self): command = "conda activate " + self.conda_env_name + "; ray status" stdin, stdout, stderr = self.ssh_connect[-1].exec_command(command) out = stdout.read().decode("utf-8") + #print(f"stderr is {stderr.read().decode('utf-8')}") + #print(f"out is {out}") out_words = [word for word in out.split("\n") if "CPU" in word][0] cpu_info = out_words.split(" ")[1].split("/") total_core = int(float(cpu_info[1])) @@ -795,16 +863,19 @@ def _init_ui(self): for index in range(len(self.ray_nodes)): if "node:__internal_head__" in ray.nodes()[index]["Resources"]: mark_alive = index - node_ip = self.ray_nodes[index]["NodeName"] - self.ssh_connect[index] = paramiko.SSHClient() - self.ssh_connect[index].load_system_host_keys() - self.ssh_connect[index].set_missing_host_key_policy(paramiko.RejectPolicy()) - self.ssh_connect[index].connect( - hostname=node_ip, port=self.node_port, username=self.user_name - ) + node_ip = self.ray_nodes[index]["NodeName"] + self.ssh_connect[index] = paramiko.SSHClient() + self.ssh_connect[index].load_system_host_keys() + self.ssh_connect[index].set_missing_host_key_policy(paramiko.AutoAddPolicy()) + self.ssh_connect[index].connect( + hostname=node_ip, port=self.node_port, username=self.user_name + ) + if mark_alive is None: + print("No alive ray worker found! Exit") + return self.ssh_connect[-1] = paramiko.SSHClient() self.ssh_connect[-1].load_system_host_keys() - self.ssh_connect[-1].set_missing_host_key_policy(paramiko.RejectPolicy()) + self.ssh_connect[-1].set_missing_host_key_policy(paramiko.AutoAddPolicy()) self.ssh_connect[-1].connect( hostname=self.ray_nodes[mark_alive]["NodeName"], port=self.node_port, @@ -889,7 +960,7 @@ def _init_ui(self): worker_num = gr.Slider( 1, 8, - 2, + 1, step=1, interactive=True, label="Worker Number", @@ -955,7 +1026,7 @@ def _init_ui(self): with gr.Accordion("Parameters", open=False, visible=True): replica_num = gr.Slider( - 1, 8, 4, step=1, interactive=True, label="Model Replica Number" + 1, 8, 1, step=1, interactive=True, label="Model Replica Number" ) cpus_per_worker_deploy = gr.Slider( 1, @@ -991,7 +1062,7 @@ def _init_ui(self): max_new_tokens = gr.Slider( 1, 2000, - 128, + 256, step=1, interactive=True, label="Max New Tokens", @@ -1000,7 +1071,7 @@ def _init_ui(self): Temperature = gr.Slider( 0, 1, - 0.7, + 0.2, step=0.01, interactive=True, label="Temperature", @@ -1009,7 +1080,7 @@ def _init_ui(self): Top_p = gr.Slider( 0, 1, - 1.0, + 0.7, step=0.01, interactive=True, label="Top p", @@ -1034,6 +1105,18 @@ def _init_ui(self): with gr.Row(): with gr.Column(scale=0.8): + with gr.Row(): + endpoint_value = "http://127.0.0.1:8000/v1/chat/completions" + model_endpoint = gr.Text( + label="Model Endpoint", + value=endpoint_value, + scale = 1 + ) + model_name = gr.Text( + label="Model Name", + value="llama-2-7b-chat-hf", + scale = 1 + ) msg = gr.Textbox( show_label=False, container=False, @@ -1098,7 +1181,7 @@ def _init_ui(self): max_new_tokens_rag = gr.Slider( 1, 2000, - 128, + 256, step=1, interactive=True, label="Max New Tokens", @@ -1229,6 +1312,18 @@ def _init_ui(self): with gr.Row(): with gr.Column(scale=0.8): + with gr.Row(): + endpoint_value = "http://127.0.0.1:8000/v1/chat/completions" + rag_model_endpoint = gr.Text( + label="Model Endpoint", + value=endpoint_value, + scale = 1 + ) + rag_model_name = gr.Text( + label="Model Name", + value="llama-2-7b-chat-hf", + scale = 1 + ) msg_rag = gr.Textbox( show_label=False, container=False, @@ -1364,10 +1459,12 @@ def _init_ui(self): [ chatbot, deployed_model_endpoint, + model_endpoint, max_new_tokens, Temperature, Top_p, Top_k, + model_name, ], [chatbot, latency_status], ) @@ -1378,10 +1475,12 @@ def _init_ui(self): [ chatbot, deployed_model_endpoint, + model_endpoint, max_new_tokens, Temperature, Top_p, Top_k, + model_name, ], [chatbot, latency_status], ) @@ -1417,6 +1516,7 @@ def _init_ui(self): [ chatbot_rag, deployed_model_endpoint, + rag_model_endpoint, max_new_tokens_rag, Temperature_rag, Top_p_rag, @@ -1424,6 +1524,7 @@ def _init_ui(self): rag_selector, rag_path, returned_k, + rag_model_name, ], [chatbot_rag, latency_status_rag], ) @@ -1434,6 +1535,7 @@ def _init_ui(self): [ chatbot_rag, deployed_model_endpoint, + rag_model_endpoint, max_new_tokens_rag, Temperature_rag, Top_p_rag, @@ -1441,6 +1543,7 @@ def _init_ui(self): rag_selector, rag_path, returned_k, + rag_model_name ], [chatbot_rag, latency_status_rag], ) @@ -1602,13 +1705,14 @@ def _init_ui(self): } }, "address": "auto", - "_node_ip_address": "127.0.0.1", } accelerate_env_vars = get_accelerate_environment_variable( finetune_config["Training"]["accelerate_mode"], config=None ) ray_init_config["runtime_env"]["env_vars"].update(accelerate_env_vars) + print("Start to init Ray connection") context = ray.init(**ray_init_config) + print("Ray connected") head_node_ip = context.get("address").split(":")[0] finetune_model_path = args.finetune_model_path From e47113b1a3194358626209a2508d30b88765dd0e Mon Sep 17 00:00:00 2001 From: "Xue, Chendi" Date: Thu, 15 Feb 2024 19:23:15 +0000 Subject: [PATCH 2/2] Fix ruff error Signed-off-by: Xue, Chendi --- ui/start_ui.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/ui/start_ui.py b/ui/start_ui.py index 60a9dcce2..dd45e0e17 100644 --- a/ui/start_ui.py +++ b/ui/start_ui.py @@ -68,10 +68,9 @@ def stop(self, flag): self.should_stop = flag def convert_openai_output(chunk): - print(chunk) try: ret = chunk["choices"][0]["delta"]["content"] - except: + except KeyError: ret = "" return ret @@ -232,8 +231,7 @@ def model_generate(self, prompt, request_url, model_name, config, simple_api=Tru try: output = re.sub("^data: ", "", output) output = json.loads(output) - print(f"After load as json: {output}") - except: + except ValueError: output = "" yield output