Skip to content

Commit a9d44d9

Browse files
Add segment sorter for data streams
It is beneficial to sort segments within a datastream's index by desc order of their max timestamp field, so that the most recent (in terms of timestamp) segments will be first. This allows to speed up sort query on @timestamp desc field, which is the most common type of query for datastreams, as we are mostly concerned with the recent data. This patch addressed this for writable indices. Segments' sorter is different from index sorting. An index sorter by itself is only concerned about the order of docs within an individual segment (and not how the segments are organized), while the segment sorter is only used during search and allows to start docs collection with the "right" segment, so we can terminate the collection faster. This PR adds a property to IndexShard `isDataStreamIndex` that shows if a shard is a part of datastream.
1 parent 97b9f72 commit a9d44d9

File tree

20 files changed

+124
-31
lines changed

20 files changed

+124
-31
lines changed

server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -650,7 +650,8 @@ public static final IndexShard newIndexShard(
650650
() -> {},
651651
RetentionLeaseSyncer.EMPTY,
652652
cbs,
653-
IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER);
653+
IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER,
654+
false);
654655
}
655656

656657
private static ShardRouting getInitializingShardRouting(ShardRouting existingShardRouting) {

server/src/internalClusterTest/java/org/elasticsearch/indices/IndexingMemoryControllerIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ EngineConfig engineConfigWithLargerIndexingMemory(EngineConfig config) {
5757
config.getQueryCachingPolicy(), config.getTranslogConfig(), config.getFlushMergesAfter(),
5858
config.getExternalRefreshListener(), config.getInternalRefreshListener(), config.getIndexSort(),
5959
config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier(), config.retentionLeasesSupplier(),
60-
config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier());
60+
config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier(), config.getLeafSorter());
6161
}
6262

6363
@Override

server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@
77
*/
88
package org.elasticsearch.cluster.metadata;
99

10+
import org.apache.lucene.document.LongPoint;
11+
import org.apache.lucene.index.LeafReader;
12+
import org.apache.lucene.index.PointValues;
1013
import org.elasticsearch.cluster.AbstractDiffable;
1114
import org.elasticsearch.cluster.Diff;
1215
import org.elasticsearch.core.Nullable;
@@ -25,6 +28,7 @@
2528
import java.util.ArrayList;
2629
import java.util.Collection;
2730
import java.util.Collections;
31+
import java.util.Comparator;
2832
import java.util.HashMap;
2933
import java.util.List;
3034
import java.util.Locale;
@@ -36,6 +40,23 @@ public final class DataStream extends AbstractDiffable<DataStream> implements To
3640

3741
public static final String BACKING_INDEX_PREFIX = ".ds-";
3842
public static final DateFormatter DATE_FORMATTER = DateFormatter.forPattern("uuuu.MM.dd");
43+
// Datastreams' leaf readers should be sorted by desc order of their timestamp field, as it allows search time optimizations
44+
public static Comparator<LeafReader> DATASTREAM_LEAF_READERS_SORTER =
45+
Comparator.comparingLong(
46+
(LeafReader r) -> {
47+
try {
48+
PointValues points = r.getPointValues(DataStream.TimestampField.FIXED_TIMESTAMP_FIELD);
49+
if (points != null) {
50+
byte[] sortValue = points.getMaxPackedValue();
51+
return LongPoint.decodeDimension(sortValue, 0);
52+
}
53+
} catch (IOException e) {
54+
assert false : "Datastream index segment doesn't contain an expected " +
55+
DataStream.TimestampField.FIXED_TIMESTAMP_FIELD + " field!";
56+
}
57+
// this should not happen, as all data stream segments must contain @timestamp field
58+
return Long.MAX_VALUE; })
59+
.reversed();
3960

4061
private final LongSupplier timeProvider;
4162
private final String name;

