Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -199,14 +199,14 @@ private void startGracefulShutdown(ChannelHandlerContext ctx) {
LOG.debug("[{}] No pending queries, completing graceful shutdown now", logPrefix);
ctx.channel().close();
} else {
// remove heartbeat handler from pipeline if present.
// Remove heartbeat handler from pipeline if present.
ChannelHandler heartbeatHandler = ctx.pipeline().get(ChannelFactory.HEARTBEAT_HANDLER_NAME);
if (heartbeatHandler != null) {
ctx.pipeline().remove(heartbeatHandler);
}
LOG.debug("[{}] There are pending queries, delaying graceful shutdown", logPrefix);
closingGracefully = true;
closeStartedFuture.setSuccess();
closeStartedFuture.trySuccess();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelPromise;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -256,7 +258,7 @@ public void should_refuse_new_writes_during_graceful_close() {
}

@Test
public void should_close_gracefully_if_orphan_ids_above_max_and_pending_requests() {
public void should_close_gracefully_if_orphan_ids_above_max_and_pending_request() {
// Given
addToPipeline();
// Generate n orphan ids by writing and cancelling the requests:
Expand Down Expand Up @@ -311,6 +313,65 @@ public void should_close_gracefully_if_orphan_ids_above_max_and_pending_requests
assertThat(channel.closeFuture()).isSuccess();
}

@Test
public void should_close_gracefully_if_orphan_ids_above_max_and_multiple_pending_requests() {
// Given
addToPipeline();
// Generate n orphan ids by writing and cancelling the requests.
for (int i = 0; i < MAX_ORPHAN_IDS; i++) {
when(streamIds.acquire()).thenReturn(i);
MockResponseCallback responseCallback = new MockResponseCallback();
channel
.writeAndFlush(
new DriverChannel.RequestMessage(QUERY, false, Frame.NO_PAYLOAD, responseCallback))
.awaitUninterruptibly();
channel.writeAndFlush(responseCallback).awaitUninterruptibly();
}
// Generate 3 additional requests that are pending and not cancelled.
List<MockResponseCallback> pendingResponseCallbacks = new ArrayList<>();
for (int i = 0; i < 3; i++) {
when(streamIds.acquire()).thenReturn(MAX_ORPHAN_IDS + i);
MockResponseCallback responseCallback = new MockResponseCallback();
channel
.writeAndFlush(
new DriverChannel.RequestMessage(QUERY, false, Frame.NO_PAYLOAD, responseCallback))
.awaitUninterruptibly();
pendingResponseCallbacks.add(responseCallback);
}

// When
// Generate the n+1th orphan id that makes us go above the threshold by canceling one if the
// pending requests.
channel.writeAndFlush(pendingResponseCallbacks.remove(0)).awaitUninterruptibly();

// Then
// Channel should be closing gracefully but there's no way to observe that from the outside
// besides writing another request and check that it's rejected.
assertThat(channel.closeFuture()).isNotDone();
ChannelFuture otherWriteFuture =
channel.writeAndFlush(
new DriverChannel.RequestMessage(
QUERY, false, Frame.NO_PAYLOAD, new MockResponseCallback()));
assertThat(otherWriteFuture).isFailed();
assertThat(otherWriteFuture.cause())
.isInstanceOf(IllegalStateException.class)
.hasMessage("Channel is closing");

// When
// Cancel the remaining pending requests causing the n+ith orphan ids above the threshold.
for (MockResponseCallback pendingResponseCallback : pendingResponseCallbacks) {
ChannelFuture future = channel.writeAndFlush(pendingResponseCallback).awaitUninterruptibly();

// Then
// The future should succeed even though the channel has started closing gracefully.
assertThat(future).isSuccess();
}

// Then
// The graceful shutdown completes.
assertThat(channel.closeFuture()).isSuccess();
}

@Test
public void should_close_immediately_if_orphan_ids_above_max_and_no_pending_requests() {
// Given
Expand Down