1+ import time
12import unittest
23
34from parameterized import parameterized
45from crate .client import connect
56import random
67from random import sample
78
8- from crate .qa .tests import NodeProvider , insert_data , UpgradePath , assert_busy
9+ from crate .qa .tests import NodeProvider , insert_data , UpgradePath
910
10- NUMBER_OF_NODES = random .randint (3 , 5 )
11- UPGRADE_42_TO_43 = ('4.2.x to 4.3.x' , UpgradePath ('4.2.x' , '4.3.x' ), NUMBER_OF_NODES ,)
12- UPGRADE_43_TO_LATEST = ('4.3.x to latest-nightly' , UpgradePath ('4.3.x' , 'latest-nightly' ), NUMBER_OF_NODES ,)
11+ UPGRADE_PATHS = [(UpgradePath ('4.2.x' , '4.3.x' ),), (UpgradePath ('4.3.x' , 'latest-nightly' ),)]
12+ UPGRADE_PATHS_FROM_43 = [(UpgradePath ('4.3.x' , 'latest-nightly' ),)]
13+
14+
15+ def assert_busy (assertion , timeout = 60 , f = 2.0 ):
16+ waited = 0
17+ duration = 0.1
18+ assertion_error = None
19+ while waited < timeout :
20+ try :
21+ assertion ()
22+ return
23+ except AssertionError as e :
24+ assertion_error = e
25+ time .sleep (duration )
26+ waited += duration
27+ duration *= f
28+ raise assertion_error
1329
1430
1531class RecoveryTest (NodeProvider , unittest .TestCase ):
32+ NUMBER_OF_NODES = 3
1633 """
1734 In depth testing of the recovery mechanism during a rolling restart.
1835 Based on org.elasticsearch.upgrades.RecoveryIT.java
@@ -61,13 +78,13 @@ def _upgrade_cluster(self, cluster, version, nodes):
6178 new_node = self .upgrade_node (node , version )
6279 cluster [i ] = new_node
6380
64- @parameterized .expand ([ UPGRADE_42_TO_43 , UPGRADE_43_TO_LATEST ] )
65- def test_recovery_with_concurrent_indexing (self , name , path , nodes ):
81+ @parameterized .expand (UPGRADE_PATHS )
82+ def test_recovery_with_concurrent_indexing (self , path ):
6683 """
6784 This test creates a new table and insert data at every stage of the
6885 rolling upgrade.
6986 """
70- cluster = self ._new_cluster (path .from_version , nodes )
87+ cluster = self ._new_cluster (path .from_version , self . NUMBER_OF_NODES )
7188 cluster .start ()
7289
7390 with connect (cluster .node ().http_url , error_trace = True ) as conn :
@@ -85,7 +102,7 @@ def test_recovery_with_concurrent_indexing(self, name, path, nodes):
85102 c .execute ('''alter table doc.test set ("routing.allocation.enable"='primaries')''' )
86103
87104 # upgrade to mixed cluster
88- self ._upgrade_cluster (cluster , path .to_version , random .randint (1 , nodes - 1 ))
105+ self ._upgrade_cluster (cluster , path .to_version , random .randint (1 , self . NUMBER_OF_NODES - 1 ))
89106 c .execute ('''alter table doc.test set ("routing.allocation.enable"='all')''' )
90107 # insert data into a mixed cluster
91108 insert_data (conn , 'doc' , 'test' , 50 )
@@ -96,15 +113,15 @@ def test_recovery_with_concurrent_indexing(self, name, path, nodes):
96113 # check counts for each node individually
97114 c .execute ('select id from sys.nodes' )
98115 node_ids = c .fetchall ()
99- self .assertEqual (len (node_ids ), nodes )
116+ self .assertEqual (len (node_ids ), self . NUMBER_OF_NODES )
100117
101118 assert_busy (lambda : self ._assert_is_green (conn , 'doc' , 'test' ))
102119 for node_id in node_ids :
103120 assert_busy (lambda : self ._assert_num_docs_by_node_id (conn , 'doc' , 'test' , node_id [0 ], 60 ))
104121
105122 c .execute ('''alter table doc.test set ("routing.allocation.enable"='primaries')''' )
106123 # upgrade the full cluster
107- self ._upgrade_cluster (cluster , path .to_version , nodes )
124+ self ._upgrade_cluster (cluster , path .to_version , self . NUMBER_OF_NODES )
108125 c .execute ('''alter table doc.test set ("routing.allocation.enable"='all')''' )
109126
110127 insert_data (conn , 'doc' , 'test' , 45 )
@@ -115,14 +132,14 @@ def test_recovery_with_concurrent_indexing(self, name, path, nodes):
115132
116133 c .execute ('select id from sys.nodes' )
117134 node_ids = c .fetchall ()
118- self .assertEqual (len (node_ids ), nodes )
135+ self .assertEqual (len (node_ids ), self . NUMBER_OF_NODES )
119136
120137 for node_id in node_ids :
121138 assert_busy (lambda : self ._assert_num_docs_by_node_id (conn , 'doc' , 'test' , node_id [0 ], 105 ))
122139
123- @parameterized .expand ([ UPGRADE_42_TO_43 , UPGRADE_43_TO_LATEST ] )
124- def test_relocation_with_concurrent_indexing (self , name , path , nodes ):
125- cluster = self ._new_cluster (path .from_version , nodes )
140+ @parameterized .expand (UPGRADE_PATHS )
141+ def test_relocation_with_concurrent_indexing (self , path ):
142+ cluster = self ._new_cluster (path .from_version , self . NUMBER_OF_NODES )
126143 cluster .start ()
127144
128145 with connect (cluster .node ().http_url , error_trace = True ) as conn :
@@ -141,7 +158,7 @@ def test_relocation_with_concurrent_indexing(self, name, path, nodes):
141158 c .execute ('''alter table doc.test set("routing.allocation.enable"='none')''' )
142159
143160 # upgrade to mixed cluster
144- self ._upgrade_cluster (cluster , path .to_version , random .randint (1 , nodes - 1 ))
161+ self ._upgrade_cluster (cluster , path .to_version , random .randint (1 , self . NUMBER_OF_NODES - 1 ))
145162
146163 c .execute ('''select id from sys.nodes order by version['number'] desc limit 1''' )
147164 new_node_id = c .fetchone ()[0 ]
@@ -169,7 +186,7 @@ def test_relocation_with_concurrent_indexing(self, name, path, nodes):
169186 self ._assert_num_docs_by_node_id (conn , 'doc' , 'test' , new_node_id , 60 )
170187
171188 # upgrade fully to the new version
172- self ._upgrade_cluster (cluster , path .to_version , nodes )
189+ self ._upgrade_cluster (cluster , path .to_version , self . NUMBER_OF_NODES )
173190
174191 c .execute ('''alter table doc.test set("number_of_replicas"=2)''' )
175192 c .execute ('''alter table doc.test reset("routing.allocation.include._id")''' )
@@ -180,7 +197,7 @@ def test_relocation_with_concurrent_indexing(self, name, path, nodes):
180197 c .execute ('refresh table doc.test' )
181198 c .execute ('select id from sys.nodes' )
182199 node_ids = c .fetchall ()
183- self .assertEqual (len (node_ids ), nodes )
200+ self .assertEqual (len (node_ids ), self . NUMBER_OF_NODES )
184201
185202 for node_id in node_ids :
186203 self ._assert_num_docs_by_node_id (conn , 'doc' , 'test' , node_id [0 ], 105 )
@@ -193,14 +210,14 @@ def _assert_shard_state(self, conn, schema, table_name, node_id, state):
193210 self .assertTrue (current_state )
194211 self .assertEqual (current_state [0 ], state )
195212
196- @parameterized .expand ([ UPGRADE_42_TO_43 , UPGRADE_43_TO_LATEST ] )
197- def test_recovery (self , name , path , nodes ):
213+ @parameterized .expand (UPGRADE_PATHS )
214+ def test_recovery (self , path ):
198215 """
199216 This test creates a new table, insert data and asserts the state at every stage of the
200217 rolling upgrade.
201218 """
202219
203- cluster = self ._new_cluster (path .from_version , nodes )
220+ cluster = self ._new_cluster (path .from_version , self . NUMBER_OF_NODES )
204221 cluster .start ()
205222
206223 with connect (cluster .node ().http_url , error_trace = True ) as conn :
@@ -218,26 +235,26 @@ def test_recovery(self, name, path, nodes):
218235 c .execute ("refresh table doc.test" )
219236
220237 # upgrade to mixed cluster
221- self ._upgrade_cluster (cluster , path .to_version , random .randint (1 , nodes - 1 ))
238+ self ._upgrade_cluster (cluster , path .to_version , random .randint (1 , self . NUMBER_OF_NODES - 1 ))
222239
223240 assert_busy (lambda : self ._assert_is_green (conn , 'doc' , 'test' ))
224241
225242 # upgrade fully to the new version
226- self ._upgrade_cluster (cluster , path .to_version , nodes )
243+ self ._upgrade_cluster (cluster , path .to_version , self . NUMBER_OF_NODES )
227244
228245 if random .choice ([True , False ]):
229246 c .execute ("refresh table doc.test" )
230247
231248 assert_busy (lambda : self ._assert_is_green (conn , 'doc' , 'test' ))
232249
233- @parameterized .expand ([ UPGRADE_42_TO_43 , UPGRADE_43_TO_LATEST ] )
234- def test_recovery_closed_index (self , name , path , nodes ):
250+ @parameterized .expand (UPGRADE_PATHS )
251+ def test_recovery_closed_index (self , path ):
235252 """
236253 This test creates a table in the non upgraded cluster and closes it. It then
237254 checks that the table is effectively closed and potentially replicated.
238255 """
239256
240- cluster = self ._new_cluster (path .from_version , nodes )
257+ cluster = self ._new_cluster (path .from_version , self . NUMBER_OF_NODES )
241258 cluster .start ()
242259
243260 with connect (cluster .node ().http_url , error_trace = True ) as conn :
@@ -252,24 +269,24 @@ def test_recovery_closed_index(self, name, path, nodes):
252269 c .execute ('alter table doc.test close' )
253270
254271 # upgrade to mixed cluster
255- self ._upgrade_cluster (cluster , path .to_version , random .randint (1 , nodes - 1 ))
272+ self ._upgrade_cluster (cluster , path .to_version , random .randint (1 , self . NUMBER_OF_NODES - 1 ))
256273
257274 self ._assert_is_closed (conn , 'doc' , 'test' )
258275
259276 # upgrade fully to the new version
260- self ._upgrade_cluster (cluster , path .to_version , nodes )
277+ self ._upgrade_cluster (cluster , path .to_version , self . NUMBER_OF_NODES )
261278
262279 self ._assert_is_closed (conn , 'doc' , 'test' )
263280
264- @parameterized .expand ([ UPGRADE_42_TO_43 , UPGRADE_43_TO_LATEST ] )
265- def test_closed_index_during_rolling_upgrade (self , name , path , nodes ):
281+ @parameterized .expand (UPGRADE_PATHS )
282+ def test_closed_index_during_rolling_upgrade (self , path ):
266283 """
267284 This test creates and closes a new table at every stage of the rolling
268285 upgrade. It then checks that the table is effectively closed and
269286 replicated.
270287 """
271288
272- cluster = self ._new_cluster (path .from_version , nodes )
289+ cluster = self ._new_cluster (path .from_version , self . NUMBER_OF_NODES )
273290 cluster .start ()
274291
275292 with connect (cluster .node ().http_url , error_trace = True ) as conn :
@@ -283,7 +300,7 @@ def test_closed_index_during_rolling_upgrade(self, name, path, nodes):
283300 self ._assert_is_closed (conn , 'doc' , 'old_cluster' )
284301
285302 # upgrade to mixed cluster
286- self ._upgrade_cluster (cluster , path .to_version , random .randint (1 , nodes - 1 ))
303+ self ._upgrade_cluster (cluster , path .to_version , random .randint (1 , self . NUMBER_OF_NODES - 1 ))
287304
288305 self ._assert_is_closed (conn , 'doc' , 'old_cluster' )
289306
@@ -297,7 +314,7 @@ def test_closed_index_during_rolling_upgrade(self, name, path, nodes):
297314 self ._assert_is_closed (conn , 'doc' , 'mixed_cluster' )
298315
299316 # upgrade fully to the new version
300- self ._upgrade_cluster (cluster , path .to_version , nodes )
317+ self ._upgrade_cluster (cluster , path .to_version , self . NUMBER_OF_NODES )
301318
302319 self ._assert_is_closed (conn , 'doc' , 'old_cluster' )
303320 self ._assert_is_closed (conn , 'doc' , 'mixed_cluster' )
@@ -311,13 +328,13 @@ def test_closed_index_during_rolling_upgrade(self, name, path, nodes):
311328
312329 self ._assert_is_closed (conn , 'doc' , 'upgraded_cluster' )
313330
314- @parameterized .expand ([ UPGRADE_42_TO_43 , UPGRADE_43_TO_LATEST ] )
315- def test_update_docs (self , name , path , nodes ):
331+ @parameterized .expand (UPGRADE_PATHS )
332+ def test_update_docs (self , path ):
316333 """
317334 This test creates a new table, insert data and updates data at every state at every stage of the
318335 rolling upgrade.
319336 """
320- cluster = self ._new_cluster (path .from_version , nodes )
337+ cluster = self ._new_cluster (path .from_version , self . NUMBER_OF_NODES )
321338 cluster .start ()
322339 with connect (cluster .node ().http_url , error_trace = True ) as conn :
323340 c = conn .cursor ()
@@ -332,7 +349,7 @@ def test_update_docs(self, name, path, nodes):
332349 c .execute ('refresh table doc.test' )
333350
334351 # upgrade to mixed cluster
335- self ._upgrade_cluster (cluster , path .to_version , random .randint (1 , nodes - 1 ))
352+ self ._upgrade_cluster (cluster , path .to_version , random .randint (1 , self . NUMBER_OF_NODES - 1 ))
336353
337354 if random .choice ([True , False ]):
338355 assert_busy (lambda : self ._assert_is_green (conn , 'doc' , 'test' ))
@@ -351,7 +368,7 @@ def test_update_docs(self, name, path, nodes):
351368 c .execute ('refresh table doc.test' )
352369
353370 # upgrade fully to the new version
354- self ._upgrade_cluster (cluster , path .to_version , nodes )
371+ self ._upgrade_cluster (cluster , path .to_version , self . NUMBER_OF_NODES )
355372
356373 updates = [(i , str (random .randint )) for i in range (0 , 100 )]
357374 res = c .executemany (
@@ -361,8 +378,8 @@ def test_update_docs(self, name, path, nodes):
361378 for result in res :
362379 self .assertEqual (result ['rowcount' ], 1 )
363380
364- @parameterized .expand ([ UPGRADE_43_TO_LATEST ] )
365- def test_operation_based_recovery (self , name , path , nodes ):
381+ @parameterized .expand (UPGRADE_PATHS_FROM_43 )
382+ def test_operation_based_recovery (self , path ):
366383 """
367384 Tests that we should perform an operation-based recovery if there were
368385 some but not too many uncommitted documents (i.e., less than 10% of
@@ -371,7 +388,7 @@ def test_operation_based_recovery(self, name, path, nodes):
371388 based peer recoveries.
372389 """
373390
374- cluster = self ._new_cluster (path .from_version , nodes )
391+ cluster = self ._new_cluster (path .from_version , self . NUMBER_OF_NODES )
375392 cluster .start ()
376393
377394 with connect (cluster .node ().http_url , error_trace = True ) as conn :
@@ -392,7 +409,7 @@ def test_operation_based_recovery(self, name, path, nodes):
392409 insert_data (conn , 'doc' , 'test' , num_docs )
393410
394411 # upgrade to mixed cluster
395- self ._upgrade_cluster (cluster , path .to_version , random .randint (1 , nodes - 1 ))
412+ self ._upgrade_cluster (cluster , path .to_version , random .randint (1 , self . NUMBER_OF_NODES - 1 ))
396413
397414 assert_busy (lambda : self ._assert_is_green (conn , 'doc' , 'test' ))
398415
@@ -402,7 +419,7 @@ def test_operation_based_recovery(self, name, path, nodes):
402419 self ._assert_ensure_checkpoints_are_synced (conn , 'doc' , 'test' )
403420
404421 # upgrade fully to the new version
405- self ._upgrade_cluster (cluster , path .to_version , nodes )
422+ self ._upgrade_cluster (cluster , path .to_version , self . NUMBER_OF_NODES )
406423
407424 assert_busy (lambda : self ._assert_is_green (conn , 'doc' , 'test' ))
408425
@@ -412,14 +429,14 @@ def test_operation_based_recovery(self, name, path, nodes):
412429
413430 self ._assert_ensure_checkpoints_are_synced (conn , 'doc' , 'test' )
414431
415- @parameterized .expand ([ UPGRADE_43_TO_LATEST ] )
416- def test_turnoff_translog_retention_after_upgraded (self , name , path , nodes ):
432+ @parameterized .expand (UPGRADE_PATHS_FROM_43 )
433+ def test_turnoff_translog_retention_after_upgraded (self , path ):
417434 """
418435 Verifies that once all shard copies on the new version, we should turn
419436 off the translog retention for indices with soft-deletes.
420437 """
421438
422- cluster = self ._new_cluster (path .from_version , nodes )
439+ cluster = self ._new_cluster (path .from_version , self . NUMBER_OF_NODES )
423440 cluster .start ()
424441
425442 with connect (cluster .node ().http_url , error_trace = True ) as conn :
@@ -440,7 +457,7 @@ def test_turnoff_translog_retention_after_upgraded(self, name, path, nodes):
440457 insert_data (conn , 'doc' , 'test' , num_docs )
441458
442459 # update the cluster to the new version
443- self ._upgrade_cluster (cluster , path .to_version , nodes )
460+ self ._upgrade_cluster (cluster , path .to_version , self . NUMBER_OF_NODES )
444461
445462 assert_busy (lambda : self ._assert_is_green (conn , 'doc' , 'test' ))
446463 c .execute ('refresh table doc.test' )
0 commit comments