1717package com .google .cloud .spanner .spi .v1 ;
1818
1919import static org .junit .Assert .assertEquals ;
20+ import static org .junit .Assert .assertNotNull ;
2021
2122import com .google .auth .oauth2 .AccessToken ;
2223import com .google .auth .oauth2 .OAuth2Credentials ;
2728import com .google .cloud .spanner .Spanner ;
2829import com .google .cloud .spanner .SpannerOptions ;
2930import com .google .cloud .spanner .Statement ;
31+ import com .google .common .base .MoreObjects ;
32+ import com .google .common .base .Preconditions ;
3033import com .google .protobuf .ListValue ;
3134import com .google .spanner .v1 .ResultSetMetadata ;
3235import com .google .spanner .v1 .StructType ;
4750import io .opencensus .tags .TagValue ;
4851import java .io .IOException ;
4952import java .net .InetSocketAddress ;
50- import java .util .HashMap ;
5153import java .util .List ;
5254import java .util .Map ;
5355import java .util .Random ;
@@ -80,26 +82,22 @@ public class GfeLatencyTest {
8082
8183 private static MockSpannerServiceImpl mockSpanner ;
8284 private static Server server ;
83- private static InetSocketAddress address ;
8485 private static Spanner spanner ;
8586 private static DatabaseClient databaseClient ;
8687
87- private static final Map <SpannerRpc .Option , Object > optionsMap = new HashMap <>();
88-
8988 private static MockSpannerServiceImpl mockSpannerNoHeader ;
9089 private static Server serverNoHeader ;
91- private static InetSocketAddress addressNoHeader ;
9290 private static Spanner spannerNoHeader ;
9391 private static DatabaseClient databaseClientNoHeader ;
9492
95- private static String instanceId = "fake-instance" ;
96- private static String databaseId = "fake-database" ;
97- private static String projectId = "fake-project" ;
93+ private static final String INSTANCE_ID = "fake-instance" ;
94+ private static final String DATABASE_ID = "fake-database" ;
95+ private static final String PROJECT_ID = "fake-project" ;
9896
99- private static final long WAIT_FOR_METRICS_TIME_MS = 1_000 ;
100- private static final int MAXIMUM_RETRIES = 5 ;
97+ private static final int MAXIMUM_RETRIES = 50000 ;
10198
102- private static AtomicInteger fakeServerTiming = new AtomicInteger (new Random ().nextInt (1000 ) + 1 );
99+ private static final AtomicInteger FAKE_SERVER_TIMING =
100+ new AtomicInteger (new Random ().nextInt (1000 ) + 1 );
103101
104102 private static final Statement SELECT1AND2 =
105103 Statement .of ("SELECT 1 AS COL1 UNION ALL SELECT 2 AS COL1" );
@@ -135,6 +133,7 @@ public class GfeLatencyTest {
135133
136134 @ BeforeClass
137135 public static void startServer () throws IOException {
136+ //noinspection deprecation
138137 SpannerRpcViews .registerGfeLatencyAndHeaderMissingCountViews ();
139138
140139 mockSpanner = new MockSpannerServiceImpl ();
@@ -143,7 +142,7 @@ public static void startServer() throws IOException {
143142 MockSpannerServiceImpl .StatementResult .query (SELECT1AND2 , SELECT1_RESULTSET ));
144143 mockSpanner .putStatementResult (
145144 MockSpannerServiceImpl .StatementResult .update (UPDATE_FOO_STATEMENT , 1L ));
146- address = new InetSocketAddress ("localhost" , 0 );
145+ InetSocketAddress address = new InetSocketAddress ("localhost" , 0 );
147146 server =
148147 NettyServerBuilder .forAddress (address )
149148 .addService (mockSpanner )
@@ -161,7 +160,7 @@ public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
161160 public void sendHeaders (Metadata headers ) {
162161 headers .put (
163162 Metadata .Key .of ("server-timing" , Metadata .ASCII_STRING_MARSHALLER ),
164- String .format ("gfet4t7; dur=%d" , fakeServerTiming .get ()));
163+ String .format ("gfet4t7; dur=%d" , FAKE_SERVER_TIMING .get ()));
165164 super .sendHeaders (headers );
166165 }
167166 },
@@ -170,25 +169,24 @@ public void sendHeaders(Metadata headers) {
170169 })
171170 .build ()
172171 .start ();
173- optionsMap .put (SpannerRpc .Option .CHANNEL_HINT , 1L );
174172 spanner = createSpannerOptions (address , server ).getService ();
175- databaseClient = spanner .getDatabaseClient (DatabaseId .of (projectId , instanceId , databaseId ));
173+ databaseClient = spanner .getDatabaseClient (DatabaseId .of (PROJECT_ID , INSTANCE_ID , DATABASE_ID ));
176174
177175 mockSpannerNoHeader = new MockSpannerServiceImpl ();
178176 mockSpannerNoHeader .setAbortProbability (0.0D );
179177 mockSpannerNoHeader .putStatementResult (
180178 MockSpannerServiceImpl .StatementResult .query (SELECT1AND2 , SELECT1_RESULTSET ));
181179 mockSpannerNoHeader .putStatementResult (
182180 MockSpannerServiceImpl .StatementResult .update (UPDATE_FOO_STATEMENT , 1L ));
183- addressNoHeader = new InetSocketAddress ("localhost" , 0 );
181+ InetSocketAddress addressNoHeader = new InetSocketAddress ("localhost" , 0 );
184182 serverNoHeader =
185183 NettyServerBuilder .forAddress (addressNoHeader )
186184 .addService (mockSpannerNoHeader )
187185 .build ()
188186 .start ();
189187 spannerNoHeader = createSpannerOptions (addressNoHeader , serverNoHeader ).getService ();
190188 databaseClientNoHeader =
191- spannerNoHeader .getDatabaseClient (DatabaseId .of (projectId , instanceId , databaseId ));
189+ spannerNoHeader .getDatabaseClient (DatabaseId .of (PROJECT_ID , INSTANCE_ID , DATABASE_ID ));
192190 }
193191
194192 @ AfterClass
@@ -221,12 +219,9 @@ public void testGfeLatencyExecuteStreamingSql() throws InterruptedException {
221219 long latency =
222220 getMetric (
223221 SpannerRpcViews .SPANNER_GFE_LATENCY_VIEW ,
224- projectId ,
225- instanceId ,
226- databaseId ,
227222 "google.spanner.v1.Spanner/ExecuteStreamingSql" ,
228223 false );
229- assertEquals (fakeServerTiming .get (), latency );
224+ assertEquals (FAKE_SERVER_TIMING .get (), latency );
230225 }
231226
232227 @ Test
@@ -238,12 +233,9 @@ public void testGfeLatencyExecuteSql() throws InterruptedException {
238233 long latency =
239234 getMetric (
240235 SpannerRpcViews .SPANNER_GFE_LATENCY_VIEW ,
241- projectId ,
242- instanceId ,
243- databaseId ,
244236 "google.spanner.v1.Spanner/ExecuteSql" ,
245237 false );
246- assertEquals (fakeServerTiming .get (), latency );
238+ assertEquals (FAKE_SERVER_TIMING .get (), latency );
247239 }
248240
249241 @ Test
@@ -254,9 +246,6 @@ public void testGfeMissingHeaderCountExecuteStreamingSql() throws InterruptedExc
254246 long count =
255247 getMetric (
256248 SpannerRpcViews .SPANNER_GFE_HEADER_MISSING_COUNT_VIEW ,
257- projectId ,
258- instanceId ,
259- databaseId ,
260249 "google.spanner.v1.Spanner/ExecuteStreamingSql" ,
261250 false );
262251 assertEquals (0 , count );
@@ -267,9 +256,6 @@ public void testGfeMissingHeaderCountExecuteStreamingSql() throws InterruptedExc
267256 long count1 =
268257 getMetric (
269258 SpannerRpcViews .SPANNER_GFE_HEADER_MISSING_COUNT_VIEW ,
270- projectId ,
271- instanceId ,
272- databaseId ,
273259 "google.spanner.v1.Spanner/ExecuteStreamingSql" ,
274260 true );
275261 assertEquals (1 , count1 );
@@ -283,9 +269,6 @@ public void testGfeMissingHeaderExecuteSql() throws InterruptedException {
283269 long count =
284270 getMetric (
285271 SpannerRpcViews .SPANNER_GFE_HEADER_MISSING_COUNT_VIEW ,
286- projectId ,
287- instanceId ,
288- databaseId ,
289272 "google.spanner.v1.Spanner/ExecuteSql" ,
290273 false );
291274 assertEquals (0 , count );
@@ -296,9 +279,6 @@ public void testGfeMissingHeaderExecuteSql() throws InterruptedException {
296279 long count1 =
297280 getMetric (
298281 SpannerRpcViews .SPANNER_GFE_HEADER_MISSING_COUNT_VIEW ,
299- projectId ,
300- instanceId ,
301- databaseId ,
302282 "google.spanner.v1.Spanner/ExecuteSql" ,
303283 true );
304284 assertEquals (1 , count1 );
@@ -321,78 +301,75 @@ private static SpannerOptions createSpannerOptions(InetSocketAddress address, Se
321301 }
322302
323303 private long getAggregationValueAsLong (AggregationData aggregationData ) {
324- return aggregationData .match (
325- new io .opencensus .common .Function <AggregationData .SumDataDouble , Long >() {
326- @ Override
327- public Long apply (AggregationData .SumDataDouble arg ) {
328- return (long ) arg .getSum ();
329- }
330- },
331- new io .opencensus .common .Function <AggregationData .SumDataLong , Long >() {
332- @ Override
333- public Long apply (AggregationData .SumDataLong arg ) {
334- return arg .getSum ();
335- }
336- },
337- new io .opencensus .common .Function <AggregationData .CountData , Long >() {
338- @ Override
339- public Long apply (AggregationData .CountData arg ) {
340- return arg .getCount ();
341- }
342- },
343- new io .opencensus .common .Function <AggregationData .DistributionData , Long >() {
344- @ Override
345- public Long apply (AggregationData .DistributionData arg ) {
346- return (long ) arg .getMean ();
347- }
348- },
349- new io .opencensus .common .Function <AggregationData .LastValueDataDouble , Long >() {
350- @ Override
351- public Long apply (AggregationData .LastValueDataDouble arg ) {
352- return (long ) arg .getLastValue ();
353- }
354- },
355- new io .opencensus .common .Function <AggregationData .LastValueDataLong , Long >() {
356- @ Override
357- public Long apply (AggregationData .LastValueDataLong arg ) {
358- return arg .getLastValue ();
359- }
360- },
361- new io .opencensus .common .Function <AggregationData , Long >() {
362- @ Override
363- public Long apply (AggregationData arg ) {
364- throw new UnsupportedOperationException ();
365- }
366- });
304+ return MoreObjects .firstNonNull (
305+ aggregationData .match (
306+ new io .opencensus .common .Function <AggregationData .SumDataDouble , Long >() {
307+ @ Override
308+ public Long apply (AggregationData .SumDataDouble arg ) {
309+ return (long ) Preconditions .checkNotNull (arg ).getSum ();
310+ }
311+ },
312+ new io .opencensus .common .Function <AggregationData .SumDataLong , Long >() {
313+ @ Override
314+ public Long apply (AggregationData .SumDataLong arg ) {
315+ return Preconditions .checkNotNull (arg ).getSum ();
316+ }
317+ },
318+ new io .opencensus .common .Function <AggregationData .CountData , Long >() {
319+ @ Override
320+ public Long apply (AggregationData .CountData arg ) {
321+ return Preconditions .checkNotNull (arg ).getCount ();
322+ }
323+ },
324+ new io .opencensus .common .Function <AggregationData .DistributionData , Long >() {
325+ @ Override
326+ public Long apply (AggregationData .DistributionData arg ) {
327+ return (long ) Preconditions .checkNotNull (arg ).getMean ();
328+ }
329+ },
330+ new io .opencensus .common .Function <AggregationData .LastValueDataDouble , Long >() {
331+ @ Override
332+ public Long apply (AggregationData .LastValueDataDouble arg ) {
333+ return (long ) Preconditions .checkNotNull (arg ).getLastValue ();
334+ }
335+ },
336+ new io .opencensus .common .Function <AggregationData .LastValueDataLong , Long >() {
337+ @ Override
338+ public Long apply (AggregationData .LastValueDataLong arg ) {
339+ return Preconditions .checkNotNull (arg ).getLastValue ();
340+ }
341+ },
342+ new io .opencensus .common .Function <AggregationData , Long >() {
343+ @ Override
344+ public Long apply (AggregationData arg ) {
345+ throw new UnsupportedOperationException ();
346+ }
347+ }),
348+ -1L );
367349 }
368350
369- private long getMetric (
370- View view ,
371- String projectId ,
372- String instanceId ,
373- String databaseId ,
374- String method ,
375- boolean withOverride )
376- throws InterruptedException {
351+ private long getMetric (View view , String method , boolean withOverride ) {
377352 List <TagValue > tagValues = new java .util .ArrayList <>();
378353 for (TagKey column : view .getColumns ()) {
379354 if (column == SpannerRpcViews .INSTANCE_ID ) {
380- tagValues .add (TagValue .create (instanceId ));
355+ tagValues .add (TagValue .create (INSTANCE_ID ));
381356 } else if (column == SpannerRpcViews .DATABASE_ID ) {
382- tagValues .add (TagValue .create (databaseId ));
357+ tagValues .add (TagValue .create (DATABASE_ID ));
383358 } else if (column == SpannerRpcViews .METHOD ) {
384359 tagValues .add (TagValue .create (method ));
385360 } else if (column == SpannerRpcViews .PROJECT_ID ) {
386- tagValues .add (TagValue .create (projectId ));
361+ tagValues .add (TagValue .create (PROJECT_ID ));
387362 }
388363 }
389364 for (int i = 0 ; i < MAXIMUM_RETRIES ; i ++) {
390- Thread .sleep ( WAIT_FOR_METRICS_TIME_MS );
365+ Thread .yield ( );
391366 ViewData viewData = SpannerRpcViews .viewManager .getView (view .getName ());
367+ assertNotNull (viewData );
392368 if (viewData .getAggregationMap () != null ) {
393369 Map <List <TagValue >, AggregationData > aggregationMap = viewData .getAggregationMap ();
394370 AggregationData aggregationData = aggregationMap .get (tagValues );
395- if (withOverride && getAggregationValueAsLong (aggregationData ) == 0 ) {
371+ if (aggregationData == null
372+ || withOverride && getAggregationValueAsLong (aggregationData ) == 0 ) {
396373 continue ;
397374 }
398375 return getAggregationValueAsLong (aggregationData );
0 commit comments