Skip to content

Commit 55e056a

Browse files
authored
dbt snowplow incremental fix (#1893)
1 parent 784b177 commit 55e056a

File tree

3 files changed

+56
-24
lines changed

3 files changed

+56
-24
lines changed

test/dbt_integration_tests/dbt-snowplow-web/incremental.sh

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ echo ""
176176
# For non-incremental runs, data is already loaded by setup_embucket.sh
177177
# For incremental runs, we only need to load events_today.csv (done later)
178178
echo "Loading events"
179-
$PYTHON_CMD load_events.py events_yesterday.csv "$DBT_TARGET"
179+
$PYTHON_CMD load_events.py events_yesterday.csv "$DBT_TARGET" drop_schemas true
180180

181181
echo ""
182182
echo "###############################"
@@ -232,7 +232,7 @@ if [ "$is_incremental" == true ]; then
232232
echo "###############################"
233233
echo ""
234234
echo "Loading events_today.csv for incremental run..."
235-
$PYTHON_CMD load_events.py events_today.csv "$DBT_TARGET"
235+
$PYTHON_CMD load_events.py events_today.csv "$DBT_TARGET" drop_schemas false
236236

237237
echo ""
238238
echo "###############################"

test/dbt_integration_tests/dbt-snowplow-web/load_events.py

Lines changed: 51 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,25 @@ def execute_sql_script(conn, script_path, filename=None):
5151
cursor.close()
5252

5353

54+
def drop_schemas_fn(conn):
55+
"""Drop derived, scratch, and snowplow_manifest schemas if they exist."""
56+
cursor = conn.cursor()
57+
schemas_to_drop = [
58+
'public_snowplow_manifest_derived',
59+
'public_snowplow_manifest_scratch',
60+
'public_snowplow_manifest_snowplow_manifest'
61+
]
62+
63+
for schema in schemas_to_drop:
64+
try:
65+
cursor.execute(f"DROP SCHEMA IF EXISTS {schema}")
66+
print(f"✓ Dropped schema: {schema}")
67+
except Exception as e:
68+
print(f"⚠ Warning dropping schema {schema}: {e}")
69+
70+
cursor.close()
71+
72+
5473
def verify_data_load(conn):
5574
"""Verify that data was loaded successfully."""
5675
cursor = conn.cursor()
@@ -104,42 +123,48 @@ def main():
104123
"""Main function to load events data."""
105124
# Parse command line arguments
106125
target = 'embucket' # default
107-
is_incremental = False
108-
run_number = 1
126+
drop_schemas = True # default to dropping schemas
109127
input_file = None
110128

111129
# Simple argument parsing
112130
args = sys.argv[1:]
113-
for i, arg in enumerate(args):
131+
i = 0
132+
while i < len(args):
133+
arg = args[i]
114134
if arg in ['--target', '-t']:
115135
if i + 1 < len(args):
116136
target = args[i + 1]
137+
i += 2
138+
continue
139+
elif arg in ['--drop-schemas', '-d']:
140+
if i + 1 < len(args):
141+
drop_schemas = (args[i + 1].lower() == 'true')
142+
i += 2
143+
continue
144+
elif arg == 'drop_schemas':
145+
if i + 1 < len(args):
146+
drop_schemas = (args[i + 1].lower() == 'true')
147+
i += 2
148+
continue
117149
elif arg in ['snowflake', 'embucket']:
118150
target = arg
119-
elif arg in ['true', 'false']:
120-
is_incremental = (arg == 'true')
121-
elif arg in ['1', '2']:
122-
run_number = int(arg)
123-
elif not arg.startswith('-') and not arg in ['snowflake', 'embucket', 'true', 'false', '1', '2']:
151+
elif arg in ['true', 'false'] and i > 0 and args[i-1] in ['drop_schemas', '--drop-schemas', '-d']:
152+
# This case is handled above, skip
153+
pass
154+
elif not arg.startswith('-') and arg not in ['snowflake', 'embucket', 'true', 'false']:
124155
if input_file is None:
125156
input_file = arg
126157
elif target == 'embucket': # If target is still default, treat second arg as target
127158
target = arg
159+
i += 1
128160

129-
# Determine input file based on incremental flag and run number
161+
# Default input file if not specified
130162
if not input_file:
131-
if is_incremental:
132-
if run_number == 1:
133-
input_file = 'events_yesterday.csv'
134-
print("Incremental run - First run - using events_yesterday.csv")
135-
else: # run_number == 2
136-
input_file = 'events_today.csv'
137-
print("Incremental run - Second run - using events_today.csv")
138-
else:
139-
input_file = 'events_yesterday.csv'
140-
print("First run - using events_yesterday.csv")
163+
input_file = 'events_yesterday.csv'
164+
print(f"No input file specified, using default: {input_file}")
141165

142166
print(f"=== Loading Snowplow Events Data into {target.upper()} Database ===")
167+
print(f"Configuration: target={target}, drop_schemas={drop_schemas}, file={input_file}")
143168

144169
# Configuration
145170
script_dir = Path(__file__).parent
@@ -188,6 +213,13 @@ def main():
188213

189214
print(f"✓ Connected to {target.upper()} successfully")
190215

216+
# Drop schemas if requested
217+
if drop_schemas:
218+
print(f"Dropping schemas...")
219+
drop_schemas_fn(conn)
220+
else:
221+
print(f"Skipping schema drop")
222+
191223
# Execute SQL script
192224
print("Executing SQL script...")
193225
execute_sql_script(conn, sql_script, events_file.name)

test/dbt_integration_tests/dbt-snowplow-web/load_events_data_snowflake.sql

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,9 @@ USE SCHEMA public_snowplow_manifest;
1111

1212
-- Drop existing table if it exists
1313
DROP TABLE IF EXISTS events;
14-
DROP SCHEMA IF EXISTS public_snowplow_manifest_derived;
15-
DROP SCHEMA IF EXISTS public_snowplow_manifest_scratch;
16-
DROP SCHEMA IF EXISTS public_snowplow_manifest_snowplow_manifest;
14+
-- DROP SCHEMA IF EXISTS public_snowplow_manifest_derived;
15+
-- DROP SCHEMA IF EXISTS public_snowplow_manifest_scratch;
16+
-- DROP SCHEMA IF EXISTS public_snowplow_manifest_snowplow_manifest;
1717

1818
-- Step 1: Create the events table with appropriate data types
1919
CREATE TABLE IF NOT EXISTS events (

0 commit comments

Comments
 (0)