From a0b3aff2d0a0aca9844312bd99f822a6808934a1 Mon Sep 17 00:00:00 2001 From: Anush Kumar Date: Mon, 17 Nov 2025 17:33:16 -0800 Subject: [PATCH 1/3] refactor(fivetran): update handling of database and schema names to use quoted identifiers for Snowflake compatibility --- docs/how/updating-datahub.md | 1 + .../docs/sources/fivetran/fivetran_pre.md | 30 ++++ .../source/fivetran/fivetran_query.py | 16 ++- .../integration/fivetran/test_fivetran.py | 136 +++++++++++++++++- 4 files changed, 179 insertions(+), 4 deletions(-) diff --git a/docs/how/updating-datahub.md b/docs/how/updating-datahub.md index 25043e37f5db7..212436ec99635 100644 --- a/docs/how/updating-datahub.md +++ b/docs/how/updating-datahub.md @@ -31,6 +31,7 @@ This file documents any backwards-incompatible changes in DataHub and assists pe ### Breaking Changes +- **Fivetran Source - Database and Schema Identifier Quoting**: The Fivetran ingestion source now uses quoted identifiers for database and schema names following Snowflake's quoted identifier convention.Schema names were already quoted and continue to be quoted (Since v1.2.0.7). This change ensures proper handling of database and schema names containing special characters (hyphens, spaces, dots, etc.) and case-sensitive names. Now that names are quoted, they preserve the case as specified in your configuration. **Action Required**: Ensure your Fivetran source configuration uses the exact case matching your Snowflake database and schema names. If your objects are stored as uppercase in Snowflake, use uppercase in your configuration. This affects all supported destination platforms (Snowflake, BigQuery, Databricks) as queries are transpiled from Snowflake SQL to the target dialect. - #15005: `SqlParsingBuilder` is removed, use `SqlParsingAggregator` instead - #14710: LookML ingestion source migrated to SDKv2 resulting in: - `browsePaths` aspect replaced with `browsePathsV2` diff --git a/metadata-ingestion/docs/sources/fivetran/fivetran_pre.md b/metadata-ingestion/docs/sources/fivetran/fivetran_pre.md index 2e04c9101939c..12b5c181f12a9 100644 --- a/metadata-ingestion/docs/sources/fivetran/fivetran_pre.md +++ b/metadata-ingestion/docs/sources/fivetran/fivetran_pre.md @@ -14,6 +14,35 @@ This source extracts the following: 3. Once initial sync up of your fivetran platform connector is done, you need to provide the fivetran platform connector's destination platform and its configuration in the recipe. 4. We expect our users to enable automatic schema updates (default) in fivetran platform connector configured for DataHub, this ensures latest schema changes are applied and avoids inconsistency data syncs. +### Database and Schema Name Handling + +The Fivetran source uses **quoted identifiers** for database and schema names to properly handle special characters and case-sensitive names. This follows Snowflake's quoted identifier convention, which is then transpiled to the target database dialect (Snowflake, BigQuery, or Databricks). + +**Important Notes:** + +- **Database names** are automatically wrapped in double quotes (e.g., `use database "my-database"`) +- **Schema names** are automatically wrapped in double quotes (e.g., `"my-schema".table_name`) +- This ensures proper handling of database and schema names containing: + - Hyphens (e.g., `my-database`) + - Spaces (e.g., `my database`) + - Special characters (e.g., `my.database`) + - Case-sensitive names (e.g., `MyDatabase`) + +**Migration Impact:** + +- If you have database or schema names with special characters, they will now be properly quoted in SQL queries +- This change ensures consistent behavior across all supported destination platforms +- No configuration changes are required - the quoting is handled automatically + +**Case Sensitivity Considerations:** + +- **Important**: In Snowflake, unquoted identifiers are automatically converted to uppercase when stored and resolved (e.g., `mydatabase` becomes `MYDATABASE`), while double-quoted identifiers preserve the exact case as entered (e.g., `"mydatabase"` stays as `mydatabase`). See [Snowflake's identifier documentation](https://docs.snowflake.com/en/sql-reference/identifiers-syntax#double-quoted-identifiers) for details. +- If your Fivetran log database or schema was previously referenced without quotes in your configuration, it would have been resolved as uppercase. Now that database and schema names are automatically quoted, they will preserve the case as specified in your configuration. +- **Action Required**: Ensure that the database and schema names in your Fivetran source configuration match the exact case of the objects in your Snowflake instance. For example: + - If your database is stored as `MYDATABASE` (uppercase) in Snowflake, use `MYDATABASE` in your configuration + - If your database is stored as `mydatabase` (lowercase) in Snowflake, use `mydatabase` in your configuration + - If you're unsure of the exact case, check your Snowflake instance or use the `SHOW DATABASES` and `SHOW SCHEMAS` commands to see the exact case + ## Concept mapping | Fivetran | Datahub | @@ -57,6 +86,7 @@ create or replace role fivetran_datahub; grant operate, usage on warehouse "" to role fivetran_datahub; // Grant access to view database and schema in which your log and metadata tables exist +// Note: Database and schema names are automatically quoted, so use quoted identifiers if your names contain special characters grant usage on DATABASE "" to role fivetran_datahub; grant usage on SCHEMA ""."" to role fivetran_datahub; diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_query.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_query.py index 89137f7c854af..2a51732f34e76 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_query.py +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_query.py @@ -30,16 +30,28 @@ def __init__(self) -> None: self.schema_clause: str = "" def use_database(self, db_name: str) -> str: - return f"use database {db_name}" + """ + Using Snowflake quoted identifiers convention + Ref: https://docs.snowflake.com/en/sql-reference/identifiers-syntax#double-quoted-identifiers + + Add double quotes around an identifier + """ + db_name = db_name.replace( + '"', '""' + ) # Replace double quotes with two double quotes to use the double quote character inside a quoted identifier + return f'use database "{db_name}"' def set_schema(self, schema_name: str) -> None: """ Using Snowflake quoted identifiers convention + Ref: https://docs.snowflake.com/en/sql-reference/identifiers-syntax#double-quoted-identifiers Add double quotes around an identifier Use two quotes to use the double quote character inside a quoted identifier """ - schema_name = schema_name.replace('"', '""') + schema_name = schema_name.replace( + '"', '""' + ) # Replace double quotes with two double quotes to use the double quote character inside a quoted identifier self.schema_clause = f'"{schema_name}".' def get_connectors_query(self) -> str: diff --git a/metadata-ingestion/tests/integration/fivetran/test_fivetran.py b/metadata-ingestion/tests/integration/fivetran/test_fivetran.py index 4f5a8b7a98a56..867d1d2c9f15e 100644 --- a/metadata-ingestion/tests/integration/fivetran/test_fivetran.py +++ b/metadata-ingestion/tests/integration/fivetran/test_fivetran.py @@ -12,6 +12,7 @@ from datahub.ingestion.source.common.gcp_credentials_config import GCPCredential from datahub.ingestion.source.fivetran.config import ( BigQueryDestinationConfig, + DatabricksDestinationConfig, FivetranLogConfig, FivetranSourceConfig, PlatformDetail, @@ -164,7 +165,7 @@ def default_query_results( def test_quoted_query_transpilation(): - """Test different schema strings and their transpilation to Bigquery""" + """Test different schema strings and their transpilation to Snowflake, BigQuery, and Databricks""" # Ref: https://github.com/datahub-project/datahub/issues/14210 fivetran_log_query = FivetranLogQuery() @@ -225,9 +226,21 @@ def test_quoted_query_transpilation(): ), ) + databricks_dest_config = FivetranLogConfig( + destination_platform="databricks", + databricks_destination_config=DatabricksDestinationConfig( + token="test-token", + workspace_url="https://test-workspace.cloud.databricks.com", + warehouse_id="test-warehouse-id", + catalog="test_catalog", + log_schema="test_schema", + ), + ) + # Create FivetranLogAPI instance snowflake_fivetran_log_api = FivetranLogAPI(snowflake_dest_config) bigquery_fivetran_log_api = FivetranLogAPI(bigquery_dest_config) + databricks_fivetran_log_api = FivetranLogAPI(databricks_dest_config) for schema in schema_name_test_cases: fivetran_log_query.set_schema(schema) @@ -254,9 +267,128 @@ def test_quoted_query_transpilation(): f"but got {num_quotes_in_clause}: {fivetran_log_query.schema_clause!r}" ) - # Make sure transpilation works for both snowflake and bigquery + # Make sure transpilation works for snowflake, bigquery, and databricks + snowflake_fivetran_log_api._query(fivetran_log_query.get_connectors_query()) + bigquery_fivetran_log_api._query(fivetran_log_query.get_connectors_query()) + databricks_fivetran_log_api._query( + fivetran_log_query.get_connectors_query() + ) + + +def test_quoted_database_identifiers(): + """Test different database names and their quoted identifier handling""" + # Ref: https://docs.snowflake.com/en/sql-reference/identifiers-syntax#double-quoted-identifiers + + fivetran_log_query = FivetranLogQuery() + + # Test cases with different database names that might cause issues + database_name_test_cases = [ + "test_database", # Normal case + "test-database", # Hyphen + "test_database_123", # Underscore and numbers + "test.database", # Dot + "test database", # Space + "test'database", # Single quote + 'test"database', # Double quote + "test`database", # Backtick + "test-database-123", # Multiple hyphens + "test_database-123", # Mixed underscore and hyphen + "test-database_123", # Mixed hyphen and underscore + "test.database-123", # Mixed dot and hyphen + "test database 123", # Multiple spaces + "test'database'123", # Multiple quotes + 'test"database"123', # Multiple double quotes + "test`database`123", # Multiple backticks + ] + + with mock.patch( + "datahub.ingestion.source.fivetran.fivetran_log_api.create_engine" + ) as mock_create_engine: + connection_magic_mock = MagicMock() + connection_magic_mock.execute.fetchone.side_effect = ["test-project-id"] + + mock_create_engine.return_value = connection_magic_mock + + snowflake_dest_config = FivetranLogConfig( + destination_platform="snowflake", + snowflake_destination_config=SnowflakeDestinationConfig( + account_id="TESTID", + warehouse="TEST_WH", + username="test", + password="test@123", + database="TEST_DATABASE", + role="TESTROLE", + log_schema="TEST_SCHEMA", + ), + ) + + bigquery_dest_config = FivetranLogConfig( + destination_platform="bigquery", + bigquery_destination_config=BigQueryDestinationConfig( + credential=GCPCredential( + private_key_id="testprivatekey", + project_id="test-project", + client_email="fivetran-connector@test-project.iam.gserviceaccount.com", + client_id="1234567", + private_key="private-key", + ), + dataset="test_dataset", + ), + ) + + databricks_dest_config = FivetranLogConfig( + destination_platform="databricks", + databricks_destination_config=DatabricksDestinationConfig( + token="test-token", + workspace_url="https://test-workspace.cloud.databricks.com", + warehouse_id="test-warehouse-id", + catalog="test_catalog", + log_schema="test_schema", + ), + ) + + # Create FivetranLogAPI instance + snowflake_fivetran_log_api = FivetranLogAPI(snowflake_dest_config) + bigquery_fivetran_log_api = FivetranLogAPI(bigquery_dest_config) + databricks_fivetran_log_api = FivetranLogAPI(databricks_dest_config) + + for db_name in database_name_test_cases: + use_db_query = fivetran_log_query.use_database(db_name) + + # Make sure the database name is wrapped in double quotes + # Example: use database "test_database" + assert use_db_query.startswith('use database "'), ( + f"Missing 'use database \"' at the beginning for database {db_name!r}" + ) + assert use_db_query.endswith('"'), ( + f"Missing double quote at the end for database {db_name!r}" + ) + + # Extract the database name from the query + # Format: use database "db_name" + db_name_in_query = use_db_query[14:-1] # Remove 'use database "' and '"' + + # If the database name has quotes in the string, then the query should have double the number of quotes + # Each quote in database name is escaped as two quotes in query, plus two for the wrapping quotes + num_quotes_in_db_name = db_name.count('"') + num_quotes_in_query = db_name_in_query.count('"') + if num_quotes_in_db_name > 0: + # Each quote in database name is escaped as two quotes in query + assert num_quotes_in_query == num_quotes_in_db_name * 2, ( + f"For database {db_name!r}, expected {num_quotes_in_db_name * 2} quotes in query (escaped), " + f"but got {num_quotes_in_query}: {use_db_query!r}" + ) + + # Set a test schema and verify queries work with the database name + fivetran_log_query.set_schema("test_schema") + # Make sure transpilation works for snowflake, bigquery, and databricks + # Note: use_database returns a query string, it doesn't modify the query object + # So we test that the schema queries still work correctly snowflake_fivetran_log_api._query(fivetran_log_query.get_connectors_query()) bigquery_fivetran_log_api._query(fivetran_log_query.get_connectors_query()) + databricks_fivetran_log_api._query( + fivetran_log_query.get_connectors_query() + ) @freeze_time(FROZEN_TIME) From f3edbecaf9e4167140583fa0b086948c154b3dde Mon Sep 17 00:00:00 2001 From: Anush Kumar Date: Tue, 18 Nov 2025 18:14:22 -0800 Subject: [PATCH 2/3] refactor(fivetran): implement backward compatibility for unquoted identifiers in Snowflake - Added logic to handle unquoted database and schema names by converting them to uppercase quoted identifiers. - Introduced a validation method for unquoted identifiers to ensure compliance with Snowflake's requirements. - Updated tests to cover various cases for database and schema names, ensuring correct handling and transpilation for Snowflake, BigQuery, and Databricks. --- .../source/fivetran/fivetran_log_api.py | 34 +- .../source/fivetran/fivetran_query.py | 32 ++ .../integration/fivetran/test_fivetran.py | 334 ++++++++++++------ .../unit/fivetran/test_fivetran_query.py | 158 +++++++++ 4 files changed, 449 insertions(+), 109 deletions(-) create mode 100644 metadata-ingestion/tests/unit/fivetran/test_fivetran_query.py diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_log_api.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_log_api.py index 6f5530cee2c3d..127eecb7f1162 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_log_api.py +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_log_api.py @@ -50,13 +50,39 @@ def _initialize_fivetran_variables( snowflake_destination_config.get_sql_alchemy_url(), **snowflake_destination_config.get_options(), ) - engine.execute( - fivetran_log_query.use_database( - snowflake_destination_config.database, + + """ + Special Handling for Snowflake Backward Compatibility: + We have migrated to using quoted identifiers for database and schema names. + However, we need to support backward compatibility for existing databases and schemas that were created with unquoted identifiers. + When an unquoted identifier us used, we automatically convert it to uppercase + quoted identifier (this is Snowflake's behavior to resolve the identifier). + unquoted identifier -> uppercase + quoted identifier -> Snowflake resolves the identifier + """ + snowflake_database = ( + snowflake_destination_config.database.upper() + if FivetranLogQuery._is_valid_unquoted_identifier( + snowflake_destination_config.database ) + else snowflake_destination_config.database + ) + logger.info( + f"Using snowflake database: {snowflake_database} (original: {snowflake_destination_config.database})" + ) + engine.execute(fivetran_log_query.use_database(snowflake_database)) + + snowflake_schema = ( + snowflake_destination_config.log_schema.upper() + if FivetranLogQuery._is_valid_unquoted_identifier( + snowflake_destination_config.log_schema + ) + else snowflake_destination_config.log_schema + ) + + logger.info( + f"Using snowflake schema: {snowflake_schema} (original: {snowflake_destination_config.log_schema})" ) fivetran_log_query.set_schema( - snowflake_destination_config.log_schema, + snowflake_schema, ) fivetran_log_database = snowflake_destination_config.database elif destination_platform == "bigquery": diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_query.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_query.py index 2a51732f34e76..3de7a4bae1a84 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_query.py +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_query.py @@ -1,3 +1,4 @@ +import re from typing import List # Safeguards to prevent fetching massive amounts of data. @@ -29,6 +30,37 @@ def __init__(self) -> None: # Select query db clause self.schema_clause: str = "" + @staticmethod + def _is_valid_unquoted_identifier(identifier: str) -> bool: + """ + Check if an identifier can be used unquoted in Snowflake. + + Snowflake unquoted identifiers must: + - Start with a letter (A-Z) or underscore (_) + - Contain only letters, numbers, and underscores + - Be uppercase (Snowflake auto-converts unquoted identifiers to uppercase) + + Ref: https://docs.snowflake.com/en/sql-reference/identifiers-syntax#unquoted-identifiers + """ + if not identifier: + return False + + # Check if it's already quoted (starts and ends with double quotes) + if identifier.startswith('"') and identifier.endswith('"'): + return False + + # Check if it starts with letter or underscore + if not (identifier[0].isalpha() or identifier[0] == "_"): + return False + + # Check if it contains only alphanumeric characters and underscores + if not re.match(r"^[A-Za-z0-9_]+$", identifier): + return False + + # For Snowflake, unquoted identifiers are case-insensitive and auto-converted to uppercase + # This means we have recieved an unquoted identifier, and we can convert it to quoted identifier with uppercase + return True + def use_database(self, db_name: str) -> str: """ Using Snowflake quoted identifiers convention diff --git a/metadata-ingestion/tests/integration/fivetran/test_fivetran.py b/metadata-ingestion/tests/integration/fivetran/test_fivetran.py index 867d1d2c9f15e..1d62472783b5d 100644 --- a/metadata-ingestion/tests/integration/fivetran/test_fivetran.py +++ b/metadata-ingestion/tests/integration/fivetran/test_fivetran.py @@ -51,10 +51,12 @@ def default_query_results( query, connector_query_results=default_connector_query_results ): fivetran_log_query = FivetranLogQuery() - fivetran_log_query.set_schema("test") - if query == fivetran_log_query.use_database("test_database"): + # For Snowflake, valid unquoted identifiers are uppercased + # "test_database" -> "TEST_DATABASE", "test" -> "TEST" + fivetran_log_query.set_schema("TEST") + if query == fivetran_log_query.use_database("TEST_DATABASE"): return [] - elif query == fivetran_log_query.get_connectors_query(): + if query == fivetran_log_query.get_connectors_query(): return connector_query_results elif query == fivetran_log_query.get_table_lineage_query( connector_ids=["calendar_elected", "my_confluent_cloud_connector_id"] @@ -164,33 +166,38 @@ def default_query_results( raise Exception(f"Unknown query {query}") -def test_quoted_query_transpilation(): +# Test cases with different schema names that might cause issues +SCHEMA_NAME_TEST_CASES = [ + "fivetran_logs", # Normal case + "fivetran-logs", # Hyphen + "fivetran_logs_123", # Underscore and numbers + "fivetran.logs", # Dot + "fivetran logs", # Space + "fivetran'logs", # Single quote + 'fivetran"logs', # Double quote + "fivetran`logs", # Backtick + "fivetran-logs-123", # Multiple hyphens + "fivetran_logs-123", # Mixed underscore and hyphen + "fivetran-logs_123", # Mixed hyphen and underscore + "fivetran.logs-123", # Mixed dot and hyphen + "fivetran logs 123", # Multiple spaces + "fivetran'logs'123", # Multiple quotes + 'fivetran"logs"123', # Multiple double quotes + "fivetran`logs`123", # Multiple backticks +] + + +@pytest.mark.parametrize("schema", SCHEMA_NAME_TEST_CASES) +def test_quoted_query_transpilation(schema): """Test different schema strings and their transpilation to Snowflake, BigQuery, and Databricks""" # Ref: https://github.com/datahub-project/datahub/issues/14210 + # Test with Snowflake destination platform to verify unquoted identifier support + fivetran_log_query_snowflake = FivetranLogQuery() + # Test without platform (default behavior - always quote) fivetran_log_query = FivetranLogQuery() fivetran_log_query.use_database("test_database") - # Test cases with different schema names that might cause issues - schema_name_test_cases = [ - "fivetran_logs", # Normal case - "fivetran-logs", # Hyphen - "fivetran_logs_123", # Underscore and numbers - "fivetran.logs", # Dot - "fivetran logs", # Space - "fivetran'logs", # Single quote - 'fivetran"logs', # Double quote - "fivetran`logs", # Backtick - "fivetran-logs-123", # Multiple hyphens - "fivetran_logs-123", # Mixed underscore and hyphen - "fivetran-logs_123", # Mixed hyphen and underscore - "fivetran.logs-123", # Mixed dot and hyphen - "fivetran logs 123", # Multiple spaces - "fivetran'logs'123", # Multiple quotes - 'fivetran"logs"123', # Multiple double quotes - "fivetran`logs`123", # Multiple backticks - ] - with mock.patch( "datahub.ingestion.source.fivetran.fivetran_log_api.create_engine" ) as mock_create_engine: @@ -242,65 +249,95 @@ def test_quoted_query_transpilation(): bigquery_fivetran_log_api = FivetranLogAPI(bigquery_dest_config) databricks_fivetran_log_api = FivetranLogAPI(databricks_dest_config) - for schema in schema_name_test_cases: - fivetran_log_query.set_schema(schema) + # Test with default (always quote) + fivetran_log_query.set_schema(schema) - # Make sure the schema_clause is wrapped in double quotes and ends with "." - # Example: "fivetran". - assert fivetran_log_query.schema_clause[0] == '"', ( - "Missing double quote at the beginning of schema_clause" - ) - assert fivetran_log_query.schema_clause[-2] == '"', ( - "Missing double quote at the end of schema_clause" - ) - assert fivetran_log_query.schema_clause[-1] == ".", ( - "Missing dot at the end of schema_clause" - ) + # Make sure the schema_clause is wrapped in double quotes and ends with "." + # Example: "fivetran". + assert fivetran_log_query.schema_clause[0] == '"', ( + "Missing double quote at the beginning of schema_clause" + ) + assert fivetran_log_query.schema_clause[-2] == '"', ( + "Missing double quote at the end of schema_clause" + ) + assert fivetran_log_query.schema_clause[-1] == ".", ( + "Missing dot at the end of schema_clause" + ) - # If the schema has quotes in the string, then schema_clause should have double the number of quotes + 2 - num_quotes_in_schema = schema.count('"') - num_quotes_in_clause = fivetran_log_query.schema_clause.count('"') - if num_quotes_in_schema > 0: - # Each quote in schema is escaped as two quotes in clause, plus two for the wrapping quotes - assert num_quotes_in_clause == num_quotes_in_schema * 2 + 2, ( - f"For schema {schema!r}, expected {num_quotes_in_schema * 2 + 2} quotes in schema_clause, " - f"but got {num_quotes_in_clause}: {fivetran_log_query.schema_clause!r}" - ) - - # Make sure transpilation works for snowflake, bigquery, and databricks - snowflake_fivetran_log_api._query(fivetran_log_query.get_connectors_query()) - bigquery_fivetran_log_api._query(fivetran_log_query.get_connectors_query()) - databricks_fivetran_log_api._query( - fivetran_log_query.get_connectors_query() + # If the schema has quotes in the string, then schema_clause should have double the number of quotes + 2 + num_quotes_in_schema = schema.count('"') + num_quotes_in_clause = fivetran_log_query.schema_clause.count('"') + if num_quotes_in_schema > 0: + # Each quote in schema is escaped as two quotes in clause, plus two for the wrapping quotes + assert num_quotes_in_clause == num_quotes_in_schema * 2 + 2, ( + f"For schema {schema!r}, expected {num_quotes_in_schema * 2 + 2} quotes in schema_clause, " + f"but got {num_quotes_in_clause}: {fivetran_log_query.schema_clause!r}" ) + # Test with Snowflake platform - valid unquoted identifiers get uppercased before quoting + # Simulate the preprocessing that happens in fivetran_log_api.py + is_valid_unquoted = FivetranLogQuery._is_valid_unquoted_identifier(schema) + processed_schema = schema.upper() if is_valid_unquoted else schema + fivetran_log_query_snowflake.set_schema(processed_schema) -def test_quoted_database_identifiers(): + # All identifiers should be quoted (after preprocessing) + assert fivetran_log_query_snowflake.schema_clause[0] == '"', ( + f"Expected quoted identifier for Snowflake schema {schema!r}, but got: {fivetran_log_query_snowflake.schema_clause!r}" + ) + assert fivetran_log_query_snowflake.schema_clause[-2] == '"', ( + f"Missing double quote at the end for Snowflake schema {schema!r}" + ) + assert fivetran_log_query_snowflake.schema_clause[-1] == ".", ( + f"Missing dot at the end for Snowflake schema {schema!r}" + ) + + # Verify the processed schema is in the clause + # Extract the schema name from the clause (remove quotes and dot) + schema_in_clause = fivetran_log_query_snowflake.schema_clause[1:-2] + # The schema in the clause will have quotes escaped (doubled), so we need to escape the processed schema for comparison + expected_escaped_schema = processed_schema.replace('"', '""') + assert schema_in_clause == expected_escaped_schema, ( + f"For schema {schema!r}, expected processed schema {processed_schema!r} (escaped: {expected_escaped_schema!r}) in clause, " + f"but got {schema_in_clause!r}" + ) + + # Make sure transpilation works for snowflake, bigquery, and databricks + snowflake_fivetran_log_api._query(fivetran_log_query.get_connectors_query()) + bigquery_fivetran_log_api._query(fivetran_log_query.get_connectors_query()) + databricks_fivetran_log_api._query(fivetran_log_query.get_connectors_query()) + + +# Test cases with different database names that might cause issues +DATABASE_NAME_TEST_CASES = [ + "test_database", # Normal case + "test-database", # Hyphen + "test_database_123", # Underscore and numbers + "test.database", # Dot + "test database", # Space + "test'database", # Single quote + 'test"database', # Double quote + "test`database", # Backtick + "test-database-123", # Multiple hyphens + "test_database-123", # Mixed underscore and hyphen + "test-database_123", # Mixed hyphen and underscore + "test.database-123", # Mixed dot and hyphen + "test database 123", # Multiple spaces + "test'database'123", # Multiple quotes + 'test"database"123', # Multiple double quotes + "test`database`123", # Multiple backticks +] + + +@pytest.mark.parametrize("db_name", DATABASE_NAME_TEST_CASES) +def test_quoted_database_identifiers(db_name): """Test different database names and their quoted identifier handling""" # Ref: https://docs.snowflake.com/en/sql-reference/identifiers-syntax#double-quoted-identifiers + # Test with Snowflake destination platform to verify unquoted identifier support + fivetran_log_query_snowflake = FivetranLogQuery() + # Test without platform (default behavior - always quote) fivetran_log_query = FivetranLogQuery() - # Test cases with different database names that might cause issues - database_name_test_cases = [ - "test_database", # Normal case - "test-database", # Hyphen - "test_database_123", # Underscore and numbers - "test.database", # Dot - "test database", # Space - "test'database", # Single quote - 'test"database', # Double quote - "test`database", # Backtick - "test-database-123", # Multiple hyphens - "test_database-123", # Mixed underscore and hyphen - "test-database_123", # Mixed hyphen and underscore - "test.database-123", # Mixed dot and hyphen - "test database 123", # Multiple spaces - "test'database'123", # Multiple quotes - 'test"database"123', # Multiple double quotes - "test`database`123", # Multiple backticks - ] - with mock.patch( "datahub.ingestion.source.fivetran.fivetran_log_api.create_engine" ) as mock_create_engine: @@ -352,42 +389,129 @@ def test_quoted_database_identifiers(): bigquery_fivetran_log_api = FivetranLogAPI(bigquery_dest_config) databricks_fivetran_log_api = FivetranLogAPI(databricks_dest_config) - for db_name in database_name_test_cases: - use_db_query = fivetran_log_query.use_database(db_name) + # Test with default (always quote) + use_db_query = fivetran_log_query.use_database(db_name) + + # Make sure the database name is wrapped in double quotes + # Example: use database "test_database" + assert use_db_query.startswith('use database "'), ( + f"Missing 'use database \"' at the beginning for database {db_name!r}" + ) + assert use_db_query.endswith('"'), ( + f"Missing double quote at the end for database {db_name!r}" + ) - # Make sure the database name is wrapped in double quotes - # Example: use database "test_database" - assert use_db_query.startswith('use database "'), ( - f"Missing 'use database \"' at the beginning for database {db_name!r}" + # Extract the database name from the query + # Format: use database "db_name" + db_name_in_query = use_db_query[14:-1] # Remove 'use database "' and '"' + + # If the database name has quotes in the string, then the query should have double the number of quotes + # Each quote in database name is escaped as two quotes in query, plus two for the wrapping quotes + num_quotes_in_db_name = db_name.count('"') + num_quotes_in_query = db_name_in_query.count('"') + if num_quotes_in_db_name > 0: + # Each quote in database name is escaped as two quotes in query + assert num_quotes_in_query == num_quotes_in_db_name * 2, ( + f"For database {db_name!r}, expected {num_quotes_in_db_name * 2} quotes in query (escaped), " + f"but got {num_quotes_in_query}: {use_db_query!r}" ) - assert use_db_query.endswith('"'), ( - f"Missing double quote at the end for database {db_name!r}" + + # Test with Snowflake platform - valid unquoted identifiers get uppercased before quoting + # Simulate the preprocessing that happens in fivetran_log_api.py + is_valid_unquoted = FivetranLogQuery._is_valid_unquoted_identifier(db_name) + processed_db_name = db_name.upper() if is_valid_unquoted else db_name + use_db_query_snowflake = fivetran_log_query_snowflake.use_database( + processed_db_name + ) + + # All identifiers should be quoted (after preprocessing) + assert use_db_query_snowflake.startswith('use database "'), ( + f"Expected quoted identifier for Snowflake database {db_name!r}, but got: {use_db_query_snowflake!r}" + ) + assert use_db_query_snowflake.endswith('"'), ( + f"Missing double quote at the end for Snowflake database {db_name!r}" + ) + + # Verify the processed database name is in the query + # Extract the database name from the query (remove 'use database "' and '"') + db_name_in_query = use_db_query_snowflake[14:-1] + # The database name in the query will have quotes escaped (doubled), so we need to escape the processed name for comparison + expected_escaped_db_name = processed_db_name.replace('"', '""') + assert db_name_in_query == expected_escaped_db_name, ( + f"For database {db_name!r}, expected processed name {processed_db_name!r} (escaped: {expected_escaped_db_name!r}) in query, " + f"but got {db_name_in_query!r}" + ) + + # Set a test schema and verify queries work with the database name + fivetran_log_query.set_schema("test_schema") + # Make sure transpilation works for snowflake, bigquery, and databricks + # Note: use_database returns a query string, it doesn't modify the query object + # So we test that the schema queries still work correctly + snowflake_fivetran_log_api._query(fivetran_log_query.get_connectors_query()) + bigquery_fivetran_log_api._query(fivetran_log_query.get_connectors_query()) + databricks_fivetran_log_api._query(fivetran_log_query.get_connectors_query()) + + +def test_snowflake_unquoted_identifier_uppercase_conversion(): + """Test that valid unquoted identifiers are uppercased for Snowflake backward compatibility""" + from datahub.ingestion.source.fivetran.fivetran_query import FivetranLogQuery + + fivetran_log_query = FivetranLogQuery() + + # Test cases: valid unquoted identifiers (should be uppercased) + valid_unquoted_cases = [ + "test_database", # lowercase, valid + "my_schema", # lowercase, valid + "TEST_DB", # already uppercase, valid + "schema_123", # lowercase with numbers, valid + ] + + # Test cases: invalid unquoted identifiers (should stay as-is) + invalid_unquoted_cases = [ + "test-database", # has hyphen + "test.database", # has dot + "test database", # has space + '"quoted_db"', # already quoted + "test'db", # has single quote + ] + + for identifier in valid_unquoted_cases: + # Simulate preprocessing in fivetran_log_api.py + is_valid = FivetranLogQuery._is_valid_unquoted_identifier(identifier) + assert is_valid, f"Expected {identifier!r} to be a valid unquoted identifier" + + processed = identifier.upper() if is_valid else identifier + assert processed == identifier.upper(), ( + f"Expected {identifier!r} to be uppercased to {identifier.upper()!r}, " + f"but got {processed!r}" + ) + + # Verify it gets quoted + use_db_query = fivetran_log_query.use_database(processed) + assert use_db_query == f'use database "{processed}"', ( + f"Expected quoted uppercase identifier, got: {use_db_query!r}" + ) + + for identifier in invalid_unquoted_cases: + # Simulate preprocessing in fivetran_log_api.py + is_valid = FivetranLogQuery._is_valid_unquoted_identifier(identifier) + assert not is_valid, ( + f"Expected {identifier!r} to be an invalid unquoted identifier" + ) + + processed = identifier.upper() if is_valid else identifier + assert processed == identifier, ( + f"Expected {identifier!r} to stay as-is, but got {processed!r}" ) - # Extract the database name from the query - # Format: use database "db_name" - db_name_in_query = use_db_query[14:-1] # Remove 'use database "' and '"' - - # If the database name has quotes in the string, then the query should have double the number of quotes - # Each quote in database name is escaped as two quotes in query, plus two for the wrapping quotes - num_quotes_in_db_name = db_name.count('"') - num_quotes_in_query = db_name_in_query.count('"') - if num_quotes_in_db_name > 0: - # Each quote in database name is escaped as two quotes in query - assert num_quotes_in_query == num_quotes_in_db_name * 2, ( - f"For database {db_name!r}, expected {num_quotes_in_db_name * 2} quotes in query (escaped), " - f"but got {num_quotes_in_query}: {use_db_query!r}" - ) - - # Set a test schema and verify queries work with the database name - fivetran_log_query.set_schema("test_schema") - # Make sure transpilation works for snowflake, bigquery, and databricks - # Note: use_database returns a query string, it doesn't modify the query object - # So we test that the schema queries still work correctly - snowflake_fivetran_log_api._query(fivetran_log_query.get_connectors_query()) - bigquery_fivetran_log_api._query(fivetran_log_query.get_connectors_query()) - databricks_fivetran_log_api._query( - fivetran_log_query.get_connectors_query() + # Verify it gets quoted as-is + # Note: If the identifier is already quoted (starts and ends with quotes), + # the quotes inside will be escaped when we quote it again + use_db_query = fivetran_log_query.use_database(processed) + # Escape quotes in the processed identifier for comparison + expected_escaped = processed.replace('"', '""') + assert use_db_query == f'use database "{expected_escaped}"', ( + f"Expected quoted identifier as-is (escaped: {expected_escaped!r}), got: {use_db_query!r}" ) diff --git a/metadata-ingestion/tests/unit/fivetran/test_fivetran_query.py b/metadata-ingestion/tests/unit/fivetran/test_fivetran_query.py new file mode 100644 index 0000000000000..e3d211b53f7f6 --- /dev/null +++ b/metadata-ingestion/tests/unit/fivetran/test_fivetran_query.py @@ -0,0 +1,158 @@ +from datahub.ingestion.source.fivetran.fivetran_query import FivetranLogQuery + + +class TestFivetranLogQuery: + """Unit tests for FivetranLogQuery class.""" + + def test_is_valid_unquoted_identifier_valid_cases(self): + """Test _is_valid_unquoted_identifier with valid identifiers.""" + valid_identifiers = [ + "test_database", # lowercase with underscore + "TEST_DATABASE", # uppercase with underscore + "my_schema", # lowercase + "schema_123", # lowercase with numbers + "SCHEMA_123", # uppercase with numbers + "a", # single letter + "_private", # starts with underscore + "_123", # starts with underscore, has numbers + "table_name_123", # mixed case with underscore and numbers + "A1B2C3", # alphanumeric + ] + + for identifier in valid_identifiers: + assert FivetranLogQuery._is_valid_unquoted_identifier(identifier), ( + f"Expected {identifier!r} to be a valid unquoted identifier" + ) + + def test_is_valid_unquoted_identifier_invalid_empty(self): + """Test _is_valid_unquoted_identifier with empty string.""" + assert not FivetranLogQuery._is_valid_unquoted_identifier("") + + def test_is_valid_unquoted_identifier_invalid_already_quoted(self): + """Test _is_valid_unquoted_identifier with already quoted identifiers.""" + invalid_quoted = [ + '"test_database"', # fully quoted + '"test"', # single word quoted + '""', # empty quoted + '"test_database"extra', # quoted with extra text + 'extra"test_database"', # extra text before quoted + ] + + for identifier in invalid_quoted: + assert not FivetranLogQuery._is_valid_unquoted_identifier(identifier), ( + f"Expected {identifier!r} to be invalid (already quoted)" + ) + + def test_is_valid_unquoted_identifier_invalid_special_characters(self): + """Test _is_valid_unquoted_identifier with special characters.""" + invalid_special = [ + "test-database", # hyphen + "test.database", # dot + "test database", # space + "test'database", # single quote + 'test"database', # double quote (not fully quoted) + "test`database", # backtick + "test@database", # at symbol + "test#database", # hash + "test$database", # dollar + "test%database", # percent + "test&database", # ampersand + "test*database", # asterisk + "test+database", # plus + "test=database", # equals + "test[database", # bracket + "test]database", # bracket + "test{database", # brace + "test}database", # brace + "test|database", # pipe + "test\\database", # backslash + "test:database", # colon + "test;database", # semicolon + "testdatabase", # greater than + "test?database", # question mark + "test/database", # slash + "test,database", # comma + ] + + for identifier in invalid_special: + assert not FivetranLogQuery._is_valid_unquoted_identifier(identifier), ( + f"Expected {identifier!r} to be invalid (contains special characters)" + ) + + def test_is_valid_unquoted_identifier_invalid_starts_with_number(self): + """Test _is_valid_unquoted_identifier with identifiers starting with numbers.""" + invalid_starts_with_number = [ + "123database", # starts with number + "0test", # starts with zero + "9schema", # starts with nine + ] + + for identifier in invalid_starts_with_number: + assert not FivetranLogQuery._is_valid_unquoted_identifier(identifier), ( + f"Expected {identifier!r} to be invalid (starts with number)" + ) + + def test_is_valid_unquoted_identifier_valid_with_numbers(self): + """Test _is_valid_unquoted_identifier with valid identifiers containing numbers.""" + valid_with_numbers = [ + "test123", # ends with numbers + "test_123", # underscore and numbers + "a1b2c3", # mixed letters and numbers + "schema1", # number at end + "table_2024", # underscore and year + ] + + for identifier in valid_with_numbers: + assert FivetranLogQuery._is_valid_unquoted_identifier(identifier), ( + f"Expected {identifier!r} to be valid (contains numbers but starts with letter/underscore)" + ) + + def test_is_valid_unquoted_identifier_valid_starts_with_underscore(self): + """Test _is_valid_unquoted_identifier with identifiers starting with underscore.""" + valid_underscore_start = [ + "_test", # underscore then letters + "_123", # underscore then numbers + "__double", # double underscore + "_test_database", # underscore, letters, underscore + ] + + for identifier in valid_underscore_start: + assert FivetranLogQuery._is_valid_unquoted_identifier(identifier), ( + f"Expected {identifier!r} to be valid (starts with underscore)" + ) + + def test_is_valid_unquoted_identifier_case_insensitive(self): + """Test _is_valid_unquoted_identifier is case-insensitive for valid identifiers.""" + # All these should be valid regardless of case + case_variants = [ + "test_database", + "TEST_DATABASE", + "Test_Database", + "tEsT_dAtAbAsE", + ] + + for identifier in case_variants: + assert FivetranLogQuery._is_valid_unquoted_identifier(identifier), ( + f"Expected {identifier!r} to be valid (case should not matter)" + ) + + def test_is_valid_unquoted_identifier_edge_cases(self): + """Test _is_valid_unquoted_identifier with edge cases.""" + # Single character valid cases + assert FivetranLogQuery._is_valid_unquoted_identifier("a") + assert FivetranLogQuery._is_valid_unquoted_identifier("A") + assert FivetranLogQuery._is_valid_unquoted_identifier("_") + + # Single character invalid cases + assert not FivetranLogQuery._is_valid_unquoted_identifier("1") + assert not FivetranLogQuery._is_valid_unquoted_identifier("-") + assert not FivetranLogQuery._is_valid_unquoted_identifier(".") + + # Very long valid identifier + long_valid = "a" + "_" * 100 + "b" + assert FivetranLogQuery._is_valid_unquoted_identifier(long_valid) + + # Unicode characters (should be invalid) + assert not FivetranLogQuery._is_valid_unquoted_identifier("test_数据库") + assert not FivetranLogQuery._is_valid_unquoted_identifier("test_ñ") From 5099ecf09066bb792fc6588a92befa0e3590889e Mon Sep 17 00:00:00 2001 From: Anush Kumar Date: Tue, 18 Nov 2025 18:27:35 -0800 Subject: [PATCH 3/3] docs(fivetran): update documentation for quoted identifiers and case sensitivity in Snowflake - Removed outdated information regarding unquoted identifiers and emphasized the importance of using quoted identifiers for database and schema names. - Added details on backward compatibility for valid unquoted identifiers and recommended best practices for configuration. - Clarified action required for users to ensure case sensitivity in their Fivetran source configurations. --- docs/how/updating-datahub.md | 1 - metadata-ingestion/docs/sources/fivetran/fivetran_pre.md | 9 ++++----- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/docs/how/updating-datahub.md b/docs/how/updating-datahub.md index 212436ec99635..25043e37f5db7 100644 --- a/docs/how/updating-datahub.md +++ b/docs/how/updating-datahub.md @@ -31,7 +31,6 @@ This file documents any backwards-incompatible changes in DataHub and assists pe ### Breaking Changes -- **Fivetran Source - Database and Schema Identifier Quoting**: The Fivetran ingestion source now uses quoted identifiers for database and schema names following Snowflake's quoted identifier convention.Schema names were already quoted and continue to be quoted (Since v1.2.0.7). This change ensures proper handling of database and schema names containing special characters (hyphens, spaces, dots, etc.) and case-sensitive names. Now that names are quoted, they preserve the case as specified in your configuration. **Action Required**: Ensure your Fivetran source configuration uses the exact case matching your Snowflake database and schema names. If your objects are stored as uppercase in Snowflake, use uppercase in your configuration. This affects all supported destination platforms (Snowflake, BigQuery, Databricks) as queries are transpiled from Snowflake SQL to the target dialect. - #15005: `SqlParsingBuilder` is removed, use `SqlParsingAggregator` instead - #14710: LookML ingestion source migrated to SDKv2 resulting in: - `browsePaths` aspect replaced with `browsePathsV2` diff --git a/metadata-ingestion/docs/sources/fivetran/fivetran_pre.md b/metadata-ingestion/docs/sources/fivetran/fivetran_pre.md index 12b5c181f12a9..2afdf40ad2923 100644 --- a/metadata-ingestion/docs/sources/fivetran/fivetran_pre.md +++ b/metadata-ingestion/docs/sources/fivetran/fivetran_pre.md @@ -37,11 +37,10 @@ The Fivetran source uses **quoted identifiers** for database and schema names to **Case Sensitivity Considerations:** - **Important**: In Snowflake, unquoted identifiers are automatically converted to uppercase when stored and resolved (e.g., `mydatabase` becomes `MYDATABASE`), while double-quoted identifiers preserve the exact case as entered (e.g., `"mydatabase"` stays as `mydatabase`). See [Snowflake's identifier documentation](https://docs.snowflake.com/en/sql-reference/identifiers-syntax#double-quoted-identifiers) for details. -- If your Fivetran log database or schema was previously referenced without quotes in your configuration, it would have been resolved as uppercase. Now that database and schema names are automatically quoted, they will preserve the case as specified in your configuration. -- **Action Required**: Ensure that the database and schema names in your Fivetran source configuration match the exact case of the objects in your Snowflake instance. For example: - - If your database is stored as `MYDATABASE` (uppercase) in Snowflake, use `MYDATABASE` in your configuration - - If your database is stored as `mydatabase` (lowercase) in Snowflake, use `mydatabase` in your configuration - - If you're unsure of the exact case, check your Snowflake instance or use the `SHOW DATABASES` and `SHOW SCHEMAS` commands to see the exact case +- **Backward Compatibility**: The system automatically handles backward compatibility for valid unquoted identifiers (identifiers containing only letters, numbers, and underscores). These identifiers are automatically uppercased before quoting to match Snowflake's behavior for unquoted identifiers. This means: + - If your database/schema name is a valid unquoted identifier (e.g., `fivetran_logs`, `MY_SCHEMA`), it will be automatically uppercased to match existing Snowflake objects created without quotes + - No configuration changes are required for standard identifiers (letters, numbers, underscores only) +- **Recommended**: For best practices and to ensure consistency, maintain the exact case of your database and schema names in your configuration to match what's stored in Snowflake ## Concept mapping