1- import time
21import unittest
32
43from parameterized import parameterized
54from crate .client import connect
65import random
76from random import sample
87
9- from crate .qa .tests import NodeProvider , insert_data , UpgradePath
8+ from crate .qa .tests import NodeProvider , insert_data , UpgradePath , assert_busy
109
11- UPGRADE_42_TO_43 = ('4.2.x to 4.3.x' , UpgradePath ('4.2.x' , '4.3.x' ), 3 ,)
12- UPGRADE_43_TO_LATEST = ('4.3.x to latest-nightly' , UpgradePath ('4.3.x' , 'latest-nightly' ), 3 ,)
10+ NUMBER_OF_NODES = 3
11+ UPGRADE_42_TO_43 = ('4.2.x to 4.3.x' , UpgradePath ('4.2.x' , '4.3.x' ), NUMBER_OF_NODES ,)
12+ UPGRADE_43_TO_LATEST = ('4.3.x to latest-nightly' , UpgradePath ('4.3.x' , 'latest-nightly' ), NUMBER_OF_NODES ,)
1313
1414
1515class RecoveryTest (NodeProvider , unittest .TestCase ):
16-
1716 """
1817 In depth testing of the recovery mechanism during a rolling restart.
1918 Based on org.elasticsearch.upgrades.RecoveryIT.java
2019 """
20+
2121 def _assert_num_docs_by_node_id (self , conn , schema , table_name , node_id , expected_count ):
2222 c = conn .cursor ()
2323 c .execute ('''select num_docs from sys.shards where schema_name = ? and table_name = ? and node['id'] = ?''' ,
2424 (schema , table_name , node_id ))
2525 number_of_docs = c .fetchone ()
26- self .assertEqual (number_of_docs [0 ], expected_count )
26+ self .assertTrue (number_of_docs )
27+ self .assertEqual (expected_count , number_of_docs [0 ])
2728
2829 def _assert_is_green (self , conn , schema , table_name ):
2930 c = conn .cursor ()
@@ -79,8 +80,7 @@ def test_recovery_with_concurrent_indexing(self, name, path, nodes):
7980 # insert data into the initial homogeneous cluster
8081 insert_data (conn , 'doc' , 'test' , 10 )
8182
82- time .sleep (3 )
83- self ._assert_is_green (conn , 'doc' , 'test' )
83+ assert_busy (lambda : self ._assert_is_green (conn , 'doc' , 'test' ))
8484 # make sure that we can index while the replicas are recovering
8585 c .execute ('''alter table doc.test set ("routing.allocation.enable"='primaries')''' )
8686
@@ -98,9 +98,9 @@ def test_recovery_with_concurrent_indexing(self, name, path, nodes):
9898 node_ids = c .fetchall ()
9999 self .assertEqual (len (node_ids ), nodes )
100100
101- time . sleep ( 3 )
101+ assert_busy ( lambda : self . _assert_is_green ( conn , 'doc' , 'test' ) )
102102 for node_id in node_ids :
103- self ._assert_num_docs_by_node_id (conn , 'doc' , 'test' , node_id [0 ], 60 )
103+ assert_busy ( lambda : self ._assert_num_docs_by_node_id (conn , 'doc' , 'test' , node_id [0 ], 60 ) )
104104
105105 c .execute ('''alter table doc.test set ("routing.allocation.enable"='primaries')''' )
106106 # upgrade the full cluster
@@ -117,12 +117,12 @@ def test_recovery_with_concurrent_indexing(self, name, path, nodes):
117117 node_ids = c .fetchall ()
118118 self .assertEqual (len (node_ids ), nodes )
119119
120- time .sleep (3 )
121120 for node_id in node_ids :
122- self ._assert_num_docs_by_node_id (conn , 'doc' , 'test' , node_id [0 ], 105 )
121+ assert_busy ( lambda : self ._assert_num_docs_by_node_id (conn , 'doc' , 'test' , node_id [0 ], 105 ) )
123122
124123 @parameterized .expand ([UPGRADE_42_TO_43 , UPGRADE_43_TO_LATEST ])
125124 def test_relocation_with_concurrent_indexing (self , name , path , nodes ):
125+ path = UpgradePath ('4.2.x' , '4.3.x' )
126126 cluster = self ._new_cluster (path .from_version , nodes )
127127 cluster .start ()
128128
@@ -135,8 +135,7 @@ def test_relocation_with_concurrent_indexing(self, name, path, nodes):
135135
136136 insert_data (conn , 'doc' , 'test' , 10 )
137137
138- time .sleep (3 )
139- self ._assert_is_green (conn , 'doc' , 'test' )
138+ assert_busy (lambda :self ._assert_is_green (conn , 'doc' , 'test' ))
140139 # make sure that no shards are allocated, so we can make sure the primary stays
141140 # on the old node (when one node stops, we lose the master too, so a replica
142141 # will not be promoted)
@@ -157,18 +156,16 @@ def test_relocation_with_concurrent_indexing(self, name, path, nodes):
157156 "routing.allocation.include._id"=?
158157 )''' , (old_node_id , ))
159158
160- self ._assert_is_green (conn , 'doc' , 'test' )
159+ assert_busy ( lambda : self ._assert_is_green (conn , 'doc' , 'test' ) )
161160
162161 c .execute ('''alter table doc.test set ("routing.allocation.include._id"=?)''' , (new_node_id , ))
163162 insert_data (conn , 'doc' , 'test' , 50 )
164163
165164 # ensure the relocation from old node to new node has occurred; otherwise the table is green
166165 # even though shards haven't moved to the new node yet (allocation was throttled).
167- time .sleep (3 )
168- c .execute ('select current_state from sys.allocations where node_id =?' , (new_node_id ,))
169- current_state = c .fetchone ()[0 ]
170- self .assertEqual (current_state , 'STARTED' )
171- self ._assert_is_green (conn , 'doc' , 'test' )
166+ assert_busy (lambda : self ._assert_shard_state (conn , 'doc' , 'test' , new_node_id , 'STARTED' ))
167+ assert_busy (lambda : self ._assert_is_green (conn , 'doc' , 'test' ))
168+
172169 c .execute ('refresh table doc.test' )
173170 self ._assert_num_docs_by_node_id (conn , 'doc' , 'test' , new_node_id , 60 )
174171
@@ -180,17 +177,23 @@ def test_relocation_with_concurrent_indexing(self, name, path, nodes):
180177
181178 insert_data (conn , 'doc' , 'test' , 45 )
182179
183- time .sleep (3 )
184- self ._assert_is_green (conn , 'doc' , 'test' )
180+ assert_busy (lambda : self ._assert_is_green (conn , 'doc' , 'test' ))
185181 c .execute ('refresh table doc.test' )
186- time .sleep (5 )
187182 c .execute ('select id from sys.nodes' )
188183 node_ids = c .fetchall ()
189184 self .assertEqual (len (node_ids ), nodes )
190185
191186 for node_id in node_ids :
192187 self ._assert_num_docs_by_node_id (conn , 'doc' , 'test' , node_id [0 ], 105 )
193188
189+ def _assert_shard_state (self , conn , schema , table_name , node_id , state ):
190+ c = conn .cursor ()
191+ c .execute ('select current_state from sys.allocations where node_id =? and table_name = ? and table_schema = ?' ,
192+ (node_id , table_name , schema ))
193+ current_state = c .fetchone ()
194+ self .assertTrue (current_state )
195+ self .assertEqual (current_state [0 ], state )
196+
194197 @parameterized .expand ([UPGRADE_42_TO_43 , UPGRADE_43_TO_LATEST ])
195198 def test_recovery (self , name , path , nodes ):
196199 """
@@ -218,16 +221,15 @@ def test_recovery(self, name, path, nodes):
218221 # upgrade to mixed cluster
219222 self ._upgrade_cluster (cluster , path .to_version , random .randint (1 , nodes - 1 ))
220223
221- time .sleep (5 )
222- self ._assert_is_green (conn , 'doc' , 'test' )
224+ assert_busy (lambda :self ._assert_is_green (conn , 'doc' , 'test' ))
223225
224226 # upgrade fully to the new version
225227 self ._upgrade_cluster (cluster , path .to_version , nodes )
226228
227229 if random .choice ([True , False ]):
228230 c .execute ("refresh table doc.test" )
229231
230- self ._assert_is_green (conn , 'doc' , 'test' )
232+ assert_busy ( lambda : self ._assert_is_green (conn , 'doc' , 'test' ) )
231233
232234 @parameterized .expand ([UPGRADE_42_TO_43 , UPGRADE_43_TO_LATEST ])
233235 def test_recovery_closed_index (self , name , path , nodes ):
@@ -246,8 +248,7 @@ def test_recovery_closed_index(self, name, path, nodes):
246248 "unassigned.node_left.delayed_timeout" = '100ms', "allocation.max_retries" = '0')
247249 ''' )
248250
249- time .sleep (3 )
250- self ._assert_is_green (conn , 'doc' , 'test' )
251+ assert_busy (lambda :self ._assert_is_green (conn , 'doc' , 'test' ))
251252
252253 c .execute ('alter table doc.test close' )
253254
@@ -291,7 +292,7 @@ def test_closed_index_during_rolling_upgrade(self, name, path, nodes):
291292 create table doc.mixed_cluster(x int) clustered into 1 shards with( number_of_replicas = 0)
292293 ''' )
293294
294- self ._assert_is_green (conn , 'doc' , 'mixed_cluster' )
295+ assert_busy ( lambda : self ._assert_is_green (conn , 'doc' , 'mixed_cluster' ) )
295296 c .execute ('alter table doc.mixed_cluster close' )
296297
297298 self ._assert_is_closed (conn , 'doc' , 'mixed_cluster' )
@@ -306,7 +307,7 @@ def test_closed_index_during_rolling_upgrade(self, name, path, nodes):
306307 create table doc.upgraded_cluster(x int) clustered into 1 shards with( number_of_replicas = 0)
307308 ''' )
308309
309- self ._assert_is_green (conn , 'doc' , 'upgraded_cluster' )
310+ assert_busy ( lambda : self ._assert_is_green (conn , 'doc' , 'upgraded_cluster' ) )
310311 c .execute ('alter table doc.upgraded_cluster close' )
311312
312313 self ._assert_is_closed (conn , 'doc' , 'upgraded_cluster' )
@@ -335,8 +336,7 @@ def test_update_docs(self, name, path, nodes):
335336 self ._upgrade_cluster (cluster , path .to_version , random .randint (1 , nodes - 1 ))
336337
337338 if random .choice ([True , False ]):
338- time .sleep (5 )
339- self ._assert_is_green (conn , 'doc' , 'test' )
339+ assert_busy (lambda : self ._assert_is_green (conn , 'doc' , 'test' ))
340340
341341 # update the data in a mixed cluster
342342 updates = [(i , str (random .randint )) for i in range (0 , 100 )]
@@ -382,8 +382,7 @@ def test_operation_based_recovery(self, name, path, nodes):
382382 "soft_deletes.enabled" = true)
383383 ''' )
384384
385- time .sleep (3 )
386- self ._assert_is_green (conn , 'doc' , 'test' )
385+ assert_busy (lambda : self ._assert_is_green (conn , 'doc' , 'test' ))
387386
388387 insert_data (conn , 'doc' , 'test' , random .randint (100 , 200 ))
389388 c .execute ('refresh table doc.test' )
@@ -396,8 +395,7 @@ def test_operation_based_recovery(self, name, path, nodes):
396395 # upgrade to mixed cluster
397396 self ._upgrade_cluster (cluster , path .to_version , random .randint (1 , nodes - 1 ))
398397
399- time .sleep (3 )
400- self ._assert_is_green (conn , 'doc' , 'test' )
398+ assert_busy (lambda : self ._assert_is_green (conn , 'doc' , 'test' ))
401399
402400 num_docs = random .randint (0 , 3 )
403401 if num_docs > 0 :
@@ -407,8 +405,7 @@ def test_operation_based_recovery(self, name, path, nodes):
407405 # upgrade fully to the new version
408406 self ._upgrade_cluster (cluster , path .to_version , nodes )
409407
410- time .sleep (3 )
411- self ._assert_is_green (conn , 'doc' , 'test' )
408+ assert_busy (lambda : self ._assert_is_green (conn , 'doc' , 'test' ))
412409
413410 num_docs = random .randint (0 , 3 )
414411 if num_docs > 0 :
@@ -434,8 +431,7 @@ def test_turnoff_translog_retention_after_upgraded(self, name, path, nodes):
434431 "soft_deletes.enabled" = true)
435432 ''' , (number_of_replicas , ))
436433
437- time .sleep (3 )
438- self ._assert_is_green (conn , 'doc' , 'test' )
434+ assert_busy (lambda : self ._assert_is_green (conn , 'doc' , 'test' ))
439435
440436 insert_data (conn , 'doc' , 'test' , random .randint (100 , 200 ))
441437 c .execute ('refresh table doc.test' )
@@ -447,8 +443,7 @@ def test_turnoff_translog_retention_after_upgraded(self, name, path, nodes):
447443 # update the cluster to the new version
448444 self ._upgrade_cluster (cluster , path .to_version , nodes )
449445
450- time .sleep (3 )
451- self ._assert_is_green (conn , 'doc' , 'test' )
446+ assert_busy (lambda : self ._assert_is_green (conn , 'doc' , 'test' ))
452447 c .execute ('refresh table doc.test' )
453448 self ._assert_translog_is_empty (conn , 'doc' , 'test' )
454449
0 commit comments