Skip to content

Commit e7efe6d

Browse files
committed
provides tests and fixes
Signed-off-by: Oleh Dokuka <[email protected]> Signed-off-by: Oleh Dokuka <[email protected]>
1 parent 8ed0e28 commit e7efe6d

File tree

8 files changed

+1234
-150
lines changed

8 files changed

+1234
-150
lines changed

rsocket-core/src/main/java/io/rsocket/RequestChannelFlux.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ public void onNext(Payload p) {
179179
FragmentationUtils.encodeFirstFragment(
180180
allocator,
181181
mtu,
182-
(int) (requested & Integer.MAX_VALUE),
182+
(int) Math.min(requested, Integer.MAX_VALUE),
183183
FrameType.REQUEST_CHANNEL,
184184
streamId,
185185
slicedMetadata,
@@ -211,7 +211,7 @@ public void onNext(Payload p) {
211211
false,
212212
false, // TODO: Should be a different flag in case of the scalar source or
213213
// sync source of a single element
214-
(int) (requested & Integer.MAX_VALUE),
214+
(int) Math.min(requested, Integer.MAX_VALUE),
215215
retainedSlicedMetadata,
216216
retainedSlicedData);
217217

rsocket-core/src/main/java/io/rsocket/RequestResponseMono.java

Lines changed: 47 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,13 @@ final class RequestResponseMono extends Mono<Payload> implements Reassemble<Payl
3939
final UnboundedProcessor<ByteBuf> sendProcessor;
4040
final PayloadDecoder payloadDecoder;
4141

42-
static final int STATE_UNSUBSCRIBED = 0;
43-
static final int STATE_SUBSCRIBED = 1;
44-
static final int STATE_REQUESTED = 2;
45-
static final int STATE_REASSEMBLING = 3;
46-
static final int STATE_TERMINATED = 4;
42+
static final int STATE_UNSUBSCRIBED = 0b0_0000;
43+
static final int STATE_SUBSCRIBED = 0b0_0001;
44+
static final int STATE_REQUESTED = 0b0_0010;
45+
static final int STATE_TERMINATED = 0b1_0000;
46+
47+
static final int FLAG_SENT = 0b0_0100;
48+
static final int FLAG_REASSEMBLING = 0b0_1000;
4749

4850
volatile int state;
4951
static final AtomicIntegerFieldUpdater<RequestResponseMono> STATE =
@@ -52,6 +54,7 @@ final class RequestResponseMono extends Mono<Payload> implements Reassemble<Payl
5254
int streamId;
5355
CoreSubscriber<? super Payload> actual;
5456
CompositeByteBuf frames;
57+
boolean done;
5558

5659
RequestResponseMono(
5760
ByteBufAllocator allocator,
@@ -98,13 +101,14 @@ public final void onComplete() {
98101

99102
@Override
100103
public final void onError(Throwable cause) {
101-
if (this.state == STATE_TERMINATED) {
104+
if (this.state == STATE_TERMINATED || this.done) {
102105
Operators.onErrorDropped(cause, currentContext());
103106
return;
104107
}
105108

106109
final CompositeByteBuf frames = this.frames;
107110
this.frames = null;
111+
this.done = true;
108112

109113
if (STATE.getAndSet(this, STATE_TERMINATED) == STATE_TERMINATED) {
110114
Operators.onErrorDropped(cause, currentContext());
@@ -122,8 +126,16 @@ public final void onError(Throwable cause) {
122126

123127
@Override
124128
public final void onNext(@Nullable Payload value) {
125-
if (this.state == STATE_TERMINATED
126-
|| STATE.getAndSet(this, STATE_TERMINATED) == STATE_TERMINATED) {
129+
if (this.state == STATE_TERMINATED || this.done) {
130+
if (value != null) {
131+
value.release();
132+
}
133+
return;
134+
}
135+
136+
this.done = true;
137+
138+
if (STATE.getAndSet(this, STATE_TERMINATED) == STATE_TERMINATED) {
127139
if (value != null) {
128140
value.release();
129141
}
@@ -229,6 +241,7 @@ public final void request(long n) {
229241

230242
p.release();
231243
} catch (Throwable e) {
244+
this.done = true;
232245
this.state = STATE_TERMINATED;
233246

234247
ReferenceCountUtil.safeRelease(p);
@@ -241,21 +254,39 @@ public final void request(long n) {
241254
}
242255

243256
this.actual.onError(e);
257+
return;
258+
}
259+
260+
for (; ; ) {
261+
int state = this.state;
262+
263+
if (state == STATE_TERMINATED) {
264+
if (!this.done) {
265+
final int streamId = this.streamId;
266+
as.remove(streamId, this);
267+
268+
final ByteBuf cancelFrame = CancelFrameFlyweight.encode(allocator, streamId);
269+
sender.onNext(cancelFrame);
270+
}
271+
272+
return;
273+
}
274+
275+
if (STATE.compareAndSet(this, state, state | FLAG_SENT)) {
276+
return;
277+
}
244278
}
245279
}
246280
}
247281

248282
@Override
249283
public final void cancel() {
250-
int state = this.state;
251-
if (state == STATE_TERMINATED) {
252-
return;
253-
}
284+
int state = STATE.getAndSet(this, STATE_TERMINATED);
254285

255-
if (STATE.getAndSet(this, STATE_TERMINATED) != STATE_TERMINATED) {
286+
if (state != STATE_TERMINATED) {
256287
if (state == STATE_SUBSCRIBED) {
257288
this.payload.release();
258-
} else {
289+
} else if ((state & FLAG_SENT) == FLAG_SENT) {
259290

260291
final CompositeByteBuf frames = this.frames;
261292
this.frames = null;
@@ -292,9 +323,10 @@ public void reassemble(ByteBuf followingFrame, boolean hasFollows, boolean termi
292323
if (state == STATE_TERMINATED) {
293324
this.frames = null;
294325
ReferenceCountUtil.safeRelease(frames);
326+
return;
295327
}
296328

297-
if (STATE.compareAndSet(this, state, STATE_REASSEMBLING)) {
329+
if (STATE.compareAndSet(this, state, state | FLAG_REASSEMBLING)) {
298330
return;
299331
}
300332
}

0 commit comments

Comments
 (0)