Skip to content

Commit 8e24c2d

Browse files
committed
Initial commit
fix checkstyle add missing space Initial commit
1 parent a947d5d commit 8e24c2d

File tree

13 files changed

+59
-62
lines changed

13 files changed

+59
-62
lines changed

examples/src/main/java/io/dapr/examples/pubsub/BulkPublisher.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ public static void main(String[] args) throws Exception {
7171
System.out.println("Going to publish message : " + message);
7272
}
7373
BulkPublishResponse<?> res = client.publishEvents(PUBSUB_NAME, TOPIC_NAME, "text/plain", messages)
74-
.subscriberContext(getReactorContext()).block();
74+
.contextWrite(getReactorContext()).block();
7575
System.out.println("Published the set of messages in a single call to Dapr");
7676
if (res != null) {
7777
if (res.getFailedEntries().size() > 0) {

examples/src/main/java/io/dapr/examples/pubsub/PublisherWithTracing.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ public static void main(String[] args) throws Exception {
6363
client.publishEvent(
6464
PUBSUB_NAME,
6565
TOPIC_NAME,
66-
message).subscriberContext(getReactorContext()).block();
66+
message).contextWrite(getReactorContext()).block();
6767
System.out.println("Published message: " + message);
6868

6969
try {

examples/src/main/java/io/dapr/examples/tracing/InvokeClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ public static void main(String[] args) throws Exception {
6767
InvokeMethodRequest sleepRequest = new InvokeMethodRequest(SERVICE_APP_ID, "proxy_sleep")
6868
.setHttpExtension(HttpExtension.POST);
6969
return client.invokeMethod(sleepRequest, TypeRef.get(Void.class));
70-
}).subscriberContext(getReactorContext()).block();
70+
}).contextWrite(getReactorContext()).block();
7171
}
7272
}
7373
}

examples/src/main/java/io/dapr/examples/tracing/TracingDemoMiddleServiceController.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ public Mono<byte[]> echo(
5858
InvokeMethodRequest request = new InvokeMethodRequest(INVOKE_APP_ID, "echo")
5959
.setBody(body)
6060
.setHttpExtension(HttpExtension.POST);
61-
return client.invokeMethod(request, TypeRef.get(byte[].class)).subscriberContext(getReactorContext(context));
61+
return client.invokeMethod(request, TypeRef.get(byte[].class)).contextWrite(getReactorContext(context));
6262
}
6363

