Skip to content

Commit 1398cba

Browse files
authored
provides request intercepting api (#944)
1 parent 6241b29 commit 1398cba

32 files changed

+2774
-399
lines changed

rsocket-core/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ dependencies {
2929

3030
implementation 'org.slf4j:slf4j-api'
3131

32+
testImplementation (project(":rsocket-transport-local"))
3233
testImplementation 'io.projectreactor:reactor-test'
3334
testImplementation 'org.assertj:assertj-core'
3435
testImplementation 'org.junit.jupiter:junit-jupiter-api'

rsocket-core/src/main/java/io/rsocket/core/FireAndForgetRequesterMono.java

Lines changed: 106 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import io.rsocket.DuplexConnection;
2626
import io.rsocket.Payload;
2727
import io.rsocket.frame.FrameType;
28+
import io.rsocket.plugins.RequestInterceptor;
2829
import java.time.Duration;
2930
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
3031
import org.reactivestreams.Subscription;
@@ -51,21 +52,31 @@ final class FireAndForgetRequesterMono extends Mono<Void> implements Subscriptio
5152
final RequesterResponderSupport requesterResponderSupport;
5253
final DuplexConnection connection;
5354

55+
@Nullable final RequestInterceptor requestInterceptor;
56+
5457
FireAndForgetRequesterMono(Payload payload, RequesterResponderSupport requesterResponderSupport) {
5558
this.allocator = requesterResponderSupport.getAllocator();
5659
this.payload = payload;
5760
this.mtu = requesterResponderSupport.getMtu();
5861
this.maxFrameLength = requesterResponderSupport.getMaxFrameLength();
5962
this.requesterResponderSupport = requesterResponderSupport;
6063
this.connection = requesterResponderSupport.getDuplexConnection();
64+
this.requestInterceptor = requesterResponderSupport.getRequestInterceptor();
6165
}
6266

6367
@Override
6468
public void subscribe(CoreSubscriber<? super Void> actual) {
6569
long previousState = markSubscribed(STATE, this);
6670
if (isSubscribedOrTerminated(previousState)) {
67-
Operators.error(
68-
actual, new IllegalStateException("FireAndForgetMono allows only a single Subscriber"));
71+
final IllegalStateException e =
72+
new IllegalStateException("FireAndForgetMono allows only a single Subscriber");
73+
74+
final RequestInterceptor requestInterceptor = this.requestInterceptor;
75+
if (requestInterceptor != null) {
76+
requestInterceptor.onReject(e, FrameType.REQUEST_FNF, null);
77+
}
78+
79+
Operators.error(actual, e);
6980
return;
7081
}
7182

@@ -76,14 +87,28 @@ public void subscribe(CoreSubscriber<? super Void> actual) {
7687
try {
7788
if (!isValid(mtu, this.maxFrameLength, p, false)) {
7889
lazyTerminate(STATE, this);
79-
p.release();
80-
actual.onError(
90+
91+
final IllegalArgumentException e =
8192
new IllegalArgumentException(
82-
String.format(INVALID_PAYLOAD_ERROR_MESSAGE, this.maxFrameLength)));
93+
String.format(INVALID_PAYLOAD_ERROR_MESSAGE, this.maxFrameLength));
94+
final RequestInterceptor requestInterceptor = this.requestInterceptor;
95+
if (requestInterceptor != null) {
96+
requestInterceptor.onReject(e, FrameType.REQUEST_FNF, p.metadata());
97+
}
98+
99+
p.release();
100+
101+
actual.onError(e);
83102
return;
84103
}
85104
} catch (IllegalReferenceCountException e) {
86105
lazyTerminate(STATE, this);
106+
107+
final RequestInterceptor requestInterceptor = this.requestInterceptor;
108+
if (requestInterceptor != null) {
109+
requestInterceptor.onReject(e, FrameType.REQUEST_FNF, null);
110+
}
111+
87112
actual.onError(e);
88113
return;
89114
}
@@ -93,26 +118,54 @@ public void subscribe(CoreSubscriber<? super Void> actual) {
93118
streamId = this.requesterResponderSupport.getNextStreamId();
94119
} catch (Throwable t) {
95120
lazyTerminate(STATE, this);
121+
122+
final Throwable ut = Exceptions.unwrap(t);
123+
final RequestInterceptor requestInterceptor = this.requestInterceptor;
124+
if (requestInterceptor != null) {
125+
requestInterceptor.onReject(ut, FrameType.REQUEST_FNF, p.metadata());
126+
}
127+
96128
p.release();
97-
actual.onError(Exceptions.unwrap(t));
129+
130+
actual.onError(ut);
98131
return;
99132
}
100133

134+
final RequestInterceptor interceptor = this.requestInterceptor;
135+
if (interceptor != null) {
136+
interceptor.onStart(streamId, FrameType.REQUEST_FNF, p.metadata());
137+
}
138+
101139
try {
102140
if (isTerminated(this.state)) {
103141
p.release();
142+
143+
if (interceptor != null) {
144+
interceptor.onCancel(streamId);
145+
}
146+
104147
return;
105148
}
106149

107150
sendReleasingPayload(
108151
streamId, FrameType.REQUEST_FNF, mtu, p, this.connection, this.allocator, true);
109152
} catch (Throwable e) {
110153
lazyTerminate(STATE, this);
154+
155+
if (interceptor != null) {
156+
interceptor.onTerminate(streamId, e);
157+
}
158+
111159
actual.onError(e);
112160
return;
113161
}
114162

115163
lazyTerminate(STATE, this);
164+
165+
if (interceptor != null) {
166+
interceptor.onTerminate(streamId, null);
167+
}
168+
116169
actual.onComplete();
117170
}
118171

@@ -137,19 +190,41 @@ public Void block(Duration m) {
137190
public Void block() {
138191
long previousState = markSubscribed(STATE, this);
139192
if (isSubscribedOrTerminated(previousState)) {
140-
throw new IllegalStateException("FireAndForgetMono allows only a single Subscriber");
193+
final IllegalStateException e =
194+
new IllegalStateException("FireAndForgetMono allows only a single Subscriber");
195+
final RequestInterceptor requestInterceptor = this.requestInterceptor;
196+
if (requestInterceptor != null) {
197+
requestInterceptor.onReject(e, FrameType.REQUEST_FNF, null);
198+
}
199+
throw e;
141200
}
142201

143202
final Payload p = this.payload;
144203
try {
145204
if (!isValid(this.mtu, this.maxFrameLength, p, false)) {
146205
lazyTerminate(STATE, this);
206+
207+
final IllegalArgumentException e =
208+
new IllegalArgumentException(
209+
String.format(INVALID_PAYLOAD_ERROR_MESSAGE, this.maxFrameLength));
210+
211+
final RequestInterceptor requestInterceptor = this.requestInterceptor;
212+
if (requestInterceptor != null) {
213+
requestInterceptor.onReject(e, FrameType.REQUEST_FNF, p.metadata());
214+
}
215+
147216
p.release();
148-
throw new IllegalArgumentException(
149-
String.format(INVALID_PAYLOAD_ERROR_MESSAGE, this.maxFrameLength));
217+
218+
throw e;
150219
}
151220
} catch (IllegalReferenceCountException e) {
152221
lazyTerminate(STATE, this);
222+
223+
final RequestInterceptor requestInterceptor = this.requestInterceptor;
224+
if (requestInterceptor != null) {
225+
requestInterceptor.onReject(e, FrameType.REQUEST_FNF, null);
226+
}
227+
153228
throw Exceptions.propagate(e);
154229
}
155230

@@ -158,10 +233,22 @@ public Void block() {
158233
streamId = this.requesterResponderSupport.getNextStreamId();
159234
} catch (Throwable t) {
160235
lazyTerminate(STATE, this);
236+
237+
final RequestInterceptor requestInterceptor = this.requestInterceptor;
238+
if (requestInterceptor != null) {
239+
requestInterceptor.onReject(Exceptions.unwrap(t), FrameType.REQUEST_FNF, p.metadata());
240+
}
241+
161242
p.release();
243+
162244
throw Exceptions.propagate(t);
163245
}
164246

247+
final RequestInterceptor interceptor = this.requestInterceptor;
248+
if (interceptor != null) {
249+
interceptor.onStart(streamId, FrameType.REQUEST_FNF, p.metadata());
250+
}
251+
165252
try {
166253
sendReleasingPayload(
167254
streamId,
@@ -173,10 +260,20 @@ public Void block() {
173260
true);
174261
} catch (Throwable e) {
175262
lazyTerminate(STATE, this);
263+
264+
if (interceptor != null) {
265+
interceptor.onTerminate(streamId, e);
266+
}
267+
176268
throw Exceptions.propagate(e);
177269
}
178270

179271
lazyTerminate(STATE, this);
272+
273+
if (interceptor != null) {
274+
interceptor.onTerminate(streamId, null);
275+
}
276+
180277
return null;
181278
}
182279

rsocket-core/src/main/java/io/rsocket/core/FireAndForgetResponderSubscriber.java

Lines changed: 50 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,13 @@
2222
import io.rsocket.Payload;
2323
import io.rsocket.RSocket;
2424
import io.rsocket.frame.decoder.PayloadDecoder;
25+
import io.rsocket.plugins.RequestInterceptor;
2526
import org.reactivestreams.Subscription;
2627
import org.slf4j.Logger;
2728
import org.slf4j.LoggerFactory;
2829
import reactor.core.CoreSubscriber;
2930
import reactor.core.publisher.Mono;
31+
import reactor.util.annotation.Nullable;
3032

3133
final class FireAndForgetResponderSubscriber
3234
implements CoreSubscriber<Void>, ResponderFrameHandler {
@@ -42,6 +44,8 @@ final class FireAndForgetResponderSubscriber
4244
final RSocket handler;
4345
final int maxInboundPayloadSize;
4446

47+
@Nullable final RequestInterceptor requestInterceptor;
48+
4549
CompositeByteBuf frames;
4650

4751
private FireAndForgetResponderSubscriber() {
@@ -51,6 +55,19 @@ private FireAndForgetResponderSubscriber() {
5155
this.maxInboundPayloadSize = 0;
5256
this.requesterResponderSupport = null;
5357
this.handler = null;
58+
this.requestInterceptor = null;
59+
this.frames = null;
60+
}
61+
62+
FireAndForgetResponderSubscriber(
63+
int streamId, RequesterResponderSupport requesterResponderSupport) {
64+
this.streamId = streamId;
65+
this.allocator = null;
66+
this.payloadDecoder = null;
67+
this.maxInboundPayloadSize = 0;
68+
this.requesterResponderSupport = null;
69+
this.handler = null;
70+
this.requestInterceptor = requesterResponderSupport.getRequestInterceptor();
5471
this.frames = null;
5572
}
5673

@@ -65,6 +82,7 @@ private FireAndForgetResponderSubscriber() {
6582
this.maxInboundPayloadSize = requesterResponderSupport.getMaxInboundPayloadSize();
6683
this.requesterResponderSupport = requesterResponderSupport;
6784
this.handler = handler;
85+
this.requestInterceptor = requesterResponderSupport.getRequestInterceptor();
6886

6987
this.frames =
7088
ReassemblyUtils.addFollowingFrame(
@@ -81,11 +99,21 @@ public void onNext(Void voidVal) {}
8199

82100
@Override
83101
public void onError(Throwable t) {
102+
final RequestInterceptor requestInterceptor = this.requestInterceptor;
103+
if (requestInterceptor != null) {
104+
requestInterceptor.onTerminate(this.streamId, t);
105+
}
106+
84107
logger.debug("Dropped Outbound error", t);
85108
}
86109

87110
@Override
88-
public void onComplete() {}
111+
public void onComplete() {
112+
final RequestInterceptor requestInterceptor = this.requestInterceptor;
113+
if (requestInterceptor != null) {
114+
requestInterceptor.onTerminate(this.streamId, null);
115+
}
116+
}
89117

90118
@Override
91119
public void handleNext(ByteBuf followingFrame, boolean hasFollows, boolean isLastPayload) {
@@ -95,11 +123,17 @@ public void handleNext(ByteBuf followingFrame, boolean hasFollows, boolean isLas
95123
ReassemblyUtils.addFollowingFrame(
96124
frames, followingFrame, hasFollows, this.maxInboundPayloadSize);
97125
} catch (IllegalStateException t) {
98-
this.requesterResponderSupport.remove(this.streamId, this);
126+
final int streamId = this.streamId;
127+
this.requesterResponderSupport.remove(streamId, this);
99128

100129
this.frames = null;
101130
frames.release();
102131

132+
final RequestInterceptor requestInterceptor = this.requestInterceptor;
133+
if (requestInterceptor != null) {
134+
requestInterceptor.onTerminate(streamId, t);
135+
}
136+
103137
logger.debug("Reassembly has failed", t);
104138
return;
105139
}
@@ -114,6 +148,12 @@ public void handleNext(ByteBuf followingFrame, boolean hasFollows, boolean isLas
114148
frames.release();
115149
} catch (Throwable t) {
116150
ReferenceCountUtil.safeRelease(frames);
151+
152+
final RequestInterceptor requestInterceptor = this.requestInterceptor;
153+
if (requestInterceptor != null) {
154+
requestInterceptor.onTerminate(this.streamId, t);
155+
}
156+
117157
logger.debug("Reassembly has failed", t);
118158
return;
119159
}
@@ -127,9 +167,16 @@ public void handleNext(ByteBuf followingFrame, boolean hasFollows, boolean isLas
127167
public final void handleCancel() {
128168
final CompositeByteBuf frames = this.frames;
129169
if (frames != null) {
130-
this.requesterResponderSupport.remove(this.streamId, this);
170+
final int streamId = this.streamId;
171+
this.requesterResponderSupport.remove(streamId, this);
172+
131173
this.frames = null;
132174
frames.release();
175+
176+
final RequestInterceptor requestInterceptor = this.requestInterceptor;
177+
if (requestInterceptor != null) {
178+
requestInterceptor.onCancel(streamId);
179+
}
133180
}
134181
}
135182
}

rsocket-core/src/main/java/io/rsocket/core/RSocketConnector.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -631,6 +631,7 @@ public Mono<RSocket> connect(Supplier<ClientTransport> transportSupplier) {
631631
(int) keepAliveInterval.toMillis(),
632632
(int) keepAliveMaxLifeTime.toMillis(),
633633
keepAliveHandler,
634+
interceptors::initRequesterRequestInterceptor,
634635
requesterLeaseHandler);
635636

636637
RSocket wrappedRSocketRequester =
@@ -669,7 +670,8 @@ public Mono<RSocket> connect(Supplier<ClientTransport> transportSupplier) {
669670
responderLeaseHandler,
670671
mtu,
671672
maxFrameLength,
672-
maxInboundPayloadSize);
673+
maxInboundPayloadSize,
674+
interceptors::initResponderRequestInterceptor);
673675

674676
return wrappedRSocketRequester;
675677
})

0 commit comments

Comments
 (0)