1- import time
21import unittest
32
43from parameterized import parameterized
54from crate .client import connect
65import random
76from random import sample
87
9- from crate .qa .tests import NodeProvider , insert_data , UpgradePath
8+ from crate .qa .tests import NodeProvider , insert_data , UpgradePath , assert_busy
109
11- UPGRADE_42_TO_43 = ('4.2.x to 4.3.x' , UpgradePath ('4.2.x' , '4.3.x' ), 3 ,)
12- UPGRADE_43_TO_LATEST = ('4.3.x to latest-nightly' , UpgradePath ('4.3.x' , 'latest-nightly' ), 3 ,)
10+ NUMBER_OF_NODES = 3
11+ UPGRADE_42_TO_43 = ('4.2.x to 4.3.x' , UpgradePath ('4.2.x' , '4.3.x' ), NUMBER_OF_NODES ,)
12+ UPGRADE_43_TO_LATEST = ('4.3.x to latest-nightly' , UpgradePath ('4.3.x' , 'latest-nightly' ), NUMBER_OF_NODES ,)
1313
1414
1515class RecoveryTest (NodeProvider , unittest .TestCase ):
16-
1716 """
1817 In depth testing of the recovery mechanism during a rolling restart.
1918 Based on org.elasticsearch.upgrades.RecoveryIT.java
2019 """
20+
2121 def _assert_num_docs_by_node_id (self , conn , schema , table_name , node_id , expected_count ):
2222 c = conn .cursor ()
2323 c .execute ('''select num_docs from sys.shards where schema_name = ? and table_name = ? and node['id'] = ?''' ,
2424 (schema , table_name , node_id ))
2525 number_of_docs = c .fetchone ()
26- self .assertEqual (number_of_docs [0 ], expected_count )
26+ self .assertTrue (number_of_docs )
27+ self .assertEqual (expected_count , number_of_docs [0 ])
2728
2829 def _assert_is_green (self , conn , schema , table_name ):
2930 c = conn .cursor ()
@@ -79,8 +80,7 @@ def test_recovery_with_concurrent_indexing(self, name, path, nodes):
7980 # insert data into the initial homogeneous cluster
8081 insert_data (conn , 'doc' , 'test' , 10 )
8182
82- time .sleep (3 )
83- self ._assert_is_green (conn , 'doc' , 'test' )
83+ assert_busy (lambda : self ._assert_is_green (conn , 'doc' , 'test' ))
8484 # make sure that we can index while the replicas are recovering
8585 c .execute ('''alter table doc.test set ("routing.allocation.enable"='primaries')''' )
8686
@@ -98,9 +98,9 @@ def test_recovery_with_concurrent_indexing(self, name, path, nodes):
9898 node_ids = c .fetchall ()
9999 self .assertEqual (len (node_ids ), nodes )
100100
101- time . sleep ( 3 )
101+ assert_busy ( lambda : self . _assert_is_green ( conn , 'doc' , 'test' ) )
102102 for node_id in node_ids :
103- self ._assert_num_docs_by_node_id (conn , 'doc' , 'test' , node_id [0 ], 60 )
103+ assert_busy ( lambda : self ._assert_num_docs_by_node_id (conn , 'doc' , 'test' , node_id [0 ], 60 ) )
104104
105105 c .execute ('''alter table doc.test set ("routing.allocation.enable"='primaries')''' )
106106 # upgrade the full cluster
@@ -117,12 +117,13 @@ def test_recovery_with_concurrent_indexing(self, name, path, nodes):
117117 node_ids = c .fetchall ()
118118 self .assertEqual (len (node_ids ), nodes )
119119
120- time .sleep (3 )
121120 for node_id in node_ids :
122- self ._assert_num_docs_by_node_id (conn , 'doc' , 'test' , node_id [0 ], 105 )
121+ assert_busy ( lambda : self ._assert_num_docs_by_node_id (conn , 'doc' , 'test' , node_id [0 ], 105 ) )
123122
124123 @parameterized .expand ([UPGRADE_42_TO_43 , UPGRADE_43_TO_LATEST ])
125124 def test_relocation_with_concurrent_indexing (self , name , path , nodes ):
125+ nodes = 3
126+ path = UpgradePath ('4.2.x' , '4.3.x' )
126127 cluster = self ._new_cluster (path .from_version , nodes )
127128 cluster .start ()
128129
@@ -135,8 +136,7 @@ def test_relocation_with_concurrent_indexing(self, name, path, nodes):
135136
136137 insert_data (conn , 'doc' , 'test' , 10 )
137138
138- time .sleep (3 )
139- self ._assert_is_green (conn , 'doc' , 'test' )
139+ assert_busy (lambda :self ._assert_is_green (conn , 'doc' , 'test' ))
140140 # make sure that no shards are allocated, so we can make sure the primary stays
141141 # on the old node (when one node stops, we lose the master too, so a replica
142142 # will not be promoted)
@@ -157,18 +157,16 @@ def test_relocation_with_concurrent_indexing(self, name, path, nodes):
157157 "routing.allocation.include._id"=?
158158 )''' , (old_node_id , ))
159159
160- self ._assert_is_green (conn , 'doc' , 'test' )
160+ assert_busy ( lambda : self ._assert_is_green (conn , 'doc' , 'test' ) )
161161
162162 c .execute ('''alter table doc.test set ("routing.allocation.include._id"=?)''' , (new_node_id , ))
163163 insert_data (conn , 'doc' , 'test' , 50 )
164164
165165 # ensure the relocation from old node to new node has occurred; otherwise the table is green
166166 # even though shards haven't moved to the new node yet (allocation was throttled).
167- time .sleep (3 )
168- c .execute ('select current_state from sys.allocations where node_id =?' , (new_node_id ,))
169- current_state = c .fetchone ()[0 ]
170- self .assertEqual (current_state , 'STARTED' )
171- self ._assert_is_green (conn , 'doc' , 'test' )
167+ assert_busy (lambda : self ._assert_shard_state (conn , 'doc' , 'test' , new_node_id , 'STARTED' ))
168+ assert_busy (lambda : self ._assert_is_green (conn , 'doc' , 'test' ))
169+
172170 c .execute ('refresh table doc.test' )
173171 self ._assert_num_docs_by_node_id (conn , 'doc' , 'test' , new_node_id , 60 )
174172
@@ -180,17 +178,23 @@ def test_relocation_with_concurrent_indexing(self, name, path, nodes):
180178
181179 insert_data (conn , 'doc' , 'test' , 45 )
182180
183- time .sleep (3 )
184- self ._assert_is_green (conn , 'doc' , 'test' )
181+ assert_busy (lambda : self ._assert_is_green (conn , 'doc' , 'test' ))
185182 c .execute ('refresh table doc.test' )
186- time .sleep (5 )
187183 c .execute ('select id from sys.nodes' )
188184 node_ids = c .fetchall ()
189185 self .assertEqual (len (node_ids ), nodes )
190186
191187 for node_id in node_ids :
192188 self ._assert_num_docs_by_node_id (conn , 'doc' , 'test' , node_id [0 ], 105 )
193189
190+ def _assert_shard_state (self , conn , schema , table_name , node_id , state ):
191+ c = conn .cursor ()
192+ c .execute ('select current_state from sys.allocations where node_id =? and table_name = ? and table_schema = ?' ,
193+ (node_id , table_name , schema ))
194+ current_state = c .fetchone ()
195+ self .assertTrue (current_state )
196+ self .assertEqual (current_state [0 ], state )
197+
194198 @parameterized .expand ([UPGRADE_42_TO_43 , UPGRADE_43_TO_LATEST ])
195199 def test_recovery (self , name , path , nodes ):
196200 """
@@ -218,16 +222,15 @@ def test_recovery(self, name, path, nodes):
218222 # upgrade to mixed cluster
219223 self ._upgrade_cluster (cluster , path .to_version , random .randint (1 , nodes - 1 ))
220224
221- time .sleep (5 )
222- self ._assert_is_green (conn , 'doc' , 'test' )
225+ assert_busy (lambda :self ._assert_is_green (conn , 'doc' , 'test' ))
223226
224227 # upgrade fully to the new version
225228 self ._upgrade_cluster (cluster , path .to_version , nodes )
226229
227230 if random .choice ([True , False ]):
228231 c .execute ("refresh table doc.test" )
229232
230- self ._assert_is_green (conn , 'doc' , 'test' )
233+ assert_busy ( lambda : self ._assert_is_green (conn , 'doc' , 'test' ) )
231234
232235 @parameterized .expand ([UPGRADE_42_TO_43 , UPGRADE_43_TO_LATEST ])
233236 def test_recovery_closed_index (self , name , path , nodes ):
@@ -246,8 +249,7 @@ def test_recovery_closed_index(self, name, path, nodes):
246249 "unassigned.node_left.delayed_timeout" = '100ms', "allocation.max_retries" = '0')
247250 ''' )
248251
249- time .sleep (3 )
250- self ._assert_is_green (conn , 'doc' , 'test' )
252+ assert_busy (lambda :self ._assert_is_green (conn , 'doc' , 'test' ))
251253
252254 c .execute ('alter table doc.test close' )
253255
@@ -291,7 +293,7 @@ def test_closed_index_during_rolling_upgrade(self, name, path, nodes):
291293 create table doc.mixed_cluster(x int) clustered into 1 shards with( number_of_replicas = 0)
292294 ''' )
293295
294- self ._assert_is_green (conn , 'doc' , 'mixed_cluster' )
296+ assert_busy ( lambda : self ._assert_is_green (conn , 'doc' , 'mixed_cluster' ) )
295297 c .execute ('alter table doc.mixed_cluster close' )
296298
297299 self ._assert_is_closed (conn , 'doc' , 'mixed_cluster' )
@@ -306,7 +308,7 @@ def test_closed_index_during_rolling_upgrade(self, name, path, nodes):
306308 create table doc.upgraded_cluster(x int) clustered into 1 shards with( number_of_replicas = 0)
307309 ''' )
308310
309- self ._assert_is_green (conn , 'doc' , 'upgraded_cluster' )
311+ assert_busy ( lambda : self ._assert_is_green (conn , 'doc' , 'upgraded_cluster' ) )
310312 c .execute ('alter table doc.upgraded_cluster close' )
311313
312314 self ._assert_is_closed (conn , 'doc' , 'upgraded_cluster' )
@@ -335,8 +337,7 @@ def test_update_docs(self, name, path, nodes):
335337 self ._upgrade_cluster (cluster , path .to_version , random .randint (1 , nodes - 1 ))
336338
337339 if random .choice ([True , False ]):
338- time .sleep (5 )
339- self ._assert_is_green (conn , 'doc' , 'test' )
340+ assert_busy (lambda : self ._assert_is_green (conn , 'doc' , 'test' ))
340341
341342 # update the data in a mixed cluster
342343 updates = [(i , str (random .randint )) for i in range (0 , 100 )]
@@ -382,8 +383,7 @@ def test_operation_based_recovery(self, name, path, nodes):
382383 "soft_deletes.enabled" = true)
383384 ''' )
384385
385- time .sleep (3 )
386- self ._assert_is_green (conn , 'doc' , 'test' )
386+ assert_busy (lambda : self ._assert_is_green (conn , 'doc' , 'test' ))
387387
388388 insert_data (conn , 'doc' , 'test' , random .randint (100 , 200 ))
389389 c .execute ('refresh table doc.test' )
@@ -396,8 +396,7 @@ def test_operation_based_recovery(self, name, path, nodes):
396396 # upgrade to mixed cluster
397397 self ._upgrade_cluster (cluster , path .to_version , random .randint (1 , nodes - 1 ))
398398
399- time .sleep (3 )
400- self ._assert_is_green (conn , 'doc' , 'test' )
399+ assert_busy (lambda : self ._assert_is_green (conn , 'doc' , 'test' ))
401400
402401 num_docs = random .randint (0 , 3 )
403402 if num_docs > 0 :
@@ -407,8 +406,7 @@ def test_operation_based_recovery(self, name, path, nodes):
407406 # upgrade fully to the new version
408407 self ._upgrade_cluster (cluster , path .to_version , nodes )
409408
410- time .sleep (3 )
411- self ._assert_is_green (conn , 'doc' , 'test' )
409+ assert_busy (lambda : self ._assert_is_green (conn , 'doc' , 'test' ))
412410
413411 num_docs = random .randint (0 , 3 )
414412 if num_docs > 0 :
@@ -434,8 +432,7 @@ def test_turnoff_translog_retention_after_upgraded(self, name, path, nodes):
434432 "soft_deletes.enabled" = true)
435433 ''' , (number_of_replicas , ))
436434
437- time .sleep (3 )
438- self ._assert_is_green (conn , 'doc' , 'test' )
435+ assert_busy (lambda : self ._assert_is_green (conn , 'doc' , 'test' ))
439436
440437 insert_data (conn , 'doc' , 'test' , random .randint (100 , 200 ))
441438 c .execute ('refresh table doc.test' )
@@ -447,8 +444,7 @@ def test_turnoff_translog_retention_after_upgraded(self, name, path, nodes):
447444 # update the cluster to the new version
448445 self ._upgrade_cluster (cluster , path .to_version , nodes )
449446
450- time .sleep (3 )
451- self ._assert_is_green (conn , 'doc' , 'test' )
447+ assert_busy (lambda : self ._assert_is_green (conn , 'doc' , 'test' ))
452448 c .execute ('refresh table doc.test' )
453449 self ._assert_translog_is_empty (conn , 'doc' , 'test' )
454450
0 commit comments