Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -150,11 +150,11 @@ protected void writeException(SocketChannelContext context, Exception exception)
}

/**
* This method is called when a listener attached to a channel operation throws an exception.
* This method is called when a task or listener attached to a channel operation throws an exception.
*
* @param exception that occurred
*/
protected void listenerException(Exception exception) {
protected void taskException(Exception exception) {
exceptionHandler.accept(exception);
}

Expand Down
41 changes: 35 additions & 6 deletions libs/nio/src/main/java/org/elasticsearch/nio/NioSelector.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
Expand All @@ -54,6 +55,7 @@ public class NioSelector implements Closeable {
private final Selector selector;
private final ByteBuffer ioBuffer;

private final TaskScheduler taskScheduler = new TaskScheduler();
private final ReentrantLock runLock = new ReentrantLock();
private final CountDownLatch exitedLoop = new CountDownLatch(1);
private final AtomicBoolean isClosed = new AtomicBoolean(false);
Expand Down Expand Up @@ -81,6 +83,10 @@ public ByteBuffer getIoBuffer() {
return ioBuffer;
}

public TaskScheduler getTaskScheduler() {
return taskScheduler;
}

public Selector rawSelector() {
return selector;
}
Expand Down Expand Up @@ -145,8 +151,16 @@ void singleLoop() {
try {
closePendingChannels();
preSelect();

int ready = selector.select(300);
long nanosUntilNextTask = taskScheduler.nanosUntilNextTask(System.nanoTime());
int ready;
if (nanosUntilNextTask == 0) {
ready = selector.selectNow();
} else {
long millisUntilNextTask = TimeUnit.NANOSECONDS.toMillis(nanosUntilNextTask);
// Only select until the next task needs to be run. Do not select with a value of 0 because
// that blocks without a timeout.
ready = selector.select(Math.min(300, Math.max(millisUntilNextTask, 1)));
}
if (ready > 0) {
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
Expand All @@ -164,6 +178,8 @@ void singleLoop() {
}
}
}

handleScheduledTasks(System.nanoTime());
} catch (ClosedSelectorException e) {
if (isOpen()) {
throw e;
Expand Down Expand Up @@ -245,6 +261,17 @@ void preSelect() {
handleQueuedWrites();
}

private void handleScheduledTasks(long nanoTime) {
Runnable task;
while ((task = taskScheduler.pollTask(nanoTime)) != null) {
try {
task.run();
} catch (Exception e) {
eventHandler.taskException(e);
}
}
}

/**
* Queues a write operation to be handled by the event loop. This can be called by any thread and is the
* api available for non-selector threads to schedule writes.
Expand All @@ -267,8 +294,10 @@ public void queueChannelClose(NioChannel channel) {
ChannelContext<?> context = channel.getContext();
assert context.getSelector() == this : "Must schedule a channel for closure with its selector";
channelsToClose.offer(context);
ensureSelectorOpenForEnqueuing(channelsToClose, context);
wakeup();
if (isOnCurrentThread() == false) {
ensureSelectorOpenForEnqueuing(channelsToClose, context);
wakeup();
}
}

/**
Expand Down Expand Up @@ -324,7 +353,7 @@ public <V> void executeListener(BiConsumer<V, Exception> listener, V value) {
try {
listener.accept(value, null);
} catch (Exception e) {
eventHandler.listenerException(e);
eventHandler.taskException(e);
}
}

Expand All @@ -340,7 +369,7 @@ public <V> void executeFailedListener(BiConsumer<V, Exception> listener, Excepti
try {
listener.accept(null, exception);
} catch (Exception e) {
eventHandler.listenerException(e);
eventHandler.taskException(e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,9 @@ protected boolean closeNow() {
return closeNow;
}

protected void setCloseNow() {
closeNow = true;
}

// When you read or write to a nio socket in java, the heap memory passed down must be copied to/from
// direct memory. The JVM internally does some buffering of the direct memory, however we can save space
Expand Down
92 changes: 92 additions & 0 deletions libs/nio/src/main/java/org/elasticsearch/nio/TaskScheduler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.nio;

import java.util.Comparator;
import java.util.PriorityQueue;

/**
* A basic priority queue backed timer service. The service is thread local and should only be used by a
* single nio selector event loop thread.
*/
public class TaskScheduler {

private final PriorityQueue<DelayedTask> tasks = new PriorityQueue<>(Comparator.comparingLong(DelayedTask::getDeadline));

/**
* Schedule a task at the defined relative nanotime. When {@link #pollTask(long)} is called with a
* relative nanotime after the scheduled time, the task will be returned. This method returns a
* {@link Runnable} that can be run to cancel the scheduled task.
*
* @param task to schedule
* @param relativeNanos defining when to execute the task
* @return runnable that will cancel the task
*/
public Runnable scheduleAtRelativeTime(Runnable task, long relativeNanos) {
DelayedTask delayedTask = new DelayedTask(relativeNanos, task);
tasks.offer(delayedTask);
return delayedTask;
}

Runnable pollTask(long relativeNanos) {
DelayedTask task;
while ((task = tasks.peek()) != null) {
if (relativeNanos - task.deadline >= 0) {
tasks.remove();
if (task.cancelled == false) {
return task.runnable;
}
} else {
return null;
}
}
return null;
}

long nanosUntilNextTask(long relativeNanos) {
DelayedTask nextTask = tasks.peek();
if (nextTask == null) {
return Long.MAX_VALUE;
} else {
return Math.max(nextTask.deadline - relativeNanos, 0);
}
}

private static class DelayedTask implements Runnable {

private final long deadline;
private final Runnable runnable;
private boolean cancelled = false;

private DelayedTask(long deadline, Runnable runnable) {
this.deadline = deadline;
this.runnable = runnable;
}

private long getDeadline() {
return deadline;
}

@Override
public void run() {
cancelled = true;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ public void testPostHandlingWillRemoveWriteIfNecessary() throws IOException {

public void testListenerExceptionCallsGenericExceptionHandler() throws IOException {
RuntimeException listenerException = new RuntimeException();
handler.listenerException(listenerException);
handler.taskException(listenerException);
verify(genericExceptionHandler).accept(listenerException);
}

Expand Down
41 changes: 39 additions & 2 deletions libs/nio/src/test/java/org/elasticsearch/nio/NioSelectorTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@

package org.elasticsearch.nio;

import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.ESTestCase;
import org.junit.Before;
import org.mockito.ArgumentCaptor;

import java.io.IOException;
import java.nio.ByteBuffer;
Expand All @@ -31,6 +33,8 @@
import java.nio.channels.Selector;
import java.util.Collections;
import java.util.HashSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;

import static org.mockito.Matchers.any;
Expand Down Expand Up @@ -98,6 +102,39 @@ public void testQueueChannelForClosed() throws IOException {
verify(eventHandler).handleClose(context);
}

public void testNioDelayedTasksAreExecuted() throws IOException {
AtomicBoolean isRun = new AtomicBoolean(false);
long nanoTime = System.nanoTime() - 1;
selector.getTaskScheduler().scheduleAtRelativeTime(() -> isRun.set(true), nanoTime);

assertFalse(isRun.get());
selector.singleLoop();
verify(rawSelector).selectNow();
assertTrue(isRun.get());
}

public void testDefaultSelectorTimeoutIsUsedIfNoTaskSooner() throws IOException {
long delay = new TimeValue(15, TimeUnit.MINUTES).nanos();
selector.getTaskScheduler().scheduleAtRelativeTime(() -> {}, System.nanoTime() + delay);

selector.singleLoop();
verify(rawSelector).select(300);
}

public void testSelectorTimeoutWillBeReducedIfTaskSooner() throws Exception {
// As this is a timing based test, we must assertBusy in the very small chance that the loop is
// delayed for 50 milliseconds (causing a selectNow())
assertBusy(() -> {
ArgumentCaptor<Long> captor = ArgumentCaptor.forClass(Long.class);
long delay = new TimeValue(50, TimeUnit.MILLISECONDS).nanos();
selector.getTaskScheduler().scheduleAtRelativeTime(() -> {}, System.nanoTime() + delay);
selector.singleLoop();
verify(rawSelector).select(captor.capture());
assertTrue(captor.getValue() > 0);
assertTrue(captor.getValue() < 300);
});
}

public void testSelectorClosedExceptionIsNotCaughtWhileRunning() throws IOException {
boolean closedSelectorExceptionCaught = false;
when(rawSelector.select(anyInt())).thenThrow(new ClosedSelectorException());
Expand Down Expand Up @@ -425,7 +462,7 @@ public void testExecuteListenerWillHandleException() throws Exception {

selector.executeListener(listener, null);

verify(eventHandler).listenerException(exception);
verify(eventHandler).taskException(exception);
}

public void testExecuteFailedListenerWillHandleException() throws Exception {
Expand All @@ -435,6 +472,6 @@ public void testExecuteFailedListenerWillHandleException() throws Exception {

selector.executeFailedListener(listener, ioException);

verify(eventHandler).listenerException(exception);
verify(eventHandler).taskException(exception);
}
}
Loading