1
1
import redis
2
+ from functools import wraps
2
3
3
4
from includes import *
4
5
7
8
'''
8
9
9
10
10
- def test_run_mobilenet (env ):
11
- if not TEST_TF :
12
- env .debugPrint ("skipping {} since TEST_TF=0" .format (sys ._getframe ().f_code .co_name ), force = True )
13
- return
11
+ def skip_if_no_TF (f ):
12
+ @wraps (f )
13
+ def wrapper (env , * args , ** kwargs ):
14
+ if not TEST_TF :
15
+ env .debugPrint ("skipping {} since TEST_TF=0" .format (
16
+ sys ._getframe ().f_code .co_name ), force = True )
17
+ return
18
+ return f (env , * args , ** kwargs )
19
+ return wrapper
20
+
14
21
22
+ @skip_if_no_TF
23
+ def test_run_mobilenet (env ):
15
24
con = env .getConnection ()
16
25
17
26
input_var = 'input'
@@ -24,26 +33,31 @@ def test_run_mobilenet(env):
24
33
25
34
ensureSlaveSynced (con , env )
26
35
27
- mobilenet_model_serialized = con .execute_command ('AI.MODELGET' , 'mobilenet' )
36
+ mobilenet_model_serialized = con .execute_command (
37
+ 'AI.MODELGET' , 'mobilenet' )
28
38
29
39
ensureSlaveSynced (con , env )
30
40
if env .useSlaves :
31
41
con2 = env .getSlaveConnection ()
32
- slave_mobilenet_model_serialized = con2 .execute_command ('AI.MODELGET' , 'mobilenet' )
33
- env .assertEqual (len (mobilenet_model_serialized ), len (slave_mobilenet_model_serialized ))
42
+ slave_mobilenet_model_serialized = con2 .execute_command (
43
+ 'AI.MODELGET' , 'mobilenet' )
44
+ env .assertEqual (len (mobilenet_model_serialized ),
45
+ len (slave_mobilenet_model_serialized ))
34
46
35
47
con .execute_command ('AI.TENSORSET' , 'input' ,
36
48
'FLOAT' , 1 , img .shape [1 ], img .shape [0 ], img .shape [2 ],
37
49
'BLOB' , img .tobytes ())
38
50
39
51
ensureSlaveSynced (con , env )
40
52
input_tensor_meta = con .execute_command ('AI.TENSORGET' , 'input' , 'META' )
41
- env .assertEqual ([b'FLOAT' , [1 , img .shape [1 ], img .shape [0 ], img .shape [2 ]]], input_tensor_meta )
53
+ env .assertEqual (
54
+ [b'FLOAT' , [1 , img .shape [1 ], img .shape [0 ], img .shape [2 ]]], input_tensor_meta )
42
55
43
56
ensureSlaveSynced (con , env )
44
57
if env .useSlaves :
45
58
con2 = env .getSlaveConnection ()
46
- slave_tensor_meta = con2 .execute_command ('AI.TENSORGET' , 'input' , 'META' )
59
+ slave_tensor_meta = con2 .execute_command (
60
+ 'AI.TENSORGET' , 'input' , 'META' )
47
61
env .assertEqual (input_tensor_meta , slave_tensor_meta )
48
62
49
63
con .execute_command ('AI.MODELRUN' , 'mobilenet' ,
@@ -63,19 +77,18 @@ def test_run_mobilenet(env):
63
77
64
78
if env .useSlaves :
65
79
con2 = env .getSlaveConnection ()
66
- slave_dtype , slave_shape , slave_data = con2 .execute_command ('AI.TENSORGET' , 'output' , 'BLOB' )
80
+ slave_dtype , slave_shape , slave_data = con2 .execute_command (
81
+ 'AI.TENSORGET' , 'output' , 'BLOB' )
67
82
env .assertEqual (dtype , slave_dtype )
68
83
env .assertEqual (shape , slave_shape )
69
84
env .assertEqual (data , slave_data )
70
85
71
86
87
+ @skip_if_no_TF
72
88
def test_run_mobilenet_multiproc (env ):
73
- if not TEST_TF :
74
- env .debugPrint ("skipping {} since TEST_TF=0" .format (sys ._getframe ().f_code .co_name ), force = True )
75
- return
76
-
77
89
if VALGRIND :
78
- env .debugPrint ("skipping {} since VALGRIND=1" .format (sys ._getframe ().f_code .co_name ), force = True )
90
+ env .debugPrint ("skipping {} since VALGRIND=1" .format (
91
+ sys ._getframe ().f_code .co_name ), force = True )
79
92
return
80
93
81
94
con = env .getConnection ()
@@ -106,17 +119,15 @@ def test_run_mobilenet_multiproc(env):
106
119
107
120
if env .useSlaves :
108
121
con2 = env .getSlaveConnection ()
109
- slave_dtype , slave_shape , slave_data = con2 .execute_command ('AI.TENSORGET' , 'output' , 'BLOB' )
122
+ slave_dtype , slave_shape , slave_data = con2 .execute_command (
123
+ 'AI.TENSORGET' , 'output' , 'BLOB' )
110
124
env .assertEqual (dtype , slave_dtype )
111
125
env .assertEqual (shape , slave_shape )
112
126
env .assertEqual (data , slave_data )
113
127
114
128
129
+ @skip_if_no_TF
115
130
def test_del_tf_model (env ):
116
- if not TEST_TF :
117
- env .debugPrint ("skipping {} since TEST_TF=0" .format (sys ._getframe ().f_code .co_name ), force = True )
118
- return
119
-
120
131
con = env .getConnection ()
121
132
122
133
test_data_path = os .path .join (os .path .dirname (__file__ ), 'test_data' )
@@ -154,14 +165,12 @@ def test_del_tf_model(env):
154
165
except Exception as e :
155
166
exception = e
156
167
env .assertEqual (type (exception ), redis .exceptions .ResponseError )
157
- env .assertEqual ("WRONGTYPE Operation against a key holding the wrong kind of value" , exception .__str__ ())
168
+ env .assertEqual (
169
+ "WRONGTYPE Operation against a key holding the wrong kind of value" , exception .__str__ ())
158
170
159
171
172
+ @skip_if_no_TF
160
173
def test_run_tf_model (env ):
161
- if not TEST_TF :
162
- env .debugPrint ("skipping {} since TEST_TF=0" .format (sys ._getframe ().f_code .co_name ), force = True )
163
- return
164
-
165
174
con = env .getConnection ()
166
175
167
176
test_data_path = os .path .join (os .path .dirname (__file__ ), 'test_data' )
@@ -182,8 +191,10 @@ def test_run_tf_model(env):
182
191
# env.assertEqual(ret[0], b'TF')
183
192
# env.assertEqual(ret[1], b'CPU')
184
193
185
- con .execute_command ('AI.TENSORSET' , 'a' , 'FLOAT' , 2 , 2 , 'VALUES' , 2 , 3 , 2 , 3 )
186
- con .execute_command ('AI.TENSORSET' , 'b' , 'FLOAT' , 2 , 2 , 'VALUES' , 2 , 3 , 2 , 3 )
194
+ con .execute_command ('AI.TENSORSET' , 'a' , 'FLOAT' ,
195
+ 2 , 2 , 'VALUES' , 2 , 3 , 2 , 3 )
196
+ con .execute_command ('AI.TENSORSET' , 'b' , 'FLOAT' ,
197
+ 2 , 2 , 'VALUES' , 2 , 3 , 2 , 3 )
187
198
188
199
ensureSlaveSynced (con , env )
189
200
@@ -217,11 +228,8 @@ def test_run_tf_model(env):
217
228
env .assertFalse (con2 .execute_command ('EXISTS' , 'm' ))
218
229
219
230
231
+ @skip_if_no_TF
220
232
def test_run_tf_model_errors (env ):
221
- if not TEST_TF :
222
- env .debugPrint ("skipping {} since TEST_TF=0" .format (sys ._getframe ().f_code .co_name ), force = True )
223
- return
224
-
225
233
con = env .getConnection ()
226
234
227
235
test_data_path = os .path .join (os .path .dirname (__file__ ), 'test_data' )
@@ -245,7 +253,8 @@ def test_run_tf_model_errors(env):
245
253
except Exception as e :
246
254
exception = e
247
255
env .assertEqual (type (exception ), redis .exceptions .ResponseError )
248
- env .assertEqual ("wrong number of arguments for 'AI.MODELGET' command" , exception .__str__ ())
256
+ env .assertEqual (
257
+ "wrong number of arguments for 'AI.MODELGET' command" , exception .__str__ ())
249
258
250
259
# ERR WRONGTYPE
251
260
con .execute_command ('SET' , 'NOT_MODEL' , 'BAR' )
@@ -254,7 +263,8 @@ def test_run_tf_model_errors(env):
254
263
except Exception as e :
255
264
exception = e
256
265
env .assertEqual (type (exception ), redis .exceptions .ResponseError )
257
- env .assertEqual ("WRONGTYPE Operation against a key holding the wrong kind of value" , exception .__str__ ())
266
+ env .assertEqual (
267
+ "WRONGTYPE Operation against a key holding the wrong kind of value" , exception .__str__ ())
258
268
# cleanup
259
269
con .execute_command ('DEL' , 'NOT_MODEL' )
260
270
@@ -365,6 +375,7 @@ def test_run_tf_model_errors(env):
365
375
env .assertEqual (type (exception ), redis .exceptions .ResponseError )
366
376
367
377
378
+ @skip_if_no_TF
368
379
def test_run_tf_model_autobatch (env ):
369
380
if not TEST_PT :
370
381
return
@@ -382,17 +393,22 @@ def test_run_tf_model_autobatch(env):
382
393
'INPUTS' , 'a' , 'b' , 'OUTPUTS' , 'mul' , model_pb )
383
394
env .assertEqual (ret , b'OK' )
384
395
385
- con .execute_command ('AI.TENSORSET' , 'a' , 'FLOAT' , 2 , 2 , 'VALUES' , 2 , 3 , 2 , 3 )
386
- con .execute_command ('AI.TENSORSET' , 'b' , 'FLOAT' , 2 , 2 , 'VALUES' , 2 , 3 , 2 , 3 )
396
+ con .execute_command ('AI.TENSORSET' , 'a' , 'FLOAT' ,
397
+ 2 , 2 , 'VALUES' , 2 , 3 , 2 , 3 )
398
+ con .execute_command ('AI.TENSORSET' , 'b' , 'FLOAT' ,
399
+ 2 , 2 , 'VALUES' , 2 , 3 , 2 , 3 )
387
400
388
- con .execute_command ('AI.TENSORSET' , 'd' , 'FLOAT' , 2 , 2 , 'VALUES' , 2 , 3 , 2 , 3 )
389
- con .execute_command ('AI.TENSORSET' , 'e' , 'FLOAT' , 2 , 2 , 'VALUES' , 2 , 3 , 2 , 3 )
401
+ con .execute_command ('AI.TENSORSET' , 'd' , 'FLOAT' ,
402
+ 2 , 2 , 'VALUES' , 2 , 3 , 2 , 3 )
403
+ con .execute_command ('AI.TENSORSET' , 'e' , 'FLOAT' ,
404
+ 2 , 2 , 'VALUES' , 2 , 3 , 2 , 3 )
390
405
391
406
ensureSlaveSynced (con , env )
392
407
393
408
def run ():
394
409
con = env .getConnection ()
395
- con .execute_command ('AI.MODELRUN' , 'm' , 'INPUTS' , 'd' , 'e' , 'OUTPUTS' , 'f' )
410
+ con .execute_command ('AI.MODELRUN' , 'm' , 'INPUTS' ,
411
+ 'd' , 'e' , 'OUTPUTS' , 'f' )
396
412
ensureSlaveSynced (con , env )
397
413
398
414
t = threading .Thread (target = run )
@@ -411,11 +427,8 @@ def run():
411
427
env .assertEqual (values , [b'4' , b'9' , b'4' , b'9' ])
412
428
413
429
430
+ @skip_if_no_TF
414
431
def test_tensorflow_modelinfo (env ):
415
- if not TEST_TF :
416
- env .debugPrint ("skipping {} since TEST_TF=0" .format (sys ._getframe ().f_code .co_name ), force = True )
417
- return
418
-
419
432
con = env .getConnection ()
420
433
421
434
test_data_path = os .path .join (os .path .dirname (__file__ ), 'test_data' )
@@ -428,17 +441,20 @@ def test_tensorflow_modelinfo(env):
428
441
'INPUTS' , 'a' , 'b' , 'OUTPUTS' , 'mul' , model_pb )
429
442
env .assertEqual (ret , b'OK' )
430
443
431
- ret = con .execute_command ('AI.TENSORSET' , 'a' , 'FLOAT' , 2 , 2 , 'VALUES' , 2 , 3 , 2 , 3 )
444
+ ret = con .execute_command (
445
+ 'AI.TENSORSET' , 'a' , 'FLOAT' , 2 , 2 , 'VALUES' , 2 , 3 , 2 , 3 )
432
446
env .assertEqual (ret , b'OK' )
433
447
434
- ret = con .execute_command ('AI.TENSORSET' , 'b' , 'FLOAT' , 2 , 2 , 'VALUES' , 2 , 3 , 2 , 3 )
448
+ ret = con .execute_command (
449
+ 'AI.TENSORSET' , 'b' , 'FLOAT' , 2 , 2 , 'VALUES' , 2 , 3 , 2 , 3 )
435
450
env .assertEqual (ret , b'OK' )
436
451
437
452
ensureSlaveSynced (con , env )
438
453
439
454
previous_duration = 0
440
455
for call in range (1 , 10 ):
441
- ret = con .execute_command ('AI.MODELRUN' , 'm' , 'INPUTS' , 'a' , 'b' , 'OUTPUTS' , 'c' )
456
+ ret = con .execute_command (
457
+ 'AI.MODELRUN' , 'm' , 'INPUTS' , 'a' , 'b' , 'OUTPUTS' , 'c' )
442
458
env .assertEqual (ret , b'OK' )
443
459
ensureSlaveSynced (con , env )
444
460
@@ -466,11 +482,8 @@ def test_tensorflow_modelinfo(env):
466
482
env .assertEqual (info_dict_0 ['ERRORS' ], 0 )
467
483
468
484
485
+ @skip_if_no_TF
469
486
def test_tensorflow_modelrun_disconnect (env ):
470
- if not TEST_TF :
471
- env .debugPrint ("skipping {} since TEST_TF=0" .format (sys ._getframe ().f_code .co_name ), force = True )
472
- return
473
-
474
487
red = env .getConnection ()
475
488
476
489
test_data_path = os .path .join (os .path .dirname (__file__ ), 'test_data' )
@@ -483,13 +496,73 @@ def test_tensorflow_modelrun_disconnect(env):
483
496
'INPUTS' , 'a' , 'b' , 'OUTPUTS' , 'mul' , model_pb )
484
497
env .assertEqual (ret , b'OK' )
485
498
486
- ret = red .execute_command ('AI.TENSORSET' , 'a' , 'FLOAT' , 2 , 2 , 'VALUES' , 2 , 3 , 2 , 3 )
499
+ ret = red .execute_command (
500
+ 'AI.TENSORSET' , 'a' , 'FLOAT' , 2 , 2 , 'VALUES' , 2 , 3 , 2 , 3 )
487
501
env .assertEqual (ret , b'OK' )
488
502
489
- ret = red .execute_command ('AI.TENSORSET' , 'b' , 'FLOAT' , 2 , 2 , 'VALUES' , 2 , 3 , 2 , 3 )
503
+ ret = red .execute_command (
504
+ 'AI.TENSORSET' , 'b' , 'FLOAT' , 2 , 2 , 'VALUES' , 2 , 3 , 2 , 3 )
490
505
env .assertEqual (ret , b'OK' )
491
506
492
507
ensureSlaveSynced (red , env )
493
508
494
- ret = send_and_disconnect (('AI.MODELRUN' , 'm' , 'INPUTS' , 'a' , 'b' , 'OUTPUTS' , 'c' ), red )
509
+ ret = send_and_disconnect (
510
+ ('AI.MODELRUN' , 'm' , 'INPUTS' , 'a' , 'b' , 'OUTPUTS' , 'c' ), red )
495
511
env .assertEqual (ret , None )
512
+
513
+
514
+ @skip_if_no_TF
515
+ def test_with_batch_and_minbatch (env ):
516
+ con = env .getConnection ()
517
+ batch_size = 2
518
+ minbatch_size = 2
519
+ model_name = 'model'
520
+ another_model_name = 'another_model'
521
+ inputvar = 'input'
522
+ outputvar = 'MobilenetV2/Predictions/Reshape_1'
523
+
524
+ model_pb , labels , img = load_mobilenet_test_data ()
525
+
526
+ con .execute_command ('AI.MODELSET' , model_name , 'TF' , DEVICE ,
527
+ 'BATCHSIZE' , batch_size , 'MINBATCHSIZE' , minbatch_size ,
528
+ 'INPUTS' , inputvar ,
529
+ 'OUTPUTS' , outputvar ,
530
+ model_pb )
531
+ con .execute_command ('AI.TENSORSET' , 'input' ,
532
+ 'FLOAT' , 1 , img .shape [1 ], img .shape [0 ], img .shape [2 ],
533
+ 'BLOB' , img .tobytes ())
534
+
535
+ def run (name = model_name , output_name = 'output' ):
536
+ con .execute_command ('AI.MODELRUN' , name ,
537
+ 'INPUTS' , 'input' , 'OUTPUTS' , output_name )
538
+
539
+ # Running thrice since minbatchsize = 2
540
+ t1 = threading .Thread (target = run )
541
+ t1 .start ()
542
+ t2 = threading .Thread (target = run )
543
+ t2 .start ()
544
+ t3 = threading .Thread (target = run )
545
+ t3 .start ()
546
+ t2 .join ()
547
+ # This is where the problem. If we set the model again (Note that the new
548
+ # MODELSET can set the model on a new key or on the same key), then the
549
+ # subsequent requests fails.
550
+ con .execute_command ('AI.MODELSET' , another_model_name , 'TF' , DEVICE ,
551
+ 'BATCHSIZE' , batch_size , 'MINBATCHSIZE' , minbatch_size ,
552
+ 'INPUTS' , inputvar ,
553
+ 'OUTPUTS' , outputvar ,
554
+ model_pb )
555
+
556
+ t1 = threading .Thread (target = run , args = (another_model_name , 'final1' ))
557
+ t1 .start ()
558
+ t2 = threading .Thread (target = run , args = (another_model_name , 'final2' ))
559
+ t2 .start ()
560
+ time .sleep (3 )
561
+ dtype , shape , data = con .execute_command ('AI.TENSORGET' , 'final1' , 'BLOB' )
562
+ dtype_map = {b'FLOAT' : np .float32 }
563
+ tensor = np .frombuffer (data , dtype = dtype_map [dtype ]).reshape (shape )
564
+ label_id = np .argmax (tensor ) - 1
565
+
566
+ _ , label = labels [str (label_id )]
567
+
568
+ env .assertEqual (label , 'giant_panda' )
0 commit comments