Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ MANIFEST.in
venv/
.DS_Store
gen-py/
.mise.toml
47 changes: 40 additions & 7 deletions e6data_python_connector/e6data_grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,13 @@ def get_session_id(self):
raise e
return self._session_id

def _set_session_id(self, refreshed_session_id):
if refreshed_session_id:
self._session_id = refreshed_session_id

def set_session_id_from_response(self, response):
self._set_session_id(response.sessionId)

def __enter__(self):
"""
Enters the runtime context related to this object.
Expand Down Expand Up @@ -385,10 +392,11 @@ def clear(self, query_id, engine_ip=None):
queryId=query_id,
engineIP=engine_ip
)
self._client.clear(
clear_response = self._client.clear(
clear_request,
metadata=_get_grpc_header(engine_ip=engine_ip, cluster=self.cluster_uuid)
)
self.set_session_id_from_response(clear_response)

def reopen(self):
"""
Expand All @@ -412,10 +420,11 @@ def query_cancel(self, engine_ip, query_id):
sessionId=self.get_session_id,
queryId=query_id
)
self._client.cancelQuery(
cancel_query_response = self._client.cancelQuery(
cancel_query_request,
metadata=_get_grpc_header(engine_ip=engine_ip, cluster=self.cluster_uuid)
)
self.set_session_id_from_response(cancel_query_response)

def dry_run(self, query):
"""
Expand Down Expand Up @@ -458,6 +467,7 @@ def get_tables(self, catalog, database):
get_table_request,
metadata=_get_grpc_header(cluster=self.cluster_uuid)
)
self.set_session_id_from_response(get_table_response)
return list(get_table_response.tables)

def get_columns(self, catalog, database, table):
Expand All @@ -482,6 +492,7 @@ def get_columns(self, catalog, database, table):
get_columns_request,
metadata=_get_grpc_header(cluster=self.cluster_uuid)
)
self.set_session_id_from_response(get_columns_response)
return [{'fieldName': row.fieldName, 'fieldType': row.fieldType} for row in get_columns_response.fieldInfo]

def get_schema_names(self, catalog):
Expand All @@ -502,6 +513,7 @@ def get_schema_names(self, catalog):
get_schema_request,
metadata=_get_grpc_header(cluster=self.cluster_uuid)
)
self.set_session_id_from_response(get_schema_response)
return list(get_schema_response.schemas)

def commit(self):
Expand Down Expand Up @@ -729,7 +741,9 @@ def clear(self, query_id=None):
queryId=query_id,
engineIP=self._engine_ip
)
return self.connection.client.clearOrCancelQuery(clear_request, metadata=self.metadata)
clear_response = self.connection.client.clearOrCancelQuery(clear_request, metadata=self.metadata)
self.connection.set_session_id_from_response(clear_response)
return clear_response

def cancel(self, query_id):
"""
Expand All @@ -755,7 +769,9 @@ def status(self, query_id):
queryId=query_id,
engineIP=self._engine_ip
)
return self.connection.client.status(status_request, metadata=self.metadata)
status_response = self.connection.client.status(status_request, metadata=self.metadata)
self.connection.set_session_id_from_response(status_response)
return status_response

@re_auth
def execute(self, operation, parameters=None, **kwargs):
Expand Down Expand Up @@ -792,17 +808,21 @@ def execute(self, operation, parameters=None, **kwargs):
metadata=self.metadata
)

self.connection.set_session_id_from_response(prepare_statement_response)
self._query_id = prepare_statement_response.queryId
self._engine_ip = prepare_statement_response.engineIP

execute_statement_request = e6x_engine_pb2.ExecuteStatementRequest(
engineIP=self._engine_ip,
sessionId=self.connection.get_session_id,
queryId=self._query_id,
)
client.executeStatement(
execute_statement_response = client.executeStatement(
execute_statement_request,
metadata=self.metadata
)

self.connection.set_session_id_from_response(execute_statement_response)
else:
prepare_statement_request = e6x_engine_pb2.PrepareStatementV2Request(
sessionId=self.connection.get_session_id,
Expand All @@ -816,6 +836,7 @@ def execute(self, operation, parameters=None, **kwargs):
timeout=self.connection.grpc_prepare_timeout
)

self.connection.set_session_id_from_response(prepare_statement_response)
self._query_id = prepare_statement_response.queryId
self._engine_ip = prepare_statement_response.engineIP

Expand All @@ -824,10 +845,12 @@ def execute(self, operation, parameters=None, **kwargs):
sessionId=self.connection.get_session_id,
queryId=self._query_id
)
client.executeStatementV2(
execute_statement_response = client.executeStatementV2(
execute_statement_request,
metadata=self.metadata
)

self.connection.set_session_id_from_response(execute_statement_response)
self.update_mete_data()
return self._query_id

Expand Down Expand Up @@ -856,6 +879,8 @@ def update_mete_data(self):
metadata=self.metadata
)
buffer = BytesIO(get_result_metadata_response.resultMetaData)

self.connection.set_session_id_from_response(get_result_metadata_response)
self._rowcount, self._query_columns_description = get_query_columns_info(buffer)
self._is_metadata_updated = True

Expand Down Expand Up @@ -927,6 +952,9 @@ def fetch_batch(self):
get_next_result_batch_request,
metadata=self.metadata
)

self.connection.set_session_id_from_response(get_next_result_batch_response)

buffer = get_next_result_batch_response.resultBatch
if not self._is_metadata_updated:
self.update_mete_data()
Expand Down Expand Up @@ -999,6 +1027,8 @@ def explain(self):
explain_request,
metadata=self.metadata
)

self.connection.set_session_id_from_response(explain_response)
return explain_response.explain

def explain_analyse(self):
Expand All @@ -1017,6 +1047,9 @@ def explain_analyse(self):
explain_analyze_request,
metadata=self.metadata
)

self.connection.set_session_id_from_response(explain_analyze_response)

return dict(
is_cached=explain_analyze_response.isCached,
parsing_time=explain_analyze_response.parsingTime,
Expand Down Expand Up @@ -1056,4 +1089,4 @@ class Error(Exception):

for type_id in PRIMITIVE_TYPES:
name = TypeId._VALUES_TO_NAMES[type_id]
setattr(sys.modules[__name__], name, DBAPITypeObject([name]))
setattr(sys.modules[__name__], name, DBAPITypeObject([name]))
259 changes: 139 additions & 120 deletions e6data_python_connector/server/e6x_engine_pb2.py

Large diffs are not rendered by default.

Loading