@@ -24,6 +24,7 @@ mod sort_order;
2424use std:: cmp:: Ordering ;
2525use std:: collections:: HashMap ;
2626use std:: mem:: discriminant;
27+ use std:: sync:: Arc ;
2728
2829use uuid:: Uuid ;
2930
@@ -37,7 +38,8 @@ use crate::{Catalog, Error, ErrorKind, TableCommit, TableRequirement, TableUpdat
3738
3839/// Table transaction.
3940pub struct Transaction < ' a > {
40- table : & ' a Table ,
41+ base_table : & ' a Table ,
42+ current_table : Table ,
4143 updates : Vec < TableUpdate > ,
4244 requirements : Vec < TableRequirement > ,
4345}
@@ -46,38 +48,60 @@ impl<'a> Transaction<'a> {
4648 /// Creates a new transaction.
4749 pub fn new ( table : & ' a Table ) -> Self {
4850 Self {
49- table,
51+ base_table : table,
52+ current_table : table. clone ( ) ,
5053 updates : vec ! [ ] ,
5154 requirements : vec ! [ ] ,
5255 }
5356 }
5457
55- fn append_updates ( & mut self , updates : Vec < TableUpdate > ) -> Result < ( ) > {
56- for update in & updates {
57- for up in & self . updates {
58- if discriminant ( up) == discriminant ( update) {
59- return Err ( Error :: new (
60- ErrorKind :: DataInvalid ,
61- format ! (
62- "Cannot apply update with same type at same time: {:?}" ,
63- update
64- ) ,
65- ) ) ;
66- }
67- }
58+ fn update_table_metadata ( & mut self , updates : & [ TableUpdate ] ) -> Result < ( ) > {
59+ let mut metadata_builder = self . current_table . metadata ( ) . clone ( ) . into_builder ( None ) ;
60+ for update in updates {
61+ metadata_builder = update. clone ( ) . apply ( metadata_builder) ?;
6862 }
69- self . updates . extend ( updates) ;
63+
64+ self . current_table
65+ . with_metadata ( Arc :: new ( metadata_builder. build ( ) ?. metadata ) ) ;
66+
7067 Ok ( ( ) )
7168 }
7269
73- fn append_requirements ( & mut self , requirements : Vec < TableRequirement > ) -> Result < ( ) > {
74- self . requirements . extend ( requirements) ;
70+ fn apply (
71+ & mut self ,
72+ updates : Vec < TableUpdate > ,
73+ requirements : Vec < TableRequirement > ,
74+ ) -> Result < ( ) > {
75+ for requirement in & requirements {
76+ requirement. check ( Some ( self . current_table . metadata ( ) ) ) ?;
77+ }
78+
79+ self . update_table_metadata ( & updates) ?;
80+
81+ self . updates . extend ( updates) ;
82+
83+ // For the requirements, it does not make sense to add a requirement more than once
84+ // For example, you cannot assert that the current schema has two different IDs
85+ for new_requirement in requirements {
86+ if self
87+ . requirements
88+ . iter ( )
89+ . map ( discriminant)
90+ . all ( |d| d != discriminant ( & new_requirement) )
91+ {
92+ self . requirements . push ( new_requirement) ;
93+ }
94+ }
95+
96+ // # TODO
97+ // Support auto commit later.
98+
7599 Ok ( ( ) )
76100 }
77101
78102 /// Sets table to a new version.
79103 pub fn upgrade_table_version ( mut self , format_version : FormatVersion ) -> Result < Self > {
80- let current_version = self . table . metadata ( ) . format_version ( ) ;
104+ let current_version = self . current_table . metadata ( ) . format_version ( ) ;
81105 match current_version. cmp ( & format_version) {
82106 Ordering :: Greater => {
83107 return Err ( Error :: new (
@@ -89,7 +113,7 @@ impl<'a> Transaction<'a> {
89113 ) ) ;
90114 }
91115 Ordering :: Less => {
92- self . append_updates ( vec ! [ UpgradeFormatVersion { format_version } ] ) ?;
116+ self . apply ( vec ! [ UpgradeFormatVersion { format_version } ] , vec ! [ ] ) ?;
93117 }
94118 Ordering :: Equal => {
95119 // Do nothing.
@@ -100,7 +124,7 @@ impl<'a> Transaction<'a> {
100124
101125 /// Update table's property.
102126 pub fn set_properties ( mut self , props : HashMap < String , String > ) -> Result < Self > {
103- self . append_updates ( vec ! [ TableUpdate :: SetProperties { updates: props } ] ) ?;
127+ self . apply ( vec ! [ TableUpdate :: SetProperties { updates: props } ] , vec ! [ ] ) ?;
104128 Ok ( self )
105129 }
106130
@@ -116,7 +140,7 @@ impl<'a> Transaction<'a> {
116140 } ;
117141 let mut snapshot_id = generate_random_id ( ) ;
118142 while self
119- . table
143+ . current_table
120144 . metadata ( )
121145 . snapshots ( )
122146 . any ( |s| s. snapshot_id ( ) == snapshot_id)
@@ -135,7 +159,7 @@ impl<'a> Transaction<'a> {
135159 ) -> Result < FastAppendAction < ' a > > {
136160 let snapshot_id = if let Some ( snapshot_id) = snapshot_id {
137161 if self
138- . table
162+ . current_table
139163 . metadata ( )
140164 . snapshots ( )
141165 . any ( |s| s. snapshot_id ( ) == snapshot_id)
@@ -168,14 +192,17 @@ impl<'a> Transaction<'a> {
168192
169193 /// Remove properties in table.
170194 pub fn remove_properties ( mut self , keys : Vec < String > ) -> Result < Self > {
171- self . append_updates ( vec ! [ TableUpdate :: RemoveProperties { removals: keys } ] ) ?;
195+ self . apply (
196+ vec ! [ TableUpdate :: RemoveProperties { removals: keys } ] ,
197+ vec ! [ ] ,
198+ ) ?;
172199 Ok ( self )
173200 }
174201
175202 /// Commit transaction.
176203 pub async fn commit ( self , catalog : & dyn Catalog ) -> Result < Table > {
177204 let table_commit = TableCommit :: builder ( )
178- . ident ( self . table . identifier ( ) . clone ( ) )
205+ . ident ( self . base_table . identifier ( ) . clone ( ) )
179206 . updates ( self . updates )
180207 . requirements ( self . requirements )
181208 . build ( ) ;
@@ -197,7 +224,7 @@ mod tests {
197224 use crate :: { TableIdent , TableUpdate } ;
198225
199226 fn make_v1_table ( ) -> Table {
200- let file = File :: open ( format ! (
227+ let file: File = File :: open ( format ! (
201228 "{}/testdata/table_metadata/{}" ,
202229 env!( "CARGO_MANIFEST_DIR" ) ,
203230 "TableMetadataV1Valid.json"
@@ -324,19 +351,21 @@ mod tests {
324351 ) ;
325352 }
326353
327- #[ test]
328- fn test_do_same_update_in_same_transaction ( ) {
329- let table = make_v2_table ( ) ;
354+ #[ tokio :: test]
355+ async fn test_transaction_apply_upgrade ( ) {
356+ let table = make_v1_table ( ) ;
330357 let tx = Transaction :: new ( & table) ;
331- let tx = tx
332- . remove_properties ( vec ! [ "a" . to_string ( ) , "b" . to_string ( ) ] )
333- . unwrap ( ) ;
334-
335- let tx = tx . remove_properties ( vec ! [ "c" . to_string ( ) , "d" . to_string ( ) ] ) ;
336-
337- assert ! (
338- tx . is_err ( ) ,
339- "Should not allow to do same kinds update in same transaction"
358+ // Upgrade v1 to v1, do nothing.
359+ let tx = tx . upgrade_table_version ( FormatVersion :: V1 ) . unwrap ( ) ;
360+ // Upgrade v1 to v2, success.
361+ let tx = tx . upgrade_table_version ( FormatVersion :: V2 ) . unwrap ( ) ;
362+ assert_eq ! (
363+ vec! [ TableUpdate :: UpgradeFormatVersion {
364+ format_version : FormatVersion :: V2
365+ } ] ,
366+ tx . updates
340367 ) ;
368+ // Upgrade v2 to v1, return error.
369+ assert ! ( tx. upgrade_table_version( FormatVersion :: V1 ) . is_err( ) ) ;
341370 }
342371}
0 commit comments