Skip to content

Commit 64bbbae

Browse files
authored
Do not block Translog add on file write (#63374)
Currently a TranslogWriter add operation is synchronized. This operation adds the bytes to the file output stream buffer and issues a write system call if the buffer is filled. This happens every 8KB which means that we routinely block other add calls on system writes. This commit modifies the add operation to simply place the operation in an array list. The array list if flushed when the sync call occurs or when 1MB is buffered.
1 parent f2ba62b commit 64bbbae

File tree

7 files changed

+460
-122
lines changed

7 files changed

+460
-122
lines changed
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.common.io;
21+
22+
import org.elasticsearch.common.unit.ByteSizeValue;
23+
import org.elasticsearch.threadpool.ThreadPool;
24+
25+
import java.nio.ByteBuffer;
26+
import java.util.Arrays;
27+
28+
public class DiskIoBufferPool {
29+
30+
public static final int BUFFER_SIZE = StrictMath.toIntExact(ByteSizeValue.parseBytesSizeValue(
31+
System.getProperty("es.disk_io.direct.buffer.size", "64KB"), "es.disk_io.direct.buffer.size").getBytes());
32+
public static final int HEAP_BUFFER_SIZE = 8 * 1024;
33+
34+
private static final ThreadLocal<ByteBuffer> ioBufferPool = ThreadLocal.withInitial(() -> {
35+
if (isWriteOrFlushThread()) {
36+
return ByteBuffer.allocateDirect(BUFFER_SIZE);
37+
} else {
38+
return ByteBuffer.allocate(HEAP_BUFFER_SIZE);
39+
}
40+
});
41+
42+
public static ByteBuffer getIoBuffer() {
43+
ByteBuffer ioBuffer = ioBufferPool.get();
44+
ioBuffer.clear();
45+
return ioBuffer;
46+
}
47+
48+
private static boolean isWriteOrFlushThread() {
49+
String threadName = Thread.currentThread().getName();
50+
for (String s : Arrays.asList(
51+
"[" + ThreadPool.Names.WRITE + "]",
52+
"[" + ThreadPool.Names.FLUSH + "]",
53+
"[" + ThreadPool.Names.SYSTEM_WRITE + "]")) {
54+
if (threadName.contains(s)) {
55+
return true;
56+
}
57+
}
58+
return false;
59+
}
60+
}

server/src/main/java/org/elasticsearch/common/util/concurrent/ReleasableLock.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,19 @@ public ReleasableLock acquire() throws EngineException {
5858
return this;
5959
}
6060

61+
/**
62+
* Try acquiring lock, returning null if unable.
63+
*/
64+
public ReleasableLock tryAcquire() {
65+
boolean locked = lock.tryLock();
66+
if (locked) {
67+
assert addCurrentThread();
68+
return this;
69+
} else {
70+
return null;
71+
}
72+
}
73+
6174
/**
6275
* Try acquiring lock, returning null if unable to acquire lock within timeout.
6376
*/

server/src/main/java/org/elasticsearch/index/translog/Translog.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,13 @@
2828
import org.elasticsearch.common.UUIDs;
2929
import org.elasticsearch.common.bytes.BytesArray;
3030
import org.elasticsearch.common.bytes.BytesReference;
31+
import org.elasticsearch.common.bytes.ReleasableBytesReference;
3132
import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
3233
import org.elasticsearch.common.io.stream.StreamInput;
3334
import org.elasticsearch.common.io.stream.StreamOutput;
3435
import org.elasticsearch.common.lease.Releasable;
3536
import org.elasticsearch.common.lease.Releasables;
3637
import org.elasticsearch.common.lucene.uid.Versions;
37-
import org.elasticsearch.common.unit.ByteSizeValue;
3838
import org.elasticsearch.common.util.BigArrays;
3939
import org.elasticsearch.common.util.concurrent.ReleasableLock;
4040
import org.elasticsearch.core.internal.io.IOUtils;
@@ -521,6 +521,7 @@ TranslogWriter createWriter(long fileGeneration, long initialMinTranslogGen, lon
521521
*/
522522
public Location add(final Operation operation) throws IOException {
523523
final ReleasableBytesStreamOutput out = new ReleasableBytesStreamOutput(bigArrays);
524+
boolean successfullySerialized = false;
524525
try {
525526
final long start = out.position();
526527
out.skip(Integer.BYTES);
@@ -530,8 +531,9 @@ public Location add(final Operation operation) throws IOException {
530531
out.seek(start);
531532
out.writeInt(operationSize);
532533
out.seek(end);
533-
final BytesReference bytes = out.bytes();
534-
try (ReleasableLock ignored = readLock.acquire()) {
534+
successfullySerialized = true;
535+
try (ReleasableBytesReference bytes = new ReleasableBytesReference(out.bytes(), out);
536+
ReleasableLock ignored = readLock.acquire()) {
535537
ensureOpen();
536538
if (operation.primaryTerm() > current.getPrimaryTerm()) {
537539
assert false :
@@ -549,7 +551,9 @@ public Location add(final Operation operation) throws IOException {
549551
closeOnTragicEvent(ex);
550552
throw new TranslogException(shardId, "Failed to write operation [" + operation + "]", ex);
551553
} finally {
552-
Releasables.close(out);
554+
if (successfullySerialized == false) {
555+
Releasables.close(out);
556+
}
553557
}
554558
}
555559

@@ -1907,7 +1911,7 @@ public static String createEmptyTranslog(final Path location,
19071911
Checkpoint.write(channelFactory, checkpointFile, checkpoint, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW);
19081912
IOUtils.fsync(checkpointFile, false);
19091913
final TranslogWriter writer = TranslogWriter.create(shardId, uuid, generation, translogFile, channelFactory,
1910-
new ByteSizeValue(10), minTranslogGeneration, initialGlobalCheckpoint,
1914+
TranslogConfig.DEFAULT_BUFFER_SIZE, minTranslogGeneration, initialGlobalCheckpoint,
19111915
() -> {
19121916
throw new UnsupportedOperationException();
19131917
}, () -> {

server/src/main/java/org/elasticsearch/index/translog/TranslogConfig.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
*/
3535
public final class TranslogConfig {
3636

37-
public static final ByteSizeValue DEFAULT_BUFFER_SIZE = new ByteSizeValue(8, ByteSizeUnit.KB);
37+
public static final ByteSizeValue DEFAULT_BUFFER_SIZE = new ByteSizeValue(1, ByteSizeUnit.MB);
3838
private final BigArrays bigArrays;
3939
private final IndexSettings indexSettings;
4040
private final ShardId shardId;

0 commit comments

Comments
 (0)