1
+ use super :: flow_identifier;
1
2
use crate :: flow_store:: connection:: FlowStore ;
2
3
use async_trait:: async_trait;
3
4
use log:: error;
@@ -15,6 +16,7 @@ pub struct FlowStoreError {
15
16
pub enum FlowStoreErrorKind {
16
17
Serialization ,
17
18
RedisOperation ,
19
+ NoIdentifier ,
18
20
}
19
21
20
22
/// Trait representing a service for managing flows in a Redis.
@@ -46,9 +48,18 @@ impl FlowStoreServiceBase for FlowStoreService {
46
48
async fn insert_flow ( & mut self , flow : Flow ) -> Result < i64 , FlowStoreError > {
47
49
let mut connection = self . redis_client_arc . lock ( ) . await ;
48
50
49
- let insert_result: RedisResult < ( ) > = connection
50
- . json_set ( flow. flow_id . to_string ( ) , "$" , & flow)
51
- . await ;
51
+ let identifier = match flow_identifier:: get_flow_identifier ( & flow) {
52
+ Some ( id) => id,
53
+ None => {
54
+ return Err ( FlowStoreError {
55
+ kind : FlowStoreErrorKind :: NoIdentifier ,
56
+ flow_id : flow. flow_id ,
57
+ reason : String :: from ( "Identifier can't be determent!" ) ,
58
+ } ) ;
59
+ }
60
+ } ;
61
+
62
+ let insert_result: RedisResult < ( ) > = connection. json_set ( identifier, "$" , & flow) . await ;
52
63
53
64
match insert_result {
54
65
Err ( redis_error) => {
@@ -78,7 +89,10 @@ impl FlowStoreServiceBase for FlowStoreService {
78
89
/// Deletes a flow
79
90
async fn delete_flow ( & mut self , flow_id : i64 ) -> Result < i64 , RedisError > {
80
91
let mut connection = self . redis_client_arc . lock ( ) . await ;
81
- let deleted_flow: RedisResult < i64 > = connection. json_del ( flow_id, "." ) . await ;
92
+
93
+ let identifier = format ! ( "{}::*" , flow_id) ;
94
+ let keys: Vec < String > = connection. keys ( & identifier) . await ?;
95
+ let deleted_flow: RedisResult < i64 > = connection. json_del ( keys, "." ) . await ;
82
96
83
97
match deleted_flow {
84
98
Ok ( int) => Ok ( int) ,
@@ -116,7 +130,18 @@ impl FlowStoreServiceBase for FlowStoreService {
116
130
}
117
131
} ;
118
132
119
- let int_keys: Vec < i64 > = string_keys
133
+ let mut real_keys: Vec < String > = vec ! [ ] ;
134
+
135
+ for key in string_keys {
136
+ if key. contains ( "::" ) {
137
+ let number = key. splitn ( 2 , "::" ) . next ( ) ;
138
+ if let Some ( real_number) = number {
139
+ real_keys. push ( String :: from ( real_number) ) ;
140
+ }
141
+ }
142
+ }
143
+
144
+ let int_keys: Vec < i64 > = real_keys
120
145
. into_iter ( )
121
146
. filter_map ( |key| key. parse :: < i64 > ( ) . ok ( ) )
122
147
. collect ( ) ;
@@ -180,6 +205,8 @@ impl FlowStoreServiceBase for FlowStoreService {
180
205
181
206
#[ cfg( test) ]
182
207
mod tests {
208
+ use std:: collections:: HashMap ;
209
+
183
210
use crate :: flow_store:: connection:: create_flow_store_connection;
184
211
use crate :: flow_store:: connection:: FlowStore ;
185
212
use crate :: flow_store:: service:: FlowStoreService ;
@@ -190,8 +217,50 @@ mod tests {
190
217
use testcontainers:: core:: WaitFor ;
191
218
use testcontainers:: runners:: AsyncRunner ;
192
219
use testcontainers:: GenericImage ;
220
+ use tucana:: shared:: FlowSetting ;
221
+ use tucana:: shared:: FlowSettingDefinition ;
222
+ use tucana:: shared:: Struct ;
193
223
use tucana:: shared:: { Flow , Flows } ;
194
224
225
+ fn get_string_value ( value : & str ) -> tucana:: shared:: Value {
226
+ tucana:: shared:: Value {
227
+ kind : Some ( tucana:: shared:: value:: Kind :: StringValue ( String :: from (
228
+ value,
229
+ ) ) ) ,
230
+ }
231
+ }
232
+
233
+ fn get_settings ( ) -> Vec < FlowSetting > {
234
+ vec ! [
235
+ FlowSetting {
236
+ definition: Some ( FlowSettingDefinition {
237
+ id: String :: from( "1424525" ) ,
238
+ key: String :: from( "HTTP_HOST" ) ,
239
+ } ) ,
240
+ object: Some ( Struct {
241
+ fields: {
242
+ let mut map = HashMap :: new( ) ;
243
+ map. insert( String :: from( "host" ) , get_string_value( "abc.code0.tech" ) ) ;
244
+ map
245
+ } ,
246
+ } ) ,
247
+ } ,
248
+ FlowSetting {
249
+ definition: Some ( FlowSettingDefinition {
250
+ id: String :: from( "14245252352" ) ,
251
+ key: String :: from( "HTTP_METHOD" ) ,
252
+ } ) ,
253
+ object: Some ( Struct {
254
+ fields: {
255
+ let mut map = HashMap :: new( ) ;
256
+ map. insert( String :: from( "method" ) , get_string_value( "GET" ) ) ;
257
+ map
258
+ } ,
259
+ } ) ,
260
+ } ,
261
+ ]
262
+ }
263
+
195
264
macro_rules! redis_integration_test {
196
265
( $test_name: ident, $consumer: expr) => {
197
266
#[ tokio:: test]
@@ -237,8 +306,8 @@ mod tests {
237
306
( |connection: FlowStore , mut service: FlowStoreService | async move {
238
307
let flow = Flow {
239
308
flow_id: 1 ,
240
- r#type: "" . to_string( ) ,
241
- settings: vec! [ ] ,
309
+ r#type: "REST " . to_string( ) ,
310
+ settings: get_settings ( ) ,
242
311
starting_node: None ,
243
312
data_types: vec![ ] ,
244
313
input_type_identifier: None ,
@@ -253,7 +322,10 @@ mod tests {
253
322
254
323
let redis_result: Option <String > = {
255
324
let mut redis_cmd = connection. lock( ) . await ;
256
- redis_cmd. json_get( "1" , "$" ) . await . unwrap( )
325
+ redis_cmd
326
+ . json_get( "1::1::abc.code0.tech::GET" , "$" )
327
+ . await
328
+ . unwrap( )
257
329
} ;
258
330
259
331
println!( "{}" , redis_result. clone( ) . unwrap( ) ) ;
@@ -264,13 +336,31 @@ mod tests {
264
336
} )
265
337
) ;
266
338
339
+ redis_integration_test ! (
340
+ insert_one_flow_fails_no_identifier,
341
+ ( |_connection: FlowStore , mut service: FlowStoreService | async move {
342
+ let flow = Flow {
343
+ flow_id: 1 ,
344
+ r#type: "" . to_string( ) ,
345
+ settings: get_settings( ) ,
346
+ starting_node: None ,
347
+ data_types: vec![ ] ,
348
+ input_type_identifier: None ,
349
+ return_type_identifier: None ,
350
+ project_id: 1 ,
351
+ } ;
352
+
353
+ assert!( !service. insert_flow( flow. clone( ) ) . await . is_ok( ) ) ;
354
+ } )
355
+ ) ;
356
+
267
357
redis_integration_test ! (
268
358
insert_will_overwrite_existing_flow,
269
359
( |connection: FlowStore , mut service: FlowStoreService | async move {
270
360
let flow = Flow {
271
361
flow_id: 1 ,
272
- r#type: "" . to_string( ) ,
273
- settings: vec! [ ] ,
362
+ r#type: "REST " . to_string( ) ,
363
+ settings: get_settings ( ) ,
274
364
data_types: vec![ ] ,
275
365
input_type_identifier: None ,
276
366
return_type_identifier: None ,
@@ -286,9 +376,9 @@ mod tests {
286
376
let flow_overwrite = Flow {
287
377
flow_id: 1 ,
288
378
r#type: "REST" . to_string( ) ,
289
- settings: vec! [ ] ,
379
+ settings: get_settings ( ) ,
290
380
data_types: vec![ ] ,
291
- input_type_identifier: None ,
381
+ input_type_identifier: Some ( String :: from ( "ABC" ) ) ,
292
382
return_type_identifier: None ,
293
383
project_id: 1 ,
294
384
starting_node: None ,
@@ -300,13 +390,16 @@ mod tests {
300
390
301
391
let redis_result: Vec <String > = {
302
392
let mut redis_cmd = connection. lock( ) . await ;
303
- redis_cmd. json_get( "1" , "$" ) . await . unwrap( )
393
+ redis_cmd
394
+ . json_get( "1::1::abc.code0.tech::GET" , "$" )
395
+ . await
396
+ . unwrap( )
304
397
} ;
305
398
306
399
assert_eq!( redis_result. len( ) , 1 ) ;
307
400
let string: & str = & * redis_result[ 0 ] ;
308
401
let redis_flow: Vec <Flow > = serde_json:: from_str( string) . unwrap( ) ;
309
- assert_eq !( redis_flow[ 0 ] . r#type , "REST" . to_string ( ) ) ;
402
+ assert !( redis_flow[ 0 ] . r#input_type_identifier . is_some ( ) ) ;
310
403
} )
311
404
) ;
312
405
@@ -315,8 +408,8 @@ mod tests {
315
408
( |_connection: FlowStore , mut service: FlowStoreService | async move {
316
409
let flow_one = Flow {
317
410
flow_id: 1 ,
318
- r#type: "" . to_string( ) ,
319
- settings: vec! [ ] ,
411
+ r#type: "REST " . to_string( ) ,
412
+ settings: get_settings ( ) ,
320
413
data_types: vec![ ] ,
321
414
input_type_identifier: None ,
322
415
return_type_identifier: None ,
@@ -326,8 +419,8 @@ mod tests {
326
419
327
420
let flow_two = Flow {
328
421
flow_id: 2 ,
329
- r#type: "" . to_string( ) ,
330
- settings: vec! [ ] ,
422
+ r#type: "REST " . to_string( ) ,
423
+ settings: get_settings ( ) ,
331
424
data_types: vec![ ] ,
332
425
input_type_identifier: None ,
333
426
return_type_identifier: None ,
@@ -337,8 +430,8 @@ mod tests {
337
430
338
431
let flow_three = Flow {
339
432
flow_id: 3 ,
340
- r#type: "" . to_string( ) ,
341
- settings: vec! [ ] ,
433
+ r#type: "REST " . to_string( ) ,
434
+ settings: get_settings ( ) ,
342
435
starting_node: None ,
343
436
data_types: vec![ ] ,
344
437
input_type_identifier: None ,
@@ -359,8 +452,8 @@ mod tests {
359
452
( |connection: FlowStore , mut service: FlowStoreService | async move {
360
453
let flow = Flow {
361
454
flow_id: 1 ,
362
- r#type: "" . to_string( ) ,
363
- settings: vec! [ ] ,
455
+ r#type: "REST " . to_string( ) ,
456
+ settings: get_settings ( ) ,
364
457
starting_node: None ,
365
458
data_types: vec![ ] ,
366
459
input_type_identifier: None ,
@@ -399,8 +492,8 @@ mod tests {
399
492
( |_connection: FlowStore , mut service: FlowStoreService | async move {
400
493
let flow_one = Flow {
401
494
flow_id: 1 ,
402
- r#type: "" . to_string( ) ,
403
- settings: vec! [ ] ,
495
+ r#type: "REST " . to_string( ) ,
496
+ settings: get_settings ( ) ,
404
497
starting_node: None ,
405
498
data_types: vec![ ] ,
406
499
input_type_identifier: None ,
@@ -410,8 +503,8 @@ mod tests {
410
503
411
504
let flow_two = Flow {
412
505
flow_id: 2 ,
413
- r#type: "" . to_string( ) ,
414
- settings: vec! [ ] ,
506
+ r#type: "REST " . to_string( ) ,
507
+ settings: get_settings ( ) ,
415
508
starting_node: None ,
416
509
data_types: vec![ ] ,
417
510
input_type_identifier: None ,
@@ -421,8 +514,8 @@ mod tests {
421
514
422
515
let flow_three = Flow {
423
516
flow_id: 3 ,
424
- r#type: "" . to_string( ) ,
425
- settings: vec! [ ] ,
517
+ r#type: "REST " . to_string( ) ,
518
+ settings: get_settings ( ) ,
426
519
starting_node: None ,
427
520
data_types: vec![ ] ,
428
521
input_type_identifier: None ,
@@ -454,8 +547,8 @@ mod tests {
454
547
( |_connection: FlowStore , mut service: FlowStoreService | async move {
455
548
let flow_one = Flow {
456
549
flow_id: 1 ,
457
- r#type: "" . to_string( ) ,
458
- settings: vec! [ ] ,
550
+ r#type: "REST " . to_string( ) ,
551
+ settings: get_settings ( ) ,
459
552
starting_node: None ,
460
553
data_types: vec![ ] ,
461
554
input_type_identifier: None ,
@@ -465,8 +558,8 @@ mod tests {
465
558
466
559
let flow_two = Flow {
467
560
flow_id: 2 ,
468
- r#type: "" . to_string( ) ,
469
- settings: vec! [ ] ,
561
+ r#type: "REST " . to_string( ) ,
562
+ settings: get_settings ( ) ,
470
563
starting_node: None ,
471
564
data_types: vec![ ] ,
472
565
input_type_identifier: None ,
@@ -476,8 +569,8 @@ mod tests {
476
569
477
570
let flow_three = Flow {
478
571
flow_id: 3 ,
479
- r#type: "" . to_string( ) ,
480
- settings: vec! [ ] ,
572
+ r#type: "REST " . to_string( ) ,
573
+ settings: get_settings ( ) ,
481
574
starting_node: None ,
482
575
data_types: vec![ ] ,
483
576
input_type_identifier: None ,
@@ -520,8 +613,8 @@ mod tests {
520
613
( |_connection: FlowStore , mut service: FlowStoreService | async move {
521
614
let flow_one = Flow {
522
615
flow_id: 1 ,
523
- r#type: "" . to_string( ) ,
524
- settings: vec! [ ] ,
616
+ r#type: "REST " . to_string( ) ,
617
+ settings: get_settings ( ) ,
525
618
starting_node: None ,
526
619
data_types: vec![ ] ,
527
620
input_type_identifier: None ,
@@ -531,8 +624,8 @@ mod tests {
531
624
532
625
let flow_two = Flow {
533
626
flow_id: 2 ,
534
- r#type: "" . to_string( ) ,
535
- settings: vec! [ ] ,
627
+ r#type: "REST " . to_string( ) ,
628
+ settings: get_settings ( ) ,
536
629
starting_node: None ,
537
630
data_types: vec![ ] ,
538
631
input_type_identifier: None ,
@@ -542,8 +635,8 @@ mod tests {
542
635
543
636
let flow_three = Flow {
544
637
flow_id: 3 ,
545
- r#type: "" . to_string( ) ,
546
- settings: vec! [ ] ,
638
+ r#type: "REST " . to_string( ) ,
639
+ settings: get_settings ( ) ,
547
640
starting_node: None ,
548
641
data_types: vec![ ] ,
549
642
input_type_identifier: None ,
@@ -576,8 +669,8 @@ mod tests {
576
669
( |_connection: FlowStore , mut service: FlowStoreService | async move {
577
670
let flow_one = Flow {
578
671
flow_id: 1 ,
579
- r#type: "" . to_string( ) ,
580
- settings: vec! [ ] ,
672
+ r#type: "REST " . to_string( ) ,
673
+ settings: get_settings ( ) ,
581
674
starting_node: None ,
582
675
data_types: vec![ ] ,
583
676
input_type_identifier: None ,
@@ -587,8 +680,8 @@ mod tests {
587
680
588
681
let flow_two = Flow {
589
682
flow_id: 2 ,
590
- r#type: "" . to_string( ) ,
591
- settings: vec! [ ] ,
683
+ r#type: "REST " . to_string( ) ,
684
+ settings: get_settings ( ) ,
592
685
starting_node: None ,
593
686
data_types: vec![ ] ,
594
687
input_type_identifier: None ,
@@ -598,8 +691,8 @@ mod tests {
598
691
599
692
let flow_three = Flow {
600
693
flow_id: 3 ,
601
- r#type: "" . to_string( ) ,
602
- settings: vec! [ ] ,
694
+ r#type: "REST " . to_string( ) ,
695
+ settings: get_settings ( ) ,
603
696
starting_node: None ,
604
697
data_types: vec![ ] ,
605
698
input_type_identifier: None ,
@@ -617,7 +710,7 @@ mod tests {
617
710
let amount = service. insert_flows( flows. clone( ) ) . await . unwrap( ) ;
618
711
assert_eq!( amount, 3 ) ;
619
712
620
- let query_flows = service. query_flows( String :: from( "1" ) ) . await ;
713
+ let query_flows = service. query_flows( String :: from( "1::* " ) ) . await ;
621
714
622
715
assert!( query_flows. is_ok( ) ) ;
623
716
assert_eq!( query_flows. unwrap( ) . flows, vec![ flow_one] )
0 commit comments