Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions test/dbt_integration_tests/dbt-snowplow-web/incremental.sh
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ echo ""
# For non-incremental runs, data is already loaded by setup_embucket.sh
# For incremental runs, we only need to load events_today.csv (done later)
echo "Loading events"
$PYTHON_CMD load_events.py events_yesterday.csv "$DBT_TARGET"
$PYTHON_CMD load_events.py events_yesterday.csv "$DBT_TARGET" drop_schemas true

echo ""
echo "###############################"
Expand Down Expand Up @@ -232,7 +232,7 @@ if [ "$is_incremental" == true ]; then
echo "###############################"
echo ""
echo "Loading events_today.csv for incremental run..."
$PYTHON_CMD load_events.py events_today.csv "$DBT_TARGET"
$PYTHON_CMD load_events.py events_today.csv "$DBT_TARGET" drop_schemas false

echo ""
echo "###############################"
Expand Down
70 changes: 51 additions & 19 deletions test/dbt_integration_tests/dbt-snowplow-web/load_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,25 @@ def execute_sql_script(conn, script_path, filename=None):
cursor.close()


def drop_schemas_fn(conn):
"""Drop derived, scratch, and snowplow_manifest schemas if they exist."""
cursor = conn.cursor()
schemas_to_drop = [
'public_snowplow_manifest_derived',
'public_snowplow_manifest_scratch',
'public_snowplow_manifest_snowplow_manifest'
]

for schema in schemas_to_drop:
try:
cursor.execute(f"DROP SCHEMA IF EXISTS {schema}")
print(f"✓ Dropped schema: {schema}")
except Exception as e:
print(f"⚠ Warning dropping schema {schema}: {e}")

cursor.close()


def verify_data_load(conn):
"""Verify that data was loaded successfully."""
cursor = conn.cursor()
Expand Down Expand Up @@ -104,42 +123,48 @@ def main():
"""Main function to load events data."""
# Parse command line arguments
target = 'embucket' # default
is_incremental = False
run_number = 1
drop_schemas = True # default to dropping schemas
input_file = None

# Simple argument parsing
args = sys.argv[1:]
for i, arg in enumerate(args):
i = 0
while i < len(args):
arg = args[i]
if arg in ['--target', '-t']:
if i + 1 < len(args):
target = args[i + 1]
i += 2
continue
elif arg in ['--drop-schemas', '-d']:
if i + 1 < len(args):
drop_schemas = (args[i + 1].lower() == 'true')
i += 2
continue
elif arg == 'drop_schemas':
if i + 1 < len(args):
drop_schemas = (args[i + 1].lower() == 'true')
i += 2
continue
elif arg in ['snowflake', 'embucket']:
target = arg
elif arg in ['true', 'false']:
is_incremental = (arg == 'true')
elif arg in ['1', '2']:
run_number = int(arg)
elif not arg.startswith('-') and not arg in ['snowflake', 'embucket', 'true', 'false', '1', '2']:
elif arg in ['true', 'false'] and i > 0 and args[i-1] in ['drop_schemas', '--drop-schemas', '-d']:
# This case is handled above, skip
pass
elif not arg.startswith('-') and arg not in ['snowflake', 'embucket', 'true', 'false']:
if input_file is None:
input_file = arg
elif target == 'embucket': # If target is still default, treat second arg as target
target = arg
i += 1

# Determine input file based on incremental flag and run number
# Default input file if not specified
if not input_file:
if is_incremental:
if run_number == 1:
input_file = 'events_yesterday.csv'
print("Incremental run - First run - using events_yesterday.csv")
else: # run_number == 2
input_file = 'events_today.csv'
print("Incremental run - Second run - using events_today.csv")
else:
input_file = 'events_yesterday.csv'
print("First run - using events_yesterday.csv")
input_file = 'events_yesterday.csv'
print(f"No input file specified, using default: {input_file}")

print(f"=== Loading Snowplow Events Data into {target.upper()} Database ===")
print(f"Configuration: target={target}, drop_schemas={drop_schemas}, file={input_file}")

# Configuration
script_dir = Path(__file__).parent
Expand Down Expand Up @@ -188,6 +213,13 @@ def main():

print(f"✓ Connected to {target.upper()} successfully")

# Drop schemas if requested
if drop_schemas:
print(f"Dropping schemas...")
drop_schemas_fn(conn)
else:
print(f"Skipping schema drop")

# Execute SQL script
print("Executing SQL script...")
execute_sql_script(conn, sql_script, events_file.name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ USE SCHEMA public_snowplow_manifest;

-- Drop existing table if it exists
DROP TABLE IF EXISTS events;
DROP SCHEMA IF EXISTS public_snowplow_manifest_derived;
DROP SCHEMA IF EXISTS public_snowplow_manifest_scratch;
DROP SCHEMA IF EXISTS public_snowplow_manifest_snowplow_manifest;
-- DROP SCHEMA IF EXISTS public_snowplow_manifest_derived;
-- DROP SCHEMA IF EXISTS public_snowplow_manifest_scratch;
-- DROP SCHEMA IF EXISTS public_snowplow_manifest_snowplow_manifest;

-- Step 1: Create the events table with appropriate data types
CREATE TABLE IF NOT EXISTS events (
Expand Down