66from random import sample
77
88from crate .qa .tests import NodeProvider , insert_data
9- from bwc .test_rolling_upgrade import UpgradePath
9+ from tests . bwc .test_rolling_upgrade import UpgradePath
1010
1111
1212class RecoveryTest (NodeProvider , unittest .TestCase ):
@@ -15,24 +15,26 @@ def test(self):
1515 self ._run_tests (
1616 [
1717 UpgradePath ('4.2.x' , '4.3.x' ),
18- UpgradePath ('4.3.0' , 'latest-nightly' ),
18+ # UpgradePath('4.3.0', 'latest-nightly'),
1919 ],
2020 [
21- self ._test_recovery_with_concurrent_indexing ,
22- self ._test_recovery ,
23- self ._test_update_docs ,
24- self ._test_recovery_closed_index ,
25- self ._test_closed_index_during_rolling_upgrade ,
26- self ._test_auto_expand_indices_during_rolling_upgrade ,
27- self ._test_retention_leases_established_when_promoting_primary ,
28- self ._test_closed_index_noop_recovery ,
29- self ._test_relocation_with_concurrent_indexing
21+ self ._test_recovery_with_concurrent_indexing ,
22+ # self._test_recovery,
23+ # self._test_update_docs,
24+ # self._test_recovery_closed_index,
25+ # self._test_closed_index_during_rolling_upgrade,
26+ # self._test_auto_expand_indices_during_rolling_upgrade,
27+ # self._test_retention_leases_established_when_promoting_primary,
28+ # self._test_closed_index_noop_recovery,
29+ # self._test_relocation_with_concurrent_indexing
3030 ]
3131 )
3232
33- def test_from_43 (self ):
33+ def _test_from_4_3 (self ):
3434 self ._run_tests (
35- [UpgradePath ('4.3.0' , 'latest-nightly' )],
35+ [
36+ UpgradePath ('4.3.0' , 'latest-nightly' )
37+ ],
3638 [
3739 self ._test_turnoff_translog_retention_after_upgraded ,
3840 self ._test_operation_based_recovery
@@ -50,6 +52,17 @@ def _run_tests(self, paths, tests):
5052 finally :
5153 self .tearDown ()
5254
55+ def _assert_count_by_node_id (self , conn , schema , table_name , node_id , expected_count ):
56+ c = conn .cursor ()
57+ c .execute ('''
58+ select num_docs from sys.shards
59+ where schema_name = ?
60+ and table_name = ?
61+ and node['id'] = ?
62+ ''' , (schema , table_name , node_id ))
63+ number_of_docs = c .fetchone ()
64+ self .assertEqual (number_of_docs [0 ], expected_count )
65+
5366 def _assert_is_green (self , conn , schema , table_name ):
5467 c = conn .cursor ()
5568 c .execute ('select health from sys.health where table_name=? and table_schema=?' , (table_name , schema ))
@@ -83,14 +96,15 @@ def _assert_checkpoints(self, conn, table_name):
8396 def _upgrade_cluster (self , cluster , version , number_of_nodes_to_upgrade ):
8497 """
8598 Upgrades a given number of nodes in the cluster to the provided version
86- in randomized order.
99+ in randomized order assuming there at maximum two different version the
100+ cluster.
87101 """
88102 assert number_of_nodes_to_upgrade <= len (cluster ._nodes )
89103 # let's find the minimum version in the cluster to make sure we don't
90104 # upgrade nodes twice to the new version in case of a mixed cluster
91105 min_version = min ([n .version for n in cluster ._nodes ])
92- for idx , node in sample ( list ( enumerate (cluster )), number_of_nodes_to_upgrade ):
93- if node . version == min_version :
106+ nodes_to_upgrade = [ n for n in enumerate (cluster ) if n [ 1 ]. version == min_version ]
107+ for idx , node in sample ( nodes_to_upgrade , min ( number_of_nodes_to_upgrade , len ( nodes_to_upgrade ))) :
94108 new_node = self .upgrade_node (node , version )
95109 cluster [idx ] = new_node
96110
@@ -117,27 +131,46 @@ def _test_recovery_with_concurrent_indexing(self, path, nodes):
117131
118132 time .sleep (3 )
119133 self ._assert_is_green (conn , 'doc' , 'test' )
134+ # make sure that we can index while the replicas are recovering
135+ c .execute ('''alter table doc.test set ("routing.allocation.enable"='primaries')''' )
120136
121137 # upgrade randomly some nodes to a different version to build a mixed cluster
122138 self ._upgrade_cluster (cluster , path .to_version , random .randint (1 , nodes - 1 ))
123-
139+ c . execute ( '''alter table doc.test set ("routing.allocation.enable"='all')''' )
124140 # insert data into a mixed cluster
125141 insert_data (conn , 'doc' , 'test' , 50 )
126142 c .execute ('refresh table doc.test' )
127143 # make sure that we can index while the replicas are recovering
128- c .execute ('''alter table doc.test set ("routing.allocation.enable"='primaries')''' )
129144 c .execute ('select count(*) from doc.test' )
130145 self .assertEqual (c .fetchone ()[0 ], 60 )
146+ # check counts for each node individually
147+ c .execute ('select id from sys.nodes' )
148+ node_ids = c .fetchall ()
149+ self .assertTrue (node_ids )
150+
151+ time .sleep (5 )
131152
153+ for node_id in node_ids :
154+ self ._assert_count_by_node_id (conn , 'doc' , 'test' , node_id [0 ], 60 )
155+
156+ c .execute ('''alter table doc.test set ("routing.allocation.enable"='primaries')''' )
132157 # upgrade the full cluster
133158 self ._upgrade_cluster (cluster , path .to_version , nodes )
159+ c .execute ('''alter table doc.test set ("routing.allocation.enable"='all')''' )
134160
135161 insert_data (conn , 'doc' , 'test' , 45 )
136162 c .execute ('refresh table doc.test' )
137163 c .execute ('select count(*) from doc.test' )
138164 res = c .fetchone ()
139165 self .assertEqual (res [0 ], 105 )
140166
167+ c .execute ('select id from sys.nodes' )
168+ node_ids = c .fetchall ()
169+
170+ time .sleep (5 )
171+ for node_id in node_ids :
172+ self ._assert_count_by_node_id (conn , 'doc' , 'test' , node_id [0 ], 105 )
173+
141174 def _test_relocation_with_concurrent_indexing (self , path , nodes ):
142175
143176 cluster = self ._new_cluster (path .from_version , nodes )
@@ -165,17 +198,22 @@ def _test_relocation_with_concurrent_indexing(self, path, nodes):
165198 # upgrade randomly some nodes to a different version to build a mixed cluster
166199 self ._upgrade_cluster (cluster , path .to_version , random .randint (1 , nodes - 1 ))
167200
201+ c .execute ('''select id from sys.nodes order by version['number'] desc limit 1''' )
202+ new_node_id = c .fetchone ()[0 ]
203+ c .execute ('''select id from sys.nodes order by version['number'] asc limit 1''' )
204+ old_node_id = c .fetchone ()[0 ]
205+
168206 # remove the replica and guaranteed the primary is placed on the old node
169- c .execute ('''select id from sys.nodes''' )
170- primary = str (c .fetchone ()[0 ])
171207 c .execute ('''alter table doc.test set (
172208 "number_of_replicas"=0,
173209 "routing.allocation.enable"='NONE',
174- "routing.allocation.include._id"=?,
175- "routing.allocation.include._tier"='NONE'
176- )''' , (primary , ))
210+ "routing.allocation.include._id"=?
211+ )''' , (old_node_id , ))
177212
178213 self ._assert_is_green (conn , 'doc' , 'test' )
214+
215+ c .execute ('''alter table doc.test set ("routing.allocation.include._id"=?)''' , (new_node_id , ))
216+
179217 insert_data (conn , 'doc' , 'test' , 50 )
180218 self ._assert_is_green (conn , 'doc' , 'test' )
181219
@@ -190,15 +228,14 @@ def _test_relocation_with_concurrent_indexing(self, path, nodes):
190228
191229 c .execute ('''
192230 alter table doc.test set(
193- "number_of_replicas"=2,
194- "routing.allocation.include._id"=?,
195- "routing.allocation.include._tier"='NONE')
196- ''' , (primary , ))
231+ "number_of_replicas"=2,
232+ "routing.allocation.include._id"='null')
233+ ''' )
197234
198235 insert_data (conn , 'doc' , 'test' , 45 )
199236
200- # time.sleep(5 )
201- # self._assert_is_green(conn, 'doc', 'test')
237+ time .sleep (30 )
238+ self ._assert_is_green (conn , 'doc' , 'test' )
202239 c .execute ('refresh table doc.test' )
203240
204241 c .execute ('select count(*) from doc.test' )
@@ -576,17 +613,17 @@ def _test_auto_expand_indices_during_rolling_upgrade(self, path, nodes):
576613 # upgrade randomly some nodes to a different version to build a mixed cluster
577614 self ._upgrade_cluster (cluster , path .to_version , random .randint (1 , nodes - 1 ))
578615
579- # time.sleep(3)
580- # self._assert_is_green(conn, 'doc', 'test')
616+ time .sleep (3 )
617+ self ._assert_is_green (conn , 'doc' , 'test' )
581618 c .execute ('''select number_of_replicas from information_schema.tables where table_name='test' ''' )
582619 # res = c.fetchone()[0]
583620 # self.assertEqual(res, len(node_ids) - 1)
584621
585622 # upgrade the whole cluster to the new version
586623 self ._upgrade_cluster (cluster , path .to_version , nodes )
587624
588- # time.sleep(3)
589- # self._assert_is_green(conn, 'doc', 'test')
625+ time .sleep (3 )
626+ self ._assert_is_green (conn , 'doc' , 'test' )
590627
591628 c .execute ('''select number_of_replicas from information_schema.tables where table_name='test' ''' )
592629 # res = c.fetchone()[0]
0 commit comments