Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# e6data Python Connector

![version](https://img.shields.io/badge/version-2.2.2-blue.svg)
![version](https://img.shields.io/badge/version-2.2.3.rc1-blue.svg)

## Introduction

Expand Down
15 changes: 9 additions & 6 deletions e6data_python_connector/e6data_grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,8 @@ def wrapper(self, *args, **kwargs):
try:
return func(self, *args, **kwargs)
except _InactiveRpcError as e:
print(f'RE_AUTH: Function Name: {func}')
print(f'RE_AUTH: Error Found {e}')
if e.code() == grpc.StatusCode.INTERNAL and 'Access denied' in e.details():
print('RE_AUTH: Initialising re-authentication.')
self.connection.get_re_authenticate_session_id()
print(f'RE_AUTH: Re-auth successful.')
return func(self, *args, **kwargs)
else:
raise e
Expand Down Expand Up @@ -192,6 +188,7 @@ def __init__(
self._max_receive_message_length = -1
self._max_send_message_length = 300 * 1024 * 1024 # mb
self.grpc_prepare_timeout = 10 * 60 # 10 minutes
self._planner_ip = None

if isinstance(grpc_options, dict):
self._keepalive_timeout_ms = grpc_options.get('keepalive_timeout_ms') or self._keepalive_timeout_ms
Expand Down Expand Up @@ -224,7 +221,8 @@ def _create_client(self):
self._client = e6x_engine_pb2_grpc.QueryEngineServiceStub(self._channel)

def get_re_authenticate_session_id(self):
self._session_id = None
self.close()
self._create_client()
return self.get_session_id

@property
Expand All @@ -243,6 +241,7 @@ def get_session_id(self):
metadata=_get_grpc_header(cluster=self.cluster_uuid)
)
self._session_id = authenticate_response.sessionId
self._planner_ip = authenticate_response.plannerIp
if not self._session_id:
raise ValueError("Invalid credentials.")
except _InactiveRpcError as e:
Expand All @@ -266,6 +265,7 @@ def get_session_id(self):
metadata=_get_grpc_header(cluster=self.cluster_uuid)
)
self._session_id = authenticate_response.sessionId
self._planner_ip = authenticate_response.plannerIp
else:
raise e
else:
Expand Down Expand Up @@ -423,6 +423,8 @@ def _reset_state(self):

@property
def metadata(self):
if not self._engine_ip:
self._engine_ip = self.connection._planner_ip
return _get_grpc_header(engine_ip=self._engine_ip, cluster=self.connection.cluster_uuid)

@property
Expand Down Expand Up @@ -568,7 +570,8 @@ def execute(self, operation, parameters=None, **kwargs):
sessionId=self.connection.get_session_id,
schema=self._database,
catalog=self._catalog_name,
queryString=sql
queryString=sql,
plannerIp=self._engine_ip
)
prepare_statement_response = client.prepareStatementV2(
prepare_statement_request,
Expand Down
173 changes: 87 additions & 86 deletions e6data_python_connector/server/e6x_engine_pb2.py

Large diffs are not rendered by default.

Loading