@@ -165,173 +165,181 @@ public static void setup() {
165165
166166 @ Test
167167 public void every_statement_should_deliver_tablet_info () {
168- Map <String , Supplier <CqlSession >> sessions = new HashMap <>();
169- sessions .put (
170- "REGULAR" ,
171- () -> CqlSession .builder ().addContactEndPoints (CCM_RULE .getContactPoints ()).build ());
172- sessions .put (
173- "WITH_KEYSPACE" ,
174- () ->
168+ try (CqlSession regularSession =
169+ CqlSession .builder ().addContactEndPoints (CCM_RULE .getContactPoints ()).build ();
170+ CqlSession withKsSession =
175171 CqlSession .builder ()
176172 .addContactEndPoints (CCM_RULE .getContactPoints ())
177173 .withKeyspace (KEYSPACE_NAME )
178- .build ());
179- sessions .put (
180- "USE_KEYSPACE" ,
181- () -> {
182- CqlSession s =
183- CqlSession .builder ().addContactEndPoints (CCM_RULE .getContactPoints ()).build ();
184- s .execute ("USE " + KEYSPACE_NAME );
185- return s ;
186- });
187-
188- Map <String , Function <CqlSession , Statement >> statements = new HashMap <>();
189- statements .put ("SELECT_CONCRETE" , s -> STMT_SELECT_CONCRETE );
190- statements .put ("SELECT_PREPARED" , s -> s .prepare (STMT_SELECT ).bind (2 , 2 ));
191- statements .put ("SELECT_NO_KS_PREPARED" , s -> s .prepare (STMT_SELECT_NO_KS ).bind (2 , 2 ));
192- statements .put ("SELECT_CONCRETE_PREPARED" , s -> s .prepare (STMT_SELECT_CONCRETE ).bind ());
193- statements .put ("SELECT_PK_CONCRETE_PREPARED" , s -> s .prepare (STMT_SELECT_PK_CONCRETE ).bind (2 ));
194- statements .put ("SELECT_CK_CONCRETE_PREPARED" , s -> s .prepare (STMT_SELECT_CK_CONCRETE ).bind (2 ));
195- statements .put ("INSERT_CONCRETE" , s -> STMT_INSERT_CONCRETE );
196- statements .put ("INSERT_PREPARED" , s -> s .prepare (STMT_INSERT ).bind (2 , 2 ));
197- statements .put ("INSERT_NO_KS_PREPARED" , s -> s .prepare (STMT_INSERT_NO_KS ).bind (2 , 2 ));
198- statements .put ("INSERT_CONCRETE_PREPARED" , s -> s .prepare (STMT_INSERT_CONCRETE ).bind ());
199- statements .put ("INSERT_PK_CONCRETE_PREPARED" , s -> s .prepare (STMT_INSERT_PK_CONCRETE ).bind (2 ));
200- statements .put ("INSERT_CK_CONCRETE_PREPARED" , s -> s .prepare (STMT_INSERT_CK_CONCRETE ).bind (2 ));
201- statements .put (
202- "INSERT_LWT_IF_NOT_EXISTS" , s -> s .prepare (STMT_INSERT_LWT_IF_NOT_EXISTS ).bind (2 , 2 ));
203- statements .put ("UPDATE_CONCRETE" , s -> STMT_UPDATE_CONCRETE );
204- statements .put ("UPDATE_PREPARED" , s -> s .prepare (STMT_UPDATE ).bind (2 , 2 ));
205- statements .put ("UPDATE_NO_KS_PREPARED" , s -> s .prepare (STMT_UPDATE_NO_KS ).bind (2 , 2 ));
206- statements .put ("UPDATE_CONCRETE_PREPARED" , s -> s .prepare (STMT_UPDATE_CONCRETE ).bind ());
207- statements .put ("UPDATE_PK_CONCRETE_PREPARED" , s -> s .prepare (STMT_UPDATE_PK_CONCRETE ).bind (2 ));
208- statements .put ("UPDATE_CK_CONCRETE_PREPARED" , s -> s .prepare (STMT_UPDATE_CK_CONCRETE ).bind (2 ));
209- statements .put ("UPDATE_LWT_IF_EXISTS" , s -> s .prepare (STMT_UPDATE_LWT_IF_EXISTS ).bind (2 , 2 ));
210- statements .put ("STMT_UPDATE_LWT_IF_VAL" , s -> s .prepare (STMT_UPDATE_LWT_IF_VAL ).bind (2 , 2 ));
211- statements .put ("DELETE_CONCRETE" , s -> STMT_DELETE_CONCRETE );
212- statements .put ("DELETE_PREPARED" , s -> s .prepare (STMT_DELETE ).bind (2 , 2 ));
213- statements .put ("DELETE_NO_KS_PREPARED" , s -> s .prepare (STMT_DELETE_NO_KS ).bind (2 , 2 ));
214- statements .put ("DELETE_CONCRETE_PREPARED" , s -> s .prepare (STMT_DELETE_CONCRETE ).bind ());
215- statements .put ("DELETE_PK_CONCRETE_PREPARED" , s -> s .prepare (STMT_DELETE_PK_CONCRETE ).bind (2 ));
216- statements .put ("DELETE_CK_CONCRETE_PREPARED" , s -> s .prepare (STMT_DELETE_CK_CONCRETE ).bind (2 ));
217- statements .put ("DELETE_LWT_IF_EXISTS" , s -> s .prepare (STMT_DELETE_IF_EXISTS ).bind (2 , 2 ));
218-
219- List <String > testErrors = new ArrayList <>();
220- for (Map .Entry <String , Supplier <CqlSession >> sessionEntry : sessions .entrySet ()) {
221- for (Map .Entry <String , Function <CqlSession , Statement >> stmtEntry : statements .entrySet ()) {
222- if (stmtEntry .getKey ().contains ("CONCRETE" )
223- && !stmtEntry .getKey ().contains ("CK_CONCRETE" )) {
224- // Scylla does not return tablet info for queries with PK built into query
225- continue ;
226- }
227- if (stmtEntry .getKey ().contains ("LWT" )) {
228- // LWT is not yet supported by scylla on tables with tablets
229- continue ;
230- }
231- if (sessionEntry .getKey ().equals ("REGULAR" ) && stmtEntry .getKey ().contains ("NO_KS" )) {
232- // Preparation of the statements without KS will fail on the session with no ks specified
233- continue ;
234- }
235- CqlSession session = sessionEntry .getValue ().get ();
236- // Empty out tablets information
237- if (session .getMetadata ().getTabletMap ().isPresent ()) {
238- session
239- .getMetadata ()
240- .getTabletMap ()
241- .get ()
242- .removeByKeyspace (CqlIdentifier .fromCql (KEYSPACE_NAME ));
243- }
244- Statement stmt ;
245- try {
246- stmt = stmtEntry .getValue ().apply (session );
247- } catch (Exception e ) {
248- RuntimeException ex =
249- new RuntimeException (
174+ .build ();
175+ CqlSession withUseKsSession =
176+ CqlSession .builder ().addContactEndPoints (CCM_RULE .getContactPoints ()).build ()) {
177+ withUseKsSession .execute ("USE " + KEYSPACE_NAME );
178+ Map <String , Supplier <CqlSession >> sessions = new HashMap <>();
179+ sessions .put ("REGULAR" , () -> regularSession );
180+ sessions .put ("WITH_KEYSPACE" , () -> withKsSession );
181+ sessions .put ("USE_KEYSPACE" , () -> withUseKsSession );
182+
183+ Map <String , Function <CqlSession , Statement >> statements = new HashMap <>();
184+ statements .put ("SELECT_CONCRETE" , s -> STMT_SELECT_CONCRETE );
185+ statements .put ("SELECT_PREPARED" , s -> s .prepare (STMT_SELECT ).bind (2 , 2 ));
186+ statements .put ("SELECT_NO_KS_PREPARED" , s -> s .prepare (STMT_SELECT_NO_KS ).bind (2 , 2 ));
187+ statements .put ("SELECT_CONCRETE_PREPARED" , s -> s .prepare (STMT_SELECT_CONCRETE ).bind ());
188+ statements .put (
189+ "SELECT_PK_CONCRETE_PREPARED" , s -> s .prepare (STMT_SELECT_PK_CONCRETE ).bind (2 ));
190+ statements .put (
191+ "SELECT_CK_CONCRETE_PREPARED" , s -> s .prepare (STMT_SELECT_CK_CONCRETE ).bind (2 ));
192+ statements .put ("INSERT_CONCRETE" , s -> STMT_INSERT_CONCRETE );
193+ statements .put ("INSERT_PREPARED" , s -> s .prepare (STMT_INSERT ).bind (2 , 2 ));
194+ statements .put ("INSERT_NO_KS_PREPARED" , s -> s .prepare (STMT_INSERT_NO_KS ).bind (2 , 2 ));
195+ statements .put ("INSERT_CONCRETE_PREPARED" , s -> s .prepare (STMT_INSERT_CONCRETE ).bind ());
196+ statements .put (
197+ "INSERT_PK_CONCRETE_PREPARED" , s -> s .prepare (STMT_INSERT_PK_CONCRETE ).bind (2 ));
198+ statements .put (
199+ "INSERT_CK_CONCRETE_PREPARED" , s -> s .prepare (STMT_INSERT_CK_CONCRETE ).bind (2 ));
200+ statements .put (
201+ "INSERT_LWT_IF_NOT_EXISTS" , s -> s .prepare (STMT_INSERT_LWT_IF_NOT_EXISTS ).bind (2 , 2 ));
202+ statements .put ("UPDATE_CONCRETE" , s -> STMT_UPDATE_CONCRETE );
203+ statements .put ("UPDATE_PREPARED" , s -> s .prepare (STMT_UPDATE ).bind (2 , 2 ));
204+ statements .put ("UPDATE_NO_KS_PREPARED" , s -> s .prepare (STMT_UPDATE_NO_KS ).bind (2 , 2 ));
205+ statements .put ("UPDATE_CONCRETE_PREPARED" , s -> s .prepare (STMT_UPDATE_CONCRETE ).bind ());
206+ statements .put (
207+ "UPDATE_PK_CONCRETE_PREPARED" , s -> s .prepare (STMT_UPDATE_PK_CONCRETE ).bind (2 ));
208+ statements .put (
209+ "UPDATE_CK_CONCRETE_PREPARED" , s -> s .prepare (STMT_UPDATE_CK_CONCRETE ).bind (2 ));
210+ statements .put ("UPDATE_LWT_IF_EXISTS" , s -> s .prepare (STMT_UPDATE_LWT_IF_EXISTS ).bind (2 , 2 ));
211+ statements .put ("STMT_UPDATE_LWT_IF_VAL" , s -> s .prepare (STMT_UPDATE_LWT_IF_VAL ).bind (2 , 2 ));
212+ statements .put ("DELETE_CONCRETE" , s -> STMT_DELETE_CONCRETE );
213+ statements .put ("DELETE_PREPARED" , s -> s .prepare (STMT_DELETE ).bind (2 , 2 ));
214+ statements .put ("DELETE_NO_KS_PREPARED" , s -> s .prepare (STMT_DELETE_NO_KS ).bind (2 , 2 ));
215+ statements .put ("DELETE_CONCRETE_PREPARED" , s -> s .prepare (STMT_DELETE_CONCRETE ).bind ());
216+ statements .put (
217+ "DELETE_PK_CONCRETE_PREPARED" , s -> s .prepare (STMT_DELETE_PK_CONCRETE ).bind (2 ));
218+ statements .put (
219+ "DELETE_CK_CONCRETE_PREPARED" , s -> s .prepare (STMT_DELETE_CK_CONCRETE ).bind (2 ));
220+ statements .put ("DELETE_LWT_IF_EXISTS" , s -> s .prepare (STMT_DELETE_IF_EXISTS ).bind (2 , 2 ));
221+
222+ List <String > testErrors = new ArrayList <>();
223+ for (Map .Entry <String , Supplier <CqlSession >> sessionEntry : sessions .entrySet ()) {
224+ for (Map .Entry <String , Function <CqlSession , Statement >> stmtEntry : statements .entrySet ()) {
225+ if (stmtEntry .getKey ().contains ("CONCRETE" )
226+ && !stmtEntry .getKey ().contains ("CK_CONCRETE" )) {
227+ // Scylla does not return tablet info for queries with PK built into query
228+ continue ;
229+ }
230+ if (stmtEntry .getKey ().contains ("LWT" )) {
231+ // LWT is not yet supported by scylla on tables with tablets
232+ continue ;
233+ }
234+ if (sessionEntry .getKey ().equals ("REGULAR" ) && stmtEntry .getKey ().contains ("NO_KS" )) {
235+ // Preparation of the statements without KS will fail on the session with no ks
236+ // specified
237+ continue ;
238+ }
239+ CqlSession session = sessionEntry .getValue ().get ();
240+ // Empty out tablets information
241+ if (session .getMetadata ().getTabletMap ().isPresent ()) {
242+ session
243+ .getMetadata ()
244+ .getTabletMap ()
245+ .get ()
246+ .removeByKeyspace (CqlIdentifier .fromCql (KEYSPACE_NAME ));
247+ }
248+ Statement stmt ;
249+ try {
250+ stmt = stmtEntry .getValue ().apply (session );
251+ } catch (Exception e ) {
252+ RuntimeException ex =
253+ new RuntimeException (
254+ String .format (
255+ "Failed to build statement %s on session %s" ,
256+ stmtEntry .getKey (), sessionEntry .getKey ()));
257+ ex .addSuppressed (e );
258+ throw ex ;
259+ }
260+ try {
261+ if (!executeOnAllHostsAndReturnIfResultHasTabletsInfo (session , stmt )) {
262+ testErrors .add (
250263 String .format (
251- "Failed to build statement %s on session %s" ,
264+ "Statement %s on session %s got no tablet info " ,
252265 stmtEntry .getKey (), sessionEntry .getKey ()));
253- ex .addSuppressed (e );
254- throw ex ;
255- }
256- try {
257- if (!executeOnAllHostsAndReturnIfResultHasTabletsInfo (session , stmt )) {
266+ continue ;
267+ }
268+ } catch (Exception e ) {
269+ testErrors .add (
270+ String .format (
271+ "Failed to execute statement %s on session %s: %s" ,
272+ stmtEntry .getKey (), sessionEntry .getKey (), e ));
273+ continue ;
274+ }
275+ if (!waitSessionLearnedTabletInfo (session )) {
258276 testErrors .add (
259277 String .format (
260- "Statement %s on session %s got no tablet info " ,
278+ "Statement %s on session %s did not trigger session tablets update " ,
261279 stmtEntry .getKey (), sessionEntry .getKey ()));
262280 continue ;
263281 }
264- } catch (Exception e ) {
265- testErrors .add (
266- String .format (
267- "Failed to execute statement %s on session %s: %s" ,
268- stmtEntry .getKey (), sessionEntry .getKey (), e ));
269- continue ;
270- }
271- if (!waitSessionLearnedTabletInfo (session )) {
272- testErrors .add (
273- String .format (
274- "Statement %s on session %s did not trigger session tablets update" ,
275- stmtEntry .getKey (), sessionEntry .getKey ()));
276- continue ;
277- }
278- if (!checkIfRoutedProperly (session , stmt )) {
279- testErrors .add (
280- String .format (
281- "Statement %s on session %s was routed to different nodes" ,
282- stmtEntry .getKey (), sessionEntry .getKey ()));
282+ if (!checkIfRoutedProperly (session , stmt )) {
283+ testErrors .add (
284+ String .format (
285+ "Statement %s on session %s was routed to different nodes" ,
286+ stmtEntry .getKey (), sessionEntry .getKey ()));
287+ }
283288 }
284289 }
285- }
286290
287- if (!testErrors .isEmpty ()) {
288- throw new AssertionError (
289- String .format (
290- "Found queries that got no tablet info: \n %s" , String .join ("\n " , testErrors )));
291+ if (!testErrors .isEmpty ()) {
292+ throw new AssertionError (
293+ String .format (
294+ "Found queries that got no tablet info: \n %s" , String .join ("\n " , testErrors )));
295+ }
291296 }
292297 }
293298
294299 @ Test
295300 public void should_receive_each_tablet_exactly_once () {
296- CqlSession session =
297- CqlSession .builder ().addContactEndPoints (CCM_RULE .getContactPoints ()).build ();
298301 int counter = 0 ;
299- PreparedStatement preparedStatement = session .prepare (STMT_INSERT );
300- for (int i = 1 ; i <= QUERIES ; i ++) {
301- if (executeAndReturnIfResultHasTabletsInfo (session , preparedStatement .bind (i , i ))) {
302- counter ++;
302+ try (CqlSession session =
303+ CqlSession .builder ().addContactEndPoints (CCM_RULE .getContactPoints ()).build (); ) {
304+ PreparedStatement preparedStatement = session .prepare (STMT_INSERT );
305+ for (int i = 1 ; i <= QUERIES ; i ++) {
306+ if (executeAndReturnIfResultHasTabletsInfo (session , preparedStatement .bind (i , i ))) {
307+ counter ++;
308+ }
303309 }
310+ Assert .assertEquals (INITIAL_TABLETS , counter );
311+ assertSessionTabletMapIsFilled (session );
304312 }
305- Assert .assertEquals (INITIAL_TABLETS , counter );
306- assertSessionTabletMapIsFilled (session );
307- session .close ();
308-
309- session = CqlSession .builder ().addContactEndPoints (CCM_RULE .getContactPoints ()).build ();
310- counter = 0 ;
311- preparedStatement = session .prepare (STMT_SELECT );
312- for (int i = 1 ; i <= QUERIES ; i ++) {
313- if (executeAndReturnIfResultHasTabletsInfo (session , preparedStatement .bind (i , i ))) {
314- counter ++;
313+
314+ try (CqlSession session =
315+ CqlSession .builder ().addContactEndPoints (CCM_RULE .getContactPoints ()).build ()) {
316+ counter = 0 ;
317+ PreparedStatement preparedStatement = session .prepare (STMT_SELECT );
318+ for (int i = 1 ; i <= QUERIES ; i ++) {
319+ if (executeAndReturnIfResultHasTabletsInfo (session , preparedStatement .bind (i , i ))) {
320+ counter ++;
321+ }
315322 }
316- }
317323
318- LOG .debug ("Ran first set of queries" );
324+ LOG .debug ("Ran first set of queries" );
319325
320- // With enough queries we should hit a wrong node for each tablet exactly once.
321- Assert .assertEquals (INITIAL_TABLETS , counter );
322- assertSessionTabletMapIsFilled (session );
326+ // With enough queries we should hit a wrong node for each tablet exactly once.
327+ Assert .assertEquals (INITIAL_TABLETS , counter );
328+ assertSessionTabletMapIsFilled (session );
323329
324- // All tablet information should be available by now (unless for some reason cluster did sth on
325- // its own)
326- // We should not receive any tablet payloads now, since they are sent only on mismatch.
327- for (int i = 1 ; i <= QUERIES ; i ++) {
330+ // All tablet information should be available by now (unless for some reason cluster did sth
331+ // on
332+ // its own)
333+ // We should not receive any tablet payloads now, since they are sent only on mismatch.
334+ for (int i = 1 ; i <= QUERIES ; i ++) {
328335
329- ResultSet rs = session .execute (preparedStatement .bind (i , i ));
330- Map <String , ByteBuffer > payload = rs .getExecutionInfo ().getIncomingPayload ();
336+ ResultSet rs = session .execute (preparedStatement .bind (i , i ));
337+ Map <String , ByteBuffer > payload = rs .getExecutionInfo ().getIncomingPayload ();
331338
332- if (payload .containsKey (TabletInfo .TABLETS_ROUTING_V1_CUSTOM_PAYLOAD_KEY )) {
333- throw new RuntimeException (
334- "Received non empty payload with tablets routing information: " + payload );
339+ if (payload .containsKey (TabletInfo .TABLETS_ROUTING_V1_CUSTOM_PAYLOAD_KEY )) {
340+ throw new RuntimeException (
341+ "Received non empty payload with tablets routing information: " + payload );
342+ }
335343 }
336344 }
337345 }
0 commit comments