1- import time
21import unittest
32
43from parameterized import parameterized
54from crate .client import connect
65import random
76from random import sample
87
9- from crate .qa .tests import NodeProvider , insert_data , UpgradePath
8+ from crate .qa .tests import NodeProvider , insert_data , UpgradePath , assert_busy
109
11- UPGRADE_42_TO_43 = ('4.2.x to 4.3.x' , UpgradePath ('4.2.x' , '4.3.x' ), 3 ,)
12- UPGRADE_43_TO_LATEST = ('4.3.x to latest-nightly' , UpgradePath ('4.3.x' , 'latest-nightly' ), 3 ,)
10+ NUMBER_OF_NODES = random .randint (3 , 5 )
11+ UPGRADE_42_TO_43 = ('4.2.x to 4.3.x' , UpgradePath ('4.2.x' , '4.3.x' ), NUMBER_OF_NODES ,)
12+ UPGRADE_43_TO_LATEST = ('4.3.x to latest-nightly' , UpgradePath ('4.3.x' , 'latest-nightly' ), NUMBER_OF_NODES ,)
1313
1414
1515class RecoveryTest (NodeProvider , unittest .TestCase ):
16-
1716 """
1817 In depth testing of the recovery mechanism during a rolling restart.
1918 Based on org.elasticsearch.upgrades.RecoveryIT.java
2019 """
20+
2121 def _assert_num_docs_by_node_id (self , conn , schema , table_name , node_id , expected_count ):
2222 c = conn .cursor ()
2323 c .execute ('''select num_docs from sys.shards where schema_name = ? and table_name = ? and node['id'] = ?''' ,
2424 (schema , table_name , node_id ))
2525 number_of_docs = c .fetchone ()
26- self .assertEqual (number_of_docs [0 ], expected_count )
26+ self .assertTrue (number_of_docs )
27+ self .assertEqual (expected_count , number_of_docs [0 ])
2728
2829 def _assert_is_green (self , conn , schema , table_name ):
2930 c = conn .cursor ()
@@ -79,8 +80,7 @@ def test_recovery_with_concurrent_indexing(self, name, path, nodes):
7980 # insert data into the initial homogeneous cluster
8081 insert_data (conn , 'doc' , 'test' , 10 )
8182
82- time .sleep (3 )
83- self ._assert_is_green (conn , 'doc' , 'test' )
83+ assert_busy (lambda : self ._assert_is_green (conn , 'doc' , 'test' ))
8484 # make sure that we can index while the replicas are recovering
8585 c .execute ('''alter table doc.test set ("routing.allocation.enable"='primaries')''' )
8686
@@ -98,9 +98,9 @@ def test_recovery_with_concurrent_indexing(self, name, path, nodes):
9898 node_ids = c .fetchall ()
9999 self .assertEqual (len (node_ids ), nodes )
100100
101- time . sleep ( 3 )
101+ assert_busy ( lambda : self . _assert_is_green ( conn , 'doc' , 'test' ) )
102102 for node_id in node_ids :
103- self ._assert_num_docs_by_node_id (conn , 'doc' , 'test' , node_id [0 ], 60 )
103+ assert_busy ( lambda : self ._assert_num_docs_by_node_id (conn , 'doc' , 'test' , node_id [0 ], 60 ) )
104104
105105 c .execute ('''alter table doc.test set ("routing.allocation.enable"='primaries')''' )
106106 # upgrade the full cluster
@@ -117,9 +117,8 @@ def test_recovery_with_concurrent_indexing(self, name, path, nodes):
117117 node_ids = c .fetchall ()
118118 self .assertEqual (len (node_ids ), nodes )
119119
120- time .sleep (3 )
121120 for node_id in node_ids :
122- self ._assert_num_docs_by_node_id (conn , 'doc' , 'test' , node_id [0 ], 105 )
121+ assert_busy ( lambda : self ._assert_num_docs_by_node_id (conn , 'doc' , 'test' , node_id [0 ], 105 ) )
123122
124123 @parameterized .expand ([UPGRADE_42_TO_43 , UPGRADE_43_TO_LATEST ])
125124 def test_relocation_with_concurrent_indexing (self , name , path , nodes ):
@@ -135,8 +134,7 @@ def test_relocation_with_concurrent_indexing(self, name, path, nodes):
135134
136135 insert_data (conn , 'doc' , 'test' , 10 )
137136
138- time .sleep (3 )
139- self ._assert_is_green (conn , 'doc' , 'test' )
137+ assert_busy (lambda : self ._assert_is_green (conn , 'doc' , 'test' ))
140138 # make sure that no shards are allocated, so we can make sure the primary stays
141139 # on the old node (when one node stops, we lose the master too, so a replica
142140 # will not be promoted)
@@ -157,18 +155,16 @@ def test_relocation_with_concurrent_indexing(self, name, path, nodes):
157155 "routing.allocation.include._id"=?
158156 )''' , (old_node_id , ))
159157
160- self ._assert_is_green (conn , 'doc' , 'test' )
158+ assert_busy ( lambda : self ._assert_is_green (conn , 'doc' , 'test' ) )
161159
162160 c .execute ('''alter table doc.test set ("routing.allocation.include._id"=?)''' , (new_node_id , ))
163161 insert_data (conn , 'doc' , 'test' , 50 )
164162
165163 # ensure the relocation from old node to new node has occurred; otherwise the table is green
166164 # even though shards haven't moved to the new node yet (allocation was throttled).
167- time .sleep (3 )
168- c .execute ('select current_state from sys.allocations where node_id =?' , (new_node_id ,))
169- current_state = c .fetchone ()[0 ]
170- self .assertEqual (current_state , 'STARTED' )
171- self ._assert_is_green (conn , 'doc' , 'test' )
165+ assert_busy (lambda : self ._assert_shard_state (conn , 'doc' , 'test' , new_node_id , 'STARTED' ))
166+ assert_busy (lambda : self ._assert_is_green (conn , 'doc' , 'test' ))
167+
172168 c .execute ('refresh table doc.test' )
173169 self ._assert_num_docs_by_node_id (conn , 'doc' , 'test' , new_node_id , 60 )
174170
@@ -180,17 +176,23 @@ def test_relocation_with_concurrent_indexing(self, name, path, nodes):
180176
181177 insert_data (conn , 'doc' , 'test' , 45 )
182178
183- time .sleep (3 )
184- self ._assert_is_green (conn , 'doc' , 'test' )
179+ assert_busy (lambda : self ._assert_is_green (conn , 'doc' , 'test' ))
185180 c .execute ('refresh table doc.test' )
186- time .sleep (5 )
187181 c .execute ('select id from sys.nodes' )
188182 node_ids = c .fetchall ()
189183 self .assertEqual (len (node_ids ), nodes )
190184
191185 for node_id in node_ids :
192186 self ._assert_num_docs_by_node_id (conn , 'doc' , 'test' , node_id [0 ], 105 )
193187
188+ def _assert_shard_state (self , conn , schema , table_name , node_id , state ):
189+ c = conn .cursor ()
190+ c .execute ('select current_state from sys.allocations where node_id =? and table_name = ? and table_schema = ?' ,
191+ (node_id , table_name , schema ))
192+ current_state = c .fetchone ()
193+ self .assertTrue (current_state )
194+ self .assertEqual (current_state [0 ], state )
195+
194196 @parameterized .expand ([UPGRADE_42_TO_43 , UPGRADE_43_TO_LATEST ])
195197 def test_recovery (self , name , path , nodes ):
196198 """
@@ -218,16 +220,15 @@ def test_recovery(self, name, path, nodes):
218220 # upgrade to mixed cluster
219221 self ._upgrade_cluster (cluster , path .to_version , random .randint (1 , nodes - 1 ))
220222
221- time .sleep (5 )
222- self ._assert_is_green (conn , 'doc' , 'test' )
223+ assert_busy (lambda : self ._assert_is_green (conn , 'doc' , 'test' ))
223224
224225 # upgrade fully to the new version
225226 self ._upgrade_cluster (cluster , path .to_version , nodes )
226227
227228 if random .choice ([True , False ]):
228229 c .execute ("refresh table doc.test" )
229230
230- self ._assert_is_green (conn , 'doc' , 'test' )
231+ assert_busy ( lambda : self ._assert_is_green (conn , 'doc' , 'test' ) )
231232
232233 @parameterized .expand ([UPGRADE_42_TO_43 , UPGRADE_43_TO_LATEST ])
233234 def test_recovery_closed_index (self , name , path , nodes ):
@@ -246,8 +247,7 @@ def test_recovery_closed_index(self, name, path, nodes):
246247 "unassigned.node_left.delayed_timeout" = '100ms', "allocation.max_retries" = '0')
247248 ''' )
248249
249- time .sleep (3 )
250- self ._assert_is_green (conn , 'doc' , 'test' )
250+ assert_busy (lambda : self ._assert_is_green (conn , 'doc' , 'test' ))
251251
252252 c .execute ('alter table doc.test close' )
253253
@@ -291,7 +291,7 @@ def test_closed_index_during_rolling_upgrade(self, name, path, nodes):
291291 create table doc.mixed_cluster(x int) clustered into 1 shards with( number_of_replicas = 0)
292292 ''' )
293293
294- self ._assert_is_green (conn , 'doc' , 'mixed_cluster' )
294+ assert_busy ( lambda : self ._assert_is_green (conn , 'doc' , 'mixed_cluster' ) )
295295 c .execute ('alter table doc.mixed_cluster close' )
296296
297297 self ._assert_is_closed (conn , 'doc' , 'mixed_cluster' )
@@ -306,7 +306,7 @@ def test_closed_index_during_rolling_upgrade(self, name, path, nodes):
306306 create table doc.upgraded_cluster(x int) clustered into 1 shards with( number_of_replicas = 0)
307307 ''' )
308308
309- self ._assert_is_green (conn , 'doc' , 'upgraded_cluster' )
309+ assert_busy ( lambda : self ._assert_is_green (conn , 'doc' , 'upgraded_cluster' ) )
310310 c .execute ('alter table doc.upgraded_cluster close' )
311311
312312 self ._assert_is_closed (conn , 'doc' , 'upgraded_cluster' )
@@ -335,8 +335,7 @@ def test_update_docs(self, name, path, nodes):
335335 self ._upgrade_cluster (cluster , path .to_version , random .randint (1 , nodes - 1 ))
336336
337337 if random .choice ([True , False ]):
338- time .sleep (5 )
339- self ._assert_is_green (conn , 'doc' , 'test' )
338+ assert_busy (lambda : self ._assert_is_green (conn , 'doc' , 'test' ))
340339
341340 # update the data in a mixed cluster
342341 updates = [(i , str (random .randint )) for i in range (0 , 100 )]
@@ -382,8 +381,7 @@ def test_operation_based_recovery(self, name, path, nodes):
382381 "soft_deletes.enabled" = true)
383382 ''' )
384383
385- time .sleep (3 )
386- self ._assert_is_green (conn , 'doc' , 'test' )
384+ assert_busy (lambda : self ._assert_is_green (conn , 'doc' , 'test' ))
387385
388386 insert_data (conn , 'doc' , 'test' , random .randint (100 , 200 ))
389387 c .execute ('refresh table doc.test' )
@@ -396,8 +394,7 @@ def test_operation_based_recovery(self, name, path, nodes):
396394 # upgrade to mixed cluster
397395 self ._upgrade_cluster (cluster , path .to_version , random .randint (1 , nodes - 1 ))
398396
399- time .sleep (3 )
400- self ._assert_is_green (conn , 'doc' , 'test' )
397+ assert_busy (lambda : self ._assert_is_green (conn , 'doc' , 'test' ))
401398
402399 num_docs = random .randint (0 , 3 )
403400 if num_docs > 0 :
@@ -407,8 +404,7 @@ def test_operation_based_recovery(self, name, path, nodes):
407404 # upgrade fully to the new version
408405 self ._upgrade_cluster (cluster , path .to_version , nodes )
409406
410- time .sleep (3 )
411- self ._assert_is_green (conn , 'doc' , 'test' )
407+ assert_busy (lambda : self ._assert_is_green (conn , 'doc' , 'test' ))
412408
413409 num_docs = random .randint (0 , 3 )
414410 if num_docs > 0 :
@@ -434,8 +430,7 @@ def test_turnoff_translog_retention_after_upgraded(self, name, path, nodes):
434430 "soft_deletes.enabled" = true)
435431 ''' , (number_of_replicas , ))
436432
437- time .sleep (3 )
438- self ._assert_is_green (conn , 'doc' , 'test' )
433+ assert_busy (lambda : self ._assert_is_green (conn , 'doc' , 'test' ))
439434
440435 insert_data (conn , 'doc' , 'test' , random .randint (100 , 200 ))
441436 c .execute ('refresh table doc.test' )
@@ -447,8 +442,7 @@ def test_turnoff_translog_retention_after_upgraded(self, name, path, nodes):
447442 # update the cluster to the new version
448443 self ._upgrade_cluster (cluster , path .to_version , nodes )
449444
450- time .sleep (3 )
451- self ._assert_is_green (conn , 'doc' , 'test' )
445+ assert_busy (lambda : self ._assert_is_green (conn , 'doc' , 'test' ))
452446 c .execute ('refresh table doc.test' )
453447 self ._assert_translog_is_empty (conn , 'doc' , 'test' )
454448
0 commit comments