File tree Expand file tree Collapse file tree 5 files changed +25
-9
lines changed
integration_tests/tests/shared_tests Expand file tree Collapse file tree 5 files changed +25
-9
lines changed Original file line number Diff line number Diff line change @@ -129,10 +129,26 @@ impl<'a> Transaction<'a> {
129129 /// Creates a fast append action.
130130 pub fn fast_append (
131131 self ,
132+ snapshot_id : Option < i64 > ,
132133 commit_uuid : Option < Uuid > ,
133134 key_metadata : Vec < u8 > ,
134135 ) -> Result < FastAppendAction < ' a > > {
135- let snapshot_id = self . generate_unique_snapshot_id ( ) ;
136+ let snapshot_id = if let Some ( snapshot_id) = snapshot_id {
137+ if self
138+ . table
139+ . metadata ( )
140+ . snapshots ( )
141+ . any ( |s| s. snapshot_id ( ) == snapshot_id)
142+ {
143+ return Err ( Error :: new (
144+ ErrorKind :: DataInvalid ,
145+ format ! ( "Snapshot id {} already exists" , snapshot_id) ,
146+ ) ) ;
147+ }
148+ snapshot_id
149+ } else {
150+ self . generate_unique_snapshot_id ( )
151+ } ;
136152 FastAppendAction :: new (
137153 self ,
138154 snapshot_id,
Original file line number Diff line number Diff line change @@ -112,7 +112,7 @@ async fn test_append_data_file() {
112112
113113 // commit result
114114 let tx = Transaction :: new ( & table) ;
115- let mut append_action = tx. fast_append ( None , vec ! [ ] ) . unwrap ( ) ;
115+ let mut append_action = tx. fast_append ( None , None , vec ! [ ] ) . unwrap ( ) ;
116116 append_action. add_data_files ( data_file. clone ( ) ) . unwrap ( ) ;
117117 let tx = append_action. apply ( ) . await . unwrap ( ) ;
118118 let table = tx. commit ( & rest_catalog) . await . unwrap ( ) ;
@@ -132,7 +132,7 @@ async fn test_append_data_file() {
132132
133133 // commit result again
134134 let tx = Transaction :: new ( & table) ;
135- let mut append_action = tx. fast_append ( None , vec ! [ ] ) . unwrap ( ) ;
135+ let mut append_action = tx. fast_append ( None , None , vec ! [ ] ) . unwrap ( ) ;
136136 append_action. add_data_files ( data_file. clone ( ) ) . unwrap ( ) ;
137137 let tx = append_action. apply ( ) . await . unwrap ( ) ;
138138 let table = tx. commit ( & rest_catalog) . await . unwrap ( ) ;
Original file line number Diff line number Diff line change @@ -120,7 +120,7 @@ async fn test_append_partition_data_file() {
120120
121121 // commit result
122122 let tx = Transaction :: new ( & table) ;
123- let mut append_action = tx. fast_append ( None , vec ! [ ] ) . unwrap ( ) ;
123+ let mut append_action = tx. fast_append ( None , None , vec ! [ ] ) . unwrap ( ) ;
124124 append_action
125125 . add_data_files ( data_file_valid. clone ( ) )
126126 . unwrap ( ) ;
@@ -180,7 +180,7 @@ async fn test_schema_incompatible_partition_type(
180180 let data_file_invalid = data_file_writer_invalid. close ( ) . await . unwrap ( ) ;
181181
182182 let tx = Transaction :: new ( & table) ;
183- let mut append_action = tx. fast_append ( None , vec ! [ ] ) . unwrap ( ) ;
183+ let mut append_action = tx. fast_append ( None , None , vec ! [ ] ) . unwrap ( ) ;
184184 if append_action
185185 . add_data_files ( data_file_invalid. clone ( ) )
186186 . is_ok ( )
@@ -220,7 +220,7 @@ async fn test_schema_incompatible_partition_fields(
220220 let data_file_invalid = data_file_writer_invalid. close ( ) . await . unwrap ( ) ;
221221
222222 let tx = Transaction :: new ( & table) ;
223- let mut append_action = tx. fast_append ( None , vec ! [ ] ) . unwrap ( ) ;
223+ let mut append_action = tx. fast_append ( None , None , vec ! [ ] ) . unwrap ( ) ;
224224 if append_action
225225 . add_data_files ( data_file_invalid. clone ( ) )
226226 . is_ok ( )
Original file line number Diff line number Diff line change @@ -90,12 +90,12 @@ async fn test_append_data_file_conflict() {
9090
9191 // start two transaction and commit one of them
9292 let tx1 = Transaction :: new ( & table) ;
93- let mut append_action = tx1. fast_append ( None , vec ! [ ] ) . unwrap ( ) ;
93+ let mut append_action = tx1. fast_append ( None , None , vec ! [ ] ) . unwrap ( ) ;
9494 append_action. add_data_files ( data_file. clone ( ) ) . unwrap ( ) ;
9595 let tx1 = append_action. apply ( ) . await . unwrap ( ) ;
9696
9797 let tx2 = Transaction :: new ( & table) ;
98- let mut append_action = tx2. fast_append ( None , vec ! [ ] ) . unwrap ( ) ;
98+ let mut append_action = tx2. fast_append ( None , None , vec ! [ ] ) . unwrap ( ) ;
9999 append_action. add_data_files ( data_file. clone ( ) ) . unwrap ( ) ;
100100 let tx2 = append_action. apply ( ) . await . unwrap ( ) ;
101101 let table = tx2
Original file line number Diff line number Diff line change @@ -309,7 +309,7 @@ async fn test_scan_all_type() {
309309
310310 // commit result
311311 let tx = Transaction :: new ( & table) ;
312- let mut append_action = tx. fast_append ( None , vec ! [ ] ) . unwrap ( ) ;
312+ let mut append_action = tx. fast_append ( None , None , vec ! [ ] ) . unwrap ( ) ;
313313 append_action. add_data_files ( data_file. clone ( ) ) . unwrap ( ) ;
314314 let tx = append_action. apply ( ) . await . unwrap ( ) ;
315315 let table = tx. commit ( & rest_catalog) . await . unwrap ( ) ;
You can’t perform that action at this time.
0 commit comments