From a1babda6d17cb861a10a89742f1ffdf1d7cb4d40 Mon Sep 17 00:00:00 2001 From: ramanE6x Date: Wed, 5 Nov 2025 01:54:07 +0530 Subject: [PATCH 1/7] Add validate_query method for query validation (plan-only, no execution) --- e6data_python_connector/e6data_grpc.py | 60 ++++++++++++++++++++++++++ 1 file changed, 60 insertions(+) diff --git a/e6data_python_connector/e6data_grpc.py b/e6data_python_connector/e6data_grpc.py index db667c4..a61be6c 100644 --- a/e6data_python_connector/e6data_grpc.py +++ b/e6data_python_connector/e6data_grpc.py @@ -853,6 +853,66 @@ def dry_run(self, query): ) return dry_run_response.dryrunValue + def validate_query(self, query, catalog=None, schema=None): + """ + Validates a query (plan-only, no execution). + This method performs syntax, parsing, and plan compilation validation. + + Args: + query (str): The SQL query to be validated. + catalog (str, optional): Catalog name. Uses connection's catalog if not provided. + schema (str, optional): Schema/database name. Uses connection's database if not provided. + + Returns: + dict: A dictionary with: + - 'success' (bool): True if query is valid, False otherwise + - 'query_id' (str, optional): The query ID if validation succeeded + - 'error' (str, optional): Error message if validation failed + + Example: + >>> result = conn.validate_query("SELECT * FROM table") + >>> if result['success']: + ... print(f"Query is valid! Query ID: {result['query_id']}") + >>> else: + ... print(f"Error: {result['error']}") + """ + + catalog_to_use = catalog if catalog is not None else self.catalog_name + schema_to_use = schema if schema is not None else self.database + + try: + + prepare_statement_request = e6x_engine_pb2.PrepareStatementV2Request( + sessionId=self.get_session_id, + schema=schema_to_use if schema_to_use else "", + catalog=catalog_to_use if catalog_to_use else "", + queryString=query, + quoting="DOUBLE_QUOTE" + ) + + prepare_statement_response = self._client.prepareStatementV2( + prepare_statement_request, + metadata=_get_grpc_header(cluster=self.cluster_name, strategy=_get_active_strategy()), + timeout=self.grpc_prepare_timeout + ) + + if hasattr(prepare_statement_response, 'new_strategy') and prepare_statement_response.new_strategy: + new_strategy = prepare_statement_response.new_strategy.lower() + current_strategy = _get_active_strategy() + if new_strategy != current_strategy: + _set_pending_strategy(new_strategy) + + + return { + 'success': True + } + + except Exception as e: + return { + 'success': False, + 'error': str(e) + } + def get_tables(self, catalog, database): """ Retrieves the list of tables from the specified catalog and database. From 47cef4541c63a7e1d4933940571007728d854fa1 Mon Sep 17 00:00:00 2001 From: Raman Dwivedi Date: Wed, 5 Nov 2025 01:57:35 +0530 Subject: [PATCH 2/7] remove quoting parameter --- e6data_python_connector/e6data_grpc.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/e6data_python_connector/e6data_grpc.py b/e6data_python_connector/e6data_grpc.py index a61be6c..dfb43c2 100644 --- a/e6data_python_connector/e6data_grpc.py +++ b/e6data_python_connector/e6data_grpc.py @@ -886,8 +886,7 @@ def validate_query(self, query, catalog=None, schema=None): sessionId=self.get_session_id, schema=schema_to_use if schema_to_use else "", catalog=catalog_to_use if catalog_to_use else "", - queryString=query, - quoting="DOUBLE_QUOTE" + queryString=query ) prepare_statement_response = self._client.prepareStatementV2( From 2cb07587a975f1cf207800b89fc129d716d5b2a9 Mon Sep 17 00:00:00 2001 From: Raman Dwivedi Date: Wed, 5 Nov 2025 12:50:02 +0530 Subject: [PATCH 3/7] Update doc string Removed optional query_id from validation return. --- e6data_python_connector/e6data_grpc.py | 1 - 1 file changed, 1 deletion(-) diff --git a/e6data_python_connector/e6data_grpc.py b/e6data_python_connector/e6data_grpc.py index dfb43c2..4c4846a 100644 --- a/e6data_python_connector/e6data_grpc.py +++ b/e6data_python_connector/e6data_grpc.py @@ -866,7 +866,6 @@ def validate_query(self, query, catalog=None, schema=None): Returns: dict: A dictionary with: - 'success' (bool): True if query is valid, False otherwise - - 'query_id' (str, optional): The query ID if validation succeeded - 'error' (str, optional): Error message if validation failed Example: From a850baa95c2b74fb988dbcb18518f351ab2a3791 Mon Sep 17 00:00:00 2001 From: Raman Dwivedi Date: Wed, 5 Nov 2025 13:29:13 +0530 Subject: [PATCH 4/7] added explicit query cancellation --- e6data_python_connector/e6data_grpc.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/e6data_python_connector/e6data_grpc.py b/e6data_python_connector/e6data_grpc.py index 4c4846a..54417b6 100644 --- a/e6data_python_connector/e6data_grpc.py +++ b/e6data_python_connector/e6data_grpc.py @@ -893,16 +893,24 @@ def validate_query(self, query, catalog=None, schema=None): metadata=_get_grpc_header(cluster=self.cluster_name, strategy=_get_active_strategy()), timeout=self.grpc_prepare_timeout ) - + + query_id = prepare_statement_response.queryId + engine_ip = prepare_statement_response.engineIP + if hasattr(prepare_statement_response, 'new_strategy') and prepare_statement_response.new_strategy: new_strategy = prepare_statement_response.new_strategy.lower() current_strategy = _get_active_strategy() if new_strategy != current_strategy: _set_pending_strategy(new_strategy) + try: + self.query_cancel(engine_ip, query_id) + except Exception: + # If cleanup fails, planner will auto-cleanup after timeout (15 min default) + pass return { - 'success': True + 'success': True, } except Exception as e: From 378d8e4bd244b15deba112cb01ebc93c2440a822 Mon Sep 17 00:00:00 2001 From: Raman Dwivedi Date: Wed, 5 Nov 2025 13:58:06 +0530 Subject: [PATCH 5/7] Update e6data_grpc.py --- e6data_python_connector/e6data_grpc.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/e6data_python_connector/e6data_grpc.py b/e6data_python_connector/e6data_grpc.py index 54417b6..cd9e040 100644 --- a/e6data_python_connector/e6data_grpc.py +++ b/e6data_python_connector/e6data_grpc.py @@ -883,8 +883,8 @@ def validate_query(self, query, catalog=None, schema=None): prepare_statement_request = e6x_engine_pb2.PrepareStatementV2Request( sessionId=self.get_session_id, - schema=schema_to_use if schema_to_use else "", - catalog=catalog_to_use if catalog_to_use else "", + schema=schema_to_use, + catalog=catalog_to_use, queryString=query ) From 19f6341d642ff6e92bf43b22fe9033387de7e9b6 Mon Sep 17 00:00:00 2001 From: ramanE6x Date: Wed, 5 Nov 2025 17:17:38 +0530 Subject: [PATCH 6/7] using dryRunV2 instead of prepareStatementV2 --- e6data_python_connector/e6data_grpc.py | 45 +++++++++----------------- 1 file changed, 15 insertions(+), 30 deletions(-) diff --git a/e6data_python_connector/e6data_grpc.py b/e6data_python_connector/e6data_grpc.py index cd9e040..1a0fa24 100644 --- a/e6data_python_connector/e6data_grpc.py +++ b/e6data_python_connector/e6data_grpc.py @@ -853,64 +853,49 @@ def dry_run(self, query): ) return dry_run_response.dryrunValue - def validate_query(self, query, catalog=None, schema=None): + def get_logical_plan(self, query, catalog=None, schema=None): """ - Validates a query (plan-only, no execution). - This method performs syntax, parsing, and plan compilation validation. + Gets the logical execution plan for a query without executing it. + This method performs syntax, parsing, and plan compilation, then returns the logical plan. Args: - query (str): The SQL query to be validated. + query (str): The SQL query to get the plan for. catalog (str, optional): Catalog name. Uses connection's catalog if not provided. schema (str, optional): Schema/database name. Uses connection's database if not provided. Returns: dict: A dictionary with: - - 'success' (bool): True if query is valid, False otherwise - - 'error' (str, optional): Error message if validation failed + - 'success' (bool): True if plan generation succeeded, False otherwise + - 'plan' (str, optional): The logical plan string if successful + - 'error' (str, optional): Error message if plan generation failed Example: - >>> result = conn.validate_query("SELECT * FROM table") + >>> result = conn.get_logical_plan("SELECT * FROM table") >>> if result['success']: - ... print(f"Query is valid! Query ID: {result['query_id']}") + ... print(f"Plan: {result['plan']}") >>> else: ... print(f"Error: {result['error']}") """ - catalog_to_use = catalog if catalog is not None else self.catalog_name schema_to_use = schema if schema is not None else self.database try: - - prepare_statement_request = e6x_engine_pb2.PrepareStatementV2Request( + dry_run_request = e6x_engine_pb2.DryRunRequestV2( + engineIP=self._engine_ip, sessionId=self.get_session_id, schema=schema_to_use, catalog=catalog_to_use, queryString=query ) - prepare_statement_response = self._client.prepareStatementV2( - prepare_statement_request, - metadata=_get_grpc_header(cluster=self.cluster_name, strategy=_get_active_strategy()), - timeout=self.grpc_prepare_timeout + dry_run_response = self._client.dryRunV2( + dry_run_request, + metadata=_get_grpc_header(cluster=self.cluster_name, strategy=_get_active_strategy()) ) - query_id = prepare_statement_response.queryId - engine_ip = prepare_statement_response.engineIP - - if hasattr(prepare_statement_response, 'new_strategy') and prepare_statement_response.new_strategy: - new_strategy = prepare_statement_response.new_strategy.lower() - current_strategy = _get_active_strategy() - if new_strategy != current_strategy: - _set_pending_strategy(new_strategy) - - try: - self.query_cancel(engine_ip, query_id) - except Exception: - # If cleanup fails, planner will auto-cleanup after timeout (15 min default) - pass - return { 'success': True, + 'plan': dry_run_response.dryrunValue } except Exception as e: From d615ea5f04dade8cd8d6e597f372512fd320b208 Mon Sep 17 00:00:00 2001 From: Raman Dwivedi Date: Wed, 5 Nov 2025 22:53:35 +0530 Subject: [PATCH 7/7] engineIP not needed Removed engineIP from DryRunRequestV2 instantiation. --- e6data_python_connector/e6data_grpc.py | 1 - 1 file changed, 1 deletion(-) diff --git a/e6data_python_connector/e6data_grpc.py b/e6data_python_connector/e6data_grpc.py index 1a0fa24..50ca918 100644 --- a/e6data_python_connector/e6data_grpc.py +++ b/e6data_python_connector/e6data_grpc.py @@ -881,7 +881,6 @@ def get_logical_plan(self, query, catalog=None, schema=None): try: dry_run_request = e6x_engine_pb2.DryRunRequestV2( - engineIP=self._engine_ip, sessionId=self.get_session_id, schema=schema_to_use, catalog=catalog_to_use,