6464
/**
@@ -71,7 +71,7 @@ public Mono<byte[]> echo(
7171
public Mono<Void> sleep(@RequestAttribute(name = "opentelemetry-context") Context context) {
7272
InvokeMethodRequest request = new InvokeMethodRequest(INVOKE_APP_ID, "sleep")
7373
.setHttpExtension(HttpExtension.POST);
74-
return client.invokeMethod(request, TypeRef.get(byte[].class)).subscriberContext(getReactorContext(context)).then();
74+
return client.invokeMethod(request, TypeRef.get(byte[].class)).contextWrite(getReactorContext(context)).then();
7575
}
7676

7777
}

sdk-actors/src/main/java/io/dapr/actors/client/DaprGrpcClient.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import reactor.core.publisher.Mono;
3131
import reactor.core.publisher.MonoSink;
3232
import reactor.util.context.Context;
33+
import reactor.util.context.ContextView;
3334

3435
import java.util.concurrent.ExecutionException;
3536
import java.util.function.Consumer;
@@ -65,7 +66,7 @@ public Mono<byte[]> invoke(String actorType, String actorId, String methodName,
6566
.setMethod(methodName)
6667
.setData(jsonPayload == null ? ByteString.EMPTY : ByteString.copyFrom(jsonPayload))
6768
.build();
68-
return Mono.subscriberContext().flatMap(
69+
return Mono.deferContextual(
6970
context -> this.<DaprProtos.InvokeActorResponse>createMono(
7071
it -> intercept(context, client).invokeActor(req, it)
7172
)
@@ -109,7 +110,7 @@ public void start(final Listener<RespT> responseListener, final Metadata metadat
109110
* @param client GRPC client for Dapr.
110111
* @return Client after adding interceptors.
111112
*/
112-
private static DaprGrpc.DaprStub intercept(Context context, DaprGrpc.DaprStub client) {
113+
private static DaprGrpc.DaprStub intercept(ContextView context, DaprGrpc.DaprStub client) {
113114
return GrpcWrapper.intercept(context, client);
114115
}
115116

sdk/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@
4444
<dependency>
4545
<groupId>io.projectreactor</groupId>
4646
<artifactId>reactor-core</artifactId>
47-
<version>3.3.11.RELEASE</version>
47+
<version>3.5.2</version>
4848
</dependency>
4949
<dependency>
5050
<groupId>com.squareup.okhttp3</groupId>

sdk/src/main/java/io/dapr/client/DaprClientGrpc.java

Lines changed: 21 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@
6868
import reactor.core.publisher.Mono;
6969
import reactor.core.publisher.MonoSink;
7070
import reactor.util.context.Context;
71+
import reactor.util.context.ContextView;
7172

7273
import java.io.Closeable;
7374
import java.io.IOException;
@@ -181,11 +182,8 @@ public Mono<Void> publishEvent(PublishEventRequest request) {
181182
envelopeBuilder.putAllMetadata(metadata);
182183
}
183184

184-
return Mono.subscriberContext().flatMap(
185-
context ->
186-
this.<Empty>createMono(
187-
it -> intercept(context, asyncStub).publishEvent(envelopeBuilder.build(), it)
188-
)
185+
return Mono.deferContextual(context -> this.<Empty>createMono(
186+
it -> intercept(context, asyncStub).publishEvent(envelopeBuilder.build(), it))
189187
).then();
190188
} catch (Exception ex) {
191189
return DaprException.wrapMono(ex);
@@ -254,8 +252,7 @@ public <T> Mono<BulkPublishResponse<T>> publishEvents(BulkPublishRequest<T> requ
254252
for (BulkPublishEntry<T> entry: request.getEntries()) {
255253
entryMap.put(entry.getEntryId(), entry);
256254
}
257-
return Mono.subscriberContext().flatMap(
258-
context ->
255+
return Mono.deferContextual(context ->
259256
this.<DaprProtos.BulkPublishResponse>createMono(
260257
it -> intercept(context, asyncStub).bulkPublishEventAlpha1(envelopeBuilder.build(), it)
261258
)
@@ -298,8 +295,8 @@ public <T> Mono<T> invokeMethod(InvokeMethodRequest invokeMethodRequest, TypeRef
298295
// gRPC to gRPC does not handle metadata in Dapr runtime proto.
299296
// gRPC to HTTP does not map correctly in Dapr runtime as per https://github.com/dapr/dapr/issues/2342
300297

301-
return Mono.subscriberContext().flatMap(
302-
context -> this.<CommonProtos.InvokeResponse>createMono(
298+
return Mono.deferContextual(context ->
299+
this.<CommonProtos.InvokeResponse>createMono(
303300
it -> intercept(context, asyncStub).invokeService(envelope, it)
304301
)
305302
).flatMap(
@@ -345,8 +342,7 @@ public <T> Mono<T> invokeBinding(InvokeBindingRequest request, TypeRef<T> type)
345342
}
346343
DaprProtos.InvokeBindingRequest envelope = builder.build();
347344

348-
return Mono.subscriberContext().flatMap(
349-
context -> this.<DaprProtos.InvokeBindingResponse>createMono(
345+
return Mono.deferContextual(context -> this.<DaprProtos.InvokeBindingResponse>createMono(
350346
it -> intercept(context, asyncStub).invokeBinding(envelope, it)
351347
)
352348
).flatMap(
@@ -392,8 +388,7 @@ public <T> Mono<State<T>> getState(GetStateRequest request, TypeRef<T> type) {
392388

393389
DaprProtos.GetStateRequest envelope = builder.build();
394390

395-
return Mono.subscriberContext().flatMap(
396-
context ->
391+
return Mono.deferContextual(context ->
397392
this.<DaprProtos.GetStateResponse>createMono(
398393
it -> intercept(context, asyncStub).getState(envelope, it)
399394
)
@@ -441,8 +436,8 @@ public <T> Mono<List<State<T>>> getBulkState(GetBulkStateRequest request, TypeRe
441436

442437
DaprProtos.GetBulkStateRequest envelope = builder.build();
443438

444-
return Mono.subscriberContext().flatMap(
445-
context -> this.<DaprProtos.GetBulkStateResponse>createMono(it -> intercept(context, asyncStub)
439+
return Mono.deferContextual(context ->
440+
this.<DaprProtos.GetBulkStateResponse>createMono(it -> intercept(context, asyncStub)
446441
.getBulkState(envelope, it)
447442
)
448443
).map(
@@ -525,7 +520,7 @@ public Mono<Void> executeStateTransaction(ExecuteStateTransactionRequest request
525520
}
526521
DaprProtos.ExecuteStateTransactionRequest req = builder.build();
527522

528-
return Mono.subscriberContext().flatMap(
523+
return Mono.deferContextual(
529524
context -> this.<Empty>createMono(it -> intercept(context, asyncStub).executeStateTransaction(req, it))
530525
).then();
531526
} catch (Exception e) {
@@ -551,7 +546,7 @@ public Mono<Void> saveBulkState(SaveStateRequest request) {
551546
}
552547
DaprProtos.SaveStateRequest req = builder.build();
553548

554-
return Mono.subscriberContext().flatMap(
549+
return Mono.deferContextual(
555550
context -> this.<Empty>createMono(it -> intercept(context, asyncStub).saveState(req, it))
556551
).then();
557552
} catch (Exception ex) {
@@ -635,8 +630,8 @@ public Mono<Void> deleteState(DeleteStateRequest request) {
635630

636631
DaprProtos.DeleteStateRequest req = builder.build();
637632

638-
return Mono.subscriberContext().flatMap(
639-
context -> this.<Empty>createMono(it -> intercept(context, asyncStub).deleteState(req, it))
633+
return Mono.deferContextual(context ->
634+
this.<Empty>createMono(it -> intercept(context, asyncStub).deleteState(req, it))
640635
).then();
641636
} catch (Exception ex) {
642637
return DaprException.wrapMono(ex);
@@ -713,7 +708,7 @@ public Mono<Map<String, String>> getSecret(GetSecretRequest request) {
713708
}
714709
DaprProtos.GetSecretRequest req = requestBuilder.build();
715710

716-
return Mono.subscriberContext().flatMap(
711+
return Mono.deferContextual(
717712
context -> this.<DaprProtos.GetSecretResponse>createMono(it -> intercept(context, asyncStub).getSecret(req, it))
718713
).map(DaprProtos.GetSecretResponse::getDataMap);
719714
}
@@ -738,9 +733,8 @@ public Mono<Map<String, Map<String, String>>> getBulkSecret(GetBulkSecretRequest
738733

739734
DaprProtos.GetBulkSecretRequest envelope = builder.build();
740735

741-
return Mono.subscriberContext().flatMap(
742-
context ->
743-
this.<DaprProtos.GetBulkSecretResponse>createMono(
736+
return Mono.deferContextual(context ->
737+
this.<DaprProtos.GetBulkSecretResponse>createMono(
744738
it -> intercept(context, asyncStub).getBulkSecret(envelope, it)
745739
)
746740
).map(it -> {
@@ -791,8 +785,7 @@ public <T> Mono<QueryStateResponse<T>> queryState(QueryStateRequest request, Typ
791785

792786
DaprProtos.QueryStateRequest envelope = builder.build();
793787

794-
return Mono.subscriberContext().flatMap(
795-
context -> this.<DaprProtos.QueryStateResponse>createMono(
788+
return Mono.deferContextual(context -> this.<DaprProtos.QueryStateResponse>createMono(
796789
it -> intercept(context, asyncStub).queryStateAlpha1(envelope, it)
797790
)
798791
).map(
@@ -855,7 +848,7 @@ public void close() throws Exception {
855848
*/
856849
@Override
857850
public Mono<Void> shutdown() {
858-
return Mono.subscriberContext().flatMap(
851+
return Mono.deferContextual(
859852
context -> this.<Empty>createMono(
860853
it -> intercept(context, asyncStub).shutdown(Empty.getDefaultInstance(), it))
861854
).then();
@@ -889,8 +882,7 @@ public Mono<Map<String, ConfigurationItem>> getConfiguration(GetConfigurationReq
889882
}
890883

891884
private Mono<Map<String, ConfigurationItem>> getConfigurationAlpha1(DaprProtos.GetConfigurationRequest envelope) {
892-
return Mono.subscriberContext().flatMap(
893-
context ->
885+
return Mono.deferContextual(context ->
894886
this.<DaprProtos.GetConfigurationResponse>createMono(
895887
it -> intercept(context, asyncStub).getConfigurationAlpha1(envelope, it)
896888
)
@@ -1034,7 +1026,7 @@ public void start(final Listener<RespT> responseListener, final Metadata metadat
10341026
* @param client GRPC client for Dapr.
10351027
* @return Client after adding interceptors.
10361028
*/
1037-
private static DaprGrpc.DaprStub intercept(Context context, DaprGrpc.DaprStub client) {
1029+
private static DaprGrpc.DaprStub intercept(ContextView context, DaprGrpc.DaprStub client) {
10381030
return GrpcWrapper.intercept(context, client);
10391031
}
10401032

sdk/src/main/java/io/dapr/client/DaprClientHttp.java

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import io.dapr.utils.TypeRef;
5151
import reactor.core.publisher.Flux;
5252
import reactor.core.publisher.Mono;
53+
import reactor.util.context.Context;
5354

5455
import java.io.IOException;
5556
import java.util.ArrayList;
@@ -177,7 +178,7 @@ public Mono<Void> publishEvent(PublishEventRequest request) {
177178
String[] pathSegments = new String[]{ DaprHttp.API_VERSION, "publish", pubsubName, topic };
178179

179180
Map<String, List<String>> queryArgs = metadataToQueryArgs(metadata);
180-
return Mono.subscriberContext().flatMap(
181+
return Mono.deferContextual(
181182
context -> this.client.invokeApi(
182183
DaprHttp.HttpMethods.POST.name(), pathSegments, queryArgs, serializedEvent, headers, context
183184
)
@@ -237,7 +238,7 @@ public <T> Mono<T> invokeMethod(InvokeMethodRequest invokeMethodRequest, TypeRef
237238
} else {
238239
headers.put(Metadata.CONTENT_TYPE, objectSerializer.getContentType());
239240
}
240-
Mono<DaprHttp.Response> response = Mono.subscriberContext().flatMap(
241+
Mono<DaprHttp.Response> response = Mono.deferContextual(
241242
context -> this.client.invokeApi(httpMethod, pathSegments.toArray(new String[0]),
242243
httpExtension.getQueryParams(), serializedRequestBody, headers, context)
243244
);
@@ -309,7 +310,7 @@ public <T> Mono<T> invokeBinding(InvokeBindingRequest request, TypeRef<T> type)
309310

310311
String[] pathSegments = new String[]{ DaprHttp.API_VERSION, "bindings", name };
311312

312-
Mono<DaprHttp.Response> response = Mono.subscriberContext().flatMap(
313+
Mono<DaprHttp.Response> response = Mono.deferContextual(
313314
context -> this.client.invokeApi(
314315
httpMethod, pathSegments, null, payload, null, context)
315316
);
@@ -349,7 +350,7 @@ public <T> Mono<List<State<T>>> getBulkState(GetBulkStateRequest request, TypeRe
349350
String[] pathSegments = new String[]{ DaprHttp.API_VERSION, "state", stateStoreName, "bulk" };
350351

351352
Map<String, List<String>> queryArgs = metadataToQueryArgs(metadata);
352-
return Mono.subscriberContext().flatMap(
353+
return Mono.deferContextual(
353354
context -> this.client
354355
.invokeApi(DaprHttp.HttpMethods.POST.name(), pathSegments, queryArgs, requestBody, null, context)
355356
).flatMap(s -> {
@@ -394,7 +395,7 @@ public <T> Mono<State<T>> getState(GetStateRequest request, TypeRef<T> type) {
394395

395396
String[] pathSegments = new String[]{ DaprHttp.API_VERSION, "state", stateStoreName, key };
396397

397-
return Mono.subscriberContext().flatMap(
398+
return Mono.deferContextual(
398399
context -> this.client
399400
.invokeApi(DaprHttp.HttpMethods.GET.name(), pathSegments, queryParams, null, context)
400401
).flatMap(s -> {
@@ -452,7 +453,7 @@ public Mono<Void> executeStateTransaction(ExecuteStateTransactionRequest request
452453

453454
String[] pathSegments = new String[]{ DaprHttp.API_VERSION, "state", stateStoreName, "transaction" };
454455

455-
return Mono.subscriberContext().flatMap(
456+
return Mono.deferContextual(
456457
context -> this.client.invokeApi(
457458
DaprHttp.HttpMethods.POST.name(), pathSegments, null, serializedOperationBody, null, context
458459
)
@@ -500,7 +501,7 @@ public Mono<Void> saveBulkState(SaveStateRequest request) {
500501

501502
String[] pathSegments = new String[]{ DaprHttp.API_VERSION, "state", stateStoreName };
502503

503-
return Mono.subscriberContext().flatMap(
504+
return Mono.deferContextual(
504505
context -> this.client.invokeApi(
505506
DaprHttp.HttpMethods.POST.name(), pathSegments, null, serializedStateBody, null, context)
506507
).then();
@@ -543,7 +544,7 @@ public Mono<Void> deleteState(DeleteStateRequest request) {
543544

544545
String[] pathSegments = new String[]{ DaprHttp.API_VERSION, "state", stateStoreName, key };
545546

546-
return Mono.subscriberContext().flatMap(
547+
return Mono.deferContextual(
547548
context -> this.client.invokeApi(
548549
DaprHttp.HttpMethods.DELETE.name(), pathSegments, queryParams, headers, context)
549550
).then();
@@ -631,7 +632,7 @@ public Mono<Map<String, String>> getSecret(GetSecretRequest request) {
631632
Map<String, List<String>> queryArgs = metadataToQueryArgs(metadata);
632633
String[] pathSegments = new String[]{ DaprHttp.API_VERSION, "secrets", secretStoreName, key };
633634

634-
return Mono.subscriberContext().flatMap(
635+
return Mono.deferContextual(
635636
context -> this.client
636637
.invokeApi(DaprHttp.HttpMethods.GET.name(), pathSegments, queryArgs, (String) null, null, context)
637638
).flatMap(response -> {
@@ -667,7 +668,7 @@ public Mono<Map<String, Map<String, String>>> getBulkSecret(GetBulkSecretRequest
667668
Map<String, List<String>> queryArgs = metadataToQueryArgs(metadata);
668669
String[] pathSegments = new String[]{ DaprHttp.API_VERSION, "secrets", secretStoreName, "bulk" };
669670

670-
return Mono.subscriberContext().flatMap(
671+
return Mono.deferContextual(
671672
context -> this.client
672673
.invokeApi(DaprHttp.HttpMethods.GET.name(), pathSegments, queryArgs, (String) null, null, context)
673674
).flatMap(response -> {
@@ -709,7 +710,7 @@ public <T> Mono<QueryStateResponse<T>> queryState(QueryStateRequest request, Typ
709710
} else {
710711
throw new IllegalArgumentException("Both query and queryString fields are not set.");
711712
}
712-
return Mono.subscriberContext().flatMap(
713+
return Mono.deferContextual(
713714
context -> this.client
714715
.invokeApi(DaprHttp.HttpMethods.POST.name(), pathSegments,
715716
queryArgs, serializedRequest, null, context)
@@ -739,7 +740,7 @@ public void close() {
739740
@Override
740741
public Mono<Void> shutdown() {
741742
String[] pathSegments = new String[]{ DaprHttp.API_VERSION, "shutdown" };
742-
return Mono.subscriberContext().flatMap(
743+
return Mono.deferContextual(
743744
context -> client.invokeApi(DaprHttp.HttpMethods.POST.name(), pathSegments,
744745
null, null, context))
745746
.then();
@@ -810,7 +811,7 @@ public Mono<Map<String, ConfigurationItem>> getConfiguration(GetConfigurationReq
810811
queryParams.putAll(queryArgs);
811812

812813
String[] pathSegments = new String[] {DaprHttp.ALPHA_1_API_VERSION, "configuration", configurationStoreName };
813-
return Mono.subscriberContext().flatMap(
814+
return Mono.deferContextual(
814815
context -> this.client
815816
.invokeApi(
816817
DaprHttp.HttpMethods.GET.name(),
@@ -871,7 +872,7 @@ public Flux<SubscribeConfigurationResponse> subscribeConfiguration(SubscribeConf
871872

872873
String[] pathSegments =
873874
new String[] { DaprHttp.ALPHA_1_API_VERSION, "configuration", configurationStoreName, "subscribe" };
874-
SubscribeConfigurationResponse res = Mono.subscriberContext().flatMap(
875+
SubscribeConfigurationResponse res = Mono.deferContextual(
875876
context -> this.client.invokeApi(
876877
DaprHttp.HttpMethods.GET.name(),
877878
pathSegments, queryParams,
@@ -913,7 +914,7 @@ public Mono<UnsubscribeConfigurationResponse> unsubscribeConfiguration(Unsubscri
913914
String[] pathSegments = new String[]
914915
{ DaprHttp.ALPHA_1_API_VERSION, "configuration", configStoreName, id, "unsubscribe" };
915916

916-
return Mono.subscriberContext().flatMap(
917+
return Mono.deferContextual(
917918
context -> this.client
918919
.invokeApi(
919920
DaprHttp.HttpMethods.GET.name(),

0 commit comments

Comments
 (0)