diff --git a/devchat/_cli/prompt.py b/devchat/_cli/prompt.py index 742605f9..a06b6443 100644 --- a/devchat/_cli/prompt.py +++ b/devchat/_cli/prompt.py @@ -1,6 +1,8 @@ import json +import sys from typing import List, Optional import rich_click as click +from devchat.engine import run_command from devchat.assistant import Assistant from devchat.openai.openai_chat import OpenAIChat, OpenAIChatConfig from devchat.store import Store @@ -24,10 +26,15 @@ help='Path to a JSON file with functions for the prompt.') @click.option('-n', '--function-name', help='Specify the function name when the content is the output of a function.') +@click.option('-ns', '--not-store', is_flag=True, default=False, required=False, + help='Do not save the conversation to the store.') +@click.option('-a', '--auto', is_flag=True, default=False, required=False, + help='Answer question by function-calling.') def prompt(content: Optional[str], parent: Optional[str], reference: Optional[List[str]], instruct: Optional[List[str]], context: Optional[List[str]], model: Optional[str], config_str: Optional[str] = None, - functions: Optional[str] = None, function_name: Optional[str] = None): + functions: Optional[str] = None, function_name: Optional[str] = None, + not_store: Optional[bool] = False, auto: Optional[bool] = False): """ This command performs interactions with the specified large language model (LLM) by sending prompts and receiving responses. @@ -82,9 +89,9 @@ def prompt(content: Optional[str], parent: Optional[str], reference: Optional[Li openai_config = OpenAIChatConfig(model=model, **parameters_data) chat = OpenAIChat(openai_config) - store = Store(repo_chat_dir, chat) + chat_store = Store(repo_chat_dir, chat) - assistant = Assistant(chat, store, config.max_input_tokens) + assistant = Assistant(chat, chat_store, config.max_input_tokens, not not_store) functions_data = None if functions is not None: @@ -94,5 +101,16 @@ def prompt(content: Optional[str], parent: Optional[str], reference: Optional[Li parent=parent, references=reference, function_name=function_name) + click.echo(assistant.prompt.formatted_header()) + command_result = run_command( + model, + assistant.prompt.messages, + content, + parent, + context_contents, + auto) + if command_result is not None: + sys.exit(command_result[0]) + for response in assistant.iterate_response(): click.echo(response, nl=False) diff --git a/devchat/assistant.py b/devchat/assistant.py index 47eec27c..2dabcb17 100644 --- a/devchat/assistant.py +++ b/devchat/assistant.py @@ -4,6 +4,7 @@ import openai from devchat.message import Message from devchat.chat import Chat +from devchat.openai.openai_prompt import OpenAIPrompt from devchat.store import Store from devchat.utils import get_logger @@ -12,7 +13,7 @@ class Assistant: - def __init__(self, chat: Chat, store: Store, max_prompt_tokens: int): + def __init__(self, chat: Chat, store: Store, max_prompt_tokens: int, need_store: bool): """ Initializes an Assistant object. @@ -23,6 +24,11 @@ def __init__(self, chat: Chat, store: Store, max_prompt_tokens: int): self._store = store self._prompt = None self.token_limit = max_prompt_tokens + self._need_store = need_store + + @property + def prompt(self) -> OpenAIPrompt: + return self._prompt @property def available_tokens(self) -> int: @@ -92,7 +98,6 @@ def iterate_response(self) -> Iterator[str]: Iterator[str]: An iterator over response strings from the chat API. """ if self._chat.config.stream: - first_chunk = True created_time = int(time.time()) config_params = self._chat.config.dict(exclude_unset=True) for chunk in self._chat.stream_response(self._prompt): @@ -114,14 +119,12 @@ def iterate_response(self) -> Iterator[str]: chunk['choices'][0]['delta']['role']='assistant' delta = self._prompt.append_response(json.dumps(chunk)) - if first_chunk: - first_chunk = False - yield self._prompt.formatted_header() yield delta if not self._prompt.responses: raise RuntimeError("No responses returned from the chat API") - self._store.store_prompt(self._prompt) - yield self._prompt.formatted_footer(0) + '\n' + if self._need_store: + self._store.store_prompt(self._prompt) + yield self._prompt.formatted_footer(0) + '\n' for index in range(1, len(self._prompt.responses)): yield self._prompt.formatted_full_response(index) + '\n' else: @@ -129,6 +132,7 @@ def iterate_response(self) -> Iterator[str]: self._prompt.set_response(response_str) if not self._prompt.responses: raise RuntimeError("No responses returned from the chat API") - self._store.store_prompt(self._prompt) + if self._need_store: + self._store.store_prompt(self._prompt) for index in range(len(self._prompt.responses)): yield self._prompt.formatted_full_response(index) + '\n' diff --git a/devchat/engine/__init__.py b/devchat/engine/__init__.py index 7fbcb637..5fb7a41f 100644 --- a/devchat/engine/__init__.py +++ b/devchat/engine/__init__.py @@ -1,11 +1,13 @@ from .command_parser import parse_command, Command, CommandParser from .namespace import Namespace from .recursive_prompter import RecursivePrompter +from .router import run_command __all__ = [ 'parse_command', 'Command', 'CommandParser', 'Namespace', - 'RecursivePrompter' + 'RecursivePrompter', + 'run_command' ] diff --git a/devchat/engine/command_runner.py b/devchat/engine/command_runner.py new file mode 100644 index 00000000..8680653f --- /dev/null +++ b/devchat/engine/command_runner.py @@ -0,0 +1,198 @@ +""" +Run Command with a input text. +""" +import os +import sys +import json +import threading +import subprocess +from typing import List +import shlex + +import openai + +from devchat.utils import get_logger +from .command_parser import Command + + +logger = get_logger(__name__) + + +# Equivalent of CommandRun in Python\which executes subprocesses +class CommandRunner: + def __init__(self, model_name: str): + self.process = None + self._model_name = model_name + + def _call_function_by_llm(self, + command_name: str, + command: Command, + history_messages: List[dict]): + """ + command needs multi parameters, so we need parse each + parameter by LLM from input_text + """ + properties = {} + required = [] + for key, value in command.parameters.items(): + properties[key] = {} + for key1, value1 in value.dict().items(): + if key1 not in ['type', 'description', 'enum'] or value1 is None: + continue + properties[key][key1] = value1 + required.append(key) + + tools = [ + { + "type": "function", + "function": { + "name": command_name, + "description": command.description, + "parameters": { + "type": "object", + "properties": properties, + "required": required, + }, + } + } + ] + + client = openai.OpenAI( + api_key=os.environ.get("OPENAI_API_KEY", None), + base_url=os.environ.get("OPENAI_API_BASE", None) + ) + + connection_error = '' + for _1 in range(3): + try: + response = client.chat.completions.create( + messages=history_messages, + model="gpt-3.5-turbo-16k", + stream=False, + tools=tools, + tool_choice={"type": "function", "function": {"name": command_name}} + ) + + respose_message = response.dict()["choices"][0]["message"] + if not respose_message['tool_calls']: + return None + tool_call = respose_message['tool_calls'][0]['function'] + if tool_call['name'] != command_name: + return None + parameters = json.loads(tool_call['arguments']) + return parameters + except (ConnectionError, openai.APIConnectionError) as err: + connection_error = err + continue + except Exception as err: + print("Exception:", err, file=sys.stderr, flush=True) + logger.exception("Call command by LLM error: %s", err) + return None + print("Connect Error:", connection_error, file=sys.stderr, flush=True) + return None + + + def run_command(self, + command_name: str, + command: Command, + history_messages: List[dict], + input_text: str, + parent_hash: str, + context_contents: List[str]): + """ + if command has parameters, then generate command parameters from input by LLM + if command.input is "required", and input is null, then return error + """ + if command.parameters and len(command.parameters) > 0: + if not self._model_name.startswith("gpt-"): + return None + + arguments = self._call_function_by_llm(command_name, command, history_messages) + if not arguments: + print("No valid parameters generated by LLM", file=sys.stderr, flush=True) + return (-1, "") + return self.run_command_with_parameters( + command, + { + "input": input_text.strip().replace(f'/{command_name}', ''), + **arguments + }, + parent_hash, + context_contents) + + return self.run_command_with_parameters( + command, + { + "input": input_text.strip().replace(f'/{command_name}', '') + }, + parent_hash, + context_contents) + + + def run_command_with_parameters(self, + command: Command, + parameters: dict[str, str], + parent_hash: str, + context_contents: List[str]): + """ + replace $xxx in command.steps[0].run with parameters[xxx] + then run command.steps[0].run + """ + def pipe_reader(pipe, out_data, out_flag): + while pipe: + data = pipe.read(1) + if data == '': + break + out_data['out'] += data + print(data, end='', file=out_flag, flush=True) + + try: + # add environment variables to parameters + if parent_hash: + os.environ['PARENT_HASH'] = parent_hash + if context_contents: + os.environ['CONTEXT_CONTENTS'] = json.dumps(context_contents) + for env_var in os.environ: + parameters[env_var] = os.environ[env_var] + parameters["command_python"] = os.environ['command_python'] + + command_run = command.steps[0]["run"] + # Replace parameters in command run + for parameter in parameters: + command_run = command_run.replace('$' + parameter, str(parameters[parameter])) + + # Run command_run + env = os.environ.copy() + if 'PYTHONPATH' in env: + del env['PYTHONPATH'] + # result = subprocess.run(command_run, shell=True, env=env) + # return result + with subprocess.Popen( + shlex.split(command_run), + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + env=env, + text=True + ) as process: + + stdout_data = {'out': ''} + stderr_data = {'out': ''} + + stdout_thread = threading.Thread( + target=pipe_reader, + args=(process.stdout, stdout_data, sys.stdout)) + stderr_thread = threading.Thread( + target=pipe_reader, + args=(process.stderr, stderr_data, sys.stderr)) + + stdout_thread.start() + stderr_thread.start() + + stdout_thread.join() + stderr_thread.join() + exit_code = process.wait() + return (exit_code, stdout_data["out"]) + return (-1, "") + except Exception as err: + print("Exception:", type(err), err, file=sys.stderr, flush=True) + return (-1, "") diff --git a/devchat/engine/router.py b/devchat/engine/router.py new file mode 100644 index 00000000..b13d0baf --- /dev/null +++ b/devchat/engine/router.py @@ -0,0 +1,237 @@ +import os +import json +from typing import List, Iterable +import openai +from devchat._cli.utils import init_dir +from .namespace import Namespace +from .command_parser import CommandParser, Command +from .command_runner import CommandRunner + + +def _load_command(command: str): + _, user_chat_dir = init_dir() + workflows_dir = os.path.join(user_chat_dir, 'workflows') + if not os.path.exists(workflows_dir): + return None + if not os.path.isdir(workflows_dir): + return None + + namespace = Namespace(workflows_dir) + commander = CommandParser(namespace) + + cmd = commander.parse(command) + if not cmd: + return None + return cmd + + +def _load_commands() -> List[Command]: + _, user_chat_dir = init_dir() + workflows_dir = os.path.join(user_chat_dir, 'workflows') + if not os.path.exists(workflows_dir): + return None + if not os.path.isdir(workflows_dir): + return None + + namespace = Namespace(workflows_dir) + commander = CommandParser(namespace) + command_names = namespace.list_names("", True) + + commands = [] + for name in command_names: + cmd = commander.parse(name) + if not cmd: + continue + commands.append((name, cmd)) + + return commands + + +def _create_tool(command_name:str, command: Command) -> dict: + properties = {} + required = [] + if command.parameters: + for key, value in command.parameters.items(): + properties[key] = {} + for key1, value1 in value.dict().items(): + if key1 not in ['type', 'description', 'enum'] or value1 is None: + continue + properties[key][key1] = value1 + required.append(key) + elif command.steps[0]['run'].find('$input') > 0: + properties['input'] = { + "type": "string", + "description": "input text" + } + required.append('input') + + return { + "type": "function", + "function": { + "name": command_name, + "description": command.description, + "parameters": { + "type": "object", + "properties": properties, + "required": required, + }, + } + } + + +def _create_tools() -> List[dict]: + commands = _load_commands() + return [_create_tool(command[0], command[1]) for command in commands if command[1].steps] + + +def _call_gpt(messages: List[dict], # messages passed to GPT + model_name: str, # GPT model name + use_function_calling: bool) -> dict: # whether to use function calling + client = openai.OpenAI( + api_key=os.environ.get("OPENAI_API_KEY", None), + base_url=os.environ.get("OPENAI_API_BASE", None) + ) + + tools = [] if not use_function_calling else _create_tools() + + for try_times in range(3): + try: + response: Iterable = client.chat.completions.create( + messages=messages, + model=model_name, + stream=True, + tools=tools + ) + + response_result = {'content': None, 'function_name': None, 'parameters': ""} + for chunk in response: # pylint: disable=E1133 + chunk = chunk.dict() + delta = chunk["choices"][0]["delta"] + if 'tool_calls' in delta and delta['tool_calls']: + tool_call = delta['tool_calls'][0]['function'] + if tool_call.get('name', None): + response_result["function_name"] = tool_call["name"] + if tool_call.get("arguments", None): + response_result["parameters"] += tool_call["arguments"] + if delta.get('content', None): + if response_result["content"]: + response_result["content"] += delta["content"] + else: + response_result["content"] = delta["content"] + print(delta["content"], end='', flush=True) + if response_result["function_name"]: + print("``` command_run") + function_call = { + 'name': response_result["function_name"], + 'arguments': response_result["parameters"]} + print(json.dumps(function_call, indent=4)) + print("```", flush=True) + return response_result + except (ConnectionError, openai.APIConnectionError) as err: + if try_times == 2: + print("Connect Exception:", err) + print(err.strerror) + return {'content': None, 'function_name': None, 'parameters': ""} + continue + except Exception as err: + print("Exception Error:", err) + return {'content': None, 'function_name': None, 'parameters': ""} + return {'content': None, 'function_name': None, 'parameters': ""} + + +def _create_messages(): + return [] + + +def _call_function(function_name: str, parameters: str, model_name: str): + """ + call function by function_name and parameters + """ + parameters = json.loads(parameters) + command_obj = _load_command(function_name) + runner = CommandRunner(model_name) + return runner.run_command_with_parameters(command_obj, parameters, "", []) + + +def _auto_function_calling(history_messages: List[dict], model_name:str): + """ + 通过function calling方式来回答当前问题。 + function最多被调用4次,必须进行最终答复。 + """ + function_call_times = 0 + + response = _call_gpt(history_messages, model_name, True) + while True: + if response['function_name']: + # run function + function_call_times += 1 + print("do function calling", end='\n\n', flush=True) + function_result = _call_function( + response['function_name'], + response['parameters'], + model_name) + history_messages.append({ + 'role': 'function', + 'content': f'exit code: {function_result[0]} stdout: {function_result[1]}', + 'name': response['function_name']}) + print("after functon call.", end='\n\n', flush=True) + + # send function result to gpt + if function_call_times < 5: + response = _call_gpt(history_messages, model_name, True) + else: + response = _call_gpt(history_messages, model_name, False) + else: + return response + + +def _auto_route(history_messages, model_name:str): + """ + select which command to run + """ + response = _call_gpt(history_messages, model_name, True) + if response['function_name']: + return _call_function( + response['function_name'], + response['parameters'], + model_name) + if not response['content']: + return (-1, "") + return (-1, "") + + +def run_command( + model_name: str, + history_messages: List[dict], + input_text: str, + parent_hash: str, + context_contents: List[str], + auto_fun: bool): + """ + load command config, and then run Command + """ + # split input_text by ' ','\n','\t' + if len(input_text.strip()) == 0: + return None + if input_text.strip()[:1] != '/': + if not (auto_fun and model_name.startswith('gpt-')): + return None + + # response = _auto_function_calling(history_messages, model_name) + # return response['content'] + return _auto_route(history_messages, model_name) + commands = input_text.split() + command = commands[0][1:] + + command_obj = _load_command(command) + if not command_obj or not command_obj.steps: + return None + + runner = CommandRunner(model_name) + return runner.run_command( + command, + command_obj, + history_messages, + input_text, + parent_hash, + context_contents) diff --git a/devchat/openai/openai_prompt.py b/devchat/openai/openai_prompt.py index ab4916c8..72d6d613 100644 --- a/devchat/openai/openai_prompt.py +++ b/devchat/openai/openai_prompt.py @@ -239,8 +239,7 @@ def _timestamp_from_dict(self, response_data: dict): if not self._timestamp: self._timestamp = response_data['created'] elif self._timestamp != response_data['created']: - raise ValueError(f"Time mismatch: expected {self._timestamp}, " - f"got {response_data['created']}") + self._timestamp = response_data['created'] def _id_from_dict(self, response_data: dict): if self._id is None: diff --git a/devchat/prompt.py b/devchat/prompt.py index 3cb1ea5e..51bfe109 100644 --- a/devchat/prompt.py +++ b/devchat/prompt.py @@ -1,6 +1,7 @@ from abc import ABC, abstractmethod from dataclasses import dataclass, field, asdict import hashlib +from datetime import datetime import sys from typing import Dict, List from devchat.message import Message @@ -224,7 +225,7 @@ def formatted_header(self) -> str: formatted_str = f"User: {user_id(self.user_name, self.user_email)[0]}\n" if not self._timestamp: - raise ValueError(f"Prompt lacks timestamp for formatting header: {self.request}") + self._timestamp = datetime.timestamp(datetime.now()) local_time = unix_to_local_datetime(self._timestamp) formatted_str += f"Date: {local_time.strftime('%a %b %d %H:%M:%S %Y %z')}\n\n" @@ -267,7 +268,7 @@ def formatted_full_response(self, index: int) -> str: index, self.request, self.responses) return None - formatted_str = self.formatted_header() + formatted_str = "" if self.responses[index].content: formatted_str += self.responses[index].content