Skip to content

Commit 700357d

Browse files
authored
Immediately flush channel after writing to buffer (elastic#31301)
This is related to elastic#27260. Currently when we queue a write with a channel we set OP_WRITE and wait until the next selection loop to flush the write. However, if the channel does not have a pending write, it is probably ready to flush. This PR implements an optimistic flush logic that will attempt this flush.
1 parent 509729f commit 700357d

File tree

8 files changed

+54
-30
lines changed

8 files changed

+54
-30
lines changed

libs/nio/src/main/java/org/elasticsearch/nio/ChannelContext.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ protected void setSelectionKey(SelectionKey selectionKey) {
6666
* @throws IOException during channel / context close
6767
*/
6868
public void closeFromSelector() throws IOException {
69-
if (closeContext.isDone() == false) {
69+
if (isOpen()) {
7070
try {
7171
rawChannel.close();
7272
closeContext.complete(null);

libs/nio/src/main/java/org/elasticsearch/nio/EventHandler.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -159,8 +159,7 @@ protected void listenerException(Exception exception) {
159159
}
160160

161161
/**
162-
* This method is called after ready events (READ, ACCEPT, WRITE, CONNECT) have been handled for a
163-
* channel.
162+
* This method is called after events (READ, WRITE, CONNECT) have been handled for a channel.
164163
*
165164
* @param context that was handled
166165
*/

libs/nio/src/main/java/org/elasticsearch/nio/NioSelector.java

Lines changed: 24 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,6 @@
4343
* {@link #runLoop()}, the selector will run until {@link #close()} is called. This instance handles closing
4444
* of channels. Users should call {@link #queueChannelClose(NioChannel)} to schedule a channel for close by
4545
* this selector.
46-
* <p>
47-
* Children of this class should implement the specific {@link #processKey(SelectionKey)},
48-
* {@link #preSelect()}, and {@link #cleanup()} functionality.
4946
*/
5047
public class NioSelector implements Closeable {
5148

@@ -65,7 +62,7 @@ public NioSelector(EventHandler eventHandler) throws IOException {
6562
this(eventHandler, Selector.open());
6663
}
6764

68-
public NioSelector(EventHandler eventHandler, Selector selector) throws IOException {
65+
public NioSelector(EventHandler eventHandler, Selector selector) {
6966
this.selector = selector;
7067
this.eventHandler = eventHandler;
7168
}
@@ -165,7 +162,7 @@ void singleLoop() {
165162
}
166163

167164
void cleanupAndCloseChannels() {
168-
cleanup();
165+
cleanupPendingWrites();
169166
channelsToClose.addAll(channelsToRegister);
170167
channelsToRegister.clear();
171168
channelsToClose.addAll(selector.keys().stream().map(sk -> (ChannelContext<?>) sk.attachment()).collect(Collectors.toList()));
@@ -234,16 +231,6 @@ void preSelect() {
234231
handleQueuedWrites();
235232
}
236233

237-
/**
238-
* Called once as the selector is being closed.
239-
*/
240-
void cleanup() {
241-
WriteOperation op;
242-
while ((op = queuedWrites.poll()) != null) {
243-
executeFailedListener(op.getListener(), new ClosedSelectorException());
244-
}
245-
}
246-
247234
/**
248235
* Queues a write operation to be handled by the event loop. This can be called by any thread and is the
249236
* api available for non-selector threads to schedule writes.
@@ -284,20 +271,31 @@ public void scheduleForRegistration(NioChannel channel) {
284271
}
285272

286273
/**
287-
* Queues a write operation directly in a channel's buffer. Channel buffers are only safe to be accessed
288-
* by the selector thread. As a result, this method should only be called by the selector thread.
274+
* Queues a write operation directly in a channel's buffer. If this channel does not have pending writes
275+
* already, the channel will be flushed. Channel buffers are only safe to be accessed by the selector
276+
* thread. As a result, this method should only be called by the selector thread. If this channel does
277+
* not have pending writes already, the channel will be flushed.
289278
*
290279
* @param writeOperation to be queued in a channel's buffer
291280
*/
292-
public void queueWriteInChannelBuffer(WriteOperation writeOperation) {
281+
public void writeToChannel(WriteOperation writeOperation) {
293282
assertOnSelectorThread();
294283
SocketChannelContext context = writeOperation.getChannel();
284+
// If the channel does not currently have anything that is ready to flush, we should flush after
285+
// the write operation is queued.
286+
boolean shouldFlushAfterQueuing = context.readyForFlush() == false;
295287
try {
296288
SelectionKeyUtils.setWriteInterested(context.getSelectionKey());
297289
context.queueWriteOperation(writeOperation);
298290
} catch (Exception e) {
291+
shouldFlushAfterQueuing = false;
299292
executeFailedListener(writeOperation.getListener(), e);
300293
}
294+
295+
if (shouldFlushAfterQueuing) {
296+
handleWrite(context);
297+
eventHandler.postHandling(context);
298+
}
301299
}
302300

303301
/**
@@ -332,6 +330,13 @@ public <V> void executeFailedListener(BiConsumer<V, Exception> listener, Excepti
332330
}
333331
}
334332

333+
private void cleanupPendingWrites() {
334+
WriteOperation op;
335+
while ((op = queuedWrites.poll()) != null) {
336+
executeFailedListener(op.getListener(), new ClosedSelectorException());
337+
}
338+
}
339+
335340
private void wakeup() {
336341
// TODO: Do we need the wakeup optimizations that some other libraries use?
337342
selector.wakeup();
@@ -394,7 +399,7 @@ private void handleQueuedWrites() {
394399
WriteOperation writeOperation;
395400
while ((writeOperation = queuedWrites.poll()) != null) {
396401
if (writeOperation.getChannel().isOpen()) {
397-
queueWriteInChannelBuffer(writeOperation);
402+
writeToChannel(writeOperation);
398403
} else {
399404
executeFailedListener(writeOperation.getListener(), new ClosedChannelException());
400405
}

libs/nio/src/main/java/org/elasticsearch/nio/SocketChannelContext.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ public void sendMessage(Object message, BiConsumer<Void, Exception> listener) {
135135
return;
136136
}
137137

138-
selector.queueWriteInChannelBuffer(writeOperation);
138+
selector.writeToChannel(writeOperation);
139139
}
140140

141141
public void queueWriteOperation(WriteOperation writeOperation) {
@@ -164,7 +164,7 @@ protected FlushOperation getPendingFlush() {
164164
@Override
165165
public void closeFromSelector() throws IOException {
166166
getSelector().assertOnSelectorThread();
167-
if (channel.isOpen()) {
167+
if (isOpen()) {
168168
ArrayList<IOException> closingExceptions = new ArrayList<>(3);
169169
try {
170170
super.closeFromSelector();

libs/nio/src/test/java/org/elasticsearch/nio/NioSelectorTests.java

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -262,11 +262,28 @@ public void testQueueWriteSuccessful() throws Exception {
262262
public void testQueueDirectlyInChannelBufferSuccessful() throws Exception {
263263
WriteOperation writeOperation = new FlushReadyWrite(channelContext, buffers, listener);
264264

265-
assertTrue((selectionKey.interestOps() & SelectionKey.OP_WRITE) == 0);
265+
assertEquals(0, (selectionKey.interestOps() & SelectionKey.OP_WRITE));
266266

267-
selector.queueWriteInChannelBuffer(writeOperation);
267+
when(channelContext.readyForFlush()).thenReturn(true);
268+
selector.writeToChannel(writeOperation);
268269

269270
verify(channelContext).queueWriteOperation(writeOperation);
271+
verify(eventHandler, times(0)).handleWrite(channelContext);
272+
verify(eventHandler, times(0)).postHandling(channelContext);
273+
assertTrue((selectionKey.interestOps() & SelectionKey.OP_WRITE) != 0);
274+
}
275+
276+
public void testShouldFlushIfNoPendingFlushes() throws Exception {
277+
WriteOperation writeOperation = new FlushReadyWrite(channelContext, buffers, listener);
278+
279+
assertEquals(0, (selectionKey.interestOps() & SelectionKey.OP_WRITE));
280+
281+
when(channelContext.readyForFlush()).thenReturn(false);
282+
selector.writeToChannel(writeOperation);
283+
284+
verify(channelContext).queueWriteOperation(writeOperation);
285+
verify(eventHandler).handleWrite(channelContext);
286+
verify(eventHandler).postHandling(channelContext);
270287
assertTrue((selectionKey.interestOps() & SelectionKey.OP_WRITE) != 0);
271288
}
272289

@@ -277,10 +294,13 @@ public void testQueueDirectlyInChannelBufferSelectionKeyThrowsException() throws
277294
CancelledKeyException cancelledKeyException = new CancelledKeyException();
278295

279296
when(channelContext.getSelectionKey()).thenReturn(selectionKey);
297+
when(channelContext.readyForFlush()).thenReturn(false);
280298
when(selectionKey.interestOps(anyInt())).thenThrow(cancelledKeyException);
281-
selector.queueWriteInChannelBuffer(writeOperation);
299+
selector.writeToChannel(writeOperation);
282300

283301
verify(channelContext, times(0)).queueWriteOperation(writeOperation);
302+
verify(eventHandler, times(0)).handleWrite(channelContext);
303+
verify(eventHandler, times(0)).postHandling(channelContext);
284304
verify(listener).accept(null, cancelledKeyException);
285305
}
286306

libs/nio/src/test/java/org/elasticsearch/nio/SocketChannelContextTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ public void testSendMessageFromSameThreadIsQueuedInChannel() {
170170
when(readWriteHandler.createWriteOperation(context, buffers, listener)).thenReturn(writeOperation);
171171
context.sendMessage(buffers, listener);
172172

173-
verify(selector).queueWriteInChannelBuffer(writeOpCaptor.capture());
173+
verify(selector).writeToChannel(writeOpCaptor.capture());
174174
WriteOperation writeOp = writeOpCaptor.getValue();
175175

176176
assertSame(writeOperation, writeOp);

x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/nio/SSLChannelContext.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ public void closeChannel() {
145145
selector.queueWrite(writeOperation);
146146
return;
147147
}
148-
selector.queueWriteInChannelBuffer(writeOperation);
148+
selector.writeToChannel(writeOperation);
149149
}
150150
}
151151

x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/nio/SSLChannelContextTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -345,7 +345,7 @@ public void testInitiateCloseFromSameThreadSchedulesCloseNotify() {
345345
context.closeChannel();
346346

347347
ArgumentCaptor<WriteOperation> captor = ArgumentCaptor.forClass(WriteOperation.class);
348-
verify(selector).queueWriteInChannelBuffer(captor.capture());
348+
verify(selector).writeToChannel(captor.capture());
349349

350350
context.queueWriteOperation(captor.getValue());
351351
verify(sslDriver).initiateClose();

0 commit comments

Comments
 (0)