Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,9 @@ subprojects {
test {
useJUnitPlatform()
testLogging {
events "FAILED"
events "PASSED", "FAILED"
showExceptions true
showCauses true
exceptionFormat "FULL"
stackTraceFilters "ENTRY_POINT"
maxGranularity 3
Expand All @@ -169,6 +170,8 @@ subprojects {
}
}

forkEvery = 1

if (isCiServer) {
def stdout = new LinkedList<TestOutputEvent>()
beforeTest { TestDescriptor td ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -612,6 +612,7 @@ public Mono<RSocket> connect(Supplier<ClientTransport> transportSupplier) {
final ResumableDuplexConnection resumableDuplexConnection =
new ResumableDuplexConnection(
CLIENT_TAG,
resumeToken,
clientServerConnection,
resumableFramesStore);
final ResumableClientSetup resumableClientSetup =
Expand Down
2 changes: 1 addition & 1 deletion rsocket-core/src/main/java/io/rsocket/core/Resume.java
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ boolean isCleanupStoreOnKeepAlive() {
Function<? super ByteBuf, ? extends ResumableFramesStore> getStoreFactory(String tag) {
return storeFactory != null
? storeFactory
: token -> new InMemoryResumableFramesStore(tag, 100_000);
: token -> new InMemoryResumableFramesStore(tag, token, 100_000);
}

Duration getStreamTimeout() {
Expand Down
3 changes: 2 additions & 1 deletion rsocket-core/src/main/java/io/rsocket/core/ServerSetup.java
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,8 @@ public Mono<Void> acceptRSocketSetup(

final ResumableFramesStore resumableFramesStore = resumeStoreFactory.apply(resumeToken);
final ResumableDuplexConnection resumableDuplexConnection =
new ResumableDuplexConnection("server", duplexConnection, resumableFramesStore);
new ResumableDuplexConnection(
"server", resumeToken, duplexConnection, resumableFramesStore);
final ServerRSocketSession serverRSocketSession =
new ServerRSocketSession(
resumeToken,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public final class UnboundedProcessor extends FluxProcessor<ByteBuf, ByteBuf>

final Queue<ByteBuf> queue;
final Queue<ByteBuf> priorityQueue;
final Runnable onFinalizedHook;

boolean cancelled;
boolean done;
Expand Down Expand Up @@ -88,6 +89,11 @@ public final class UnboundedProcessor extends FluxProcessor<ByteBuf, ByteBuf>
boolean outputFused;

public UnboundedProcessor() {
this(() -> {});
}

public UnboundedProcessor(Runnable onFinalizedHook) {
this.onFinalizedHook = onFinalizedHook;
this.queue = new MpscUnboundedArrayQueue<>(Queues.SMALL_BUFFER_SIZE);
this.priorityQueue = new MpscUnboundedArrayQueue<>(Queues.SMALL_BUFFER_SIZE);
}
Expand Down Expand Up @@ -793,6 +799,9 @@ static long markTerminatedOrFinalized(UnboundedProcessor instance) {
}

if (STATE.compareAndSet(instance, state, nextState | FLAG_TERMINATED)) {
if (isFinalized(nextState)) {
instance.onFinalizedHook.run();
}
return state;
}
}
Expand Down Expand Up @@ -906,6 +915,7 @@ static void clearAndFinalize(UnboundedProcessor instance) {

if (STATE.compareAndSet(
instance, state, (state & ~MAX_WIP_VALUE & ~FLAG_HAS_VALUE) | FLAG_FINALIZED)) {
instance.onFinalizedHook.run();
break;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.util.CharsetUtil;
import io.rsocket.DuplexConnection;
import io.rsocket.exceptions.ConnectionErrorException;
import io.rsocket.exceptions.Exceptions;
Expand Down Expand Up @@ -54,6 +55,7 @@ public class ClientRSocketSession
final Retry retry;
final boolean cleanupStoreOnKeepAlive;
final ByteBuf resumeToken;
final String session;

volatile Subscription s;
static final AtomicReferenceFieldUpdater<ClientRSocketSession, Subscription> S =
Expand All @@ -71,20 +73,30 @@ public ClientRSocketSession(
Retry retry,
boolean cleanupStoreOnKeepAlive) {
this.resumeToken = resumeToken;
this.session = resumeToken.toString(CharsetUtil.UTF_8);
this.connectionFactory =
connectionFactory.flatMap(
dc -> {
final long impliedPosition = resumableFramesStore.frameImpliedPosition();
final long position = resumableFramesStore.framePosition();
dc.sendFrame(
0,
ResumeFrameCodec.encode(
dc.alloc(),
resumeToken.retain(),
// server uses this to release its cache
resumableFramesStore.frameImpliedPosition(), // observed on the client side
impliedPosition, // observed on the client side
// server uses this to check whether there is no mismatch
resumableFramesStore.framePosition() // sent from the client sent
position // sent from the client sent
));
logger.debug("Resume Frame has been sent");

if (logger.isDebugEnabled()) {
logger.debug(
"Side[client]|Session[{}]. ResumeFrame[impliedPosition[{}], position[{}]] has been sent.",
session,
impliedPosition,
position);
}

return connectionTransformer.apply(dc);
});
Expand All @@ -105,7 +117,12 @@ void reconnect(int index) {
if (this.s == Operators.cancelledSubscription()
&& S.compareAndSet(this, Operators.cancelledSubscription(), null)) {
keepAliveSupport.stop();
logger.debug("Connection[" + index + "] is lost. Reconnecting to resume...");
if (logger.isDebugEnabled()) {
logger.debug(
"Side[client]|Session[{}]. Connection[{}] is lost. Reconnecting to resume...",
session,
index);
}
connectionFactory.retryWhen(retry).timeout(resumeSessionDuration).subscribe(this);
}
}
Expand Down Expand Up @@ -155,21 +172,30 @@ public void onNext(Tuple2<ByteBuf, DuplexConnection> tuple2) {
DuplexConnection nextDuplexConnection = tuple2.getT2();

if (!Operators.terminate(S, this)) {
logger.debug("Session has already been expired. Terminating received connection");
if (logger.isDebugEnabled()) {
logger.debug(
"Side[client]|Session[{}]. Session has already been expired. Terminating received connection",
session);
}
final ConnectionErrorException connectionErrorException =
new ConnectionErrorException("resumption_server=[Session Expired]");
nextDuplexConnection.sendErrorAndClose(connectionErrorException);
nextDuplexConnection.receive().subscribe().dispose();
return;
}

final int streamId = FrameHeaderCodec.streamId(shouldBeResumeOKFrame);
if (streamId != 0) {
logger.debug(
"Illegal first frame received. RESUME_OK frame must be received before any others. Terminating received connection");
resumableConnection.dispose();
if (logger.isDebugEnabled()) {
logger.debug(
"Side[client]|Session[{}]. Illegal first frame received. RESUME_OK frame must be received before any others. Terminating received connection",
session);
}
final ConnectionErrorException connectionErrorException =
new ConnectionErrorException("RESUME_OK frame must be received before any others");
resumableConnection.dispose(connectionErrorException);
nextDuplexConnection.sendErrorAndClose(connectionErrorException);
nextDuplexConnection.receive().subscribe().dispose();
return;
}

Expand All @@ -183,7 +209,8 @@ public void onNext(Tuple2<ByteBuf, DuplexConnection> tuple2) {
final long position = resumableFramesStore.framePosition();
final long impliedPosition = resumableFramesStore.frameImpliedPosition();
logger.debug(
"ResumeOK FRAME received. ServerResumeState{observedFramesPosition[{}]}. ClientResumeState{observedFramesPosition[{}], sentFramesPosition[{}]}",
"Side[client]|Session[{}]. ResumeOK FRAME received. ServerResumeState[remoteImpliedPosition[{}]]. ClientResumeState[impliedPosition[{}], position[{}]]",
session,
remoteImpliedPos,
impliedPosition,
position);
Expand All @@ -194,42 +221,54 @@ public void onNext(Tuple2<ByteBuf, DuplexConnection> tuple2) {
}
} catch (IllegalStateException e) {
logger.debug("Exception occurred while releasing frames in the frameStore", e);
resumableConnection.dispose();
resumableConnection.dispose(e);
final ConnectionErrorException t = new ConnectionErrorException(e.getMessage(), e);
nextDuplexConnection.sendErrorAndClose(t);
nextDuplexConnection.receive().subscribe().dispose();
return;
}

if (resumableConnection.connect(nextDuplexConnection)) {
keepAliveSupport.start();
logger.debug("Session has been resumed successfully");
if (logger.isDebugEnabled()) {
logger.debug(
"Side[client]|Session[{}]. Session has been resumed successfully", session);
}
} else {
logger.debug("Session has already been expired. Terminating received connection");
if (logger.isDebugEnabled()) {
logger.debug(
"Side[client]|Session[{}]. Session has already been expired. Terminating received connection",
session);
}
final ConnectionErrorException connectionErrorException =
new ConnectionErrorException("resumption_server_pos=[Session Expired]");
nextDuplexConnection.sendErrorAndClose(connectionErrorException);
nextDuplexConnection.receive().subscribe().dispose();
}
} else {
logger.debug(
"Mismatching remote and local state. Expected RemoteImpliedPosition[{}] to be greater or equal to the LocalPosition[{}]. Terminating received connection",
"Side[client]|Session[{}]. Mismatching remote and local state. Expected RemoteImpliedPosition[{}] to be greater or equal to the LocalPosition[{}]. Terminating received connection",
session,
remoteImpliedPos,
position);
resumableConnection.dispose();
final ConnectionErrorException connectionErrorException =
new ConnectionErrorException("resumption_server_pos=[" + remoteImpliedPos + "]");
resumableConnection.dispose(connectionErrorException);
nextDuplexConnection.sendErrorAndClose(connectionErrorException);
nextDuplexConnection.receive().subscribe().dispose();
}
} else if (frameType == FrameType.ERROR) {
final RuntimeException exception = Exceptions.from(0, shouldBeResumeOKFrame);
logger.debug("Received error frame. Terminating received connection", exception);
resumableConnection.dispose();
resumableConnection.dispose(exception);
} else {
logger.debug(
"Illegal first frame received. RESUME_OK frame must be received before any others. Terminating received connection");
resumableConnection.dispose();
final ConnectionErrorException connectionErrorException =
new ConnectionErrorException("RESUME_OK frame must be received before any others");
resumableConnection.dispose(connectionErrorException);
nextDuplexConnection.sendErrorAndClose(connectionErrorException);
nextDuplexConnection.receive().subscribe().dispose();
}
}

Expand All @@ -239,7 +278,7 @@ public void onError(Throwable t) {
Operators.onErrorDropped(t, currentContext());
}

resumableConnection.dispose();
resumableConnection.dispose(t);
}

@Override
Expand Down
Loading