diff --git a/mcp_timeplus/__init__.py b/mcp_timeplus/__init__.py index 8fa3165..690f64e 100644 --- a/mcp_timeplus/__init__.py +++ b/mcp_timeplus/__init__.py @@ -7,6 +7,7 @@ explore_kafka_topic, create_kafka_stream, generate_sql, + connect_to_apache_iceberg, ) __all__ = [ @@ -18,4 +19,5 @@ "explore_kafka_topic", "create_kafka_stream", "generate_sql", + "connect_to_apache_iceberg", ] diff --git a/mcp_timeplus/mcp_server.py b/mcp_timeplus/mcp_server.py index c470db6..09badeb 100644 --- a/mcp_timeplus/mcp_server.py +++ b/mcp_timeplus/mcp_server.py @@ -238,3 +238,26 @@ def create_timeplus_client(): except Exception as e: logger.error(f"Failed to connect to Timeplus: {str(e)}") raise + +@mcp.tool() +def connect_to_apache_iceberg(iceberg_db: str,aws_account_id: int, s3_bucket: str, aws_region: str="us-west-2",is_s3_table_bucket: bool=False): + """Create a Timeplus database in iceberg type to connect to Iceberg""" + if iceberg_db is None or aws_account_id is None or s3_bucket is None: + return {"error": "iceberg_db, aws_account_id, and s3_bucket are required"} + logger.info("Creating Iceberg database") + warehouse=aws_account_id + storage_endpoint=f"https://{s3_bucket}.s3.{aws_region}.amazonaws.com" + if is_s3_table_bucket: + warehouse=f"{aws_account_id}:s3tablescatalog/{s3_bucket}" + storage_endpoint=f"https://s3tables.{aws_region}.amazonaws.com/{s3_bucket}" + + sql=f"""CREATE DATABASE {iceberg_db} + SETTINGS type='iceberg',catalog_uri='https://glue.{aws_region}.amazonaws.com/iceberg',catalog_type='rest', + warehouse='{warehouse}',storage_endpoint='{storage_endpoint}',use_environment_credentials=true, + rest_catalog_sigv4_enabled=true,rest_catalog_signing_region='{aws_region}',rest_catalog_signing_name='glue' + """ + run_sql(sql) + logger.info("Iceberg database created") + + sql=f"SHOW STREAMS FROM {iceberg_db}" + return run_sql(sql)