15
15
16
16
package software .amazon .awssdk .enhanced .dynamodb ;
17
17
18
- import static org .assertj .core .api .Assertions .as ;
19
18
import static org .assertj .core .api .Assertions .assertThat ;
20
19
import static org .assertj .core .api .Assertions .assertThatThrownBy ;
21
20
24
23
import org .junit .AfterClass ;
25
24
import org .junit .BeforeClass ;
26
25
import org .junit .Test ;
26
+ import software .amazon .awssdk .enhanced .dynamodb .extensions .VersionedRecordExtension ;
27
27
import software .amazon .awssdk .enhanced .dynamodb .model .DeleteItemEnhancedRequest ;
28
28
import software .amazon .awssdk .enhanced .dynamodb .model .DeleteItemEnhancedResponse ;
29
29
import software .amazon .awssdk .enhanced .dynamodb .model .EnhancedLocalSecondaryIndex ;
30
30
import software .amazon .awssdk .enhanced .dynamodb .model .GetItemEnhancedResponse ;
31
31
import software .amazon .awssdk .enhanced .dynamodb .model .PutItemEnhancedRequest ;
32
32
import software .amazon .awssdk .enhanced .dynamodb .model .PutItemEnhancedResponse ;
33
33
import software .amazon .awssdk .enhanced .dynamodb .model .Record ;
34
+ import software .amazon .awssdk .enhanced .dynamodb .model .TransactWriteItemsEnhancedRequest ;
34
35
import software .amazon .awssdk .enhanced .dynamodb .model .UpdateItemEnhancedRequest ;
35
36
import software .amazon .awssdk .enhanced .dynamodb .model .UpdateItemEnhancedResponse ;
37
+ import software .amazon .awssdk .enhanced .dynamodb .model .VersionedRecord ;
36
38
import software .amazon .awssdk .services .dynamodb .DynamoDbAsyncClient ;
37
39
import software .amazon .awssdk .services .dynamodb .model .ConditionalCheckFailedException ;
38
40
import software .amazon .awssdk .services .dynamodb .model .Projection ;
41
43
import software .amazon .awssdk .services .dynamodb .model .ReturnItemCollectionMetrics ;
42
44
import software .amazon .awssdk .services .dynamodb .model .ReturnValue ;
43
45
import software .amazon .awssdk .services .dynamodb .model .ReturnValuesOnConditionCheckFailure ;
46
+ import software .amazon .awssdk .services .dynamodb .model .TransactionCanceledException ;
44
47
45
48
public class AsyncCrudWithResponseIntegrationTest extends DynamoDbEnhancedIntegrationTestBase {
46
49
@@ -56,13 +59,18 @@ public class AsyncCrudWithResponseIntegrationTest extends DynamoDbEnhancedIntegr
56
59
private static DynamoDbAsyncClient dynamoDbClient ;
57
60
private static DynamoDbEnhancedAsyncClient enhancedClient ;
58
61
private static DynamoDbAsyncTable <Record > mappedTable ;
62
+ private static DynamoDbAsyncTable <VersionedRecord > versionedRecordTable ;
59
63
60
64
@ BeforeClass
61
65
public static void beforeClass () {
62
66
dynamoDbClient = createAsyncDynamoDbClient ();
63
- enhancedClient = DynamoDbEnhancedAsyncClient .builder ().dynamoDbClient (dynamoDbClient ).build ();
67
+ enhancedClient = DynamoDbEnhancedAsyncClient .builder ()
68
+ .dynamoDbClient (dynamoDbClient )
69
+ .extensions (VersionedRecordExtension .builder ().build ())
70
+ .build ();
64
71
mappedTable = enhancedClient .table (TABLE_NAME , TABLE_SCHEMA );
65
72
mappedTable .createTable (r -> r .localSecondaryIndices (LOCAL_SECONDARY_INDEX )).join ();
73
+ versionedRecordTable = enhancedClient .table (TABLE_NAME , VERSIONED_RECORD_TABLE_SCHEMA );
66
74
dynamoDbClient .waiter ().waitUntilTableExists (r -> r .tableName (TABLE_NAME )).join ();
67
75
}
68
76
@@ -72,6 +80,11 @@ public void tearDown() {
72
80
.items ()
73
81
.subscribe (record -> mappedTable .deleteItem (record ).join ())
74
82
.join ();
83
+
84
+ versionedRecordTable .scan ()
85
+ .items ()
86
+ .subscribe (versionedRecord -> versionedRecordTable .deleteItem (versionedRecord ).join ())
87
+ .join ();
75
88
}
76
89
77
90
@ AfterClass
@@ -341,4 +354,149 @@ public void getItem_withoutReturnConsumedCapacity() {
341
354
GetItemEnhancedResponse <Record > response = mappedTable .getItemWithResponse (req -> req .key (key )).join ();
342
355
assertThat (response .consumedCapacity ()).isNull ();
343
356
}
357
+
358
+ @ Test
359
+ public void transactWriteItems_recordWithoutVersion_andOptimisticLockingOnDeleteOnDelete_shouldSucceed () {
360
+ Record originalItem = new Record ().setId ("123" ).setSort (10 ).setStringAttribute ("Original Item" );
361
+ Key recordKey = Key .builder ().partitionValue (originalItem .getId ()).sortValue (originalItem .getSort ()).build ();
362
+
363
+ // Put the item
364
+ mappedTable .putItem (originalItem ).join ();
365
+
366
+ // Retrieve the item, modify it separately and update it, which will increment the version
367
+ Record savedItem = mappedTable .getItem (r -> r .key (recordKey )).join ();
368
+ savedItem .setStringAttribute ("Updated Item" );
369
+ mappedTable .updateItem (savedItem ).join ();
370
+
371
+ // Get the updated item and try to delete it
372
+ Record updatedItem = mappedTable .getItem (r -> r .key (recordKey )).join ();
373
+ enhancedClient .transactWriteItems (TransactWriteItemsEnhancedRequest .builder ()
374
+ .addDeleteItem (mappedTable , updatedItem )
375
+ .build ()).join ();
376
+
377
+ Record deletedItem = mappedTable .getItem (r -> r .key (recordKey )).join ();
378
+ assertThat (deletedItem ).isNull ();
379
+ }
380
+
381
+ @ Test
382
+ public void transactWriteItems_recordWithVersion_andOptimisticLockingOnDelete_ifVersionMatch_shouldSucceed () {
383
+ VersionedRecord originalItem = new VersionedRecord ().setId ("123" ).setSort (10 ).setStringAttribute ("Original Item" );
384
+ Key recordKey = Key .builder ().partitionValue (originalItem .getId ()).sortValue (originalItem .getSort ()).build ();
385
+
386
+ // Put the item
387
+ versionedRecordTable .putItem (originalItem ).join ();
388
+
389
+ // Retrieve the item, modify it separately and update it, which will increment the version
390
+ VersionedRecord savedItem = versionedRecordTable .getItem (r -> r .key (recordKey )).join ();
391
+ savedItem .setStringAttribute ("Updated Item" );
392
+ versionedRecordTable .updateItem (savedItem ).join ();
393
+
394
+ // Get the updated item and try to delete it
395
+ VersionedRecord updatedItem = versionedRecordTable .getItem (r -> r .key (recordKey )).join ();
396
+ enhancedClient .transactWriteItems (TransactWriteItemsEnhancedRequest .builder ()
397
+ .addDeleteItem (versionedRecordTable , updatedItem )
398
+ .build ()).join ();
399
+
400
+ VersionedRecord deletedItem = versionedRecordTable .getItem (r -> r .key (recordKey )).join ();
401
+ assertThat (deletedItem ).isNull ();
402
+ }
403
+
404
+ @ Test
405
+ public void transactWriteItems_recordWithVersion_andOptimisticLockingOnDelete_ifVersionMismatch_shouldFail () {
406
+ VersionedRecord originalItem = new VersionedRecord ().setId ("123" ).setSort (10 ).setStringAttribute ("Original Item" );
407
+ Key recordKey = Key .builder ().partitionValue (originalItem .getId ()).sortValue (originalItem .getSort ()).build ();
408
+
409
+ // Put the item
410
+ versionedRecordTable .putItem (originalItem ).join ();
411
+
412
+ // Retrieve the item, modify it separately and update it, which will increment the version
413
+ VersionedRecord savedItem = versionedRecordTable .getItem (r -> r .key (recordKey )).join ();
414
+ savedItem .setStringAttribute ("Updated Item" );
415
+ versionedRecordTable .updateItem (savedItem ).join ();
416
+
417
+ // Get the updated item and try to delete it
418
+ VersionedRecord updatedItem = versionedRecordTable .getItem (r -> r .key (recordKey )).join ();
419
+ updatedItem .setVersion (3 ); // Intentionally set a version that does not match the current version
420
+
421
+ TransactWriteItemsEnhancedRequest request =
422
+ TransactWriteItemsEnhancedRequest .builder ()
423
+ .addDeleteItem (versionedRecordTable , updatedItem )
424
+ .build ();
425
+
426
+ assertThatThrownBy (() -> enhancedClient .transactWriteItems (request ).join ())
427
+ .isInstanceOf (CompletionException .class )
428
+ .satisfies (e ->
429
+ assertThat (((TransactionCanceledException ) e .getCause ())
430
+ .cancellationReasons ()
431
+ .stream ()
432
+ .anyMatch (reason ->
433
+ "ConditionalCheckFailed" .equals (reason .code ())
434
+ && "The conditional request failed" .equals (reason .message ())))
435
+ .isTrue ());
436
+ }
437
+
438
+ @ Test
439
+ public void delete_recordWithoutVersion_andOptimisticLockingOnDelete_shouldSucceed () {
440
+ Record originalItem = new Record ().setId ("123" ).setSort (10 ).setStringAttribute ("Original Item" );
441
+ Key recordKey = Key .builder ().partitionValue (originalItem .getId ()).sortValue (originalItem .getSort ()).build ();
442
+
443
+ // Put the item
444
+ mappedTable .putItem (originalItem ).join ();
445
+
446
+ // Retrieve the item, modify it separately and update it, which will increment the version
447
+ Record savedItem = mappedTable .getItem (r -> r .key (recordKey )).join ();
448
+ savedItem .setStringAttribute ("Updated Item" );
449
+ mappedTable .updateItem (savedItem ).join ();
450
+
451
+ // Get the updated item and try to delete it
452
+ Record updatedItem = mappedTable .getItem (r -> r .key (recordKey )).join ();
453
+ mappedTable .deleteItem (updatedItem ).join ();
454
+
455
+ Record deletedItem = mappedTable .getItem (r -> r .key (recordKey )).join ();
456
+ assertThat (deletedItem ).isNull ();
457
+ }
458
+
459
+ @ Test
460
+ public void delete_recordWithVersion_andOptimisticLockingOnDelete_ifVersionMatch_shouldSucceed () {
461
+ VersionedRecord originalItem = new VersionedRecord ().setId ("123" ).setSort (10 ).setStringAttribute ("Original Item" );
462
+ Key recordKey = Key .builder ().partitionValue (originalItem .getId ()).sortValue (originalItem .getSort ()).build ();
463
+
464
+ // Put the item
465
+ versionedRecordTable .putItem (originalItem ).join ();
466
+
467
+ // Retrieve the item, modify it separately and update it, which will increment the version
468
+ VersionedRecord savedItem = versionedRecordTable .getItem (r -> r .key (recordKey )).join ();
469
+ savedItem .setStringAttribute ("Updated Item" );
470
+ versionedRecordTable .updateItem (savedItem ).join ();
471
+
472
+ // Get the updated item and try to delete it
473
+ VersionedRecord updatedItem = versionedRecordTable .getItem (r -> r .key (recordKey )).join ();
474
+ versionedRecordTable .deleteItem (updatedItem ).join ();
475
+
476
+ VersionedRecord deletedItem = versionedRecordTable .getItem (r -> r .key (recordKey )).join ();
477
+ assertThat (deletedItem ).isNull ();
478
+ }
479
+
480
+ @ Test
481
+ public void delete_recordWithoutVersion_andOptimisticLockingOnDelete_ifVersionMismatch_shouldFail () {
482
+ VersionedRecord originalItem = new VersionedRecord ().setId ("123" ).setSort (10 ).setStringAttribute ("Original Item" );
483
+ Key recordKey = Key .builder ().partitionValue (originalItem .getId ()).sortValue (originalItem .getSort ()).build ();
484
+
485
+ // Put the item
486
+ versionedRecordTable .putItem (originalItem ).join ();
487
+
488
+ // Retrieve the item, modify it separately and update it, which will increment the version
489
+ VersionedRecord savedItem = versionedRecordTable .getItem (r -> r .key (recordKey )).join ();
490
+ savedItem .setStringAttribute ("Updated Item" );
491
+ versionedRecordTable .updateItem (savedItem ).join ();
492
+
493
+ // Get the updated item and try to delete it
494
+ VersionedRecord updatedItem = versionedRecordTable .getItem (r -> r .key (recordKey )).join ();
495
+ updatedItem .setVersion (3 ); // Intentionally set a version that does not match the current version
496
+
497
+ assertThatThrownBy (() -> versionedRecordTable .deleteItem (updatedItem ).join ())
498
+ .isInstanceOf (CompletionException .class )
499
+ .satisfies (e ->
500
+ assertThat (e .getMessage ()).contains ("The conditional request failed" ));
501
+ }
344
502
}
0 commit comments