Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,39 @@ An MCP server for ClickHouse.
* Input: `sql` (string): The SQL query to execute.
* Query data directly from various sources (files, URLs, databases) without ETL processes.

### Memory Tools (Experimental)

> [!WARNING]
> Memory tools are an experimental feature and may change in future versions.

When enabled via the `CLICKHOUSE_MEMORY=true` environment variable, the following memory management tools become available:

* `save_memory`
* Store user-provided information as key-value pairs for later retrieval and reference.
* Input: `key` (string): A concise, descriptive key that summarizes the content.
* Input: `value` (string): The information to store.

* `get_memories_titles`
* Retrieve all memory keys/titles to see what information has been stored.
* Returns a list of all stored memory keys with timestamps.

* `get_memory`
* Retrieve all memory entries matching a specific key.
* Input: `key` (string): The key to search for.
* Returns all memories associated with that key, ordered by most recent first.

* `get_all_memories`
* Retrieve all saved memories from the memory table.
* Input: None
* **Warning**: Should only be used when explicitly requested, as it may return large amounts of data.

* `delete_memory`
* Delete all memory entries matching a specific key.
* Input: `key` (string): The key of memories to delete.
* **Warning**: Should only be used when explicitly requested by the user.

These tools use ClickHouse to store memories in a `user_memory` table, allowing information to persist across sessions.

### Health Check Endpoint

When running with HTTP or SSE transport, a health check endpoint is available at `/health`. This endpoint:
Expand Down Expand Up @@ -317,6 +350,9 @@ The following environment variables are used to configure the ClickHouse and chD
* `CLICKHOUSE_ENABLED`: Enable/disable ClickHouse functionality
* Default: `"true"`
* Set to `"false"` to disable ClickHouse tools when using chDB only
* `CLICKHOUSE_MEMORY`: Enable/disable memory tools (experimental)
* Default: `"false"`
* Set to `"true"` to enable memory management tools for storing key-value data

#### chDB Variables

Expand Down
8 changes: 8 additions & 0 deletions mcp_clickhouse/mcp_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,14 @@ def mcp_bind_port(self) -> int:
"""
return int(os.getenv("CLICKHOUSE_MCP_BIND_PORT", "8000"))

@property
def memory_enabled(self) -> bool:
"""Get whether memory tools are enabled.

Default: False
"""
return os.getenv("CLICKHOUSE_MEMORY", "false").lower() == "true"

def get_client_config(self) -> dict:
"""Get the configuration dictionary for clickhouse_connect client.

Expand Down
289 changes: 289 additions & 0 deletions mcp_clickhouse/mcp_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,283 @@ def get_readonly_setting(client) -> str:
return "1" # Default to basic read-only mode if setting isn't present


def execute_write_query(query: str):
"""This function bypasses the read-only mode and allows write queries to be executed.

TODO: Find a sustainable way to execute write queries.

Args:
query: The write query to execute

Returns:
The result of the write query
"""
client = create_clickhouse_client()
try:
res = client.command(query)
logger.info("Write query executed successfully")
return res
except Exception as err:
logger.error(f"Error executing write query: {err}")
return {"error": str(err)}


def save_memory(key: str, value: str):
"""Store user-provided information as key-value pairs for later retrieval and reference. Generate a concise, descriptive key that summarizes the value content provided."""
logger.info(f"Saving memory with key: {key}")

try:
# Create table if it doesn't exist
logger.info("Ensuring user_memory table exists")
create_table_query = """
CREATE TABLE IF NOT EXISTS user_memory (
key String,
value String,
created_at DateTime DEFAULT now(),
updated_at DateTime DEFAULT now()
) ENGINE = MergeTree()
ORDER BY key
"""

create_result = execute_write_query(create_table_query)
if isinstance(create_result, dict) and "error" in create_result:
return {
"status": "error",
"message": f"Failed to create user_memory table: {create_result['error']}"
}

# Insert or replace the memory data using REPLACE INTO for upsert behavior
insert_query = f"""
INSERT INTO user_memory (key, value, updated_at)
VALUES ({format_query_value(key)}, {format_query_value(value)}, now())
"""

insert_result = execute_write_query(insert_query)
if isinstance(insert_result, dict) and "error" in insert_result:
return {
"status": "error",
"message": f"Failed to save memory: {insert_result['error']}"
}

logger.info(f"Successfully saved memory with key: {key}")
return {
"status": "success",
"message": f"Memory '{key}' saved successfully",
"key": key
}

except Exception as e:
logger.error(f"Unexpected error in save_memory: {str(e)}")
return {"status": "error", "message": f"Unexpected error: {str(e)}"}


def get_memories_titles():
"""Retrieve all memory keys/titles from the user memory table to see what information has been stored."""
logger.info("Retrieving all memory titles")

try:
# Query to get all keys with their timestamps
query = """
SELECT key, created_at, updated_at
FROM user_memory
ORDER BY updated_at DESC
"""

result = execute_query(query)

# Check if we received an error structure from execute_query
if isinstance(result, dict) and "error" in result:
return {
"status": "error",
"message": f"Failed to retrieve memory titles: {result['error']}"
}

# Extract just the keys for the response from the new result format
rows = result.get("rows", [])
titles = [row[0] for row in rows] if rows else []

# Convert rows to dict format for details
columns = result.get("columns", [])
details = []
for row in rows:
row_dict = {}
for i, col_name in enumerate(columns):
row_dict[col_name] = row[i]
details.append(row_dict)

logger.info(f"Retrieved {len(titles)} memory titles")
return {
"status": "success",
"titles": titles,
"count": len(titles),
"details": details # Include full details with timestamps
}

