diff --git a/libs/nio/src/main/java/org/elasticsearch/nio/EventHandler.java b/libs/nio/src/main/java/org/elasticsearch/nio/EventHandler.java index 87a2489fdbc27..7eebfe24665a2 100644 --- a/libs/nio/src/main/java/org/elasticsearch/nio/EventHandler.java +++ b/libs/nio/src/main/java/org/elasticsearch/nio/EventHandler.java @@ -150,11 +150,11 @@ protected void writeException(SocketChannelContext context, Exception exception) } /** - * This method is called when a listener attached to a channel operation throws an exception. + * This method is called when a task or listener attached to a channel operation throws an exception. * * @param exception that occurred */ - protected void listenerException(Exception exception) { + protected void taskException(Exception exception) { exceptionHandler.accept(exception); } diff --git a/libs/nio/src/main/java/org/elasticsearch/nio/NioSelector.java b/libs/nio/src/main/java/org/elasticsearch/nio/NioSelector.java index 6820b6a07188f..cacd06bde5fa3 100644 --- a/libs/nio/src/main/java/org/elasticsearch/nio/NioSelector.java +++ b/libs/nio/src/main/java/org/elasticsearch/nio/NioSelector.java @@ -33,6 +33,7 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantLock; @@ -54,6 +55,7 @@ public class NioSelector implements Closeable { private final Selector selector; private final ByteBuffer ioBuffer; + private final TaskScheduler taskScheduler = new TaskScheduler(); private final ReentrantLock runLock = new ReentrantLock(); private final CountDownLatch exitedLoop = new CountDownLatch(1); private final AtomicBoolean isClosed = new AtomicBoolean(false); @@ -81,6 +83,10 @@ public ByteBuffer getIoBuffer() { return ioBuffer; } + public TaskScheduler getTaskScheduler() { + return taskScheduler; + } + public Selector rawSelector() { return selector; } @@ -145,8 +151,16 @@ void singleLoop() { try { closePendingChannels(); preSelect(); - - int ready = selector.select(300); + long nanosUntilNextTask = taskScheduler.nanosUntilNextTask(System.nanoTime()); + int ready; + if (nanosUntilNextTask == 0) { + ready = selector.selectNow(); + } else { + long millisUntilNextTask = TimeUnit.NANOSECONDS.toMillis(nanosUntilNextTask); + // Only select until the next task needs to be run. Do not select with a value of 0 because + // that blocks without a timeout. + ready = selector.select(Math.min(300, Math.max(millisUntilNextTask, 1))); + } if (ready > 0) { Set selectionKeys = selector.selectedKeys(); Iterator keyIterator = selectionKeys.iterator(); @@ -164,6 +178,8 @@ void singleLoop() { } } } + + handleScheduledTasks(System.nanoTime()); } catch (ClosedSelectorException e) { if (isOpen()) { throw e; @@ -245,6 +261,17 @@ void preSelect() { handleQueuedWrites(); } + private void handleScheduledTasks(long nanoTime) { + Runnable task; + while ((task = taskScheduler.pollTask(nanoTime)) != null) { + try { + task.run(); + } catch (Exception e) { + eventHandler.taskException(e); + } + } + } + /** * Queues a write operation to be handled by the event loop. This can be called by any thread and is the * api available for non-selector threads to schedule writes. @@ -267,8 +294,10 @@ public void queueChannelClose(NioChannel channel) { ChannelContext context = channel.getContext(); assert context.getSelector() == this : "Must schedule a channel for closure with its selector"; channelsToClose.offer(context); - ensureSelectorOpenForEnqueuing(channelsToClose, context); - wakeup(); + if (isOnCurrentThread() == false) { + ensureSelectorOpenForEnqueuing(channelsToClose, context); + wakeup(); + } } /** @@ -324,7 +353,7 @@ public void executeListener(BiConsumer listener, V value) { try { listener.accept(value, null); } catch (Exception e) { - eventHandler.listenerException(e); + eventHandler.taskException(e); } } @@ -340,7 +369,7 @@ public void executeFailedListener(BiConsumer listener, Excepti try { listener.accept(null, exception); } catch (Exception e) { - eventHandler.listenerException(e); + eventHandler.taskException(e); } } diff --git a/libs/nio/src/main/java/org/elasticsearch/nio/SocketChannelContext.java b/libs/nio/src/main/java/org/elasticsearch/nio/SocketChannelContext.java index 864fe793fdf73..661c55cc7280a 100644 --- a/libs/nio/src/main/java/org/elasticsearch/nio/SocketChannelContext.java +++ b/libs/nio/src/main/java/org/elasticsearch/nio/SocketChannelContext.java @@ -234,6 +234,9 @@ protected boolean closeNow() { return closeNow; } + protected void setCloseNow() { + closeNow = true; + } // When you read or write to a nio socket in java, the heap memory passed down must be copied to/from // direct memory. The JVM internally does some buffering of the direct memory, however we can save space diff --git a/libs/nio/src/main/java/org/elasticsearch/nio/TaskScheduler.java b/libs/nio/src/main/java/org/elasticsearch/nio/TaskScheduler.java new file mode 100644 index 0000000000000..e197230147c8b --- /dev/null +++ b/libs/nio/src/main/java/org/elasticsearch/nio/TaskScheduler.java @@ -0,0 +1,92 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.nio; + +import java.util.Comparator; +import java.util.PriorityQueue; + +/** + * A basic priority queue backed timer service. The service is thread local and should only be used by a + * single nio selector event loop thread. + */ +public class TaskScheduler { + + private final PriorityQueue tasks = new PriorityQueue<>(Comparator.comparingLong(DelayedTask::getDeadline)); + + /** + * Schedule a task at the defined relative nanotime. When {@link #pollTask(long)} is called with a + * relative nanotime after the scheduled time, the task will be returned. This method returns a + * {@link Runnable} that can be run to cancel the scheduled task. + * + * @param task to schedule + * @param relativeNanos defining when to execute the task + * @return runnable that will cancel the task + */ + public Runnable scheduleAtRelativeTime(Runnable task, long relativeNanos) { + DelayedTask delayedTask = new DelayedTask(relativeNanos, task); + tasks.offer(delayedTask); + return delayedTask; + } + + Runnable pollTask(long relativeNanos) { + DelayedTask task; + while ((task = tasks.peek()) != null) { + if (relativeNanos - task.deadline >= 0) { + tasks.remove(); + if (task.cancelled == false) { + return task.runnable; + } + } else { + return null; + } + } + return null; + } + + long nanosUntilNextTask(long relativeNanos) { + DelayedTask nextTask = tasks.peek(); + if (nextTask == null) { + return Long.MAX_VALUE; + } else { + return Math.max(nextTask.deadline - relativeNanos, 0); + } + } + + private static class DelayedTask implements Runnable { + + private final long deadline; + private final Runnable runnable; + private boolean cancelled = false; + + private DelayedTask(long deadline, Runnable runnable) { + this.deadline = deadline; + this.runnable = runnable; + } + + private long getDeadline() { + return deadline; + } + + @Override + public void run() { + cancelled = true; + } + } +} diff --git a/libs/nio/src/test/java/org/elasticsearch/nio/EventHandlerTests.java b/libs/nio/src/test/java/org/elasticsearch/nio/EventHandlerTests.java index 6e1e34ec1f572..f3ffab1baef67 100644 --- a/libs/nio/src/test/java/org/elasticsearch/nio/EventHandlerTests.java +++ b/libs/nio/src/test/java/org/elasticsearch/nio/EventHandlerTests.java @@ -245,7 +245,7 @@ public void testPostHandlingWillRemoveWriteIfNecessary() throws IOException { public void testListenerExceptionCallsGenericExceptionHandler() throws IOException { RuntimeException listenerException = new RuntimeException(); - handler.listenerException(listenerException); + handler.taskException(listenerException); verify(genericExceptionHandler).accept(listenerException); } diff --git a/libs/nio/src/test/java/org/elasticsearch/nio/NioSelectorTests.java b/libs/nio/src/test/java/org/elasticsearch/nio/NioSelectorTests.java index bd5f1c1eb346f..8cde769cca3a2 100644 --- a/libs/nio/src/test/java/org/elasticsearch/nio/NioSelectorTests.java +++ b/libs/nio/src/test/java/org/elasticsearch/nio/NioSelectorTests.java @@ -19,8 +19,10 @@ package org.elasticsearch.nio; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.test.ESTestCase; import org.junit.Before; +import org.mockito.ArgumentCaptor; import java.io.IOException; import java.nio.ByteBuffer; @@ -31,6 +33,8 @@ import java.nio.channels.Selector; import java.util.Collections; import java.util.HashSet; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiConsumer; import static org.mockito.Matchers.any; @@ -98,6 +102,39 @@ public void testQueueChannelForClosed() throws IOException { verify(eventHandler).handleClose(context); } + public void testNioDelayedTasksAreExecuted() throws IOException { + AtomicBoolean isRun = new AtomicBoolean(false); + long nanoTime = System.nanoTime() - 1; + selector.getTaskScheduler().scheduleAtRelativeTime(() -> isRun.set(true), nanoTime); + + assertFalse(isRun.get()); + selector.singleLoop(); + verify(rawSelector).selectNow(); + assertTrue(isRun.get()); + } + + public void testDefaultSelectorTimeoutIsUsedIfNoTaskSooner() throws IOException { + long delay = new TimeValue(15, TimeUnit.MINUTES).nanos(); + selector.getTaskScheduler().scheduleAtRelativeTime(() -> {}, System.nanoTime() + delay); + + selector.singleLoop(); + verify(rawSelector).select(300); + } + + public void testSelectorTimeoutWillBeReducedIfTaskSooner() throws Exception { + // As this is a timing based test, we must assertBusy in the very small chance that the loop is + // delayed for 50 milliseconds (causing a selectNow()) + assertBusy(() -> { + ArgumentCaptor captor = ArgumentCaptor.forClass(Long.class); + long delay = new TimeValue(50, TimeUnit.MILLISECONDS).nanos(); + selector.getTaskScheduler().scheduleAtRelativeTime(() -> {}, System.nanoTime() + delay); + selector.singleLoop(); + verify(rawSelector).select(captor.capture()); + assertTrue(captor.getValue() > 0); + assertTrue(captor.getValue() < 300); + }); + } + public void testSelectorClosedExceptionIsNotCaughtWhileRunning() throws IOException { boolean closedSelectorExceptionCaught = false; when(rawSelector.select(anyInt())).thenThrow(new ClosedSelectorException()); @@ -425,7 +462,7 @@ public void testExecuteListenerWillHandleException() throws Exception { selector.executeListener(listener, null); - verify(eventHandler).listenerException(exception); + verify(eventHandler).taskException(exception); } public void testExecuteFailedListenerWillHandleException() throws Exception { @@ -435,6 +472,6 @@ public void testExecuteFailedListenerWillHandleException() throws Exception { selector.executeFailedListener(listener, ioException); - verify(eventHandler).listenerException(exception); + verify(eventHandler).taskException(exception); } } diff --git a/libs/nio/src/test/java/org/elasticsearch/nio/TaskSchedulerTests.java b/libs/nio/src/test/java/org/elasticsearch/nio/TaskSchedulerTests.java new file mode 100644 index 0000000000000..4f5c074826b25 --- /dev/null +++ b/libs/nio/src/test/java/org/elasticsearch/nio/TaskSchedulerTests.java @@ -0,0 +1,104 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.nio; + +import org.elasticsearch.test.ESTestCase; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.LockSupport; + +public class TaskSchedulerTests extends ESTestCase { + + private TaskScheduler scheduler = new TaskScheduler(); + + public void testScheduleTask() { + AtomicBoolean complete = new AtomicBoolean(false); + + long executeTime = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(10); + scheduler.scheduleAtRelativeTime(() -> complete.set(true), executeTime); + + while (true) { + long nanoTime = System.nanoTime(); + Runnable runnable = scheduler.pollTask(nanoTime); + if (nanoTime - executeTime >= 0) { + runnable.run(); + assertTrue(complete.get()); + break; + } else { + assertNull(runnable); + LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(1)); + } + } + } + + public void testPollScheduleTaskAtExactTime() { + long executeTime = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(10); + scheduler.scheduleAtRelativeTime(() -> {}, executeTime); + + assertNull(scheduler.pollTask(executeTime - 1)); + assertNotNull(scheduler.pollTask(executeTime)); + } + + public void testTaskOrdering() { + AtomicBoolean first = new AtomicBoolean(false); + AtomicBoolean second = new AtomicBoolean(false); + AtomicBoolean third = new AtomicBoolean(false); + long executeTime = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(10); + scheduler.scheduleAtRelativeTime(() -> third.set(true), executeTime + 2); + scheduler.scheduleAtRelativeTime(() -> first.set(true), executeTime); + scheduler.scheduleAtRelativeTime(() -> second.set(true), executeTime + 1); + + scheduler.pollTask(executeTime + 10).run(); + assertTrue(first.get()); + assertFalse(second.get()); + assertFalse(third.get()); + scheduler.pollTask(executeTime + 10).run(); + assertTrue(first.get()); + assertTrue(second.get()); + assertFalse(third.get()); + scheduler.pollTask(executeTime + 10).run(); + assertTrue(first.get()); + assertTrue(second.get()); + assertTrue(third.get()); + } + + public void testTaskCancel() { + AtomicBoolean first = new AtomicBoolean(false); + AtomicBoolean second = new AtomicBoolean(false); + long executeTime = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(10); + Runnable cancellable = scheduler.scheduleAtRelativeTime(() -> first.set(true), executeTime); + scheduler.scheduleAtRelativeTime(() -> second.set(true), executeTime + 1); + + cancellable.run(); + scheduler.pollTask(executeTime + 10).run(); + assertFalse(first.get()); + assertTrue(second.get()); + assertNull(scheduler.pollTask(executeTime + 10)); + } + + public void testNanosUntilNextTask() { + long nanoTime = System.nanoTime(); + long executeTime = nanoTime + TimeUnit.MILLISECONDS.toNanos(10); + scheduler.scheduleAtRelativeTime(() -> {}, executeTime); + assertEquals(TimeUnit.MILLISECONDS.toNanos(10), scheduler.nanosUntilNextTask(nanoTime)); + assertEquals(TimeUnit.MILLISECONDS.toNanos(5), scheduler.nanosUntilNextTask(nanoTime + TimeUnit.MILLISECONDS.toNanos(5))); + } +} diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/nio/SSLChannelContext.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/nio/SSLChannelContext.java index c83bd16ca95e1..b5d5db2166c1f 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/nio/SSLChannelContext.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/nio/SSLChannelContext.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.security.transport.nio; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.nio.FlushOperation; import org.elasticsearch.nio.InboundChannelBuffer; @@ -16,6 +17,7 @@ import javax.net.ssl.SSLEngine; import java.io.IOException; +import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Predicate; @@ -28,7 +30,11 @@ */ public final class SSLChannelContext extends SocketChannelContext { + private static final long CLOSE_TIMEOUT_NANOS = new TimeValue(10, TimeUnit.SECONDS).nanos(); + private static final Runnable DEFAULT_TIMEOUT_CANCELLER = () -> {}; + private final SSLDriver sslDriver; + private Runnable closeTimeoutCanceller = DEFAULT_TIMEOUT_CANCELLER; SSLChannelContext(NioSocketChannel channel, NioSelector selector, Consumer exceptionHandler, SSLDriver sslDriver, ReadWriteHandler readWriteHandler, InboundChannelBuffer channelBuffer) { @@ -53,6 +59,8 @@ public void queueWriteOperation(WriteOperation writeOperation) { getSelector().assertOnSelectorThread(); if (writeOperation instanceof CloseNotifyOperation) { sslDriver.initiateClose(); + long relativeNanos = CLOSE_TIMEOUT_NANOS + System.nanoTime(); + closeTimeoutCanceller = getSelector().getTaskScheduler().scheduleAtRelativeTime(this::channelCloseTimeout, relativeNanos); } else { super.queueWriteOperation(writeOperation); } @@ -161,6 +169,7 @@ public void closeChannel() { public void closeFromSelector() throws IOException { getSelector().assertOnSelectorThread(); if (channel.isOpen()) { + closeTimeoutCanceller.run(); IOUtils.close(super::closeFromSelector, sslDriver::close); } } @@ -169,6 +178,12 @@ public SSLEngine getSSLEngine() { return sslDriver.getSSLEngine(); } + private void channelCloseTimeout() { + closeTimeoutCanceller = DEFAULT_TIMEOUT_CANCELLER; + setCloseNow(); + getSelector().queueChannelClose(channel); + } + private static class CloseNotifyOperation implements WriteOperation { private static final BiConsumer LISTENER = (v, t) -> {}; diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/nio/SSLChannelContextTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/nio/SSLChannelContextTests.java index 4fdfb196d034e..0870124022850 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/nio/SSLChannelContextTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/nio/SSLChannelContextTests.java @@ -12,6 +12,7 @@ import org.elasticsearch.nio.InboundChannelBuffer; import org.elasticsearch.nio.NioSelector; import org.elasticsearch.nio.NioSocketChannel; +import org.elasticsearch.nio.TaskScheduler; import org.elasticsearch.nio.WriteOperation; import org.elasticsearch.test.ESTestCase; import org.junit.Before; @@ -26,9 +27,11 @@ import java.util.function.Consumer; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.same; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -41,6 +44,7 @@ public class SSLChannelContextTests extends ESTestCase { private SSLChannelContext context; private InboundChannelBuffer channelBuffer; private NioSelector selector; + private TaskScheduler nioTimer; private BiConsumer listener; private Consumer exceptionHandler; private SSLDriver sslDriver; @@ -56,6 +60,7 @@ public void init() { messageLength = randomInt(96) + 20; selector = mock(NioSelector.class); + nioTimer = mock(TaskScheduler.class); listener = mock(BiConsumer.class); channel = mock(NioSocketChannel.class); rawChannel = mock(SocketChannel.class); @@ -66,6 +71,7 @@ public void init() { context = new SSLChannelContext(channel, selector, exceptionHandler, sslDriver, readWriteHandler, channelBuffer); when(selector.isOnCurrentThread()).thenReturn(true); + when(selector.getTaskScheduler()).thenReturn(nioTimer); when(sslDriver.getNetworkReadBuffer()).thenReturn(readBuffer); when(sslDriver.getNetworkWriteBuffer()).thenReturn(writeBuffer); ByteBuffer buffer = ByteBuffer.allocate(1 << 14); @@ -334,6 +340,44 @@ public void testReadyToCloseIfDriverIndicateClosed() { assertTrue(context.selectorShouldClose()); } + public void testCloseTimeout() { + context.closeChannel(); + + ArgumentCaptor captor = ArgumentCaptor.forClass(WriteOperation.class); + verify(selector).writeToChannel(captor.capture()); + + ArgumentCaptor taskCaptor = ArgumentCaptor.forClass(Runnable.class); + Runnable cancellable = mock(Runnable.class); + when(nioTimer.scheduleAtRelativeTime(taskCaptor.capture(), anyLong())).thenReturn(cancellable); + context.queueWriteOperation(captor.getValue()); + verify(nioTimer).scheduleAtRelativeTime(taskCaptor.capture(), anyLong()); + assertFalse(context.selectorShouldClose()); + taskCaptor.getValue().run(); + assertTrue(context.selectorShouldClose()); + verify(selector).queueChannelClose(channel); + verify(cancellable, never()).run(); + } + + @SuppressWarnings("unchecked") + public void testCloseTimeoutIsCancelledOnClose() throws IOException { + try (SocketChannel realChannel = SocketChannel.open()) { + when(channel.getRawChannel()).thenReturn(realChannel); + TestReadWriteHandler readWriteHandler = new TestReadWriteHandler(readConsumer); + context = new SSLChannelContext(channel, selector, exceptionHandler, sslDriver, readWriteHandler, channelBuffer); + context.closeChannel(); + ArgumentCaptor captor = ArgumentCaptor.forClass(WriteOperation.class); + verify(selector).writeToChannel(captor.capture()); + ArgumentCaptor taskCaptor = ArgumentCaptor.forClass(Runnable.class); + Runnable cancellable = mock(Runnable.class); + when(nioTimer.scheduleAtRelativeTime(taskCaptor.capture(), anyLong())).thenReturn(cancellable); + context.queueWriteOperation(captor.getValue()); + + when(channel.isOpen()).thenReturn(true); + context.closeFromSelector(); + verify(cancellable).run(); + } + } + public void testInitiateCloseFromDifferentThreadSchedulesCloseNotify() { when(selector.isOnCurrentThread()).thenReturn(false, true); context.closeChannel();