Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions docs/integrations/data-integrations/snowflake.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,22 @@ WITH
};
```

If the private key cannot be accesed from disk (for example when running MindsDB on Cloud), provide the PEM content directly:

```sql
CREATE DATABASE snowflake_datasource
WITH
ENGINE = 'snowflake',
PARAMETERS = {
"account": "tvuibdy-vm85921",
"user": "your_username",
"private_key": "-----BEGIN PRIVATE KEY-----\\n...\\n-----END PRIVATE KEY-----",
"database": "test_db",
"auth_type": "key_pair"
};
```


With encrypted private key (passphrase protected):

```sql
Expand Down Expand Up @@ -116,6 +132,7 @@ Authentication parameters (one method required):

* `password`: The password for the Snowflake account (password authentication).
* `private_key_path`: Path to the private key file for key pair authentication.
* `private_key`: PEM-formatted private key content for key pair authentication.
* `private_key_passphrase`: Optional passphrase for encrypted private key (key pair authentication).

Optional parameters:
Expand Down
2 changes: 1 addition & 1 deletion mindsdb/__about__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
__title__ = "MindsDB"
__package_name__ = "mindsdb"
__version__ = "25.13.0"
__version__ = "25.13.1"
__description__ = "MindsDB's AI SQL Server enables developers to build AI tools that need access to real-time data to perform their tasks"
__email__ = "jorge@mindsdb.com"
__author__ = "MindsDB Inc"
Expand Down
107 changes: 84 additions & 23 deletions mindsdb/integrations/handlers/bigquery_handler/bigquery_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@
from mindsdb_sql_parser.ast.base import ASTNode
from mindsdb.integrations.libs.base import MetaDatabaseHandler
from mindsdb.utilities.render.sqlalchemy_render import SqlalchemyRender
from mindsdb.integrations.utilities.handlers.auth_utilities.google import GoogleServiceAccountOAuth2Manager
from mindsdb.integrations.utilities.handlers.auth_utilities.google import (
GoogleServiceAccountOAuth2Manager,
)
from mindsdb.integrations.libs.response import (
HandlerStatusResponse as StatusResponse,
HandlerResponse as Response,
Expand Down Expand Up @@ -270,37 +272,92 @@ def meta_get_column_statistics_for_table(self, table_name: str, columns: list) -
Returns:
Response: A response object containing the column statistics.
"""
# Check column data types
column_types_query = f"""
SELECT column_name, data_type
FROM `{self.connection_data["project_id"]}.{self.connection_data["dataset"]}.INFORMATION_SCHEMA.COLUMNS`
WHERE table_name = '{table_name}'
"""
column_types_result = self.native_query(column_types_query)

if column_types_result.resp_type != RESPONSE_TYPE.TABLE:
logger.error(f"Error retrieving column types for table {table_name}")
return Response(
RESPONSE_TYPE.ERROR,
error_message=f"Could not retrieve column types for table {table_name}",
)

column_type_map = dict(
zip(
column_types_result.data_frame["column_name"],
column_types_result.data_frame["data_type"],
)
)

# Types that don't support MIN/MAX aggregations
UNSUPPORTED_MINMAX_PREFIXES = ("ARRAY", "STRUCT", "RECORD")
UNSUPPORTED_MINMAX_TYPES = ("GEOGRAPHY", "JSON", "BYTES")

def supports_minmax(data_type: str) -> bool:
"""Check if a BigQuery data type supports MIN/MAX operations."""
if data_type is None:
return False
data_type_upper = data_type.upper()
if any(data_type_upper.startswith(prefix) for prefix in UNSUPPORTED_MINMAX_PREFIXES):
return False
if data_type_upper in UNSUPPORTED_MINMAX_TYPES:
return False
return True

# To avoid hitting BigQuery's query size limits, we will chunk the columns into batches.
# This is because the queries are combined using UNION ALL, which can lead to very large queries if there are many columns.
BATCH_SIZE = 20

def chunked(lst, n):
"""
Yields successive n-sized chunks from lst.
"""
"""Yields successive n-sized chunks from lst."""
for i in range(0, len(lst), n):
yield lst[i : i + n]

queries = []
for column_batch in chunked(columns, BATCH_SIZE):
batch_queries = []
for column in column_batch:
batch_queries.append(
f"""
SELECT
'{table_name}' AS table_name,
'{column}' AS column_name,
SAFE_DIVIDE(COUNTIF({column} IS NULL), COUNT(*)) * 100 AS null_percentage,
CAST(MIN(`{column}`) AS STRING) AS minimum_value,
CAST(MAX(`{column}`) AS STRING) AS maximum_value,
COUNT(DISTINCT {column}) AS distinct_values_count
FROM
`{self.connection_data["project_id"]}.{self.connection_data["dataset"]}.{table_name}`
"""
)

query = " UNION ALL ".join(batch_queries)
queries.append(query)
data_type = column_type_map.get(column)

if supports_minmax(data_type):
# Full statistics for supported types
batch_queries.append(
f"""
SELECT
'{table_name}' AS table_name,
'{column}' AS column_name,
SAFE_DIVIDE(COUNTIF(`{column}` IS NULL), COUNT(*)) * 100 AS null_percentage,
CAST(MIN(`{column}`) AS STRING) AS minimum_value,
CAST(MAX(`{column}`) AS STRING) AS maximum_value,
COUNT(DISTINCT `{column}`) AS distinct_values_count
FROM
`{self.connection_data["project_id"]}.{self.connection_data["dataset"]}.{table_name}`
"""
)
else:
# Limited statistics for complex types (no MIN/MAX/COUNT DISTINCT)
logger.info(f"Skipping MIN/MAX for column {column} with unsupported type: {data_type}")
batch_queries.append(
f"""
SELECT
'{table_name}' AS table_name,
'{column}' AS column_name,
SAFE_DIVIDE(COUNTIF(`{column}` IS NULL), COUNT(*)) * 100 AS null_percentage,
CAST(NULL AS STRING) AS minimum_value,
CAST(NULL AS STRING) AS maximum_value,
CAST(NULL AS INT64) AS distinct_values_count
FROM
`{self.connection_data["project_id"]}.{self.connection_data["dataset"]}.{table_name}`
"""
)

if batch_queries:
query = " UNION ALL ".join(batch_queries)
queries.append(query)

results = []
for query in queries:
Expand All @@ -316,9 +373,13 @@ def chunked(lst, n):
if not results:
logger.warning(f"No column statistics could be retrieved for table {table_name}.")
return Response(
RESPONSE_TYPE.ERROR, error_message=f"No column statistics could be retrieved for table {table_name}."
RESPONSE_TYPE.ERROR,
error_message=f"No column statistics could be retrieved for table {table_name}.",
)
return Response(RESPONSE_TYPE.TABLE, pd.concat(results, ignore_index=True) if results else pd.DataFrame())
return Response(
RESPONSE_TYPE.TABLE,
pd.concat(results, ignore_index=True) if results else pd.DataFrame(),
)

def meta_get_primary_keys(self, table_names: Optional[list] = None) -> Response:
"""
Expand Down
Loading
Loading