14
14
from awswrangler import _data_types , _utils , catalog , exceptions , s3
15
15
from awswrangler ._config import apply_configs
16
16
from awswrangler .athena ._executions import wait_query
17
- from awswrangler .athena ._utils import (
18
- _get_workgroup_config ,
19
- _start_query_execution ,
20
- _WorkGroupConfig ,
21
- _MergeClause
22
- )
17
+ from awswrangler .athena ._utils import _get_workgroup_config , _start_query_execution , _WorkGroupConfig , _MergeClause
23
18
from awswrangler .typing import GlueTableSettings
24
19
25
20
_logger : logging .Logger = logging .getLogger (__name__ )
@@ -232,59 +227,53 @@ def _validate_args(
232
227
raise exceptions .InvalidArgumentCombination (
233
228
"Either path or workgroup path must be specified to store the temporary results."
234
229
)
235
-
230
+
236
231
if merge_cols and merge_on_clause :
237
- raise exceptions .InvalidArgumentCombination (
238
- "Cannot specify both merge_cols and merge_on_clause. Use either merge_cols for simple equality matching or merge_on_clause for custom logic."
239
- )
240
-
232
+ raise exceptions .InvalidArgumentCombination (
233
+ "Cannot specify both merge_cols and merge_on_clause. Use either merge_cols for simple equality matching or merge_on_clause for custom logic."
234
+ )
235
+
241
236
if merge_on_clause and merge_match_nulls :
242
- raise exceptions .InvalidArgumentCombination (
243
- "merge_match_nulls can only be used together with merge_cols."
244
- )
245
-
237
+ raise exceptions .InvalidArgumentCombination ("merge_match_nulls can only be used together with merge_cols." )
238
+
246
239
if merge_conditional_clauses and merge_condition != "conditional_merge" :
247
- raise exceptions .InvalidArgumentCombination (
248
- "merge_conditional_clauses can only be used when merge_condition is 'conditional_merge'."
249
- )
250
-
240
+ raise exceptions .InvalidArgumentCombination (
241
+ "merge_conditional_clauses can only be used when merge_condition is 'conditional_merge'."
242
+ )
243
+
251
244
if (merge_cols or merge_on_clause ) and merge_condition not in ["update" , "ignore" , "conditional_merge" ]:
252
245
raise exceptions .InvalidArgumentValue (
253
246
f"Invalid merge_condition: { merge_condition } . Valid values: ['update', 'ignore', 'conditional_merge']"
254
247
)
255
-
248
+
256
249
if merge_condition == "conditional_merge" :
257
- if not merge_conditional_clauses :
258
- raise exceptions .InvalidArgumentCombination (
259
- "merge_conditional_clauses must be provided when merge_condition is 'conditional_merge'."
260
- )
261
-
262
- seen_not_matched = False
263
- for i , clause in enumerate (merge_conditional_clauses ):
264
- if "when" not in clause :
265
- raise exceptions .InvalidArgumentValue (
266
- f"merge_conditional_clauses[{ i } ] must contain 'when' field."
267
- )
268
- if "action" not in clause :
269
- raise exceptions .InvalidArgumentValue (
270
- f"merge_conditional_clauses[{ i } ] must contain 'action' field."
271
- )
272
- if clause ["when" ] not in ['MATCHED' , 'NOT MATCHED' , 'NOT MATCHED BY SOURCE' ]:
273
- raise exceptions .InvalidArgumentValue (
274
- f"merge_conditional_clauses[{ i } ]['when'] must be one of ['MATCHED', 'NOT MATCHED', 'NOT MATCHED BY SOURCE']."
275
- )
276
- if clause ["action" ] not in ["UPDATE" , "DELETE" , "INSERT" ]:
277
- raise exceptions .InvalidArgumentValue (
278
- f"merge_conditional_clauses[{ i } ]['action'] must be one of ['UPDATE', 'DELETE', 'INSERT']."
279
- )
280
-
281
- if clause ["when" ] in ["NOT MATCHED" , "NOT MATCHED BY SOURCE" ]:
282
- seen_not_matched = True
283
- elif clause ["when" ] == "MATCHED" and seen_not_matched :
284
- raise exceptions .InvalidArgumentValue (
285
- f"merge_conditional_clauses[{ i } ]['when'] is MATCHED but appears after a NOT MATCHED clause. "
286
- "WHEN MATCHED must come before WHEN NOT MATCHED or WHEN NOT MATCHED BY SOURCE."
287
- )
250
+ if not merge_conditional_clauses :
251
+ raise exceptions .InvalidArgumentCombination (
252
+ "merge_conditional_clauses must be provided when merge_condition is 'conditional_merge'."
253
+ )
254
+
255
+ seen_not_matched = False
256
+ for i , clause in enumerate (merge_conditional_clauses ):
257
+ if "when" not in clause :
258
+ raise exceptions .InvalidArgumentValue (f"merge_conditional_clauses[{ i } ] must contain 'when' field." )
259
+ if "action" not in clause :
260
+ raise exceptions .InvalidArgumentValue (f"merge_conditional_clauses[{ i } ] must contain 'action' field." )
261
+ if clause ["when" ] not in ["MATCHED" , "NOT MATCHED" , "NOT MATCHED BY SOURCE" ]:
262
+ raise exceptions .InvalidArgumentValue (
263
+ f"merge_conditional_clauses[{ i } ]['when'] must be one of ['MATCHED', 'NOT MATCHED', 'NOT MATCHED BY SOURCE']."
264
+ )
265
+ if clause ["action" ] not in ["UPDATE" , "DELETE" , "INSERT" ]:
266
+ raise exceptions .InvalidArgumentValue (
267
+ f"merge_conditional_clauses[{ i } ]['action'] must be one of ['UPDATE', 'DELETE', 'INSERT']."
268
+ )
269
+
270
+ if clause ["when" ] in ["NOT MATCHED" , "NOT MATCHED BY SOURCE" ]:
271
+ seen_not_matched = True
272
+ elif clause ["when" ] == "MATCHED" and seen_not_matched :
273
+ raise exceptions .InvalidArgumentValue (
274
+ f"merge_conditional_clauses[{ i } ]['when'] is MATCHED but appears after a NOT MATCHED clause. "
275
+ "WHEN MATCHED must come before WHEN NOT MATCHED or WHEN NOT MATCHED BY SOURCE."
276
+ )
288
277
289
278
if mode == "overwrite_partitions" :
290
279
if not partition_cols :
@@ -296,6 +285,7 @@ def _validate_args(
296
285
"When mode is 'overwrite_partitions' merge_cols must not be specified."
297
286
)
298
287
288
+
299
289
def _merge_iceberg (
300
290
df : pd .DataFrame ,
301
291
database : str ,
@@ -387,55 +377,55 @@ def _merge_iceberg(
387
377
388
378
# Build WHEN clauses based on merge_condition
389
379
when_clauses = []
390
-
380
+
391
381
if merge_condition == "update" :
392
382
when_clauses .append (f"""WHEN MATCHED THEN
393
383
UPDATE SET { ", " .join ([f'"{ x } " = source."{ x } "' for x in df .columns ])} """ )
394
384
when_clauses .append (f"""WHEN NOT MATCHED THEN
395
385
INSERT ({ ", " .join ([f'"{ x } "' for x in df .columns ])} )
396
386
VALUES ({ ", " .join ([f'source."{ x } "' for x in df .columns ])} )""" )
397
-
387
+
398
388
elif merge_condition == "ignore" :
399
389
when_clauses .append (f"""WHEN NOT MATCHED THEN
400
390
INSERT ({ ", " .join ([f'"{ x } "' for x in df .columns ])} )
401
391
VALUES ({ ", " .join ([f'source."{ x } "' for x in df .columns ])} )""" )
402
-
392
+
403
393
elif merge_condition == "conditional_merge" :
404
394
for clause in merge_conditional_clauses :
405
395
when_type = clause ["when" ]
406
396
action = clause ["action" ]
407
397
condition = clause .get ("condition" )
408
398
columns = clause .get ("columns" )
409
-
399
+
410
400
# Build WHEN clause
411
401
when_part = f"WHEN { when_type } "
412
402
if condition :
413
403
when_part += f" AND { condition } "
414
-
404
+
415
405
# Build action
416
406
if action == "UPDATE" :
417
407
update_columns = columns or df .columns .tolist ()
418
408
update_sets = [f'"{ col } " = source."{ col } "' for col in update_columns ]
419
409
when_part += f" THEN UPDATE SET { ', ' .join (update_sets )} "
420
-
410
+
421
411
elif action == "DELETE" :
422
412
when_part += " THEN DELETE"
423
-
413
+
424
414
elif action == "INSERT" :
425
415
insert_columns = columns or df .columns .tolist ()
426
416
column_list = ", " .join ([f'"{ col } "' for col in insert_columns ])
427
417
values_list = ", " .join ([f'source."{ col } "' for col in insert_columns ])
428
418
when_part += f" THEN INSERT ({ column_list } ) VALUES ({ values_list } )"
429
-
419
+
430
420
when_clauses .append (when_part )
431
-
421
+
432
422
joined_clauses = "\n " .join (when_clauses )
433
423
sql_statement = f"""
434
424
MERGE INTO "{ database } "."{ table } " target
435
425
USING "{ database } "."{ source_table } " source
436
426
ON { on_condition }
437
427
{ joined_clauses }
438
- """
428
+ """
439
429
else :
440
430
sql_statement = f"""
441
431
INSERT INTO "{ database } "."{ table } " ({ ", " .join ([f'"{ x } "' for x in df .columns ])} )
0 commit comments