From dce10be6d552eef8902568d30230452a694fc0dc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Konrad=20Kami=C5=84ski?= Date: Mon, 28 Dec 2020 16:09:14 +0100 Subject: [PATCH 1/2] Fix: Improper read timeout handling. JAVA-3920 --- .../mongodb/connection/netty/NettyStream.java | 92 ++++++++++--------- .../connection/netty/ReadTimeoutHandler.java | 57 +++++++++--- .../connection/netty/TimeoutHandle.java | 39 ++++++++ 3 files changed, 129 insertions(+), 59 deletions(-) create mode 100644 driver-core/src/main/com/mongodb/connection/netty/TimeoutHandle.java diff --git a/driver-core/src/main/com/mongodb/connection/netty/NettyStream.java b/driver-core/src/main/com/mongodb/connection/netty/NettyStream.java index 948da9162ec..4b1d1b12d2a 100644 --- a/driver-core/src/main/com/mongodb/connection/netty/NettyStream.java +++ b/driver-core/src/main/com/mongodb/connection/netty/NettyStream.java @@ -215,14 +215,14 @@ public void readAsync(final int numBytes, final AsyncCompletionHandler } private void readAsync(final int numBytes, final AsyncCompletionHandler handler, final int additionalTimeout) { - scheduleReadTimeout(additionalTimeout); + final TimeoutHandle timeoutHandle = scheduleReadTimeout(additionalTimeout); ByteBuf buffer = null; Throwable exceptionResult = null; synchronized (this) { exceptionResult = pendingException; if (exceptionResult == null) { if (!hasBytesAvailable(numBytes)) { - pendingReader = new PendingReader(numBytes, handler); + pendingReader = new PendingReader(numBytes, handler, timeoutHandle); } else { CompositeByteBuf composite = allocator.compositeBuffer(pendingInboundBuffers.size()); int bytesNeeded = numBytes; @@ -247,11 +247,11 @@ private void readAsync(final int numBytes, final AsyncCompletionHandler } } if (exceptionResult != null) { - disableReadTimeout(); + timeoutHandle.cancel(); handler.failed(exceptionResult); } if (buffer != null) { - disableReadTimeout(); + timeoutHandle.cancel(); handler.completed(buffer); } } @@ -282,7 +282,9 @@ private void handleReadResponse(final io.netty.buffer.ByteBuf buffer, final Thro } if (localPendingReader != null) { - readAsync(localPendingReader.numBytes, localPendingReader.handler); + final TimeoutCancellingHandler timeoutCancellingHandler = + new TimeoutCancellingHandler(localPendingReader.timeoutHandle, localPendingReader.handler); + readAsync(localPendingReader.numBytes, timeoutCancellingHandler); } } @@ -358,10 +360,12 @@ public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable t) private static final class PendingReader { private final int numBytes; private final AsyncCompletionHandler handler; + private final TimeoutHandle timeoutHandle; - private PendingReader(final int numBytes, final AsyncCompletionHandler handler) { + private PendingReader(final int numBytes, final AsyncCompletionHandler handler, final TimeoutHandle timeoutHandle) { this.numBytes = numBytes; this.handler = handler; + this.timeoutHandle = timeoutHandle; } } @@ -445,47 +449,45 @@ public void operationComplete(final ChannelFuture future) { } } - private void scheduleReadTimeout(final int additionalTimeout) { - adjustTimeout(false, additionalTimeout); + private TimeoutHandle scheduleReadTimeout(final int additionalTimeout) { + if (isClosed) { + return TimeoutHandle.NOOP; + } + ChannelHandler timeoutHandler = channel.pipeline().get(READ_HANDLER_NAME); + if (timeoutHandler != null) { + final ReadTimeoutHandler readTimeoutHandler = (ReadTimeoutHandler) timeoutHandler; + final ChannelHandlerContext handlerContext = channel.pipeline().context(timeoutHandler); + EventExecutor executor = handlerContext.executor(); + + if (executor.inEventLoop()) { + return readTimeoutHandler.scheduleTimeout(handlerContext, additionalTimeout); + } else { + return readTimeoutHandler.scheduleTimeout(executor, handlerContext, additionalTimeout); + } + } else { + return TimeoutHandle.NOOP; + } } - private void disableReadTimeout() { - adjustTimeout(true, 0); - } + private static final class TimeoutCancellingHandler implements AsyncCompletionHandler { + private final TimeoutHandle timeoutHandle; + private final AsyncCompletionHandler delegate; - private void adjustTimeout(final boolean disable, final int additionalTimeout) { - if (isClosed) { - return; - } - ChannelHandler timeoutHandler = channel.pipeline().get(READ_HANDLER_NAME); - if (timeoutHandler != null) { - final ReadTimeoutHandler readTimeoutHandler = (ReadTimeoutHandler) timeoutHandler; - final ChannelHandlerContext handlerContext = channel.pipeline().context(timeoutHandler); - EventExecutor executor = handlerContext.executor(); - - if (disable) { - if (executor.inEventLoop()) { - readTimeoutHandler.removeTimeout(handlerContext); - } else { - executor.submit(new Runnable() { - @Override - public void run() { - readTimeoutHandler.removeTimeout(handlerContext); - } - }); - } - } else { - if (executor.inEventLoop()) { - readTimeoutHandler.scheduleTimeout(handlerContext, additionalTimeout); - } else { - executor.submit(new Runnable() { - @Override - public void run() { - readTimeoutHandler.scheduleTimeout(handlerContext, additionalTimeout); - } - }); - } - } - } + private TimeoutCancellingHandler(final TimeoutHandle timeoutHandle, final AsyncCompletionHandler delegate) { + this.timeoutHandle = timeoutHandle; + this.delegate = delegate; + } + + @Override + public void completed(final ByteBuf byteBuf) { + timeoutHandle.cancel(); + delegate.completed(byteBuf); + } + + @Override + public void failed(final Throwable t) { + timeoutHandle.cancel(); + delegate.failed(t); + } } } diff --git a/driver-core/src/main/com/mongodb/connection/netty/ReadTimeoutHandler.java b/driver-core/src/main/com/mongodb/connection/netty/ReadTimeoutHandler.java index 824c8f7d6a3..f5312cfe1ac 100644 --- a/driver-core/src/main/com/mongodb/connection/netty/ReadTimeoutHandler.java +++ b/driver-core/src/main/com/mongodb/connection/netty/ReadTimeoutHandler.java @@ -21,51 +21,80 @@ import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.timeout.ReadTimeoutException; +import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import static com.mongodb.assertions.Assertions.isTrue; import static com.mongodb.assertions.Assertions.isTrueArgument; /** - * Passes a {@link ReadTimeoutException} if the time between a {@link #scheduleTimeout} and {@link #removeTimeout} is longer than the set - * timeout. + * Passes a {@link ReadTimeoutException} if the time between a {@link #scheduleTimeout} and returned {@link TimeoutHandle#cancel()} + * is longer than the set timeout. */ final class ReadTimeoutHandler extends ChannelInboundHandlerAdapter { private final long readTimeout; - private volatile ScheduledFuture timeout; ReadTimeoutHandler(final long readTimeout) { isTrueArgument("readTimeout must be greater than zero.", readTimeout > 0); this.readTimeout = readTimeout; } - void scheduleTimeout(final ChannelHandlerContext ctx, final int additionalTimeout) { - isTrue("Handler called from the eventLoop", ctx.channel().eventLoop().inEventLoop()); - if (timeout == null) { - timeout = ctx.executor().schedule(new ReadTimeoutTask(ctx), readTimeout + additionalTimeout, TimeUnit.MILLISECONDS); - } + TimeoutHandle scheduleTimeout(final ChannelHandlerContext ctx, final int additionalTimeout) { + final SimpleTimeoutHandle timeoutHandle = new SimpleTimeoutHandle(); + scheduleTimeout(timeoutHandle, ctx, additionalTimeout); + return timeoutHandle; + } + + TimeoutHandle scheduleTimeout(final ExecutorService executor, final ChannelHandlerContext ctx, final int additionalTimeout) { + final SimpleTimeoutHandle timeoutHandle = new SimpleTimeoutHandle(); + executor.submit(() -> scheduleTimeout(timeoutHandle, ctx, additionalTimeout)); + return timeoutHandle; } - void removeTimeout(final ChannelHandlerContext ctx) { + private void scheduleTimeout(final SimpleTimeoutHandle timeoutHandle, final ChannelHandlerContext ctx, final int additionalTimeout) { isTrue("Handler called from the eventLoop", ctx.channel().eventLoop().inEventLoop()); - if (timeout != null) { - timeout.cancel(false); - timeout = null; + + final ReadTimeoutTask task = new ReadTimeoutTask(timeoutHandle, ctx); + final ScheduledFuture timeout = ctx.executor().schedule(task, readTimeout + additionalTimeout, TimeUnit.MILLISECONDS); + timeoutHandle.assignTimeout(timeout); + } + + private static final class SimpleTimeoutHandle implements TimeoutHandle { + private AtomicBoolean cancelled = new AtomicBoolean(false); + private ScheduledFuture timeout = null; + + @Override + public void cancel() { + cancelled.set(true); + if (timeout != null) { + timeout.cancel(false); + } + } + + private boolean isCancelled() { + return cancelled.get(); + } + + private void assignTimeout(final ScheduledFuture timeout) { + this.timeout = timeout; } } private static final class ReadTimeoutTask implements Runnable { + private final SimpleTimeoutHandle timeoutHandle; private final ChannelHandlerContext ctx; - ReadTimeoutTask(final ChannelHandlerContext ctx) { + ReadTimeoutTask(final SimpleTimeoutHandle timeoutHandle, final ChannelHandlerContext ctx) { + this.timeoutHandle = timeoutHandle; this.ctx = ctx; } @Override public void run() { - if (ctx.channel().isOpen()) { + if (!timeoutHandle.isCancelled() && ctx.channel().isOpen()) { try { ctx.fireExceptionCaught(ReadTimeoutException.INSTANCE); ctx.close(); diff --git a/driver-core/src/main/com/mongodb/connection/netty/TimeoutHandle.java b/driver-core/src/main/com/mongodb/connection/netty/TimeoutHandle.java new file mode 100644 index 00000000000..18bc4c33a2f --- /dev/null +++ b/driver-core/src/main/com/mongodb/connection/netty/TimeoutHandle.java @@ -0,0 +1,39 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * Copyright 2012 The Netty Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.mongodb.connection.netty; + +/** + * Handle to a timeout that allows a cancellation using {@link #cancel()}. + */ +public interface TimeoutHandle { + TimeoutHandle NOOP = new NoopTimeoutHandle(); + + /** + * Cancels the timeout. + */ + void cancel(); + + /** + * Handle to a non-existing timeout. + */ + final class NoopTimeoutHandle implements TimeoutHandle { + @Override + public void cancel() { + } + } +} From 708bb5941bfaaa07a6fa06653f386065e403ad51 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Konrad=20Kami=C5=84ski?= Date: Fri, 15 Jan 2021 00:52:07 +0100 Subject: [PATCH 2/2] Fix: Improper read timeout handling. https://jira.mongodb.org/browse/JAVA-3920 --- .../mongodb/connection/netty/NettyStream.java | 97 ++++++++-------- .../connection/netty/ReadTimeoutHandler.java | 107 ------------------ .../connection/netty/TimeoutHandle.java | 39 ------- 3 files changed, 46 insertions(+), 197 deletions(-) delete mode 100644 driver-core/src/main/com/mongodb/connection/netty/ReadTimeoutHandler.java delete mode 100644 driver-core/src/main/com/mongodb/connection/netty/TimeoutHandle.java diff --git a/driver-core/src/main/com/mongodb/connection/netty/NettyStream.java b/driver-core/src/main/com/mongodb/connection/netty/NettyStream.java index 4b1d1b12d2a..a141cd7860b 100644 --- a/driver-core/src/main/com/mongodb/connection/netty/NettyStream.java +++ b/driver-core/src/main/com/mongodb/connection/netty/NettyStream.java @@ -35,7 +35,6 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; -import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; @@ -44,7 +43,6 @@ import io.netty.channel.socket.SocketChannel; import io.netty.handler.ssl.SslHandler; import io.netty.handler.timeout.ReadTimeoutException; -import io.netty.util.concurrent.EventExecutor; import org.bson.ByteBuf; import javax.net.ssl.SSLContext; @@ -58,6 +56,8 @@ import java.util.List; import java.util.Queue; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import static com.mongodb.internal.connection.SslHelper.enableHostNameVerification; import static com.mongodb.internal.connection.SslHelper.enableSni; @@ -67,7 +67,7 @@ * A Stream implementation based on Netty 4.0. */ final class NettyStream implements Stream { - private static final String READ_HANDLER_NAME = "ReadTimeoutHandler"; + private static final String INBOUND_BUFFER_HANDLER_NAME = "InboundBufferHandler"; private final ServerAddress address; private final SocketSettings settings; private final SslSettings sslSettings; @@ -81,6 +81,7 @@ final class NettyStream implements Stream { private final LinkedList pendingInboundBuffers = new LinkedList(); private volatile PendingReader pendingReader; private volatile Throwable pendingException; + private final int readTimeoutMs; NettyStream(final ServerAddress address, final SocketSettings settings, final SslSettings sslSettings, final EventLoopGroup workerGroup, final Class socketChannelClass, final ByteBufAllocator allocator) { @@ -90,6 +91,7 @@ final class NettyStream implements Stream { this.workerGroup = workerGroup; this.socketChannelClass = socketChannelClass; this.allocator = allocator; + this.readTimeoutMs = settings.getReadTimeout(MILLISECONDS); } @Override @@ -154,11 +156,7 @@ public void initChannel(final SocketChannel ch) { engine.setSSLParameters(sslParameters); ch.pipeline().addFirst("ssl", new SslHandler(engine, false)); } - int readTimeout = settings.getReadTimeout(MILLISECONDS); - if (readTimeout > 0) { - ch.pipeline().addLast(READ_HANDLER_NAME, new ReadTimeoutHandler(readTimeout)); - } - ch.pipeline().addLast(new InboundBufferHandler()); + ch.pipeline().addLast(INBOUND_BUFFER_HANDLER_NAME, new InboundBufferHandler()); } }); final ChannelFuture channelFuture = bootstrap.connect(nextAddress); @@ -186,7 +184,7 @@ public boolean supportsAdditionalTimeout() { @Override public ByteBuf read(final int numBytes, final int additionalTimeout) throws IOException { FutureAsyncCompletionHandler future = new FutureAsyncCompletionHandler(); - readAsync(numBytes, future, additionalTimeout); + readAsync(numBytes, future, additionalTimeout, null); return future.get(); } @@ -211,18 +209,19 @@ public void operationComplete(final ChannelFuture future) throws Exception { @Override public void readAsync(final int numBytes, final AsyncCompletionHandler handler) { - readAsync(numBytes, handler, 0); + readAsync(numBytes, handler, 0, null); } - private void readAsync(final int numBytes, final AsyncCompletionHandler handler, final int additionalTimeout) { - final TimeoutHandle timeoutHandle = scheduleReadTimeout(additionalTimeout); + private void readAsync(final int numBytes, final AsyncCompletionHandler handler, final int additionalTimeout, + final PendingReader pendingReaderToReuse) { ByteBuf buffer = null; - Throwable exceptionResult = null; + Throwable exceptionResult; synchronized (this) { exceptionResult = pendingException; if (exceptionResult == null) { if (!hasBytesAvailable(numBytes)) { - pendingReader = new PendingReader(numBytes, handler, timeoutHandle); + pendingReader = (pendingReaderToReuse != null) ? pendingReaderToReuse + : new PendingReader(numBytes, handler, scheduleReadTimeout(additionalTimeout)); } else { CompositeByteBuf composite = allocator.compositeBuffer(pendingInboundBuffers.size()); int bytesNeeded = numBytes; @@ -246,12 +245,17 @@ private void readAsync(final int numBytes, final AsyncCompletionHandler } } } + + if ((exceptionResult != null || buffer != null) + && (pendingReaderToReuse != null) + && (pendingReaderToReuse.timeoutFuture != null)) { + pendingReaderToReuse.timeoutFuture.cancel(false); + } + if (exceptionResult != null) { - timeoutHandle.cancel(); handler.failed(exceptionResult); } if (buffer != null) { - timeoutHandle.cancel(); handler.completed(buffer); } } @@ -282,9 +286,7 @@ private void handleReadResponse(final io.netty.buffer.ByteBuf buffer, final Thro } if (localPendingReader != null) { - final TimeoutCancellingHandler timeoutCancellingHandler = - new TimeoutCancellingHandler(localPendingReader.timeoutHandle, localPendingReader.handler); - readAsync(localPendingReader.numBytes, timeoutCancellingHandler); + readAsync(localPendingReader.numBytes, localPendingReader.handler, 0, localPendingReader); } } @@ -360,12 +362,12 @@ public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable t) private static final class PendingReader { private final int numBytes; private final AsyncCompletionHandler handler; - private final TimeoutHandle timeoutHandle; + private final Future timeoutFuture; - private PendingReader(final int numBytes, final AsyncCompletionHandler handler, final TimeoutHandle timeoutHandle) { + private PendingReader(final int numBytes, final AsyncCompletionHandler handler, final Future timeoutFuture) { this.numBytes = numBytes; this.handler = handler; - this.timeoutHandle = timeoutHandle; + this.timeoutFuture = timeoutFuture; } } @@ -449,45 +451,38 @@ public void operationComplete(final ChannelFuture future) { } } - private TimeoutHandle scheduleReadTimeout(final int additionalTimeout) { - if (isClosed) { - return TimeoutHandle.NOOP; + private Future scheduleReadTimeout(final int additionalTimeout) { + if (isClosed || readTimeoutMs <= 0) { + return null; } - ChannelHandler timeoutHandler = channel.pipeline().get(READ_HANDLER_NAME); - if (timeoutHandler != null) { - final ReadTimeoutHandler readTimeoutHandler = (ReadTimeoutHandler) timeoutHandler; - final ChannelHandlerContext handlerContext = channel.pipeline().context(timeoutHandler); - EventExecutor executor = handlerContext.executor(); - - if (executor.inEventLoop()) { - return readTimeoutHandler.scheduleTimeout(handlerContext, additionalTimeout); - } else { - return readTimeoutHandler.scheduleTimeout(executor, handlerContext, additionalTimeout); - } + + final ChannelHandlerContext ctx = channel.pipeline().context(INBOUND_BUFFER_HANDLER_NAME); + if (ctx != null) { + final ReadTimeoutTask task = new ReadTimeoutTask(ctx); + return ctx.executor().schedule(task, readTimeoutMs + additionalTimeout, TimeUnit.MILLISECONDS); } else { - return TimeoutHandle.NOOP; + return null; } } - private static final class TimeoutCancellingHandler implements AsyncCompletionHandler { - private final TimeoutHandle timeoutHandle; - private final AsyncCompletionHandler delegate; + private static final class ReadTimeoutTask implements Runnable { - private TimeoutCancellingHandler(final TimeoutHandle timeoutHandle, final AsyncCompletionHandler delegate) { - this.timeoutHandle = timeoutHandle; - this.delegate = delegate; - } + private final ChannelHandlerContext ctx; - @Override - public void completed(final ByteBuf byteBuf) { - timeoutHandle.cancel(); - delegate.completed(byteBuf); + ReadTimeoutTask(final ChannelHandlerContext ctx) { + this.ctx = ctx; } @Override - public void failed(final Throwable t) { - timeoutHandle.cancel(); - delegate.failed(t); + public void run() { + if (ctx.channel().isOpen()) { + try { + ctx.fireExceptionCaught(ReadTimeoutException.INSTANCE); + ctx.close(); + } catch (Throwable t) { + ctx.fireExceptionCaught(t); + } + } } } } diff --git a/driver-core/src/main/com/mongodb/connection/netty/ReadTimeoutHandler.java b/driver-core/src/main/com/mongodb/connection/netty/ReadTimeoutHandler.java deleted file mode 100644 index f5312cfe1ac..00000000000 --- a/driver-core/src/main/com/mongodb/connection/netty/ReadTimeoutHandler.java +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Copyright 2008-present MongoDB, Inc. - * Copyright 2012 The Netty Project - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.mongodb.connection.netty; - -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundHandlerAdapter; -import io.netty.handler.timeout.ReadTimeoutException; - -import java.util.concurrent.ExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - -import static com.mongodb.assertions.Assertions.isTrue; -import static com.mongodb.assertions.Assertions.isTrueArgument; - -/** - * Passes a {@link ReadTimeoutException} if the time between a {@link #scheduleTimeout} and returned {@link TimeoutHandle#cancel()} - * is longer than the set timeout. - */ -final class ReadTimeoutHandler extends ChannelInboundHandlerAdapter { - private final long readTimeout; - - ReadTimeoutHandler(final long readTimeout) { - isTrueArgument("readTimeout must be greater than zero.", readTimeout > 0); - this.readTimeout = readTimeout; - } - - TimeoutHandle scheduleTimeout(final ChannelHandlerContext ctx, final int additionalTimeout) { - final SimpleTimeoutHandle timeoutHandle = new SimpleTimeoutHandle(); - scheduleTimeout(timeoutHandle, ctx, additionalTimeout); - return timeoutHandle; - } - - TimeoutHandle scheduleTimeout(final ExecutorService executor, final ChannelHandlerContext ctx, final int additionalTimeout) { - final SimpleTimeoutHandle timeoutHandle = new SimpleTimeoutHandle(); - executor.submit(() -> scheduleTimeout(timeoutHandle, ctx, additionalTimeout)); - return timeoutHandle; - } - - private void scheduleTimeout(final SimpleTimeoutHandle timeoutHandle, final ChannelHandlerContext ctx, final int additionalTimeout) { - isTrue("Handler called from the eventLoop", ctx.channel().eventLoop().inEventLoop()); - - final ReadTimeoutTask task = new ReadTimeoutTask(timeoutHandle, ctx); - final ScheduledFuture timeout = ctx.executor().schedule(task, readTimeout + additionalTimeout, TimeUnit.MILLISECONDS); - timeoutHandle.assignTimeout(timeout); - } - - private static final class SimpleTimeoutHandle implements TimeoutHandle { - private AtomicBoolean cancelled = new AtomicBoolean(false); - private ScheduledFuture timeout = null; - - @Override - public void cancel() { - cancelled.set(true); - if (timeout != null) { - timeout.cancel(false); - } - } - - private boolean isCancelled() { - return cancelled.get(); - } - - private void assignTimeout(final ScheduledFuture timeout) { - this.timeout = timeout; - } - } - - private static final class ReadTimeoutTask implements Runnable { - - private final SimpleTimeoutHandle timeoutHandle; - private final ChannelHandlerContext ctx; - - ReadTimeoutTask(final SimpleTimeoutHandle timeoutHandle, final ChannelHandlerContext ctx) { - this.timeoutHandle = timeoutHandle; - this.ctx = ctx; - } - - @Override - public void run() { - if (!timeoutHandle.isCancelled() && ctx.channel().isOpen()) { - try { - ctx.fireExceptionCaught(ReadTimeoutException.INSTANCE); - ctx.close(); - } catch (Throwable t) { - ctx.fireExceptionCaught(t); - } - } - } - } -} diff --git a/driver-core/src/main/com/mongodb/connection/netty/TimeoutHandle.java b/driver-core/src/main/com/mongodb/connection/netty/TimeoutHandle.java deleted file mode 100644 index 18bc4c33a2f..00000000000 --- a/driver-core/src/main/com/mongodb/connection/netty/TimeoutHandle.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Copyright 2008-present MongoDB, Inc. - * Copyright 2012 The Netty Project - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.mongodb.connection.netty; - -/** - * Handle to a timeout that allows a cancellation using {@link #cancel()}. - */ -public interface TimeoutHandle { - TimeoutHandle NOOP = new NoopTimeoutHandle(); - - /** - * Cancels the timeout. - */ - void cancel(); - - /** - * Handle to a non-existing timeout. - */ - final class NoopTimeoutHandle implements TimeoutHandle { - @Override - public void cancel() { - } - } -}