server/src/main/java/org/elasticsearch/index/IndexService.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -386,7 +386,8 @@ private long getAvgShardSizeInBytes() throws IOException {
386386
public synchronized IndexShard createShard(
387387
final ShardRouting routing,
388388
final Consumer<ShardId> globalCheckpointSyncer,
389-
final RetentionLeaseSyncer retentionLeaseSyncer) throws IOException {
389+
final RetentionLeaseSyncer retentionLeaseSyncer,
390+
final boolean isDataStreamIndex) throws IOException {
390391
Objects.requireNonNull(retentionLeaseSyncer);
391392
/*
392393
* TODO: we execute this in parallel but it's a synced method. Yet, we might
@@ -478,7 +479,8 @@ public synchronized IndexShard createShard(
478479
() -> globalCheckpointSyncer.accept(shardId),
479480
retentionLeaseSyncer,
480481
circuitBreakerService,
481-
snapshotCommitSupplier);
482+
snapshotCommitSupplier,
483+
isDataStreamIndex);
482484
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
483485
eventListener.afterIndexShardCreated(indexShard);
484486
shards = Maps.copyMapWithAddedEntry(shards, shardId.id(), indexShard);

server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
import org.apache.lucene.analysis.Analyzer;
1111
import org.apache.lucene.codecs.Codec;
12+
import org.apache.lucene.index.LeafReader;
1213
import org.apache.lucene.index.MergePolicy;
1314
import org.apache.lucene.search.QueryCache;
1415
import org.apache.lucene.search.QueryCachingPolicy;
@@ -32,6 +33,7 @@
3233
import org.elasticsearch.plugins.IndexStorePlugin;
3334
import org.elasticsearch.threadpool.ThreadPool;
3435

36+
import java.util.Comparator;
3537
import java.util.List;
3638
import java.util.Objects;
3739
import java.util.function.LongSupplier;
@@ -70,6 +72,7 @@ public final class EngineConfig {
7072
private final CircuitBreakerService circuitBreakerService;
7173
private final LongSupplier globalCheckpointSupplier;
7274
private final Supplier<RetentionLeases> retentionLeasesSupplier;
75+
private final Comparator<LeafReader> leafSorter;
7376

7477
/**
7578
* A supplier of the outstanding retention leases. This is used during merged operations to determine which operations that have been
@@ -131,7 +134,8 @@ public EngineConfig(
131134
LongSupplier globalCheckpointSupplier,
132135
Supplier<RetentionLeases> retentionLeasesSupplier,
133136
LongSupplier primaryTermSupplier,
134-
IndexStorePlugin.SnapshotCommitSupplier snapshotCommitSupplier) {
137+
IndexStorePlugin.SnapshotCommitSupplier snapshotCommitSupplier,
138+
Comparator<LeafReader> leafSorter) {
135139
this.shardId = shardId;
136140
this.indexSettings = indexSettings;
137141
this.threadPool = threadPool;
@@ -169,6 +173,7 @@ public EngineConfig(
169173
this.retentionLeasesSupplier = Objects.requireNonNull(retentionLeasesSupplier);
170174
this.primaryTermSupplier = primaryTermSupplier;
171175
this.snapshotCommitSupplier = snapshotCommitSupplier;
176+
this.leafSorter = leafSorter;
172177
}
173178

174179
/**
@@ -353,4 +358,12 @@ public LongSupplier getPrimaryTermSupplier() {
353358
public IndexStorePlugin.SnapshotCommitSupplier getSnapshotCommitSupplier() {
354359
return snapshotCommitSupplier;
355360
}
361+
362+
/**
363+
* Returns how segments should be sorted for reading or @null if no sorting should be applied.
364+
*/
365+
@Nullable
366+
public Comparator<LeafReader> getLeafSorter() {
367+
return leafSorter;
368+
}
356369
}

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2182,6 +2182,11 @@ private IndexWriterConfig getIndexWriterConfig() {
21822182
if (config().getIndexSort() != null) {
21832183
iwc.setIndexSort(config().getIndexSort());
21842184
}
2185+
// Provide a custom leaf sorter, so that index readers opened from this writer
2186+
// will have its leaves sorted according the given leaf sorter.
2187+
if (engineConfig.getLeafSorter() != null) {
2188+
iwc.setLeafSorter(engineConfig.getLeafSorter());
2189+
}
21852190
return iwc;
21862191
}
21872192

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,7 @@
185185
import java.util.stream.Collectors;
186186
import java.util.stream.StreamSupport;
187187

188+
import static org.elasticsearch.cluster.metadata.DataStream.DATASTREAM_LEAF_READERS_SORTER;
188189
import static org.elasticsearch.index.seqno.RetentionLeaseActions.RETAIN_ALL;
189190
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
190191

@@ -283,6 +284,7 @@ Runnable getGlobalCheckpointSyncer() {
283284
private final AtomicReference<Translog.Location> pendingRefreshLocation = new AtomicReference<>();
284285
private final RefreshPendingLocationListener refreshPendingLocationListener;
285286
private volatile boolean useRetentionLeasesInPeerRecovery;
287+
private final boolean isDataStreamIndex; // if a shard is a part of data stream
286288

287289
public IndexShard(
288290
final ShardRouting shardRouting,
@@ -304,7 +306,8 @@ public IndexShard(
304306
final Runnable globalCheckpointSyncer,
305307
final RetentionLeaseSyncer retentionLeaseSyncer,
306308
final CircuitBreakerService circuitBreakerService,
307-
final IndexStorePlugin.SnapshotCommitSupplier snapshotCommitSupplier) throws IOException {
309+
final IndexStorePlugin.SnapshotCommitSupplier snapshotCommitSupplier,
310+
boolean isDataStreamIndex) throws IOException {
308311
super(shardRouting.shardId(), indexSettings);
309312
assert shardRouting.initializing();
310313
this.shardRouting = shardRouting;
@@ -387,6 +390,7 @@ public boolean shouldCache(Query query) {
387390
persistMetadata(path, indexSettings, shardRouting, null, logger);
388391
this.useRetentionLeasesInPeerRecovery = replicationTracker.hasAllPeerRecoveryRetentionLeases();
389392
this.refreshPendingLocationListener = new RefreshPendingLocationListener();
393+
this.isDataStreamIndex = isDataStreamIndex;
390394
}
391395

392396
public ThreadPool getThreadPool() {
@@ -2912,7 +2916,8 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) {
29122916
globalCheckpointSupplier,
29132917
replicationTracker::getRetentionLeases,
29142918
this::getOperationPrimaryTerm,
2915-
snapshotCommitSupplier);
2919+
snapshotCommitSupplier,
2920+
isDataStreamIndex ? DATASTREAM_LEAF_READERS_SORTER : null);
29162921
}
29172922

29182923
/**

server/src/main/java/org/elasticsearch/indices/IndicesService.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -767,13 +767,14 @@ public IndexShard createShard(
767767
final Consumer<ShardId> globalCheckpointSyncer,
768768
final RetentionLeaseSyncer retentionLeaseSyncer,
769769
final DiscoveryNode targetNode,
770-
final DiscoveryNode sourceNode) throws IOException {
770+
final DiscoveryNode sourceNode,
771+
final boolean isDataStreamIndex) throws IOException {
771772
Objects.requireNonNull(retentionLeaseSyncer);
772773
ensureChangesAllowed();
773774
IndexService indexService = indexService(shardRouting.index());
774775
assert indexService != null;
775776
RecoveryState recoveryState = indexService.createRecoveryState(shardRouting, targetNode, sourceNode);
776-
IndexShard indexShard = indexService.createShard(shardRouting, globalCheckpointSyncer, retentionLeaseSyncer);
777+
IndexShard indexShard = indexService.createShard(shardRouting, globalCheckpointSyncer, retentionLeaseSyncer, isDataStreamIndex);
777778
indexShard.addShardFailureCallback(onShardFailure);
778779
indexShard.startRecovery(recoveryState, recoveryTargetService, recoveryListener, repositoriesService,
779780
mapping -> {

server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.elasticsearch.cluster.ClusterState;
2121
import org.elasticsearch.cluster.ClusterStateApplier;
2222
import org.elasticsearch.cluster.action.shard.ShardStateAction;
23+
import org.elasticsearch.cluster.metadata.IndexAbstraction;
2324
import org.elasticsearch.cluster.metadata.IndexMetadata;
2425
import org.elasticsearch.cluster.node.DiscoveryNode;
2526
import org.elasticsearch.cluster.node.DiscoveryNodes;
@@ -576,6 +577,8 @@ private void createShard(DiscoveryNodes nodes, RoutingTable routingTable, ShardR
576577
try {
577578
final long primaryTerm = state.metadata().index(shardRouting.index()).primaryTerm(shardRouting.id());
578579
logger.debug("{} creating shard with primary term [{}]", shardRouting.shardId(), primaryTerm);
580+
IndexAbstraction indexAbstraction = state.metadata().getIndicesLookup().get(shardRouting.getIndexName());
581+
final boolean isDataStreamIndex = (indexAbstraction != null) && (indexAbstraction.getParentDataStream() != null);
579582
indicesService.createShard(
580583
shardRouting,
581584
recoveryTargetService,
@@ -585,7 +588,8 @@ private void createShard(DiscoveryNodes nodes, RoutingTable routingTable, ShardR
585588
this::updateGlobalCheckpointForShard,
586589
retentionLeaseSyncer,
587590
nodes.getLocalNode(),
588-
sourceNode);
591+
sourceNode,
592+
isDataStreamIndex);
589593
} catch (Exception e) {
590594
failAndRemoveShard(shardRouting, true, "failed to create shard", e, state);
591595
}
@@ -900,6 +904,7 @@ U createIndex(IndexMetadata indexMetadata,
900904
* @param retentionLeaseSyncer a callback when this shard syncs retention leases
901905
* @param targetNode the node where this shard will be recovered
902906
* @param sourceNode the source node to recover this shard from (it might be null)
907+
* @param isDataStreamIndex true if an shard belongs to an index that is a part of a data stream.
903908
* @return a new shard
904909
* @throws IOException if an I/O exception occurs when creating the shard
905910
*/
@@ -912,7 +917,8 @@ T createShard(
912917
Consumer<ShardId> globalCheckpointSyncer,
913918
RetentionLeaseSyncer retentionLeaseSyncer,
914919
DiscoveryNode targetNode,
915-
@Nullable DiscoveryNode sourceNode) throws IOException;
920+
@Nullable DiscoveryNode sourceNode,
921+
boolean isDataStreamIndex) throws IOException;
916922

917923
/**
918924
* Returns shard for the specified id if it exists otherwise returns <code>null</code>.

server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2946,7 +2946,8 @@ public void testRecoverFromForeignTranslog() throws IOException {
29462946
() -> UNASSIGNED_SEQ_NO,
29472947
() -> RetentionLeases.EMPTY,
29482948
primaryTerm::get,
2949-
IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER);
2949+
IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER,
2950+
null);
29502951
expectThrows(EngineCreationFailureException.class, () -> new InternalEngine(brokenConfig));
29512952

29522953
engine = createEngine(store, primaryTranslogDir); // and recover again!
@@ -6020,7 +6021,7 @@ public void testNotWarmUpSearcherInEngineCtor() throws Exception {
60206021
config.getQueryCachingPolicy(), translogConfig, config.getFlushMergesAfter(),
60216022
config.getExternalRefreshListener(), config.getInternalRefreshListener(), config.getIndexSort(),
60226023
config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier(), config.retentionLeasesSupplier(),
6023-
config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier());
6024+
config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier(), config.getLeafSorter());
60246025
try (InternalEngine engine = createEngine(configWithWarmer)) {
60256026
assertThat(warmedUpReaders, empty());
60266027
assertThat(expectThrows(Throwable.class, () -> engine.acquireSearcher("test")).getMessage(),

0 commit comments

Comments
 (0)