From 24d4840bdbc4dc69218a41e45014bb0d5ddeb2be Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Mon, 31 Dec 2018 18:45:59 -0600 Subject: [PATCH 1/4] WIP --- .../java/org/elasticsearch/nio/NioTimer.java | 61 +++++++++++++++++++ 1 file changed, 61 insertions(+) create mode 100644 libs/nio/src/main/java/org/elasticsearch/nio/NioTimer.java diff --git a/libs/nio/src/main/java/org/elasticsearch/nio/NioTimer.java b/libs/nio/src/main/java/org/elasticsearch/nio/NioTimer.java new file mode 100644 index 0000000000000..5b2df7d698575 --- /dev/null +++ b/libs/nio/src/main/java/org/elasticsearch/nio/NioTimer.java @@ -0,0 +1,61 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.nio; + +import org.elasticsearch.common.unit.TimeValue; + +import java.util.Comparator; +import java.util.PriorityQueue; + +public class NioTimer { + + private final PriorityQueue tasks; + + public NioTimer() { + tasks = new PriorityQueue<>(Comparator.comparingLong(value -> value.deadline)); + } + + public void schedule(Runnable task, TimeValue timeValue) { + long nanos = timeValue.getNanos(); + long currentTime = System.nanoTime(); + + + } + + public void scheduleAtRelativeTime(Runnable task, long relativeTime) { + tasks.offer(new DelayedTask(relativeTime, task)); + } + + public void pollTasks() { + long currentNanos = System.nanoTime(); + + } + + private static class DelayedTask { + + private final long deadline; + private final Runnable runnable; + + private DelayedTask(long deadline, Runnable runnable) { + this.deadline = deadline; + this.runnable = runnable; + } + } +} From cffe6f44612159e261b6f0fb4fe47400c85c66df Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Tue, 8 Jan 2019 15:08:29 -0700 Subject: [PATCH 2/4] Implement basic timer --- .../java/org/elasticsearch/nio/NioTimer.java | 50 +++++++--- .../org/elasticsearch/nio/NioTimerTests.java | 96 +++++++++++++++++++ 2 files changed, 133 insertions(+), 13 deletions(-) create mode 100644 libs/nio/src/test/java/org/elasticsearch/nio/NioTimerTests.java diff --git a/libs/nio/src/main/java/org/elasticsearch/nio/NioTimer.java b/libs/nio/src/main/java/org/elasticsearch/nio/NioTimer.java index 5b2df7d698575..23f6de6ad7597 100644 --- a/libs/nio/src/main/java/org/elasticsearch/nio/NioTimer.java +++ b/libs/nio/src/main/java/org/elasticsearch/nio/NioTimer.java @@ -21,41 +21,65 @@ import org.elasticsearch.common.unit.TimeValue; +import java.util.ArrayList; import java.util.Comparator; import java.util.PriorityQueue; -public class NioTimer { +class NioTimer { private final PriorityQueue tasks; - public NioTimer() { - tasks = new PriorityQueue<>(Comparator.comparingLong(value -> value.deadline)); + NioTimer() { + tasks = new PriorityQueue<>(Comparator.comparingLong(DelayedTask::getDeadline)); } - public void schedule(Runnable task, TimeValue timeValue) { - long nanos = timeValue.getNanos(); - long currentTime = System.nanoTime(); - - + Runnable schedule(Runnable task, TimeValue timeValue) { + return scheduleAtRelativeTime(task, System.nanoTime() + timeValue.nanos()); } - public void scheduleAtRelativeTime(Runnable task, long relativeTime) { - tasks.offer(new DelayedTask(relativeTime, task)); + Runnable scheduleAtRelativeTime(Runnable task, long relativeNanos) { + DelayedTask delayedTask = new DelayedTask(relativeNanos, task); + tasks.offer(delayedTask); + return delayedTask; } - public void pollTasks() { - long currentNanos = System.nanoTime(); + Runnable pollTask() { + return pollTask(System.nanoTime()); + } + Runnable pollTask(long relativeNanos) { + DelayedTask task; + while ((task = tasks.peek()) != null) { + if (relativeNanos - task.deadline >= 0) { + tasks.remove(); + if (task.cancelled == false) { + return task.runnable; + } + } else { + return null; + } + } + return null; } - private static class DelayedTask { + private static class DelayedTask implements Runnable { private final long deadline; private final Runnable runnable; + private boolean cancelled = false; private DelayedTask(long deadline, Runnable runnable) { this.deadline = deadline; this.runnable = runnable; } + + private long getDeadline() { + return deadline; + } + + @Override + public void run() { + cancelled = true; + } } } diff --git a/libs/nio/src/test/java/org/elasticsearch/nio/NioTimerTests.java b/libs/nio/src/test/java/org/elasticsearch/nio/NioTimerTests.java new file mode 100644 index 0000000000000..9effcd0bc6876 --- /dev/null +++ b/libs/nio/src/test/java/org/elasticsearch/nio/NioTimerTests.java @@ -0,0 +1,96 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.nio; + +import org.elasticsearch.test.ESTestCase; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.LockSupport; + +public class NioTimerTests extends ESTestCase { + + private NioTimer timer = new NioTimer(); + + public void testScheduleTask() { + AtomicBoolean complete = new AtomicBoolean(false); + + long executeTime = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(10); + timer.scheduleAtRelativeTime(() -> complete.set(true), executeTime); + + while (true) { + long nanoTime = System.nanoTime(); + Runnable runnable = timer.pollTask(nanoTime); + if (nanoTime - executeTime >= 0) { + runnable.run(); + assertTrue(complete.get()); + break; + } else { + assertNull(runnable); + LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(1)); + } + } + } + + public void testPollScheduleTaskAtExactTime() { + long executeTime = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(10); + timer.scheduleAtRelativeTime(() -> {}, executeTime); + + assertNull(timer.pollTask(executeTime - 1)); + assertNotNull(timer.pollTask(executeTime)); + } + + public void testTaskOrdering() { + AtomicBoolean first = new AtomicBoolean(false); + AtomicBoolean second = new AtomicBoolean(false); + AtomicBoolean third = new AtomicBoolean(false); + long executeTime = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(10); + timer.scheduleAtRelativeTime(() -> third.set(true), executeTime + 2); + timer.scheduleAtRelativeTime(() -> first.set(true), executeTime); + timer.scheduleAtRelativeTime(() -> second.set(true), executeTime + 1); + + timer.pollTask(executeTime + 10).run(); + assertTrue(first.get()); + assertFalse(second.get()); + assertFalse(third.get()); + timer.pollTask(executeTime + 10).run(); + assertTrue(first.get()); + assertTrue(second.get()); + assertFalse(third.get()); + timer.pollTask(executeTime + 10).run(); + assertTrue(first.get()); + assertTrue(second.get()); + assertTrue(third.get()); + } + + public void testTaskCancel() { + AtomicBoolean first = new AtomicBoolean(false); + AtomicBoolean second = new AtomicBoolean(false); + long executeTime = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(10); + Runnable cancellable = timer.scheduleAtRelativeTime(() -> first.set(true), executeTime); + timer.scheduleAtRelativeTime(() -> second.set(true), executeTime + 1); + + cancellable.run(); + timer.pollTask(executeTime + 10).run(); + assertFalse(first.get()); + assertTrue(second.get()); + assertNull(timer.pollTask(executeTime + 10)); + } +} From 0adec5cb81e4aad2ccb84334d2e94fda1561b6ea Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Tue, 8 Jan 2019 17:47:14 -0700 Subject: [PATCH 3/4] Wip --- .../org/elasticsearch/nio/EventHandler.java | 4 +- .../org/elasticsearch/nio/NioSelector.java | 41 ++++++++++++++--- .../java/org/elasticsearch/nio/NioTimer.java | 39 ++++++++++------ .../nio/SocketChannelContext.java | 3 ++ .../elasticsearch/nio/EventHandlerTests.java | 2 +- .../elasticsearch/nio/NioSelectorTests.java | 39 +++++++++++++++- .../org/elasticsearch/nio/NioTimerTests.java | 12 ++++- .../transport/nio/SSLChannelContext.java | 15 ++++++ .../transport/nio/SSLChannelContextTests.java | 46 +++++++++++++++++++ 9 files changed, 173 insertions(+), 28 deletions(-) diff --git a/libs/nio/src/main/java/org/elasticsearch/nio/EventHandler.java b/libs/nio/src/main/java/org/elasticsearch/nio/EventHandler.java index 87a2489fdbc27..7eebfe24665a2 100644 --- a/libs/nio/src/main/java/org/elasticsearch/nio/EventHandler.java +++ b/libs/nio/src/main/java/org/elasticsearch/nio/EventHandler.java @@ -150,11 +150,11 @@ protected void writeException(SocketChannelContext context, Exception exception) } /** - * This method is called when a listener attached to a channel operation throws an exception. + * This method is called when a task or listener attached to a channel operation throws an exception. * * @param exception that occurred */ - protected void listenerException(Exception exception) { + protected void taskException(Exception exception) { exceptionHandler.accept(exception); } diff --git a/libs/nio/src/main/java/org/elasticsearch/nio/NioSelector.java b/libs/nio/src/main/java/org/elasticsearch/nio/NioSelector.java index 6820b6a07188f..3d7395eae920f 100644 --- a/libs/nio/src/main/java/org/elasticsearch/nio/NioSelector.java +++ b/libs/nio/src/main/java/org/elasticsearch/nio/NioSelector.java @@ -33,6 +33,7 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantLock; @@ -54,6 +55,7 @@ public class NioSelector implements Closeable { private final Selector selector; private final ByteBuffer ioBuffer; + private final NioTimer nioTimer = new NioTimer(); private final ReentrantLock runLock = new ReentrantLock(); private final CountDownLatch exitedLoop = new CountDownLatch(1); private final AtomicBoolean isClosed = new AtomicBoolean(false); @@ -81,6 +83,10 @@ public ByteBuffer getIoBuffer() { return ioBuffer; } + public NioTimer getNioTimer() { + return nioTimer; + } + public Selector rawSelector() { return selector; } @@ -145,8 +151,16 @@ void singleLoop() { try { closePendingChannels(); preSelect(); - - int ready = selector.select(300); + long nanosUntilNextTask = nioTimer.nanosUntilNextTask(System.nanoTime()); + int ready; + if (nanosUntilNextTask == 0) { + ready = selector.selectNow(); + } else { + long millisUntilNextTask = TimeUnit.NANOSECONDS.toMillis(nanosUntilNextTask); + // Only select until the next task needs to be run. Do not select with a value of 0 because + // that blocks without a timeout. + ready = selector.select(Math.min(300, Math.max(millisUntilNextTask, 1))); + } if (ready > 0) { Set selectionKeys = selector.selectedKeys(); Iterator keyIterator = selectionKeys.iterator(); @@ -164,6 +178,8 @@ void singleLoop() { } } } + + handleScheduledTasks(System.nanoTime()); } catch (ClosedSelectorException e) { if (isOpen()) { throw e; @@ -245,6 +261,17 @@ void preSelect() { handleQueuedWrites(); } + private void handleScheduledTasks(long nanoTime) { + Runnable task; + while ((task = nioTimer.pollTask(nanoTime)) != null) { + try { + task.run(); + } catch (Exception e) { + eventHandler.taskException(e); + } + } + } + /** * Queues a write operation to be handled by the event loop. This can be called by any thread and is the * api available for non-selector threads to schedule writes. @@ -267,8 +294,10 @@ public void queueChannelClose(NioChannel channel) { ChannelContext context = channel.getContext(); assert context.getSelector() == this : "Must schedule a channel for closure with its selector"; channelsToClose.offer(context); - ensureSelectorOpenForEnqueuing(channelsToClose, context); - wakeup(); + if (isOnCurrentThread() == false) { + ensureSelectorOpenForEnqueuing(channelsToClose, context); + wakeup(); + } } /** @@ -324,7 +353,7 @@ public void executeListener(BiConsumer listener, V value) { try { listener.accept(value, null); } catch (Exception e) { - eventHandler.listenerException(e); + eventHandler.taskException(e); } } @@ -340,7 +369,7 @@ public void executeFailedListener(BiConsumer listener, Excepti try { listener.accept(null, exception); } catch (Exception e) { - eventHandler.listenerException(e); + eventHandler.taskException(e); } } diff --git a/libs/nio/src/main/java/org/elasticsearch/nio/NioTimer.java b/libs/nio/src/main/java/org/elasticsearch/nio/NioTimer.java index 23f6de6ad7597..afa5997b77f25 100644 --- a/libs/nio/src/main/java/org/elasticsearch/nio/NioTimer.java +++ b/libs/nio/src/main/java/org/elasticsearch/nio/NioTimer.java @@ -21,32 +21,27 @@ import org.elasticsearch.common.unit.TimeValue; -import java.util.ArrayList; import java.util.Comparator; import java.util.PriorityQueue; -class NioTimer { - - private final PriorityQueue tasks; +/** + * A basic priority queue backed timer service. The service is thread local and should only be used by a + * single nio selector event loop thread. + */ +public class NioTimer { - NioTimer() { - tasks = new PriorityQueue<>(Comparator.comparingLong(DelayedTask::getDeadline)); - } + private final PriorityQueue tasks = new PriorityQueue<>(Comparator.comparingLong(DelayedTask::getDeadline)); - Runnable schedule(Runnable task, TimeValue timeValue) { + public Cancellable schedule(Runnable task, TimeValue timeValue) { return scheduleAtRelativeTime(task, System.nanoTime() + timeValue.nanos()); } - Runnable scheduleAtRelativeTime(Runnable task, long relativeNanos) { + public Cancellable scheduleAtRelativeTime(Runnable task, long relativeNanos) { DelayedTask delayedTask = new DelayedTask(relativeNanos, task); tasks.offer(delayedTask); return delayedTask; } - Runnable pollTask() { - return pollTask(System.nanoTime()); - } - Runnable pollTask(long relativeNanos) { DelayedTask task; while ((task = tasks.peek()) != null) { @@ -62,7 +57,21 @@ Runnable pollTask(long relativeNanos) { return null; } - private static class DelayedTask implements Runnable { + long nanosUntilNextTask(long relativeNanos) { + DelayedTask nextTask = tasks.peek(); + if (nextTask == null) { + return Long.MAX_VALUE; + } else { + return Math.max(nextTask.deadline - relativeNanos, 0); + } + } + + public interface Cancellable { + + void cancel(); + } + + private static class DelayedTask implements Cancellable { private final long deadline; private final Runnable runnable; @@ -78,7 +87,7 @@ private long getDeadline() { } @Override - public void run() { + public void cancel() { cancelled = true; } } diff --git a/libs/nio/src/main/java/org/elasticsearch/nio/SocketChannelContext.java b/libs/nio/src/main/java/org/elasticsearch/nio/SocketChannelContext.java index 864fe793fdf73..661c55cc7280a 100644 --- a/libs/nio/src/main/java/org/elasticsearch/nio/SocketChannelContext.java +++ b/libs/nio/src/main/java/org/elasticsearch/nio/SocketChannelContext.java @@ -234,6 +234,9 @@ protected boolean closeNow() { return closeNow; } + protected void setCloseNow() { + closeNow = true; + } // When you read or write to a nio socket in java, the heap memory passed down must be copied to/from // direct memory. The JVM internally does some buffering of the direct memory, however we can save space diff --git a/libs/nio/src/test/java/org/elasticsearch/nio/EventHandlerTests.java b/libs/nio/src/test/java/org/elasticsearch/nio/EventHandlerTests.java index 6e1e34ec1f572..f3ffab1baef67 100644 --- a/libs/nio/src/test/java/org/elasticsearch/nio/EventHandlerTests.java +++ b/libs/nio/src/test/java/org/elasticsearch/nio/EventHandlerTests.java @@ -245,7 +245,7 @@ public void testPostHandlingWillRemoveWriteIfNecessary() throws IOException { public void testListenerExceptionCallsGenericExceptionHandler() throws IOException { RuntimeException listenerException = new RuntimeException(); - handler.listenerException(listenerException); + handler.taskException(listenerException); verify(genericExceptionHandler).accept(listenerException); } diff --git a/libs/nio/src/test/java/org/elasticsearch/nio/NioSelectorTests.java b/libs/nio/src/test/java/org/elasticsearch/nio/NioSelectorTests.java index bd5f1c1eb346f..a45a405eae45b 100644 --- a/libs/nio/src/test/java/org/elasticsearch/nio/NioSelectorTests.java +++ b/libs/nio/src/test/java/org/elasticsearch/nio/NioSelectorTests.java @@ -19,8 +19,10 @@ package org.elasticsearch.nio; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.test.ESTestCase; import org.junit.Before; +import org.mockito.ArgumentCaptor; import java.io.IOException; import java.nio.ByteBuffer; @@ -31,6 +33,8 @@ import java.nio.channels.Selector; import java.util.Collections; import java.util.HashSet; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiConsumer; import static org.mockito.Matchers.any; @@ -98,6 +102,37 @@ public void testQueueChannelForClosed() throws IOException { verify(eventHandler).handleClose(context); } + public void testNioDelayedTasksAreExecuted() throws IOException { + AtomicBoolean isRun = new AtomicBoolean(false); + long nanoTime = System.nanoTime() - 1; + selector.getNioTimer().scheduleAtRelativeTime(() -> isRun.set(true), nanoTime); + + assertFalse(isRun.get()); + selector.singleLoop(); + verify(rawSelector).selectNow(); + assertTrue(isRun.get()); + } + + public void testDefaultSelectorTimeoutIsUsedIfNoTaskSooner() throws IOException { + selector.getNioTimer().schedule(() -> {}, new TimeValue(15, TimeUnit.MINUTES)); + + selector.singleLoop(); + verify(rawSelector).select(300); + } + + public void testSelectorTimeoutWillBeReducedIfTaskSooner() throws Exception { + // As this is a timing based test, we must assertBusy in the very small chance that the loop is + // delayed for 50 milliseconds (causing a selectNow()) + assertBusy(() -> { + ArgumentCaptor captor = ArgumentCaptor.forClass(Long.class); + selector.getNioTimer().schedule(() -> {}, new TimeValue(50, TimeUnit.MILLISECONDS)); + selector.singleLoop(); + verify(rawSelector).select(captor.capture()); + assertTrue(captor.getValue() > 0); + assertTrue(captor.getValue() < 300); + }); + } + public void testSelectorClosedExceptionIsNotCaughtWhileRunning() throws IOException { boolean closedSelectorExceptionCaught = false; when(rawSelector.select(anyInt())).thenThrow(new ClosedSelectorException()); @@ -425,7 +460,7 @@ public void testExecuteListenerWillHandleException() throws Exception { selector.executeListener(listener, null); - verify(eventHandler).listenerException(exception); + verify(eventHandler).taskException(exception); } public void testExecuteFailedListenerWillHandleException() throws Exception { @@ -435,6 +470,6 @@ public void testExecuteFailedListenerWillHandleException() throws Exception { selector.executeFailedListener(listener, ioException); - verify(eventHandler).listenerException(exception); + verify(eventHandler).taskException(exception); } } diff --git a/libs/nio/src/test/java/org/elasticsearch/nio/NioTimerTests.java b/libs/nio/src/test/java/org/elasticsearch/nio/NioTimerTests.java index 9effcd0bc6876..9912021957d09 100644 --- a/libs/nio/src/test/java/org/elasticsearch/nio/NioTimerTests.java +++ b/libs/nio/src/test/java/org/elasticsearch/nio/NioTimerTests.java @@ -84,13 +84,21 @@ public void testTaskCancel() { AtomicBoolean first = new AtomicBoolean(false); AtomicBoolean second = new AtomicBoolean(false); long executeTime = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(10); - Runnable cancellable = timer.scheduleAtRelativeTime(() -> first.set(true), executeTime); + NioTimer.Cancellable cancellable = timer.scheduleAtRelativeTime(() -> first.set(true), executeTime); timer.scheduleAtRelativeTime(() -> second.set(true), executeTime + 1); - cancellable.run(); + cancellable.cancel(); timer.pollTask(executeTime + 10).run(); assertFalse(first.get()); assertTrue(second.get()); assertNull(timer.pollTask(executeTime + 10)); } + + public void testNanosUntilNextTask() { + long nanoTime = System.nanoTime(); + long executeTime = nanoTime + TimeUnit.MILLISECONDS.toNanos(10); + timer.scheduleAtRelativeTime(() -> {}, executeTime); + assertEquals(TimeUnit.MILLISECONDS.toNanos(10), timer.nanosUntilNextTask(nanoTime)); + assertEquals(TimeUnit.MILLISECONDS.toNanos(5), timer.nanosUntilNextTask(nanoTime + TimeUnit.MILLISECONDS.toNanos(5))); + } } diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/nio/SSLChannelContext.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/nio/SSLChannelContext.java index c83bd16ca95e1..f72d7a7c9fe82 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/nio/SSLChannelContext.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/nio/SSLChannelContext.java @@ -5,10 +5,12 @@ */ package org.elasticsearch.xpack.security.transport.nio; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.nio.FlushOperation; import org.elasticsearch.nio.InboundChannelBuffer; import org.elasticsearch.nio.NioSocketChannel; +import org.elasticsearch.nio.NioTimer; import org.elasticsearch.nio.ReadWriteHandler; import org.elasticsearch.nio.SocketChannelContext; import org.elasticsearch.nio.NioSelector; @@ -16,6 +18,7 @@ import javax.net.ssl.SSLEngine; import java.io.IOException; +import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Predicate; @@ -28,7 +31,11 @@ */ public final class SSLChannelContext extends SocketChannelContext { + private static final TimeValue CLOSE_TIMEOUT = new TimeValue(10, TimeUnit.SECONDS); + private static final NioTimer.Cancellable DEFAULT_TIMEOUT_TASK = () -> {}; + private final SSLDriver sslDriver; + private NioTimer.Cancellable closeTimeoutTask = DEFAULT_TIMEOUT_TASK; SSLChannelContext(NioSocketChannel channel, NioSelector selector, Consumer exceptionHandler, SSLDriver sslDriver, ReadWriteHandler readWriteHandler, InboundChannelBuffer channelBuffer) { @@ -53,6 +60,7 @@ public void queueWriteOperation(WriteOperation writeOperation) { getSelector().assertOnSelectorThread(); if (writeOperation instanceof CloseNotifyOperation) { sslDriver.initiateClose(); + closeTimeoutTask = getSelector().getNioTimer().schedule(this::channelCloseTimeout, CLOSE_TIMEOUT); } else { super.queueWriteOperation(writeOperation); } @@ -161,6 +169,7 @@ public void closeChannel() { public void closeFromSelector() throws IOException { getSelector().assertOnSelectorThread(); if (channel.isOpen()) { + closeTimeoutTask.cancel(); IOUtils.close(super::closeFromSelector, sslDriver::close); } } @@ -169,6 +178,12 @@ public SSLEngine getSSLEngine() { return sslDriver.getSSLEngine(); } + private void channelCloseTimeout() { + closeTimeoutTask = DEFAULT_TIMEOUT_TASK; + setCloseNow(); + getSelector().queueChannelClose(channel); + } + private static class CloseNotifyOperation implements WriteOperation { private static final BiConsumer LISTENER = (v, t) -> {}; diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/nio/SSLChannelContextTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/nio/SSLChannelContextTests.java index 4fdfb196d034e..a24183d8ff1c2 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/nio/SSLChannelContextTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/nio/SSLChannelContextTests.java @@ -6,12 +6,14 @@ package org.elasticsearch.xpack.security.transport.nio; import org.elasticsearch.common.CheckedFunction; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.nio.BytesWriteHandler; import org.elasticsearch.nio.FlushReadyWrite; import org.elasticsearch.nio.InboundChannelBuffer; import org.elasticsearch.nio.NioSelector; import org.elasticsearch.nio.NioSocketChannel; +import org.elasticsearch.nio.NioTimer; import org.elasticsearch.nio.WriteOperation; import org.elasticsearch.test.ESTestCase; import org.junit.Before; @@ -22,13 +24,16 @@ import java.nio.ByteBuffer; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; +import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; import java.util.function.Consumer; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; import static org.mockito.Matchers.same; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -41,6 +46,7 @@ public class SSLChannelContextTests extends ESTestCase { private SSLChannelContext context; private InboundChannelBuffer channelBuffer; private NioSelector selector; + private NioTimer nioTimer; private BiConsumer listener; private Consumer exceptionHandler; private SSLDriver sslDriver; @@ -56,6 +62,7 @@ public void init() { messageLength = randomInt(96) + 20; selector = mock(NioSelector.class); + nioTimer = mock(NioTimer.class); listener = mock(BiConsumer.class); channel = mock(NioSocketChannel.class); rawChannel = mock(SocketChannel.class); @@ -66,6 +73,7 @@ public void init() { context = new SSLChannelContext(channel, selector, exceptionHandler, sslDriver, readWriteHandler, channelBuffer); when(selector.isOnCurrentThread()).thenReturn(true); + when(selector.getNioTimer()).thenReturn(nioTimer); when(sslDriver.getNetworkReadBuffer()).thenReturn(readBuffer); when(sslDriver.getNetworkWriteBuffer()).thenReturn(writeBuffer); ByteBuffer buffer = ByteBuffer.allocate(1 << 14); @@ -334,6 +342,44 @@ public void testReadyToCloseIfDriverIndicateClosed() { assertTrue(context.selectorShouldClose()); } + public void testCloseTimeout() { + context.closeChannel(); + + ArgumentCaptor captor = ArgumentCaptor.forClass(WriteOperation.class); + verify(selector).writeToChannel(captor.capture()); + + ArgumentCaptor taskCaptor = ArgumentCaptor.forClass(Runnable.class); + NioTimer.Cancellable cancellable = mock(NioTimer.Cancellable.class); + when(nioTimer.schedule(taskCaptor.capture(), eq(new TimeValue(10, TimeUnit.SECONDS)))).thenReturn(cancellable); + context.queueWriteOperation(captor.getValue()); + verify(nioTimer).schedule(taskCaptor.capture(), eq(new TimeValue(10, TimeUnit.SECONDS))); + assertFalse(context.selectorShouldClose()); + taskCaptor.getValue().run(); + assertTrue(context.selectorShouldClose()); + verify(selector).queueChannelClose(channel); + verify(cancellable, never()).cancel(); + } + + @SuppressWarnings("unchecked") + public void testCloseTimeoutIsCancelledOnClose() throws IOException { + try (SocketChannel realChannel = SocketChannel.open()) { + when(channel.getRawChannel()).thenReturn(realChannel); + TestReadWriteHandler readWriteHandler = new TestReadWriteHandler(readConsumer); + context = new SSLChannelContext(channel, selector, exceptionHandler, sslDriver, readWriteHandler, channelBuffer); + context.closeChannel(); + ArgumentCaptor captor = ArgumentCaptor.forClass(WriteOperation.class); + verify(selector).writeToChannel(captor.capture()); + ArgumentCaptor taskCaptor = ArgumentCaptor.forClass(Runnable.class); + NioTimer.Cancellable cancellable = mock(NioTimer.Cancellable.class); + when(nioTimer.schedule(taskCaptor.capture(), eq(new TimeValue(10, TimeUnit.SECONDS)))).thenReturn(cancellable); + context.queueWriteOperation(captor.getValue()); + + when(channel.isOpen()).thenReturn(true); + context.closeFromSelector(); + verify(cancellable).cancel(); + } + } + public void testInitiateCloseFromDifferentThreadSchedulesCloseNotify() { when(selector.isOnCurrentThread()).thenReturn(false, true); context.closeChannel(); From 1033d0d5bc6acc6c7dc2c34a93abe3b120e0021e Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Wed, 9 Jan 2019 08:27:55 -0700 Subject: [PATCH 4/4] Changes from review --- .../org/elasticsearch/nio/NioSelector.java | 10 ++--- .../nio/{NioTimer.java => TaskScheduler.java} | 28 ++++++------- .../elasticsearch/nio/NioSelectorTests.java | 8 ++-- ...imerTests.java => TaskSchedulerTests.java} | 42 +++++++++---------- .../transport/nio/SSLChannelContext.java | 14 +++---- .../transport/nio/SSLChannelContextTests.java | 26 ++++++------ 6 files changed, 63 insertions(+), 65 deletions(-) rename libs/nio/src/main/java/org/elasticsearch/nio/{NioTimer.java => TaskScheduler.java} (79%) rename libs/nio/src/test/java/org/elasticsearch/nio/{NioTimerTests.java => TaskSchedulerTests.java} (65%) diff --git a/libs/nio/src/main/java/org/elasticsearch/nio/NioSelector.java b/libs/nio/src/main/java/org/elasticsearch/nio/NioSelector.java index 3d7395eae920f..cacd06bde5fa3 100644 --- a/libs/nio/src/main/java/org/elasticsearch/nio/NioSelector.java +++ b/libs/nio/src/main/java/org/elasticsearch/nio/NioSelector.java @@ -55,7 +55,7 @@ public class NioSelector implements Closeable { private final Selector selector; private final ByteBuffer ioBuffer; - private final NioTimer nioTimer = new NioTimer(); + private final TaskScheduler taskScheduler = new TaskScheduler(); private final ReentrantLock runLock = new ReentrantLock(); private final CountDownLatch exitedLoop = new CountDownLatch(1); private final AtomicBoolean isClosed = new AtomicBoolean(false); @@ -83,8 +83,8 @@ public ByteBuffer getIoBuffer() { return ioBuffer; } - public NioTimer getNioTimer() { - return nioTimer; + public TaskScheduler getTaskScheduler() { + return taskScheduler; } public Selector rawSelector() { @@ -151,7 +151,7 @@ void singleLoop() { try { closePendingChannels(); preSelect(); - long nanosUntilNextTask = nioTimer.nanosUntilNextTask(System.nanoTime()); + long nanosUntilNextTask = taskScheduler.nanosUntilNextTask(System.nanoTime()); int ready; if (nanosUntilNextTask == 0) { ready = selector.selectNow(); @@ -263,7 +263,7 @@ void preSelect() { private void handleScheduledTasks(long nanoTime) { Runnable task; - while ((task = nioTimer.pollTask(nanoTime)) != null) { + while ((task = taskScheduler.pollTask(nanoTime)) != null) { try { task.run(); } catch (Exception e) { diff --git a/libs/nio/src/main/java/org/elasticsearch/nio/NioTimer.java b/libs/nio/src/main/java/org/elasticsearch/nio/TaskScheduler.java similarity index 79% rename from libs/nio/src/main/java/org/elasticsearch/nio/NioTimer.java rename to libs/nio/src/main/java/org/elasticsearch/nio/TaskScheduler.java index afa5997b77f25..e197230147c8b 100644 --- a/libs/nio/src/main/java/org/elasticsearch/nio/NioTimer.java +++ b/libs/nio/src/main/java/org/elasticsearch/nio/TaskScheduler.java @@ -19,8 +19,6 @@ package org.elasticsearch.nio; -import org.elasticsearch.common.unit.TimeValue; - import java.util.Comparator; import java.util.PriorityQueue; @@ -28,15 +26,20 @@ * A basic priority queue backed timer service. The service is thread local and should only be used by a * single nio selector event loop thread. */ -public class NioTimer { +public class TaskScheduler { private final PriorityQueue tasks = new PriorityQueue<>(Comparator.comparingLong(DelayedTask::getDeadline)); - public Cancellable schedule(Runnable task, TimeValue timeValue) { - return scheduleAtRelativeTime(task, System.nanoTime() + timeValue.nanos()); - } - - public Cancellable scheduleAtRelativeTime(Runnable task, long relativeNanos) { + /** + * Schedule a task at the defined relative nanotime. When {@link #pollTask(long)} is called with a + * relative nanotime after the scheduled time, the task will be returned. This method returns a + * {@link Runnable} that can be run to cancel the scheduled task. + * + * @param task to schedule + * @param relativeNanos defining when to execute the task + * @return runnable that will cancel the task + */ + public Runnable scheduleAtRelativeTime(Runnable task, long relativeNanos) { DelayedTask delayedTask = new DelayedTask(relativeNanos, task); tasks.offer(delayedTask); return delayedTask; @@ -66,12 +69,7 @@ long nanosUntilNextTask(long relativeNanos) { } } - public interface Cancellable { - - void cancel(); - } - - private static class DelayedTask implements Cancellable { + private static class DelayedTask implements Runnable { private final long deadline; private final Runnable runnable; @@ -87,7 +85,7 @@ private long getDeadline() { } @Override - public void cancel() { + public void run() { cancelled = true; } } diff --git a/libs/nio/src/test/java/org/elasticsearch/nio/NioSelectorTests.java b/libs/nio/src/test/java/org/elasticsearch/nio/NioSelectorTests.java index a45a405eae45b..8cde769cca3a2 100644 --- a/libs/nio/src/test/java/org/elasticsearch/nio/NioSelectorTests.java +++ b/libs/nio/src/test/java/org/elasticsearch/nio/NioSelectorTests.java @@ -105,7 +105,7 @@ public void testQueueChannelForClosed() throws IOException { public void testNioDelayedTasksAreExecuted() throws IOException { AtomicBoolean isRun = new AtomicBoolean(false); long nanoTime = System.nanoTime() - 1; - selector.getNioTimer().scheduleAtRelativeTime(() -> isRun.set(true), nanoTime); + selector.getTaskScheduler().scheduleAtRelativeTime(() -> isRun.set(true), nanoTime); assertFalse(isRun.get()); selector.singleLoop(); @@ -114,7 +114,8 @@ public void testNioDelayedTasksAreExecuted() throws IOException { } public void testDefaultSelectorTimeoutIsUsedIfNoTaskSooner() throws IOException { - selector.getNioTimer().schedule(() -> {}, new TimeValue(15, TimeUnit.MINUTES)); + long delay = new TimeValue(15, TimeUnit.MINUTES).nanos(); + selector.getTaskScheduler().scheduleAtRelativeTime(() -> {}, System.nanoTime() + delay); selector.singleLoop(); verify(rawSelector).select(300); @@ -125,7 +126,8 @@ public void testSelectorTimeoutWillBeReducedIfTaskSooner() throws Exception { // delayed for 50 milliseconds (causing a selectNow()) assertBusy(() -> { ArgumentCaptor captor = ArgumentCaptor.forClass(Long.class); - selector.getNioTimer().schedule(() -> {}, new TimeValue(50, TimeUnit.MILLISECONDS)); + long delay = new TimeValue(50, TimeUnit.MILLISECONDS).nanos(); + selector.getTaskScheduler().scheduleAtRelativeTime(() -> {}, System.nanoTime() + delay); selector.singleLoop(); verify(rawSelector).select(captor.capture()); assertTrue(captor.getValue() > 0); diff --git a/libs/nio/src/test/java/org/elasticsearch/nio/NioTimerTests.java b/libs/nio/src/test/java/org/elasticsearch/nio/TaskSchedulerTests.java similarity index 65% rename from libs/nio/src/test/java/org/elasticsearch/nio/NioTimerTests.java rename to libs/nio/src/test/java/org/elasticsearch/nio/TaskSchedulerTests.java index 9912021957d09..4f5c074826b25 100644 --- a/libs/nio/src/test/java/org/elasticsearch/nio/NioTimerTests.java +++ b/libs/nio/src/test/java/org/elasticsearch/nio/TaskSchedulerTests.java @@ -25,19 +25,19 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.LockSupport; -public class NioTimerTests extends ESTestCase { +public class TaskSchedulerTests extends ESTestCase { - private NioTimer timer = new NioTimer(); + private TaskScheduler scheduler = new TaskScheduler(); public void testScheduleTask() { AtomicBoolean complete = new AtomicBoolean(false); long executeTime = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(10); - timer.scheduleAtRelativeTime(() -> complete.set(true), executeTime); + scheduler.scheduleAtRelativeTime(() -> complete.set(true), executeTime); while (true) { long nanoTime = System.nanoTime(); - Runnable runnable = timer.pollTask(nanoTime); + Runnable runnable = scheduler.pollTask(nanoTime); if (nanoTime - executeTime >= 0) { runnable.run(); assertTrue(complete.get()); @@ -51,10 +51,10 @@ public void testScheduleTask() { public void testPollScheduleTaskAtExactTime() { long executeTime = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(10); - timer.scheduleAtRelativeTime(() -> {}, executeTime); + scheduler.scheduleAtRelativeTime(() -> {}, executeTime); - assertNull(timer.pollTask(executeTime - 1)); - assertNotNull(timer.pollTask(executeTime)); + assertNull(scheduler.pollTask(executeTime - 1)); + assertNotNull(scheduler.pollTask(executeTime)); } public void testTaskOrdering() { @@ -62,19 +62,19 @@ public void testTaskOrdering() { AtomicBoolean second = new AtomicBoolean(false); AtomicBoolean third = new AtomicBoolean(false); long executeTime = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(10); - timer.scheduleAtRelativeTime(() -> third.set(true), executeTime + 2); - timer.scheduleAtRelativeTime(() -> first.set(true), executeTime); - timer.scheduleAtRelativeTime(() -> second.set(true), executeTime + 1); + scheduler.scheduleAtRelativeTime(() -> third.set(true), executeTime + 2); + scheduler.scheduleAtRelativeTime(() -> first.set(true), executeTime); + scheduler.scheduleAtRelativeTime(() -> second.set(true), executeTime + 1); - timer.pollTask(executeTime + 10).run(); + scheduler.pollTask(executeTime + 10).run(); assertTrue(first.get()); assertFalse(second.get()); assertFalse(third.get()); - timer.pollTask(executeTime + 10).run(); + scheduler.pollTask(executeTime + 10).run(); assertTrue(first.get()); assertTrue(second.get()); assertFalse(third.get()); - timer.pollTask(executeTime + 10).run(); + scheduler.pollTask(executeTime + 10).run(); assertTrue(first.get()); assertTrue(second.get()); assertTrue(third.get()); @@ -84,21 +84,21 @@ public void testTaskCancel() { AtomicBoolean first = new AtomicBoolean(false); AtomicBoolean second = new AtomicBoolean(false); long executeTime = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(10); - NioTimer.Cancellable cancellable = timer.scheduleAtRelativeTime(() -> first.set(true), executeTime); - timer.scheduleAtRelativeTime(() -> second.set(true), executeTime + 1); + Runnable cancellable = scheduler.scheduleAtRelativeTime(() -> first.set(true), executeTime); + scheduler.scheduleAtRelativeTime(() -> second.set(true), executeTime + 1); - cancellable.cancel(); - timer.pollTask(executeTime + 10).run(); + cancellable.run(); + scheduler.pollTask(executeTime + 10).run(); assertFalse(first.get()); assertTrue(second.get()); - assertNull(timer.pollTask(executeTime + 10)); + assertNull(scheduler.pollTask(executeTime + 10)); } public void testNanosUntilNextTask() { long nanoTime = System.nanoTime(); long executeTime = nanoTime + TimeUnit.MILLISECONDS.toNanos(10); - timer.scheduleAtRelativeTime(() -> {}, executeTime); - assertEquals(TimeUnit.MILLISECONDS.toNanos(10), timer.nanosUntilNextTask(nanoTime)); - assertEquals(TimeUnit.MILLISECONDS.toNanos(5), timer.nanosUntilNextTask(nanoTime + TimeUnit.MILLISECONDS.toNanos(5))); + scheduler.scheduleAtRelativeTime(() -> {}, executeTime); + assertEquals(TimeUnit.MILLISECONDS.toNanos(10), scheduler.nanosUntilNextTask(nanoTime)); + assertEquals(TimeUnit.MILLISECONDS.toNanos(5), scheduler.nanosUntilNextTask(nanoTime + TimeUnit.MILLISECONDS.toNanos(5))); } } diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/nio/SSLChannelContext.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/nio/SSLChannelContext.java index f72d7a7c9fe82..b5d5db2166c1f 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/nio/SSLChannelContext.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/nio/SSLChannelContext.java @@ -10,7 +10,6 @@ import org.elasticsearch.nio.FlushOperation; import org.elasticsearch.nio.InboundChannelBuffer; import org.elasticsearch.nio.NioSocketChannel; -import org.elasticsearch.nio.NioTimer; import org.elasticsearch.nio.ReadWriteHandler; import org.elasticsearch.nio.SocketChannelContext; import org.elasticsearch.nio.NioSelector; @@ -31,11 +30,11 @@ */ public final class SSLChannelContext extends SocketChannelContext { - private static final TimeValue CLOSE_TIMEOUT = new TimeValue(10, TimeUnit.SECONDS); - private static final NioTimer.Cancellable DEFAULT_TIMEOUT_TASK = () -> {}; + private static final long CLOSE_TIMEOUT_NANOS = new TimeValue(10, TimeUnit.SECONDS).nanos(); + private static final Runnable DEFAULT_TIMEOUT_CANCELLER = () -> {}; private final SSLDriver sslDriver; - private NioTimer.Cancellable closeTimeoutTask = DEFAULT_TIMEOUT_TASK; + private Runnable closeTimeoutCanceller = DEFAULT_TIMEOUT_CANCELLER; SSLChannelContext(NioSocketChannel channel, NioSelector selector, Consumer exceptionHandler, SSLDriver sslDriver, ReadWriteHandler readWriteHandler, InboundChannelBuffer channelBuffer) { @@ -60,7 +59,8 @@ public void queueWriteOperation(WriteOperation writeOperation) { getSelector().assertOnSelectorThread(); if (writeOperation instanceof CloseNotifyOperation) { sslDriver.initiateClose(); - closeTimeoutTask = getSelector().getNioTimer().schedule(this::channelCloseTimeout, CLOSE_TIMEOUT); + long relativeNanos = CLOSE_TIMEOUT_NANOS + System.nanoTime(); + closeTimeoutCanceller = getSelector().getTaskScheduler().scheduleAtRelativeTime(this::channelCloseTimeout, relativeNanos); } else { super.queueWriteOperation(writeOperation); } @@ -169,7 +169,7 @@ public void closeChannel() { public void closeFromSelector() throws IOException { getSelector().assertOnSelectorThread(); if (channel.isOpen()) { - closeTimeoutTask.cancel(); + closeTimeoutCanceller.run(); IOUtils.close(super::closeFromSelector, sslDriver::close); } } @@ -179,7 +179,7 @@ public SSLEngine getSSLEngine() { } private void channelCloseTimeout() { - closeTimeoutTask = DEFAULT_TIMEOUT_TASK; + closeTimeoutCanceller = DEFAULT_TIMEOUT_CANCELLER; setCloseNow(); getSelector().queueChannelClose(channel); } diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/nio/SSLChannelContextTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/nio/SSLChannelContextTests.java index a24183d8ff1c2..0870124022850 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/nio/SSLChannelContextTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/nio/SSLChannelContextTests.java @@ -6,14 +6,13 @@ package org.elasticsearch.xpack.security.transport.nio; import org.elasticsearch.common.CheckedFunction; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.nio.BytesWriteHandler; import org.elasticsearch.nio.FlushReadyWrite; import org.elasticsearch.nio.InboundChannelBuffer; import org.elasticsearch.nio.NioSelector; import org.elasticsearch.nio.NioSocketChannel; -import org.elasticsearch.nio.NioTimer; +import org.elasticsearch.nio.TaskScheduler; import org.elasticsearch.nio.WriteOperation; import org.elasticsearch.test.ESTestCase; import org.junit.Before; @@ -24,12 +23,11 @@ import java.nio.ByteBuffer; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; -import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; import java.util.function.Consumer; import static org.mockito.Matchers.any; -import static org.mockito.Matchers.eq; +import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.same; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; @@ -46,7 +44,7 @@ public class SSLChannelContextTests extends ESTestCase { private SSLChannelContext context; private InboundChannelBuffer channelBuffer; private NioSelector selector; - private NioTimer nioTimer; + private TaskScheduler nioTimer; private BiConsumer listener; private Consumer exceptionHandler; private SSLDriver sslDriver; @@ -62,7 +60,7 @@ public void init() { messageLength = randomInt(96) + 20; selector = mock(NioSelector.class); - nioTimer = mock(NioTimer.class); + nioTimer = mock(TaskScheduler.class); listener = mock(BiConsumer.class); channel = mock(NioSocketChannel.class); rawChannel = mock(SocketChannel.class); @@ -73,7 +71,7 @@ public void init() { context = new SSLChannelContext(channel, selector, exceptionHandler, sslDriver, readWriteHandler, channelBuffer); when(selector.isOnCurrentThread()).thenReturn(true); - when(selector.getNioTimer()).thenReturn(nioTimer); + when(selector.getTaskScheduler()).thenReturn(nioTimer); when(sslDriver.getNetworkReadBuffer()).thenReturn(readBuffer); when(sslDriver.getNetworkWriteBuffer()).thenReturn(writeBuffer); ByteBuffer buffer = ByteBuffer.allocate(1 << 14); @@ -349,15 +347,15 @@ public void testCloseTimeout() { verify(selector).writeToChannel(captor.capture()); ArgumentCaptor taskCaptor = ArgumentCaptor.forClass(Runnable.class); - NioTimer.Cancellable cancellable = mock(NioTimer.Cancellable.class); - when(nioTimer.schedule(taskCaptor.capture(), eq(new TimeValue(10, TimeUnit.SECONDS)))).thenReturn(cancellable); + Runnable cancellable = mock(Runnable.class); + when(nioTimer.scheduleAtRelativeTime(taskCaptor.capture(), anyLong())).thenReturn(cancellable); context.queueWriteOperation(captor.getValue()); - verify(nioTimer).schedule(taskCaptor.capture(), eq(new TimeValue(10, TimeUnit.SECONDS))); + verify(nioTimer).scheduleAtRelativeTime(taskCaptor.capture(), anyLong()); assertFalse(context.selectorShouldClose()); taskCaptor.getValue().run(); assertTrue(context.selectorShouldClose()); verify(selector).queueChannelClose(channel); - verify(cancellable, never()).cancel(); + verify(cancellable, never()).run(); } @SuppressWarnings("unchecked") @@ -370,13 +368,13 @@ public void testCloseTimeoutIsCancelledOnClose() throws IOException { ArgumentCaptor captor = ArgumentCaptor.forClass(WriteOperation.class); verify(selector).writeToChannel(captor.capture()); ArgumentCaptor taskCaptor = ArgumentCaptor.forClass(Runnable.class); - NioTimer.Cancellable cancellable = mock(NioTimer.Cancellable.class); - when(nioTimer.schedule(taskCaptor.capture(), eq(new TimeValue(10, TimeUnit.SECONDS)))).thenReturn(cancellable); + Runnable cancellable = mock(Runnable.class); + when(nioTimer.scheduleAtRelativeTime(taskCaptor.capture(), anyLong())).thenReturn(cancellable); context.queueWriteOperation(captor.getValue()); when(channel.isOpen()).thenReturn(true); context.closeFromSelector(); - verify(cancellable).cancel(); + verify(cancellable).run(); } }