except Exception as e:
logger.error(f"Unexpected error in get_memories_titles: {str(e)}")
return {"status": "error", "message": f"Unexpected error: {str(e)}"}


def get_memory(key: str):
"""Retrieve all memory entries matching the specified key from the user memory table."""
logger.info(f"Retrieving memory for key: {key}")

try:
# Query to get all memories matching the key, ordered by most recent first
query = f"""
SELECT key, value, created_at, updated_at
FROM user_memory
WHERE key = {format_query_value(key)}
ORDER BY updated_at DESC
"""

result = execute_query(query)

# Check if we received an error structure from execute_query
if isinstance(result, dict) and "error" in result:
return {
"status": "error",
"message": f"Failed to retrieve memory: {result['error']}"
}

# Convert to dict format
columns = result.get("columns", [])
rows = result.get("rows", [])
memories = []
for row in rows:
row_dict = {}
for i, col_name in enumerate(columns):
row_dict[col_name] = row[i]
memories.append(row_dict)

# Check if memory exists
if not memories:
logger.info(f"No memory found for key: {key}")
return {
"status": "not_found",
"message": f"No memory found with key '{key}'",
"key": key
}

# Return all matching memories
logger.info(f"Successfully retrieved {len(memories)} memories for key: {key}")

return {
"status": "success",
"key": key,
"count": len(memories),
"memories": memories
}

except Exception as e:
logger.error(f"Unexpected error in get_memory: {str(e)}")
return {"status": "error", "message": f"Unexpected error: {str(e)}"}


def get_all_memories():
"""Retrieve all saved memories from the user memory table, don't list them back, just the give the number of memories retrieved. WARNING: This tool should only be used when explicitly requested by the user, as it may return large amounts of data."""
logger.info("Retrieving all memories")

try:
# Query to get all memories ordered by most recent first
query = """
SELECT key, value, created_at, updated_at
FROM user_memory
ORDER BY updated_at DESC
"""

result = execute_query(query)

# Check if we received an error structure from execute_query
if isinstance(result, dict) and "error" in result:
return {
"status": "error",
"message": f"Failed to retrieve all memories: {result['error']}"
}

# Convert to dict format
columns = result.get("columns", [])
rows = result.get("rows", [])
memories = []
for row in rows:
row_dict = {}
for i, col_name in enumerate(columns):
row_dict[col_name] = row[i]
memories.append(row_dict)

# Return all memories
logger.info(f"Successfully retrieved {len(memories)} total memories")

return {
"status": "success",
"count": len(memories),
"memories": memories
}

except Exception as e:
logger.error(f"Unexpected error in get_all_memories: {str(e)}")
return {"status": "error", "message": f"Unexpected error: {str(e)}"}


def delete_memory(key: str):
"""Delete all memory entries matching the specified key from the user memory table. Warining this tool should only be used when explicitly requested by the user"""
Copy link
Preview

Copilot AI Oct 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Corrected spelling of 'Warining' to 'Warning'.

Suggested change
"""Delete all memory entries matching the specified key from the user memory table. Warining this tool should only be used when explicitly requested by the user"""
"""Delete all memory entries matching the specified key from the user memory table. Warning this tool should only be used when explicitly requested by the user"""

Copilot uses AI. Check for mistakes.

logger.info(f"Deleting memory for key: {key}")

try:
# First check if memories exist for this key
check_query = f"""
SELECT count()
FROM user_memory
WHERE key = {format_query_value(key)}
"""

check_result = execute_query(check_query)

# Check if we received an error structure from execute_query
if isinstance(check_result, dict) and "error" in check_result:
return {
"status": "error",
"message": f"Failed to check memory existence: {check_result['error']}"
}

# Check if any memories exist - handle new result format
rows = check_result.get("rows", [])
if not rows or len(rows) == 0 or rows[0][0] == 0:
logger.info(f"No memories found for key: {key}")
return {
"status": "not_found",
"message": f"No memories found with key '{key}'",
"key": key
}

memories_count = rows[0][0]

# Delete the memories
delete_query = f"""
ALTER TABLE user_memory
DELETE WHERE key = {format_query_value(key)}
"""

delete_result = execute_write_query(delete_query)
if isinstance(delete_result, dict) and "error" in delete_result:
return {
"status": "error",
"message": f"Failed to delete memories: {delete_result['error']}"
}

logger.info(f"Successfully deleted {memories_count} memories for key: {key}")
return {
"status": "success",
"message": f"Deleted {memories_count} memory entries with key '{key}'",
"key": key,
"deleted_count": memories_count
}

except Exception as e:
logger.error(f"Unexpected error in delete_memory: {str(e)}")
return {"status": "error", "message": f"Unexpected error: {str(e)}"}


def create_chdb_client():
"""Create a chDB client connection."""
if not get_chdb_config().enabled:
Expand Down Expand Up @@ -370,3 +647,15 @@ def _init_chdb_client():
)
mcp.add_prompt(chdb_prompt)
logger.info("chDB tools and prompts registered")

# Conditionally register memory tools based on CLICKHOUSE_MEMORY flag
config = get_config()
if config.memory_enabled:
logger.info("Memory tools enabled - registering memory management tools")
mcp.add_tool(Tool.from_function(save_memory))
mcp.add_tool(Tool.from_function(get_memories_titles))
mcp.add_tool(Tool.from_function(get_memory))
mcp.add_tool(Tool.from_function(get_all_memories))
mcp.add_tool(Tool.from_function(delete_memory))
else:
logger.info("Memory tools disabled - set CLICKHOUSE_MEMORY=true to enable")
5 changes: 5 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,8 @@ line-length = 100
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

[dependency-groups]
dev = [
"pytest>=8.4.1",
]
Loading
Loading