diff --git a/hud/agents/__init__.py b/hud/agents/__init__.py index 7470adb3..55a531ca 100644 --- a/hud/agents/__init__.py +++ b/hud/agents/__init__.py @@ -4,10 +4,12 @@ from .claude import ClaudeAgent from .openai import OperatorAgent from .openai_chat_generic import GenericOpenAIChatAgent +from .openrouter import OpenRouterAgent __all__ = [ "ClaudeAgent", "GenericOpenAIChatAgent", "MCPAgent", "OperatorAgent", + "OpenRouterAgent", ] diff --git a/hud/agents/openrouter.py b/hud/agents/openrouter.py new file mode 100644 index 00000000..58bc9019 --- /dev/null +++ b/hud/agents/openrouter.py @@ -0,0 +1,589 @@ +"""OpenRouter agent facade plus shared tooling helpers.""" + +from __future__ import annotations + +import base64 +import json +import re +import uuid +from importlib import import_module +import importlib.util +from pathlib import Path +from io import BytesIO +from typing import Any, Dict, Type +from abc import abstractmethod + +import mcp.types as types +from PIL import Image + +import litellm + +from hud.agents.base import MCPAgent +from hud.tools.computer.settings import computer_settings +from hud.types import MCPToolCall, MCPToolResult, AgentResponse +from hud import instrument +import logging +logger = logging.getLogger(__name__) + +from hud.settings import settings +import os + +# Shared helper utilities for computer-use adapters +def _random_id() -> str: + return f"call_{uuid.uuid4().hex[:8]}" + +def _make_reasoning_item(reasoning: str) -> dict[str, Any]: + return { + "id": _random_id(), + "type": "reasoning", + "summary": [{"type": "summary_text", "text": reasoning}], + } + +def _make_output_text_item(content: str) -> dict[str, Any]: + return { + "id": _random_id(), + "type": "message", + "role": "assistant", + "status": "completed", + "content": [{"type": "output_text", "text": content, "annotations": []}], + } + +def _make_computer_call_item(action: dict[str, Any], call_id: str | None = None) -> dict[str, Any]: + call_id = call_id or _random_id() + return { + "id": _random_id(), + "call_id": call_id, + "type": "computer_call", + "status": "completed", + "pending_safety_checks": [], + "action": action, + } + +def _make_click_item(x: int, y: int, button: str = "left", call_id: str | None = None) -> dict[str, Any]: + return _make_computer_call_item({"type": "click", "x": x, "y": y, "button": button}, call_id) + +def _make_double_click_item(x: int, y: int, call_id: str | None = None) -> dict[str, Any]: + return _make_computer_call_item({"type": "double_click", "x": x, "y": y}, call_id) + +def _make_drag_item(path: list[dict[str, int]], call_id: str | None = None) -> dict[str, Any]: + return _make_computer_call_item({"type": "drag", "path": path}, call_id) + +def _make_keypress_item(keys: list[str], call_id: str | None = None) -> dict[str, Any]: + return _make_computer_call_item({"type": "keypress", "keys": keys}, call_id) + +def _make_type_item(text: str, call_id: str | None = None) -> dict[str, Any]: + return _make_computer_call_item({"type": "type", "text": text}, call_id) + +def _make_scroll_item( + x: int, + y: int, + scroll_x: int, + scroll_y: int, + call_id: str | None = None, +) -> dict[str, Any]: + action = {"type": "scroll", "x": x, "y": y, "scroll_x": scroll_x, "scroll_y": scroll_y} + return _make_computer_call_item(action, call_id) + +def _make_wait_item(call_id: str | None = None) -> dict[str, Any]: + return _make_computer_call_item({"type": "wait"}, call_id) + +def _make_screenshot_item(call_id: str) -> dict[str, Any]: + return _make_computer_call_item({"type": "screenshot"}, call_id) + +def _make_failed_tool_call_items( + tool_name: str, + tool_kwargs: dict[str, Any], + error_message: str, + call_id: str, +) -> list[dict[str, Any]]: + call = _make_computer_call_item({"type": tool_name, **tool_kwargs}, call_id) + call["status"] = "failed" + failure_text = _make_output_text_item(f"Tool {tool_name} failed: {error_message}") + failure_text["role"] = "assistant" + return [call, failure_text] + +def _coerce_to_pixel_coordinates( + x_val: Any, + y_val: Any, + *, + width: int, + height: int, +) -> tuple[int, int] | None: + try: + x_float = float(x_val) + y_float = float(y_val) + except (TypeError, ValueError): + return None + + def clamp(value: int, maximum: int) -> int: + return max(0, min(maximum - 1, value)) + + abs_x = abs(x_float) + abs_y = abs(y_float) + if abs_x <= 1.0 and abs_y <= 1.0: + px = int(x_float * width) + py = int(y_float * height) + elif abs_x <= 999.0 and abs_y <= 999.0: + px = int((x_float / 999.0) * width) + py = int((y_float / 999.0) * height) + else: + px = int(x_float) + py = int(y_float) + + return clamp(px, width), clamp(py, height) + +def _parse_coordinate_box(value: Any) -> tuple[float, float] | None: + if isinstance(value, (list, tuple)) and len(value) >= 2: + try: + return float(value[0]), float(value[1]) + except (TypeError, ValueError): + return None + + if isinstance(value, str): + stripped = value.strip() + try: + loaded = json.loads(stripped) + except Exception: + matches = re.findall(r"-?\d+(?:\.\d+)?", stripped) + if len(matches) >= 2: + return float(matches[0]), float(matches[1]) + else: + if isinstance(loaded, (list, tuple)) and len(loaded) >= 2: + try: + return float(loaded[0]), float(loaded[1]) + except (TypeError, ValueError): + return None + return None + +def _coerce_box_to_pixels( + box: Any, + *, + width: int, + height: int, +) -> tuple[int, int] | None: + coords = _parse_coordinate_box(box) + if not coords: + return None + return _coerce_to_pixel_coordinates(coords[0], coords[1], width=width, height=height) + +def _parse_json_action_string(action_text: str) -> dict[str, Any] | None: + candidate = action_text.strip() + if not (candidate.startswith("{") and candidate.endswith("}")): + return None + + attempts = [candidate] + if "\\" in candidate: + try: + attempts.append(candidate.encode("utf-8").decode("unicode_escape")) + except Exception: + pass + attempts.append(candidate.replace("\\\"", '"')) + + for attempt in attempts: + try: + return json.loads(attempt) + except Exception: + continue + + return None + +def _convert_json_action_to_items( + json_action: dict[str, Any], + call_id: str, + image_width: int, + image_height: int, +) -> list[dict[str, Any]]: + items: list[dict[str, Any]] = [] + + action_type = str(json_action.get("type", json_action.get("action_type", ""))).lower() + if not action_type: + return items + + if action_type in {"type", "text", "input_text"}: + text_value = json_action.get("content") or json_action.get("text") or "" + if text_value: + items.append(_make_type_item(str(text_value), call_id=call_id)) + elif action_type in {"click", "left_click", "right_click"}: + # Handle both "start_box" and the new "start_x"/"start_y" format + start_box = json_action.get("start_box") or json_action.get("startBox") + coords = _coerce_box_to_pixels(start_box, width=image_width, height=image_height) + if not coords: + coords = _coerce_to_pixel_coordinates( + json_action.get("start_x") or json_action.get("x"), + json_action.get("start_y") or json_action.get("y"), + width=image_width, + height=image_height, + ) + if coords: + button = str(json_action.get("button", "left") or "left").lower() + items.append(_make_click_item(coords[0], coords[1], button=button, call_id=call_id)) + elif action_type in {"double_click", "left_double_click"}: + start_box = json_action.get("start_box") or json_action.get("startBox") + coords = _coerce_box_to_pixels(start_box, width=image_width, height=image_height) + if not coords: + coords = _coerce_to_pixel_coordinates( + json_action.get("start_x") or json_action.get("x"), + json_action.get("start_y") or json_action.get("y"), + width=image_width, + height=image_height, + ) + if coords: + items.append(_make_double_click_item(coords[0], coords[1], call_id=call_id)) + elif action_type in {"drag", "left_drag"}: + start_box = json_action.get("start_box") or json_action.get("startBox") + end_box = json_action.get("end_box") or json_action.get("endBox") + start_coords = _coerce_box_to_pixels(start_box, width=image_width, height=image_height) + end_coords = _coerce_box_to_pixels(end_box, width=image_width, height=image_height) + if not start_coords: + start_coords = _coerce_to_pixel_coordinates( + json_action.get("start_x") or json_action.get("x"), + json_action.get("start_y") or json_action.get("y"), + width=image_width, + height=image_height, + ) + if not end_coords: + end_coords = _coerce_to_pixel_coordinates( + json_action.get("end_x"), + json_action.get("end_y"), + width=image_width, + height=image_height, + ) + if start_coords and end_coords: + path = [ + {"x": start_coords[0], "y": start_coords[1]}, + {"x": end_coords[0], "y": end_coords[1]}, + ] + items.append(_make_drag_item(path, call_id=call_id)) + elif action_type == "scroll": + start_box = json_action.get("start_box") or json_action.get("startBox") + coords = _coerce_box_to_pixels(start_box, width=image_width, height=image_height) + if not coords: + coords = _coerce_to_pixel_coordinates( + json_action.get("start_x") or json_action.get("x"), + json_action.get("start_y") or json_action.get("y"), + width=image_width, + height=image_height, + ) + direction = str(json_action.get("direction", "")).lower() + step = int(json_action.get("step", 5) or 5) + if coords: + scroll_x = 0 + scroll_y = 0 + if direction == "up": + scroll_y = -abs(step) + elif direction == "down": + scroll_y = abs(step) + elif direction == "left": + scroll_x = -abs(step) + elif direction == "right": + scroll_x = abs(step) + items.append( + _make_scroll_item(coords[0], coords[1], scroll_x, scroll_y, call_id=call_id) + ) + # hover/move dropped in minimal action surface + elif action_type in {"keypress", "key", "key_press"}: + keys = json_action.get("keys") + key_list: list[str] = [] + if isinstance(keys, str): + key_list = [segment.strip() for segment in keys.split("+") if segment.strip()] + elif isinstance(keys, list): + key_list = [str(segment).strip() for segment in keys if str(segment).strip()] + if key_list: + items.append(_make_keypress_item(key_list, call_id=call_id)) + elif action_type == "wait": + items.append(_make_wait_item(call_id=call_id)) + + return items + + +def _decode_image_dimensions(image_b64: str) -> tuple[int, int]: + try: + data = base64.b64decode(image_b64) + with Image.open(BytesIO(data)) as img: + return img.size + except Exception: # pragma: no cover - defensive fallback + return computer_settings.OPENAI_COMPUTER_WIDTH, computer_settings.OPENAI_COMPUTER_HEIGHT + + +def _extract_user_instruction(messages: list[dict[str, Any]]) -> str: + for message in messages: + if not isinstance(message, dict): + continue + if message.get("type") == "message" and message.get("role") == "user": + content = message.get("content") or [] + if isinstance(content, list): + for block in content: + if isinstance(block, dict) and block.get("type") in {"text", "input_text"}: + text = block.get("text") + if isinstance(text, str) and text.strip(): + return text.strip() + return "" + + +def get_last_image_from_messages(messages: list[dict[str, Any]]) -> str | None: + for message in reversed(messages): + if not isinstance(message, dict): + continue + msg_type = message.get("type") + if msg_type == "computer_call_output": + output = message.get("output") or {} + if isinstance(output, dict): + image_url = output.get("image_url") + if isinstance(image_url, str) and image_url.startswith("data:image/"): + return image_url.split(",", 1)[1] + if msg_type == "message" and message.get("role") == "user": + content = message.get("content") + if isinstance(content, list): + for item in reversed(content): + if isinstance(item, dict) and item.get("type") == "image_url": + url_obj = item.get("image_url") + if isinstance(url_obj, dict): + url = url_obj.get("url") + if isinstance(url, str) and url.startswith("data:image/"): + return url.split(",", 1)[1] + return None + +class OpenRouterBaseAgent(MCPAgent): + """Base class for OpenRouter vision-language agents with shared formatting logic.""" + + def __init__(self, completion_kwargs: dict[str, Any] | None = None, **kwargs: Any) -> None: + super().__init__(**kwargs) + self.completion_kwargs = completion_kwargs or {} + + async def format_blocks(self, blocks: list[types.ContentBlock]) -> list[dict[str, Any]]: + """Format MCP content blocks into message items.""" + content_items: list[dict[str, Any]] = [] + text_parts: list[str] = [] + for block in blocks: + if isinstance(block, types.TextContent): + if block.text: + text_parts.append(block.text) + elif isinstance(block, types.ImageContent): + content_items.append( + { + "type": "message", + "role": "user", + "content": [ + { + "type": "image_url", + "image_url": { + "url": f"data:{getattr(block, 'mimeType', 'image/png')};base64,{block.data}", + }, + } + ], + } + ) + + if text_parts: + content_items.insert( + 0, + { + "type": "message", + "role": "user", + "content": [{"type": "input_text", "text": "\n".join(text_parts)}], + }, + ) + + return content_items + + async def format_tool_results( + self, tool_calls: list[MCPToolCall], tool_results: list[MCPToolResult] + ) -> list[dict[str, Any]]: + """Format tool execution results into message items.""" + import mcp.types as types # noqa: PLC0415 + + rendered: list[dict[str, Any]] = [] + for call, result in zip(tool_calls, tool_results, strict=False): + call_args = call.arguments or {} + if result.isError: + error_text = "".join( + c.text for c in result.content if isinstance(c, types.TextContent) + ) + rendered.extend( + _make_failed_tool_call_items( + tool_name=call_args.get("type", call.name), + tool_kwargs=call_args, + error_message=error_text or "Unknown error", + call_id=call.id, + ) + ) + continue + + screenshot_found = False + for content in result.content: + if isinstance(content, types.ImageContent): + rendered.append( + { + "type": "computer_call_output", + "call_id": call.id, + "output": { + "type": "input_image", + "image_url": f"data:{content.mimeType};base64,{content.data}", + }, + } + ) + screenshot_found = True + break + + text_parts = [ + c.text for c in result.content if isinstance(c, types.TextContent) and c.text + ] + if text_parts: + rendered.append( + { + "type": "message", + "role": "user", + "content": [{"type": "input_text", "text": "\n".join(text_parts)}], + } + ) + + if not screenshot_found and not text_parts: + rendered.append( + { + "type": "computer_call_output", + "call_id": call.id, + "output": {"type": "input_text", "text": "Tool executed"}, + } + ) + + return rendered + + @abstractmethod + async def build_prompt(self, messages: list[dict[str, Any]], instruction: str, screenshot_b64: str) -> list[dict[str, Any]]: + """Subclass hook to build model-specific prompt/messages.""" + pass + + @abstractmethod + async def parse_response(self, response: Any, messages: list[dict[str, Any]], screenshot_b64: str) -> AgentResponse: + """Subclass hook to parse model response into AgentResponse.""" + pass + + @instrument( + span_type="agent", + record_args=False, + record_result=True, + ) + + async def get_response(self, messages: list[dict[str, Any]]) -> AgentResponse: + instruction = _extract_user_instruction(messages) + + screenshot_b64 = get_last_image_from_messages(messages) + if not screenshot_b64: + call_id = _random_id() + messages.append(_make_screenshot_item(call_id)) + return AgentResponse( + content="capturing initial screenshot", + tool_calls=[MCPToolCall(id=call_id, name="openai_computer", arguments={"type": "screenshot"})], + done=False, + ) + + litellm_messages = await self.build_prompt(messages, instruction, screenshot_b64) + + api_kwargs: dict[str, Any] = { + "model": self.model_name, + "messages": litellm_messages, + } + if "openrouter" in self.model_name.lower(): + api_kwargs["api_key"] = settings.openrouter_api_key or os.getenv("OPENROUTER_API_KEY") + api_kwargs.update(self.completion_kwargs) + + try: + response = await litellm.acompletion(**api_kwargs) + except Exception as exc: + logger.exception(f"{self.__class__.__name__} completion failed: %s", exc) + return AgentResponse( + content=f"{self.__class__.__name__} request failed: {exc}", + tool_calls=[], + done=True, + isError=True, + ) + + return await self.parse_response(response, messages, screenshot_b64) + + +# Adapter dispatch +_ADAPTER_REGISTRY: Dict[str, str] = { + "z-ai/glm-4.5v": "hud.agents.openrouter.models.glm45v.glm45v:Glm45vAgent", + "huggingface/bytedance-seed/ui-tars-1.5-7b": "hud.agents.openrouter.models.uitars.uitars:UITarsAgent", +} + +def _load_adapter(path: str) -> Type[MCPAgent]: + module_name, class_name = path.split(":", 1) + try: + module = import_module(module_name) + except ModuleNotFoundError: + here = Path(__file__).resolve() + # e.g., models/glm45v/glm45v.py + parts = module_name.split(".models.") + if len(parts) == 2: + rel = parts[1].replace(".", "/") + ".py" + candidate = here.with_name("openrouter") / "models" / Path(rel) + if candidate.exists(): + spec = importlib.util.spec_from_file_location("hud.agents._adapter", str(candidate)) + if spec and spec.loader: + mod = importlib.util.module_from_spec(spec) + spec.loader.exec_module(mod) + return getattr(mod, class_name) + raise + return getattr(module, class_name) + +class OpenRouterAgent: + """Dispatch wrapper that selects the correct OpenRouter adapter by model.""" + + def __init__(self, *, model_name: str = "z-ai/glm-4.5v", **kwargs: Any) -> None: + normalized = self._normalize_model_name(model_name) + try: + adapter_path = _ADAPTER_REGISTRY[normalized] + except KeyError as exc: # pragma: no cover - defensive + raise ValueError(f"Unsupported OpenRouter model: {model_name}") from exc + + adapter_cls = _load_adapter(adapter_path) + canonical_model = f"openrouter/{normalized}" + self.model_name = canonical_model + self._adapter = adapter_cls(model_name=canonical_model, **kwargs) + + @staticmethod + def _normalize_model_name(raw_model: str | None) -> str: + if not raw_model: + raise ValueError("Model name must be provided for OpenRouterAgent") + key = raw_model.strip() + if key.startswith("openrouter/"): + key = key[len("openrouter/") :] + key = key.lower() + if key in _ADAPTER_REGISTRY: + return key + raise ValueError(f"Unknown OpenRouter model: {raw_model}") + + def __getattr__(self, item: str) -> Any: + return getattr(self._adapter, item) + + def __dir__(self) -> list[str]: + base_dir = set(super().__dir__()) + base_dir.update(self.__dict__.keys()) + base_dir.update(dir(self._adapter)) + return sorted(base_dir) + +__all__ = [ + "OpenRouterAgent", + "OpenRouterBaseAgent", + "_random_id", + "_make_reasoning_item", + "_make_output_text_item", + "_make_computer_call_item", + "_make_click_item", + "_make_double_click_item", + "_make_drag_item", + "_make_keypress_item", + "_make_type_item", + "_make_scroll_item", + "_make_screenshot_item", + "_make_failed_tool_call_items", + "_coerce_to_pixel_coordinates", + "_parse_coordinate_box", + "_coerce_box_to_pixels", + "_parse_json_action_string", + "_convert_json_action_to_items", + "_decode_image_dimensions", + "_extract_user_instruction", + "get_last_image_from_messages", +] diff --git a/hud/agents/openrouter/models/glm45v/action_space.txt b/hud/agents/openrouter/models/glm45v/action_space.txt new file mode 100644 index 00000000..908cd718 --- /dev/null +++ b/hud/agents/openrouter/models/glm45v/action_space.txt @@ -0,0 +1,188 @@ +### left_click + +Call rule: `left_click(start_box='[x,y]')` +{ + 'name': 'left_click', + 'description': 'Perform a left mouse click at the specified coordinates on the screen.', + 'parameters': { + 'type': 'object', + 'properties': { + 'start_box': { + 'type': 'array', + 'items': { + 'type': 'integer' + }, + 'description': 'Coordinates [x,y] where to perform the click, normalized to 0-999 range.' + } + }, + 'required': ['start_box'] + } +} + +### left_double_click + +Call rule: `left_double_click(start_box='[x,y]', element_info='')` +{ + 'name': 'left_double_click', + 'description': 'Perform a left mouse double-click at the specified coordinates on the screen.', + 'parameters': { + 'type': 'object', + 'properties': { + 'start_box': { + 'type': 'array', + 'items': { + 'type': 'integer' + }, + 'description': 'Coordinates [x,y] where to perform the double-click, normalized to 0-999 range.' + }, + 'element_info': { + 'type': 'string', + 'description': 'Optional text description of the UI element being double-clicked.' + } + }, + 'required': ['start_box'] + } +} + +### left_drag + +Call rule: `left_drag(start_box='[x1,y1]', end_box='[x2,y2]', element_info='')` +{ + 'name': 'left_drag', + 'description': 'Drag the mouse from starting coordinates to ending coordinates while holding the left mouse button.', + 'parameters': { + 'type': 'object', + 'properties': { + 'start_box': { + 'type': 'array', + 'items': { + 'type': 'integer' + }, + 'description': 'Starting coordinates [x1,y1] for the drag operation, normalized to 0-999 range.' + }, + 'end_box': { + 'type': 'array', + 'items': { + 'type': 'integer' + }, + 'description': 'Ending coordinates [x2,y2] for the drag operation, normalized to 0-999 range.' + }, + 'element_info': { + 'type': 'string', + 'description': 'Optional text description of the UI element being dragged.' + } + }, + 'required': ['start_box', 'end_box'] + } +} + +### key + +Call rule: `key(keys='')` +{ + 'name': 'key', + 'description': 'Simulate pressing a single key or combination of keys on the keyboard.', + 'parameters': { + 'type': 'object', + 'properties': { + 'keys': { + 'type': 'string', + 'description': "The key or key combination to press. Use '+' to separate keys in combinations (e.g., 'ctrl+c', 'alt+tab')." + } + }, + 'required': ['keys'] + } +} + +### type + +Call rule: `type(content='')` +{ + 'name': 'type', + 'description': 'Type text content into the currently focused text input field. This action only performs typing and does not handle field activation or clearing.', + 'parameters': { + 'type': 'object', + 'properties': { + 'content': { + 'type': 'string', + 'description': 'The text content to be typed into the active text field.' + } + }, + 'required': ['content'] + } +} + +### scroll + +Call rule: `scroll(start_box='[x,y]', direction='', step=5, element_info='')` +{ + 'name': 'scroll', + 'description': 'Scroll an element at the specified coordinates in the specified direction by a given number of wheel steps.', + 'parameters': { + 'type': 'object', + 'properties': { + 'start_box': { + 'type': 'array', + 'items': { + 'type': 'integer' + }, + 'description': 'Coordinates [x,y] of the element or area to scroll, normalized to 0-999 range.' + }, + 'direction': { + 'type': 'string', + 'enum': ['down', 'up'], + 'description': "The direction to scroll: 'down' or 'up'." + }, + 'step': { + 'type': 'integer', + 'default': 5, + 'description': 'Number of wheel steps to scroll, default is 5.' + }, + 'element_info': { + 'type': 'string', + 'description': 'Optional text description of the UI element being scrolled.' + } + }, + 'required': ['start_box', 'direction'] + } +} + +### WAIT + +Call rule: `WAIT()` +{ + 'name': 'WAIT', + 'description': 'Wait for 5 seconds before proceeding to the next action.', + 'parameters': { + 'type': 'object', + 'properties': {}, + 'required': [] + } +} + +### DONE + +Call rule: `DONE()` +{ + 'name': 'DONE', + 'description': 'Indicate that the current task has been completed successfully and no further actions are needed.', + 'parameters': { + 'type': 'object', + 'properties': {}, + 'required': [] + } +} + +### FAIL + +Call rule: `FAIL()` +{ + 'name': 'FAIL', + 'description': 'Indicate that the current task cannot be completed or is impossible to accomplish.', + 'parameters': { + 'type': 'object', + 'properties': {}, + 'required': [] + } +} + diff --git a/hud/agents/openrouter/models/glm45v/glm45v.py b/hud/agents/openrouter/models/glm45v/glm45v.py new file mode 100644 index 00000000..993c06c2 --- /dev/null +++ b/hud/agents/openrouter/models/glm45v/glm45v.py @@ -0,0 +1,408 @@ +"""glm-4.5v computer-use agent backed by litellm + openrouter.""" + +from __future__ import annotations + +import json +import logging +import re +from typing import Any, ClassVar +from pathlib import Path + +from litellm.types.utils import ModelResponse + +from hud.settings import settings +from hud.tools.computer.settings import computer_settings +from hud.types import AgentResponse, MCPToolCall +from hud.agents.openrouter import ( + OpenRouterBaseAgent, + _convert_json_action_to_items, + _decode_image_dimensions, + _make_output_text_item, + _make_reasoning_item, + _parse_json_action_string, + _random_id, +) + +logger = logging.getLogger(__name__) + +def _load_text_resource(path: str | Path) -> str | None: + try: + p = Path(path) + with p.open("r", encoding="utf-8") as f: + return f.read() + except Exception: + return None + +_BASE_DIR = Path(__file__).resolve().parent +_ACTION_SPACE_PATH = _BASE_DIR / "action_space.txt" + +GLM_ACTION_SPACE = _load_text_resource(_ACTION_SPACE_PATH) or "" +if not GLM_ACTION_SPACE.strip(): + raise RuntimeError(f"Missing action space file at {_ACTION_SPACE_PATH}") + +def convert_responses_items_to_glm45v_pc_prompt( + messages: list[dict[str, Any]], + task: str, + memory: str = "[]", +) -> list[dict[str, Any]]: + head_text = ( + "You are a GUI Agent, and your primary task is to respond accurately to user" + " requests or questions. In addition to directly answering the user's queries," + " you can also use tools or perform GUI operations directly until you fulfill" + " the user's request or provide a correct answer. You should carefully read and" + " understand the images and questions provided by the user, and engage in" + " thinking and reflection when appropriate. The coordinates involved are all" + " represented in thousandths (0-999)." + "\n\n# Task:\n" + f"{task}\n\n# Task Platform\nUbuntu\n\n# Action Space\n{GLM_ACTION_SPACE}\n\n" + "# Historical Actions and Current Memory\nHistory:" + ) + + tail_text = ( + "\nMemory:\n" + f"{memory}\n" + "# Output Format\nPlain text explanation with action(param='...')\n" + "Memory:\n[{\"key\": \"value\"}, ...]\n\n# Some Additional Notes\n" + "- I'll give you the most recent history screenshots(shrunked to 50%*50%) along with the historical action steps.\n" + "- You should put the key information you *have to remember* in a seperated memory part and I'll give it to you in the next round." + " The content in this part should be a dict list. If you no longer need some given information, you should remove it from the memory." + " Even if you don't need to remember anything, you should also output an empty list.\n" + "- If elevated privileges are needed, credentials are referenced as .\n" + "- For any mail account interactions, credentials are referenced as .\n\n" + "Current Screenshot:\n" + ) + + history: list[dict[str, Any]] = [] + history_images: list[str] = [] + current_step: list[dict[str, Any]] = [] + step_num = 0 + + # Optimization: Limit history to last 10 messages to improve performance + for message in messages[-10:]: + if not isinstance(message, dict): + continue + msg_type = message.get("type") + + if msg_type in {"reasoning", "message", "computer_call", "computer_call_output"}: + current_step.append(message) + + if msg_type == "computer_call_output" and current_step: + step_num += 1 + + bot_thought = "" + action_text = "" + for item in current_step: + if item.get("type") == "message" and item.get("role") == "assistant": + content = item.get("content") or [] + if isinstance(content, list): + for block in content: + if isinstance(block, dict) and block.get("type") == "output_text": + bot_thought = block.get("text", "") + break + if item.get("type") == "computer_call": + action_text = json.dumps(item.get("action", {})) + + history.append({ + "step_num": step_num, + "bot_thought": bot_thought, + "action_text": action_text, + }) + + output = message.get("output") or {} + if isinstance(output, dict) and output.get("type") == "input_image": + url = output.get("image_url") + if isinstance(url, str): + history_images.append(url) + + current_step = [] + + content: list[dict[str, Any]] = [] + current_text = head_text + + total_steps = len(history) + image_tail = min(2, len(history_images)) + + for idx, step in enumerate(history): + step_no = step["step_num"] + bot_thought = step["bot_thought"] + action_text = step["action_text"] + + if idx < total_steps - image_tail: + current_text += ( + f"\nstep {step_no}: Screenshot:(Omitted in context.)" + f" Thought: {bot_thought}\nAction: {action_text}" + ) + else: + current_text += f"\nstep {step_no}: Screenshot:" + content.append({"type": "text", "text": current_text}) + image_idx = idx - (total_steps - image_tail) + if 0 <= image_idx < len(history_images): + content.append({"type": "image_url", "image_url": {"url": history_images[image_idx]}}) + current_text = f" Thought: {bot_thought}\nAction: {action_text}" + + current_text += tail_text + content.append({"type": "text", "text": current_text}) + return content + +def _parse_string_action_to_dict(action: str) -> dict[str, Any]: + """Converts GLM's string-based action output to a structured dictionary.""" + if action.startswith("left_click"): + match = re.search(r"start_box='?\[(\d+),\s*(\d+)\]'?", action) + if match: return {"type": "click", "button": "left", "start_box": [match.group(1), match.group(2)]} + elif action.startswith("right_click"): + match = re.search(r"start_box='?\[(\d+),\s*(\d+)\]'?", action) + if match: return {"type": "click", "button": "right", "start_box": [match.group(1), match.group(2)]} + elif action.startswith("left_double_click"): + match = re.search(r"start_box='?\[(\d+),\s*(\d+)\]'?", action) + if match: return {"type": "double_click", "start_box": [match.group(1), match.group(2)]} + elif action.startswith("left_drag"): + start_match = re.search(r"start_box='?\[(\d+),\s*(\d+)\]'?", action) + end_match = re.search(r"end_box='?\[(\d+),\s*(\d+)\]'?", action) + if start_match and end_match: + return { + "type": "drag", + "start_box": [start_match.group(1), start_match.group(2)], + "end_box": [end_match.group(1), end_match.group(2)], + } + elif action.startswith("key"): + key_match = re.search(r"keys='([^']+)'", action) + if key_match: + keys = key_match.group(1) + key_list = keys.split("+") if "+" in keys else [keys] + return {"type": "keypress", "keys": key_list} + elif action.startswith("type"): + content_match = re.search(r"content='([^']*)'", action) + if content_match: return {"type": "type", "content": content_match.group(1)} + elif action.startswith("scroll"): + coord_match = re.search(r"start_box='?\[(\d+),\s*(\d+)\]'?", action) + direction_match = re.search(r"direction='([^']+)'", action) + if coord_match and direction_match: + return { + "type": "scroll", + "start_box": [coord_match.group(1), coord_match.group(2)], + "direction": direction_match.group(1), + } + elif action == "WAIT()": + return {"type": "wait"} + return {} + + +def convert_glm_completion_to_responses_items( + response: ModelResponse, + image_width: int, + image_height: int, + parsed_response: dict[str, str] | None = None, +) -> list[dict[str, Any]]: + items: list[dict[str, Any]] = [] + + if not getattr(response, "choices", None): + return items + + choice = response.choices[0] + message = getattr(choice, "message", None) + if not message: + return items + + content = getattr(message, "content", "") or "" + reasoning_content = getattr(message, "reasoning_content", None) + + if reasoning_content: + items.append(_make_reasoning_item(str(reasoning_content))) + + parsed = parsed_response or parse_glm_response(content) + action = parsed.get("action", "") + action_text = parsed.get("action_text", "") + + if action_text: + clean_text = action_text + if action: + clean_text = clean_text.replace(action, "").strip() + clean_text = re.sub(r"Memory:\s*\[.*?\]\s*$", "", clean_text, flags=re.DOTALL).strip() + if clean_text: + items.append(_make_output_text_item(clean_text)) + + if action: + call_id = _random_id() + + json_action = _parse_json_action_string(action) + if not json_action: + json_action = _parse_string_action_to_dict(action) + + if json_action: + json_entries = _convert_json_action_to_items( + json_action, + call_id=call_id, + image_width=image_width, + image_height=image_height, + ) + if json_entries: + items.extend(json_entries) + + return items + + +def parse_glm_response(response: str) -> dict[str, str]: + json_match = re.search(r'(\{.*\})', response) + if json_match: + action = json_match.group(1).strip() + else: + box_match = re.search(r"<\|begin_of_box\|>(.*?)<\|end_of_box\|>", response) + if box_match: + action = box_match.group(1).strip() + else: + action_pattern = r"[\w_]+\([^)]*\)" + matches = re.findall(action_pattern, response) + action = matches[0] if matches else "" + + memory_pattern = r"Memory:(.*?)$" + memory_match = re.search(memory_pattern, response, re.DOTALL) + memory = memory_match.group(1).strip() if memory_match else "[]" + + action_text_pattern = r"^(.*?)Memory:" + action_text_match = re.search(action_text_pattern, response, re.DOTALL) + action_text = action_text_match.group(1).strip() if action_text_match else response + if action_text: + action_text = action_text.replace("<|begin_of_box|>", "").replace("<|end_of_box|>", "") + + return { + "action": action or "", + "action_text": action_text, + "memory": memory, + } + +class Glm45vAgent(OpenRouterBaseAgent): + """LiteLLM-backed GLM-4.5V agent that speaks MCP.""" + + metadata: ClassVar[dict[str, Any]] = { + "display_width": computer_settings.OPENAI_COMPUTER_WIDTH, + "display_height": computer_settings.OPENAI_COMPUTER_HEIGHT, + } + + required_tools: ClassVar[list[str]] = ["openai_computer"] + + def __init__( + self, + *, + model_name: str = "z-ai/glm-4.5v", + completion_kwargs: dict[str, Any] | None = None, + system_prompt: str | None = None, + **agent_kwargs: Any, + ) -> None: + super().__init__(**agent_kwargs) + self.model_name = model_name + self.completion_kwargs = completion_kwargs or {} + if system_prompt: + self.system_prompt = system_prompt + else: + self.system_prompt = "" + self._memory = "[]" + self._last_instruction = "" + self._task_description = "" + + async def get_system_messages(self) -> list[Any]: + return [] + + def _glm_tool_call_to_mcp(self, item: dict[str, Any]) -> MCPToolCall: + call_id = item.get("call_id") or _random_id() + action = item.get("action") or {} + action_type = action.get("type", "") + + arguments: dict[str, Any] = {"type": action_type} + for key in ("x", "y", "scroll_x", "scroll_y"): + if key in action: + arguments[key] = action[key] + if "button" in action: + arguments["button"] = action["button"] + if "keys" in action: + arguments["keys"] = action["keys"] + if "text" in action: + arguments["text"] = action["text"] + if "path" in action: + arguments["path"] = action["path"] + + return MCPToolCall(id=call_id, name="openai_computer", arguments=arguments) + + async def build_prompt(self, messages: list[dict[str, Any]], instruction: str, screenshot_b64: str) -> list[dict[str, Any]]: + # Original prompt building logic from get_response + if instruction: + self._last_instruction = instruction + self._task_description = instruction + task_instruction = self._task_description or getattr(self, "_last_instruction", "") + + self.console.debug(f"glm45v task instruction: {task_instruction}") + self.console.debug(f"glm45v memory (pre-step): {self._memory}") + + prompt_content = convert_responses_items_to_glm45v_pc_prompt( + messages=messages, + task=task_instruction, + memory=self._memory, + ) + prompt_content.append( + {"type": "image_url", "image_url": {"url": f"data:image/png;base64,{screenshot_b64}"}} + ) + + litellm_messages: list[dict[str, Any]] = [] + if getattr(self, "system_prompt", None): + litellm_messages.append({"role": "system", "content": self.system_prompt}) + litellm_messages.append({"role": "user", "content": prompt_content}) + + return litellm_messages + + async def parse_response(self, response: Any, messages: list[dict[str, Any]], screenshot_b64: str) -> AgentResponse: + # Original parsing logic from get_response + choice = response.choices[0] + message = getattr(choice, "message", None) + response_content = getattr(message, "content", "") if message else "" + parsed = parse_glm_response(response_content or "") if response_content else { + "memory": self._memory, + } + if parsed.get("memory"): + self._memory = parsed["memory"] + + image_width, image_height = _decode_image_dimensions(screenshot_b64) + response_items = convert_glm_completion_to_responses_items( + response, + image_width=image_width, + image_height=image_height, + parsed_response=parsed, + ) + + messages.extend(response_items) + + text_parts: list[str] = [] + reasoning_parts: list[str] = [] + tool_calls: list[MCPToolCall] = [] + + for item in response_items: + if not isinstance(item, dict): + continue + if item.get("type") == "message" and item.get("role") == "assistant": + for block in item.get("content", []) or []: + if isinstance(block, dict) and block.get("type") == "output_text": + text = block.get("text") + if isinstance(text, str): + text_parts.append(text) + elif item.get("type") == "reasoning": + summary = item.get("summary", []) + for block in summary: + if isinstance(block, dict) and block.get("text"): + reasoning_parts.append(block["text"]) + elif item.get("type") == "computer_call": + tool_calls.append(self._glm_tool_call_to_mcp(item)) + + content_text = "\n".join(text_parts).strip() + reasoning_text = "\n".join(reasoning_parts).strip() + + return AgentResponse( + content=content_text or None, + reasoning=reasoning_text or None, + tool_calls=tool_calls, + done=not tool_calls, + raw=response, + ) + + async def get_response(self, messages: list[dict[str, Any]]) -> AgentResponse: + return await super().get_response(messages) + + +__all__ = ["Glm45vAgent"] diff --git a/hud/agents/openrouter/models/uitars/uitars.py b/hud/agents/openrouter/models/uitars/uitars.py new file mode 100644 index 00000000..ba68e50b --- /dev/null +++ b/hud/agents/openrouter/models/uitars/uitars.py @@ -0,0 +1,512 @@ +"""UITARS adapter rebuilt using official parser utilities""" + +from __future__ import annotations + +import ast +import base64 +import logging +import math +import os +import re +from io import BytesIO +from typing import Any, ClassVar + +from PIL import Image + +from hud.agents.openrouter import ( + OpenRouterBaseAgent, + _convert_json_action_to_items, + _decode_image_dimensions, + _make_screenshot_item, + _random_id, +) +from hud.tools.computer.settings import computer_settings +from hud.types import AgentResponse, MCPToolCall + +logger = logging.getLogger(__name__) + +IMAGE_FACTOR = 28 +MIN_PIXELS = 100 * 28 * 28 +MAX_PIXELS = 16384 * 28 * 28 +MAX_RATIO = 200 +def _resolve_provider_model_name(model_name: str) -> str: + key = (model_name or "").strip() + if key.startswith("openrouter/"): + key = key[len("openrouter/") :] + lowered = key.lower() + if lowered in {"huggingface/bytedance-seed/ui-tars-1.5-7b", "bytedance-seed/ui-tars-1.5-7b"}: + return "ByteDance-Seed/UI-TARS-1.5-7B" + return key + +COMPUTER_USE_DOUBAO = """You are a GUI agent. You are given a task and your action history, with screenshots. You need to perform the next action to complete the task. + +## Output Format +``` +Thought: ... +Action: ... +``` + +## Action Space + +click(point='x1 y1') +left_double(point='x1 y1') +right_single(point='x1 y1') +drag(start_point='x1 y1', end_point='x2 y2') +hotkey(key='ctrl c') # Split keys with a space and use lowercase. Also, do not use more than 3 keys in one hotkey action. +type(content='xxx') # Use escape characters \\', \\\", and \\n in content part to ensure we can parse the content in normal python string format. If you want to submit your input, use \\n at the end of content. +scroll(point='x1 y1', direction='down or up or right or left') # Show more information on the `direction` side. +wait() #Sleep for 5s and take a screenshot to check for any changes. +finished(content='xxx') # Use escape characters \\', \\", and \\n in content part to ensure we can parse the content in normal python string format. + + +## Note +- Use {language} in `Thought` part. +- Write a small plan and finally summarize your next action (with its target element) in one sentence in `Thought` part. + +## User Instruction +{instruction} +""" + +def convert_point_to_coordinates(text: str, is_answer: bool = False) -> str: + pattern = r"(\d+)\s+(\d+)" + + def replace_match(match: re.Match[str]) -> str: + x1, y1 = map(int, match.groups()) + return f"({x1},{y1})" + + text = re.sub(r"\[EOS\]", "", text) + return re.sub(pattern, replace_match, text).strip() + + +def parse_action(action_str: str) -> dict[str, Any] | None: + try: + node = ast.parse(action_str, mode="eval") + if not isinstance(node, ast.Expression): + raise ValueError("Not an expression") + call = node.body + if not isinstance(call, ast.Call): + raise ValueError("Not a function call") + + if isinstance(call.func, ast.Name): + func_name = call.func.id + elif isinstance(call.func, ast.Attribute): + func_name = call.func.attr + else: + func_name = None + + kwargs: dict[str, Any] = {} + for kw in call.keywords: + key = kw.arg + if key is None: + # Skip unpacked kwargs like **extra + continue + if isinstance(kw.value, ast.Constant): + value = kw.value.value + elif isinstance(kw.value, ast.Str): # compatibility + value = kw.value.s + else: + value = None + kwargs[key] = value + + return {"function": func_name, "args": kwargs} + except Exception as exc: + logger.debug("Failed to parse action '%s': %s", action_str, exc) + return None + +def escape_single_quotes(text: str) -> str: + pattern = r"(? int: + return round(number / factor) * factor + + +def ceil_by_factor(number: int, factor: int) -> int: + return math.ceil(number / factor) * factor + + +def floor_by_factor(number: int, factor: int) -> int: + return math.floor(number / factor) * factor + + +def smart_resize(height: int, width: int, factor: int = IMAGE_FACTOR, min_pixels: int = MIN_PIXELS, max_pixels: int = MAX_PIXELS) -> tuple[int, int]: + if max(height, width) / min(height, width) > MAX_RATIO: + raise ValueError( + f"absolute aspect ratio must be smaller than {MAX_RATIO}, got {max(height, width) / min(height, width)}" + ) + h_bar = max(factor, round_by_factor(height, factor)) + w_bar = max(factor, round_by_factor(width, factor)) + if h_bar * w_bar > max_pixels: + beta = math.sqrt((height * width) / max_pixels) + h_bar = floor_by_factor(int(height / beta), factor) + w_bar = floor_by_factor(int(width / beta), factor) + elif h_bar * w_bar < min_pixels: + beta = math.sqrt(min_pixels / (height * width)) + h_bar = ceil_by_factor(int(height * beta), factor) + w_bar = ceil_by_factor(int(width * beta), factor) + return h_bar, w_bar + + +def _preprocess_text_for_parsing(text: str) -> str: + if "" in text: + text = convert_point_to_coordinates(text) + if "start_point=" in text: + text = text.replace("start_point=", "start_box=") + if "end_point=" in text: + text = text.replace("end_point=", "end_box=") + if "point=" in text: + text = text.replace("point=", "start_box=") + return text + + +def parse_action_to_structure_output( + text: str, + factor: int, + origin_resized_height: int, + origin_resized_width: int, + model_type: str = "qwen25vl", + max_pixels: int = MAX_PIXELS, + min_pixels: int = MIN_PIXELS, +) -> list[dict[str, Any]]: + text = _preprocess_text_for_parsing(text.strip()) + + # Thought/Action extraction + if text.startswith("Thought:"): + thought_pattern = r"Thought: (.+?)(?=\s*Action: |$)" + elif text.startswith("Reflection:"): + thought_pattern = r"Reflection: (.+?)Action_Summary: (.+?)(?=\s*Action: |$)" + elif text.startswith("Action_Summary:"): + thought_pattern = r"Action_Summary: (.+?)(?=\s*Action: |$)" + else: + thought_pattern = r"Thought: (.+?)(?=\s*Action: |$)" + + reflection, thought = None, None + thought_match = re.search(thought_pattern, text, re.DOTALL) + if thought_match: + if len(thought_match.groups()) == 1: + thought = thought_match.group(1).strip() + elif len(thought_match.groups()) == 2: + thought = thought_match.group(2).strip() + reflection = thought_match.group(1).strip() + + if "Action:" not in text: + return [] + action_str_full = text.split("Action: ")[-1] + + # Split multiple actions if present (rare; we expect exactly one) + raw_actions: list[str] = [] + for seg in action_str_full.split(")\n\n"): + act = seg.strip() + if not act: + continue + if not act.endswith(")"): + act += ")" + # Handle type(content='...') with quotes inside + if "type(content" in act: + def _unbox(m: re.Match[str]) -> str: + return m.group(1) + pat = r"type\(content='(.*?)'\)" + if re.search(pat, act): + inner = re.sub(pat, _unbox, act) + inner = escape_single_quotes(inner) + act = "type(content='" + inner + "')" + raw_actions.append(act) + + parsed_actions = [parse_action(a.replace("\n", "\\n").lstrip()) for a in raw_actions] + + actions: list[dict[str, Any]] = [] + for action_instance, raw_str in zip(parsed_actions, raw_actions): + if not action_instance: + raise ValueError(f"Action can't parse: {raw_str}") + action_type = action_instance["function"] + params = action_instance["args"] + + action_inputs: dict[str, Any] = {} + for param_name, param in params.items(): + if param == "": + continue + if isinstance(param, str): + param = param.lstrip() + action_inputs[param_name.strip()] = param + + if "start_box" in param_name or "end_box" in param_name: + ori_box = str(param) + numbers = ori_box.replace("(", "").replace(")", "").split(",") + + # qwen25vl branch -> absolute pixel coords relative to processed dims -> normalize to 0..1f + if model_type == "qwen25vl": + float_numbers: list[float] = [] + for idx, num in enumerate(numbers): + val = float(num) + if (idx + 1) % 2 == 0: + float_numbers.append(val / float(origin_resized_height or 1)) + else: + float_numbers.append(val / float(origin_resized_width or 1)) + else: + # Otherwise assume factor-based normalization (e.g., 1000) + float_numbers = [float(num) / float(factor or 1) for num in numbers] + + if len(float_numbers) == 2: + float_numbers = [float_numbers[0], float_numbers[1], float_numbers[0], float_numbers[1]] + action_inputs[param_name.strip()] = str(float_numbers) + + actions.append( + { + "reflection": reflection, + "thought": thought, + "action_type": action_type, + "action_inputs": action_inputs, + "text": text, + } + ) + return actions + +def _pil_to_data_uri(img: Image.Image) -> str: + buf = BytesIO() + img.save(buf, format="PNG") + return "data:image/png;base64," + base64.b64encode(buf.getvalue()).decode("utf-8") + +def _resize_for_model(img: Image.Image) -> tuple[Image.Image, int, int]: + w, h = img.size + new_h, new_w = smart_resize(h, w) + if (new_w, new_h) != (w, h): + img = img.resize((new_w, new_h)) + if img.mode != "RGB": + img = img.convert("RGB") + return img, new_w, new_h + +def _format_action_to_doubao_string(action: dict[str, Any], width: int, height: int) -> str | None: + a_type = (action.get("type") or "").lower() + + if a_type in {"click", "left_click"}: + x = action.get("x", 0) + y = action.get("y", 0) + return f"click(start_box='({x},{y})')" + elif a_type == "double_click": + x = action.get("x", 0) + y = action.get("y", 0) + return f"left_double(start_box='({x},{y})')" + elif a_type == "drag": + path = action.get("path", []) + if len(path) >= 2: + sx, sy = path[0].get("x", 0), path[0].get("y", 0) + ex, ey = path[-1].get("x", 0), path[-1].get("y", 0) + return f"drag(start_point='({sx},{sy})', end_point='({ex},{ey})')" + elif a_type == "keypress": + keys = " ".join(action.get("keys", [])) + return f"hotkey(key='{keys}')" + elif a_type == "type": + content = action.get("text", "") + return f"type(content='{content}')" + elif a_type == "scroll": + x = action.get("x", 0) + y = action.get("y", 0) + direction = action.get("scroll_y", 0) + dir_str = "down" if direction > 0 else "up" + return f"scroll(point='({x},{y})', direction='{dir_str}')" + return None + +def _parse_to_json_action(actions: list[dict[str, Any]]) -> dict[str, Any] | None: + if not actions: + return None + act = actions[0] + a_type = (act.get("action_type") or "").lower() + inputs = act.get("action_inputs") or {} + + def _coerce_box(value: Any) -> list[float] | None: + try: + if isinstance(value, str): + nums = re.findall(r"-?\d+(?:\.\d+)?", value) + if len(nums) >= 2: + return [float(nums[0]), float(nums[1])] + elif isinstance(value, (list, tuple)) and len(value) >= 2: + return [float(value[0]), float(value[1])] + except Exception: + return None + return None + + if a_type in {"click", "left_single"}: + sb = _coerce_box(inputs.get("start_box")) + if sb: + return {"type": "click", "button": "left", "start_box": sb} + if a_type in {"left_double", "double_click"}: + sb = _coerce_box(inputs.get("start_box")) + if sb: + return {"type": "double_click", "start_box": sb} + if a_type in {"right_single", "right_click"}: + sb = _coerce_box(inputs.get("start_box")) + if sb: + return {"type": "click", "button": "right", "start_box": sb} + if a_type in {"drag", "select", "left_drag"}: + s = _coerce_box(inputs.get("start_box")) + e = _coerce_box(inputs.get("end_box")) + if s and e: + return {"type": "drag", "start_box": s, "end_box": e} + if a_type in {"hotkey", "key", "keydown", "keypress"}: + key_str = inputs.get("key") or inputs.get("hotkey") or inputs.get("keys") or "" + key_str = str(key_str) + # Normalize arrow aliases and spacing + key_str = key_str.replace("arrowleft", "left").replace("arrowright", "right").replace("arrowup", "up").replace("arrowdown", "down") + keys = [seg for seg in re.split(r"[+\s]+", key_str.strip()) if seg] + if keys: + return {"type": "keypress", "keys": keys} + if a_type == "type": + content = inputs.get("content", "") + return {"type": "type", "content": str(content)} + if a_type == "scroll": + sb = _coerce_box(inputs.get("start_box")) + direction = str(inputs.get("direction") or "down").lower() + if sb: + return {"type": "scroll", "start_box": sb, "direction": direction} + if a_type == "wait": + return {"type": "wait"} + if a_type == "finished": + return {"type": "finished", "content": str(inputs.get("content") or "")} + return None + +class UITarsAgent(OpenRouterBaseAgent): + """UITARS computer-use agent (Doubao-style prompts + official parser).""" + + metadata: ClassVar[dict[str, Any]] = { + "display_width": computer_settings.OPENAI_COMPUTER_WIDTH, + "display_height": computer_settings.OPENAI_COMPUTER_HEIGHT, + } + required_tools: ClassVar[list[str]] = ["openai_computer"] + + def __init__(self, *, model_name: str = "ByteDance-Seed/UI-TARS-1.5-7B", completion_kwargs: dict[str, Any] | None = None, **agent_kwargs: Any) -> None: + super().__init__(**agent_kwargs) + self.model_name = model_name + self._base_completion_kwargs = dict(completion_kwargs or {}) + # Allow configuring a Hugging Face endpoint via environment + env_base = os.getenv("HF_ENDPOINT_BASE_URL") + env_token = os.getenv("HF_ENDPOINT_TOKEN") or os.getenv("HF_API_KEY") + env_provider = os.getenv("HF_ENDPOINT_PROVIDER") + if env_base and "api_base" not in self._base_completion_kwargs: + self._base_completion_kwargs["api_base"] = env_base + if env_provider: + self._base_completion_kwargs.setdefault("custom_llm_provider", str(env_provider)) + if env_token and "api_key" not in self._base_completion_kwargs: + self._base_completion_kwargs["api_key"] = env_token + + # If HF endpoint is configured and provider not set, default to huggingface + if os.getenv("HF_ENDPOINT_BASE_URL") and "custom_llm_provider" not in self._base_completion_kwargs: + self._base_completion_kwargs["custom_llm_provider"] = "huggingface" + + self._provider_model = _resolve_provider_model_name(self.model_name) + self.completion_kwargs = self._base_completion_kwargs + self.model_name = self._provider_model + + async def get_system_messages(self) -> list[Any]: + return [] + + def _tool_call(self, item: dict[str, Any]) -> MCPToolCall: + call_id = item.get("call_id") or _random_id() + action = item.get("action") or {} + return MCPToolCall(id=call_id, name="openai_computer", arguments=action) + + async def build_prompt(self, messages: list[dict[str, Any]], instruction: str, screenshot_b64: str) -> list[dict[str, Any]]: + try: + data = base64.b64decode(screenshot_b64.split(",", 1)[1] if screenshot_b64.startswith("data:image") else screenshot_b64) + img = Image.open(BytesIO(data)) + orig_w, orig_h = img.size + except Exception: + orig_w, orig_h = _decode_image_dimensions(screenshot_b64) + img = None + + proc_w, proc_h = orig_w, orig_h + proc_uri = f"data:image/png;base64,{screenshot_b64}" + if img is not None: + img, proc_w, proc_h = _resize_for_model(img) + proc_uri = _pil_to_data_uri(img) + + system_prompt = COMPUTER_USE_DOUBAO.format(language="English", instruction=instruction or "") + + litellm_messages: list[dict[str, Any]] = [] + + for msg in messages[:-1]: + if not isinstance(msg, dict): + continue + msg_type = msg.get("type") + + if msg_type == "computer_call_output": + output = msg.get("output") or {} + if isinstance(output, dict) and output.get("type") == "input_image": + image_url = output.get("image_url") + if image_url: + litellm_messages.append({ + "role": "user", + "content": [{"type": "image_url", "image_url": {"url": image_url}}] + }) + + elif msg_type == "computer_call": + action = msg.get("action") or {} + action_str = _format_action_to_doubao_string(action, proc_w, proc_h) + if action_str: + litellm_messages.append({ + "role": "assistant", + "content": f"Thought: Executing action.\nAction: {action_str}" + }) + + litellm_messages.append({ + "role": "user", + "content": [ + {"type": "text", "text": system_prompt}, + {"type": "image_url", "image_url": {"url": proc_uri}}, + ], + }) + + return litellm_messages + + async def parse_response(self, response: Any, messages: list[dict[str, Any]], screenshot_b64: str) -> AgentResponse: + content = (getattr(response.choices[0], "message", None) or {}).get("content", "") + logger.debug("UITARS model output: %s", content) + + orig_w, orig_h = _decode_image_dimensions(screenshot_b64) + + actions = parse_action_to_structure_output( + content or "", + factor=1000, + origin_resized_height=orig_h, + origin_resized_width=orig_w, + model_type="qwen25vl", + ) + json_action = _parse_to_json_action(actions) + + items: list[dict[str, Any]] = [] + if json_action: + call_id = _random_id() + items.extend( + _convert_json_action_to_items( + json_action, + call_id=call_id, + image_width=orig_w, + image_height=orig_h, + ) + ) + + if items and json_action.get("type") in {"click", "left_click"}: + converted_action = items[0].get("action", {}) if items else {} + x_coord = converted_action.get("x", 0) + if x_coord < 50: + logger.info("Detected launcher click at x=%d, adding auto-wait", x_coord) + items.append({ + "type": "computer_call", + "action": {"type": "screenshot"}, + "computer_call_id": _random_id(), + }) + + if not items: + items.append(_make_screenshot_item(call_id)) + else: + call_id = _random_id() + items.append(_make_screenshot_item(call_id)) + + tool_calls = [self._tool_call(i) for i in items if i.get("type") == "computer_call"] + return AgentResponse(content=None, tool_calls=tool_calls, done=not tool_calls, raw=response) + + async def get_response(self, messages: list[dict[str, Any]]) -> AgentResponse: + return await super().get_response(messages) + + +__all__ = ["UITarsAgent"] diff --git a/hud/agents/tests/test_openrouter.py b/hud/agents/tests/test_openrouter.py new file mode 100644 index 00000000..7328586e --- /dev/null +++ b/hud/agents/tests/test_openrouter.py @@ -0,0 +1,94 @@ +from __future__ import annotations + +import pytest + +from types import SimpleNamespace +from typing import Any + +def _import_agents(): + import mcp.types as types + from hud.agents.glm45v import Glm45vAgent + from hud.agents.openrouter import OpenRouterAgent + from hud.types import MCPToolResult + return Glm45vAgent, OpenRouterAgent, MCPToolResult, types + + +def test_openrouter_agent_defaults_to_glm45v() -> None: + Glm45vAgent, OpenRouterAgent, _, _ = _import_agents() + agent = OpenRouterAgent() + assert isinstance(agent._adapter, Glm45vAgent) + assert agent.model_name == "openrouter/z-ai/glm-4.5v" + + +def test_openrouter_agent_normalizes_alias() -> None: + _, OpenRouterAgent, _, _ = _import_agents() + agent = OpenRouterAgent(model_name="Z-AI/GLM-4.5V") + assert agent.model_name == "openrouter/z-ai/glm-4.5v" + + +def test_openrouter_agent_rejects_unknown_model() -> None: + _, OpenRouterAgent, _, _ = _import_agents() + with pytest.raises(ValueError): + OpenRouterAgent(model_name="unknown/model") + + +@pytest.mark.asyncio +async def test_openrouter_agent_parses_tool_calls(monkeypatch: pytest.MonkeyPatch) -> None: + Glm45vAgent, OpenRouterAgent, MCPToolResult, types = _import_agents() + png_base64 = "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAQAAAC1HAwCAAAAC0lEQVR42mP8/x8AAwMCAO61uFYAAAAASUVORK5CYII=" + + async def fake_completion(*_: Any, **__: Any) -> Any: + message = SimpleNamespace(content=( + "I will click the button.\n" + "<|begin_of_box|>{\"type\": \"click\", \"start_box\": [100, 200]}<|end_of_box|>\n" + "Memory:[]" + ), reasoning_content=None) + choice = SimpleNamespace(message=message) + return SimpleNamespace(choices=[choice]) + + monkeypatch.setattr("hud.agents.glm45v.litellm.acompletion", fake_completion) + + agent = OpenRouterAgent(model_name="z-ai/glm-4.5v") + + messages: list[dict[str, Any]] = [ + { + "type": "message", + "role": "user", + "content": [{"type": "input_text", "text": "click the highlighted cell"}], + }, + { + "type": "computer_call_output", + "call_id": "initial", + "output": { + "type": "input_image", + "image_url": f"data:image/png;base64,{png_base64}", + }, + }, + ] + + response = await agent.get_response(list(messages)) + + assert not response.done + assert response.tool_calls, "expected at least one tool call" + + tool_call = response.tool_calls[0] + assert tool_call.name == "openai_computer" + assert tool_call.arguments["type"] == "click" + # coordinates are normalized from the 1x1 PNG back to pixel space -> 0/0 + assert tool_call.arguments["x"] == 0 + assert tool_call.arguments["y"] == 0 + + tool_result = MCPToolResult( + content=[ + types.ImageContent(type="image", data=png_base64, mimeType="image/png"), + types.TextContent(type="text", text="button pressed"), + ] + ) + + rendered = await agent.format_tool_results([tool_call], [tool_result]) + + assert any(item.get("type") == "computer_call_output" for item in rendered) + assert any( + item.get("type") == "message" and item.get("role") == "user" + for item in rendered + ) diff --git a/hud/cli/__init__.py b/hud/cli/__init__.py index 3708cf0e..c1701f5c 100644 --- a/hud/cli/__init__.py +++ b/hud/cli/__init__.py @@ -777,7 +777,8 @@ def eval( agent: str | None = typer.Argument( None, help=( - "Agent backend to use (claude, openai, vllm, or litellm). If not provided, will prompt interactively." # noqa: E501 + "Agent backend to use (claude, openai computer use, openrouter responses, " + "vllm, or litellm). If not provided, will prompt interactively." ), ), full: bool = typer.Option( @@ -893,6 +894,7 @@ def eval( [ {"name": "Claude 4 Sonnet", "value": "claude"}, {"name": "OpenAI Computer Use", "value": "openai"}, + {"name": "OpenRouter", "value": "openrouter"}, {"name": "vLLM (Local Server)", "value": "vllm"}, {"name": "LiteLLM (Multi-provider)", "value": "litellm"}, ] @@ -901,7 +903,7 @@ def eval( agent = hud_console.select("Select an agent to use:", choices=choices, default=0) # Handle HUD model selection - if agent and agent not in ["claude", "openai", "vllm", "litellm", "integration_test"]: + if agent and agent not in ["claude", "openai", "openrouter", "vllm", "litellm", "integration_test"]: # Find remote model name model = agent if not vllm_base_url: @@ -922,7 +924,7 @@ def eval( hud_console.info(f"Using HUD model: {model} (trained on {base_model})") # Validate agent choice - valid_agents = ["claude", "openai", "vllm", "litellm", "integration_test"] + valid_agents = ["claude", "openai", "openrouter", "vllm", "litellm", "integration_test"] if agent not in valid_agents: hud_console.error(f"Invalid agent: {agent}. Must be one of: {', '.join(valid_agents)}") raise typer.Exit(1) diff --git a/hud/cli/eval.py b/hud/cli/eval.py index e8afceac..7719e84d 100644 --- a/hud/cli/eval.py +++ b/hud/cli/eval.py @@ -113,7 +113,7 @@ def _build_vllm_config( def build_agent( - agent_type: Literal["claude", "openai", "vllm", "litellm", "integration_test"], + agent_type: Literal["claude", "openai", "openrouter", "vllm", "litellm", "integration_test"], *, model: str | None = None, allowed_tools: list[str] | None = None, @@ -180,6 +180,21 @@ def build_agent( allowed_tools=allowed_tools, verbose=verbose, ) + elif agent_type == "openrouter": + try: + from hud.agents.openrouter import OpenRouterAgent + except ImportError as e: + hud_console.error( + "OpenRouter agent dependencies are not installed. " + "Please install with: pip install 'hud-python[agent]'" + ) + raise typer.Exit(1) from e + + return OpenRouterAgent( + model_name=model or "z-ai/glm-4.5v", + allowed_tools=allowed_tools, + verbose=verbose, + ) # Fallback Claude agent (Anthropic) try: @@ -209,7 +224,7 @@ def build_agent( async def run_single_task( source: str, *, - agent_type: Literal["claude", "openai", "vllm", "litellm", "integration_test"] = "claude", + agent_type: Literal["claude", "openai", "openrouter", "vllm", "litellm", "integration_test"] = "claude", model: str | None = None, allowed_tools: list[str] | None = None, max_steps: int = 10, @@ -305,6 +320,16 @@ async def run_single_task( } if allowed_tools: agent_config["allowed_tools"] = allowed_tools + elif agent_type == "openrouter": + from hud.agents.openrouter import OpenRouterAgent + + agent_class = OpenRouterAgent + agent_config = { + "model_name": model or "z-ai/glm-4.5v", + "verbose": verbose, + } + if allowed_tools: + agent_config["allowed_tools"] = allowed_tools elif agent_type == "claude": from hud.agents import ClaudeAgent @@ -353,7 +378,7 @@ async def run_single_task( async def run_full_dataset( source: str, *, - agent_type: Literal["claude", "openai", "vllm", "litellm", "integration_test"] = "claude", + agent_type: Literal["claude", "openai", "openrouter", "vllm", "litellm", "integration_test"] = "claude", model: str | None = None, allowed_tools: list[str] | None = None, max_concurrent: int = 30, @@ -454,6 +479,39 @@ async def run_full_dataset( if allowed_tools: agent_config["allowed_tools"] = allowed_tools + elif agent_type == "openrouter": + try: + # Use adapter class directly so it satisfies type[MCPAgent] + from hud.agents.openrouter import ( + OpenRouterAgent, + _ADAPTER_REGISTRY, + _load_adapter, + ) + except ImportError as e: + hud_console.error( + "OpenRouter agent dependencies are not installed. " + "Please install with: pip install 'hud-python[agent]'" + ) + raise typer.Exit(1) from e + + # Normalize model and resolve adapter + raw_model = model or "z-ai/glm-4.5v" + try: + normalized = OpenRouterAgent._normalize_model_name(raw_model) + adapter_path = _ADAPTER_REGISTRY[normalized] + except Exception as e: + hud_console.error(f"Unsupported OpenRouter model: {raw_model}") + raise typer.Exit(1) from e + + adapter_cls = _load_adapter(adapter_path) + agent_class = adapter_cls + agent_config = { + "model_name": f"openrouter/{normalized}", + "verbose": verbose, + } + if allowed_tools: + agent_config["allowed_tools"] = allowed_tools + else: try: from hud.agents import ClaudeAgent @@ -539,10 +597,13 @@ def eval_command( "--full", help="Run the entire dataset (omit for single-task debug mode)", ), - agent: Literal["claude", "openai", "vllm", "litellm", "integration_test"] = typer.Option( + agent: Literal["claude", "openai", "openrouter", "vllm", "litellm", "integration_test"] = typer.Option( "claude", "--agent", - help="Agent backend to use (claude, openai, vllm for local server, or litellm)", + help=( + "Agent backend to use (claude, openai computer use, openrouter, " + "vllm for local server, or litellm)" + ), ), model: str | None = typer.Option( None, diff --git a/hud/utils/agent_factories.py b/hud/utils/agent_factories.py index e15cb240..f42248a4 100644 --- a/hud/utils/agent_factories.py +++ b/hud/utils/agent_factories.py @@ -8,6 +8,7 @@ from hud.agents.grounded_openai import GroundedOpenAIChatAgent from hud.agents.openai_chat_generic import GenericOpenAIChatAgent +from hud.agents.openrouter import OpenRouterAgent from hud.tools.grounding import GrounderConfig @@ -82,3 +83,9 @@ def create_grounded_agent(**kwargs: Any) -> GroundedOpenAIChatAgent: return GroundedOpenAIChatAgent( openai_client=openai_client, grounder_config=grounder_config, **kwargs ) + + +def create_openrouter_agent(**kwargs: Any) -> OpenRouterAgent: + """Factory for OpenRouterAgent with run_dataset compatibility.""" + + return OpenRouterAgent(**kwargs)