Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,19 @@ public BitsetFilterCache(IndexSettings indexSettings, Listener listener) {
this.listener = listener;
}

public static BitSet bitsetFromQuery(Query query, LeafReaderContext context) throws IOException {
final IndexReaderContext topLevelContext = ReaderUtil.getTopLevelContext(context);
final IndexSearcher searcher = new IndexSearcher(topLevelContext);
searcher.setQueryCache(null);
final Weight weight = searcher.createWeight(searcher.rewrite(query), ScoreMode.COMPLETE_NO_SCORES, 1f);
Scorer s = weight.scorer(context);
if (s == null) {
return null;
} else {
return BitSet.of(s.iterator(), context.reader().maxDoc());
}
}

public IndexWarmer.Listener createListener(ThreadPool threadPool) {
return new BitSetProducerWarmer(threadPool);
}
Expand All @@ -115,7 +128,7 @@ public void clear(String reason) {
loadedFilters.invalidateAll();
}

private BitSet getAndLoadIfNotPresent(final Query query, final LeafReaderContext context) throws IOException, ExecutionException {
private BitSet getAndLoadIfNotPresent(final Query query, final LeafReaderContext context) throws ExecutionException {
final IndexReader.CacheHelper cacheHelper = context.reader().getCoreCacheHelper();
if (cacheHelper == null) {
throw new IllegalArgumentException("Reader " + context.reader() + " does not support caching");
Expand All @@ -133,18 +146,7 @@ private BitSet getAndLoadIfNotPresent(final Query query, final LeafReaderContext
});

return filterToFbs.computeIfAbsent(query, key -> {
final IndexReaderContext topLevelContext = ReaderUtil.getTopLevelContext(context);
final IndexSearcher searcher = new IndexSearcher(topLevelContext);
searcher.setQueryCache(null);
final Weight weight = searcher.createWeight(searcher.rewrite(query), ScoreMode.COMPLETE_NO_SCORES, 1f);
Scorer s = weight.scorer(context);
final BitSet bitSet;
if (s == null) {
bitSet = null;
} else {
bitSet = BitSet.of(s.iterator(), context.reader().maxDoc());
}

final BitSet bitSet = bitsetFromQuery(query, context);
Value value = new Value(bitSet, shardId);
listener.onCache(shardId, value.bitset);
return value;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,19 @@
package org.elasticsearch.index.shard;

import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.index.IndexSettings;

public abstract class AbstractIndexShardComponent implements IndexShardComponent {

protected final Logger logger;
protected final DeprecationLogger deprecationLogger;
protected final ShardId shardId;
protected final IndexSettings indexSettings;

protected AbstractIndexShardComponent(ShardId shardId, IndexSettings indexSettings) {
this.shardId = shardId;
this.indexSettings = indexSettings;
this.logger = Loggers.getLogger(getClass(), shardId);
this.deprecationLogger = new DeprecationLogger(logger);
}

@Override
Expand All @@ -47,8 +44,4 @@ public ShardId shardId() {
public IndexSettings indexSettings() {
return indexSettings;
}

public String nodeName() {
return indexSettings.getNodeName();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
*/
public final class ElasticsearchMergePolicy extends FilterMergePolicy {

private static Logger logger = LogManager.getLogger(ElasticsearchMergePolicy.class);
private static final Logger logger = LogManager.getLogger(ElasticsearchMergePolicy.class);

// True if the next merge request should do segment upgrades:
private volatile boolean upgradeInProgress;
Expand Down
49 changes: 20 additions & 29 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import org.elasticsearch.Assertions;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
Expand Down Expand Up @@ -326,17 +325,15 @@ public IndexShard(
this.pendingPrimaryTerm = primaryTerm;
this.globalCheckpointListeners =
new GlobalCheckpointListeners(shardId, threadPool.executor(ThreadPool.Names.LISTENER), threadPool.scheduler(), logger);
final ReplicationTracker replicationTracker =
new ReplicationTracker(
shardId,
aId,
indexSettings,
primaryTerm,
UNASSIGNED_SEQ_NO,
globalCheckpointListeners::globalCheckpointUpdated,
threadPool::absoluteTimeInMillis,
(retentionLeases, listener) -> retentionLeaseSyncer.sync(shardId, retentionLeases, listener));
this.replicationTracker = replicationTracker;
this.replicationTracker = new ReplicationTracker(
shardId,
aId,
indexSettings,
primaryTerm,
UNASSIGNED_SEQ_NO,
globalCheckpointListeners::globalCheckpointUpdated,
threadPool::absoluteTimeInMillis,
(retentionLeases, listener) -> retentionLeaseSyncer.sync(shardId, retentionLeases, listener));

// the query cache is a node-level thing, however we want the most popular filters
// to be computed on a per-shard basis
Expand Down Expand Up @@ -443,16 +440,17 @@ public void updateShardState(final ShardRouting newRouting,
final ShardRouting currentRouting;
synchronized (mutex) {
currentRouting = this.shardRouting;
assert currentRouting != null;

if (!newRouting.shardId().equals(shardId())) {
throw new IllegalArgumentException("Trying to set a routing entry with shardId " +
newRouting.shardId() + " on a shard with shardId " + shardId());
}
if ((currentRouting == null || newRouting.isSameAllocation(currentRouting)) == false) {
if (newRouting.isSameAllocation(currentRouting) == false) {
throw new IllegalArgumentException("Trying to set a routing entry with a different allocation. Current " +
currentRouting + ", new " + newRouting);
}
if (currentRouting != null && currentRouting.primary() && newRouting.primary() == false) {
if (currentRouting.primary() && newRouting.primary() == false) {
throw new IllegalArgumentException("illegal state: trying to move shard from primary mode to replica mode. Current "
+ currentRouting + ", new " + newRouting);
}
Expand Down Expand Up @@ -586,7 +584,7 @@ public void onFailure(Exception e) {
: "a started primary with non-pending operation term must be in primary mode " + this.shardRouting;
shardStateUpdated.countDown();
}
if (currentRouting != null && currentRouting.active() == false && newRouting.active()) {
if (currentRouting.active() == false && newRouting.active()) {
indexEventListener.afterIndexShardStarted(this);
}
if (newRouting.equals(currentRouting) == false) {
Expand Down Expand Up @@ -631,8 +629,7 @@ public IndexShardState markAsRecovering(String reason, RecoveryState recoverySta
public void relocated(final String targetAllocationId, final Consumer<ReplicationTracker.PrimaryContext> consumer)
throws IllegalIndexShardStateException, IllegalStateException, InterruptedException {
assert shardRouting.primary() : "only primaries can be marked as relocated: " + shardRouting;
final Releasable forceRefreshes = refreshListeners.forceRefreshes();
try {
try (Releasable forceRefreshes = refreshListeners.forceRefreshes()) {
indexShardOperationPermits.blockOperations(30, TimeUnit.MINUTES, () -> {
forceRefreshes.close();
// no shard operation permits are being held here, move state from started to relocated
Expand Down Expand Up @@ -665,8 +662,6 @@ public void relocated(final String targetAllocationId, final Consumer<Replicatio
// Fail primary relocation source and target shards.
failShard("timed out waiting for relocation hand-off to complete", null);
throw new IndexShardClosedException(shardId(), "timed out waiting for relocation hand-off to complete");
} finally {
forceRefreshes.close();
}
}

Expand Down Expand Up @@ -745,7 +740,7 @@ private Engine.IndexResult applyIndexOperation(Engine engine, long seqNo, long o
sourceWithResolvedType = new SourceToParse(sourceToParse.index(), resolvedType, sourceToParse.id(),
sourceToParse.source(), sourceToParse.getXContentType(), sourceToParse.routing());
}
operation = prepareIndex(docMapper(resolvedType), indexSettings.getIndexVersionCreated(), sourceWithResolvedType,
operation = prepareIndex(docMapper(resolvedType), sourceWithResolvedType,
seqNo, opPrimaryTerm, version, versionType, origin, autoGeneratedTimeStamp, isRetry, ifSeqNo, ifPrimaryTerm);
Mapping update = operation.parsedDoc().dynamicMappingsUpdate();
if (update != null) {
Expand All @@ -763,7 +758,7 @@ private Engine.IndexResult applyIndexOperation(Engine engine, long seqNo, long o
return index(engine, operation);
}

public static Engine.Index prepareIndex(DocumentMapperForType docMapper, Version indexCreatedVersion, SourceToParse source, long seqNo,
public static Engine.Index prepareIndex(DocumentMapperForType docMapper, SourceToParse source, long seqNo,
long primaryTerm, long version, VersionType versionType, Engine.Operation.Origin origin,
long autoGeneratedIdTimestamp, boolean isRetry,
long ifSeqNo, long ifPrimaryTerm) {
Expand Down Expand Up @@ -1529,7 +1524,7 @@ private void innerOpenEngineAndTranslog() throws IOException {
// time elapses after the engine is created above (pulling the config settings) until we set the engine reference, during
// which settings changes could possibly have happened, so here we forcefully push any config changes to the new engine.
onSettingsChanged();
assertSequenceNumbersInCommit();
assert assertSequenceNumbersInCommit();
assert recoveryState.getStage() == RecoveryState.Stage.TRANSLOG : "TRANSLOG stage expected but was: " + recoveryState.getStage();
}

Expand All @@ -1546,7 +1541,7 @@ private boolean assertSequenceNumbersInCommit() throws IOException {
return true;
}

protected void onNewEngine(Engine newEngine) {
private void onNewEngine(Engine newEngine) {
refreshListeners.setCurrentRefreshLocationSupplier(newEngine::getTranslogLastWriteLocation);
}

Expand Down Expand Up @@ -1858,10 +1853,6 @@ public List<Segment> segments(boolean verbose) {
return getEngine().segments(verbose);
}

public void flushAndCloseEngine() throws IOException {
getEngine().flushAndClose();
}

public String getHistoryUUID() {
return getEngine().getHistoryUUID();
}
Expand Down Expand Up @@ -2876,7 +2867,7 @@ protected void write(List<Tuple<Translog.Location, Consumer<Exception>>> candida
}
}
};
};
}

/**
* Syncs the given location with the underlying storage unless already synced. This method might return immediately without
Expand Down Expand Up @@ -2988,7 +2979,7 @@ private RefreshListeners buildRefreshListeners() {
return new RefreshListeners(
indexSettings::getMaxRefreshListeners,
() -> refresh("too_many_listeners"),
threadPool.executor(ThreadPool.Names.LISTENER)::execute,
threadPool.executor(ThreadPool.Names.LISTENER),
logger, threadPool.getThreadContext(),
externalRefreshMetric);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,6 @@ public IndexShardClosedException(ShardId shardId) {
super(shardId, IndexShardState.CLOSED, "Closed");
}

public IndexShardClosedException(ShardId shardId, Throwable t) {
super(shardId, IndexShardState.CLOSED, "Closed", t);
}

public IndexShardClosedException(ShardId shardId, String message) {
super(shardId, IndexShardState.CLOSED, message);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,10 +176,5 @@ IndexingStats.Stats stats(boolean isThrottled, long currentThrottleMillis) {
deleteMetric.count(), TimeUnit.NANOSECONDS.toMillis(deleteMetric.sum()), deleteCurrent.count(),
noopUpdates.count(), isThrottled, TimeUnit.MILLISECONDS.toMillis(currentThrottleMillis));
}

void clear() {
indexMetric.clear();
deleteMetric.clear();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ public void onFailure(Exception e) {
}
};
try {
new SnapshotSender(logger, syncAction, resyncTask, shardId, primaryAllocationId, primaryTerm, snapshot, chunkSize.bytesAsInt(),
new SnapshotSender(syncAction, resyncTask, shardId, primaryAllocationId, primaryTerm, snapshot, chunkSize.bytesAsInt(),
startingSeqNo, maxSeqNo, maxSeenAutoIdTimestamp, wrappedListener).run();
} catch (Exception e) {
wrappedListener.onFailure(e);
Expand Down Expand Up @@ -200,12 +200,12 @@ static class SnapshotSender extends AbstractRunnable implements ActionListener<R
private final AtomicBoolean firstMessage = new AtomicBoolean(true);
private final AtomicInteger totalSentOps = new AtomicInteger();
private final AtomicInteger totalSkippedOps = new AtomicInteger();
private AtomicBoolean closed = new AtomicBoolean();
private final AtomicBoolean closed = new AtomicBoolean();

SnapshotSender(Logger logger, SyncAction syncAction, ResyncTask task, ShardId shardId, String primaryAllocationId, long primaryTerm,
SnapshotSender(SyncAction syncAction, ResyncTask task, ShardId shardId, String primaryAllocationId, long primaryTerm,
Translog.Snapshot snapshot, int chunkSizeInBytes, long startingSeqNo, long maxSeqNo,
long maxSeenAutoIdTimestamp, ActionListener<Void> listener) {
this.logger = logger;
this.logger = PrimaryReplicaSyncer.logger;
this.syncAction = syncAction;
this.task = task;
this.shardId = shardId;
Expand All @@ -232,7 +232,7 @@ public void onFailure(Exception e) {
}
}

private static Translog.Operation[] EMPTY_ARRAY = new Translog.Operation[0];
private static final Translog.Operation[] EMPTY_ARRAY = new Translog.Operation[0];

@Override
protected void doRun() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@
*/
public class RemoveCorruptedLuceneSegmentsAction {

public Tuple<RemoveCorruptedShardDataCommand.CleanStatus, String> getCleanStatus(ShardPath shardPath,
Directory indexDirectory,
public Tuple<RemoveCorruptedShardDataCommand.CleanStatus, String> getCleanStatus(Directory indexDirectory,
Lock writeLock,
PrintStream printStream,
boolean verbose) throws IOException {
Expand Down Expand Up @@ -62,7 +61,6 @@ public Tuple<RemoveCorruptedShardDataCommand.CleanStatus, String> getCleanStatus
}

public void execute(Terminal terminal,
ShardPath shardPath,
Directory indexDirectory,
Lock writeLock,
PrintStream printStream,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ public void write(int b) {
terminal.println("Opening Lucene index at " + indexPath);
terminal.println("");
try {
indexCleanStatus = removeCorruptedLuceneSegmentsAction.getCleanStatus(shardPath, indexDir,
indexCleanStatus = removeCorruptedLuceneSegmentsAction.getCleanStatus(indexDir,
writeIndexLock, printStream, verbose);
} catch (Exception e) {
terminal.println(e.getMessage());
Expand Down Expand Up @@ -355,7 +355,7 @@ public void write(int b) {
confirm("Continue and remove corrupted data from the shard ?", terminal);

if (indexStatus != CleanStatus.CLEAN) {
removeCorruptedLuceneSegmentsAction.execute(terminal, shardPath, indexDir,
removeCorruptedLuceneSegmentsAction.execute(terminal, indexDir,
writeIndexLock, printStream, verbose);
}

Expand All @@ -373,7 +373,7 @@ public void write(int b) {

// newHistoryCommit obtains its own lock
addNewHistoryCommit(indexDir, terminal, translogStatus != CleanStatus.CLEAN);
newAllocationId(environment, shardPath, terminal);
newAllocationId(shardPath, terminal);
if (indexStatus != CleanStatus.CLEAN) {
dropCorruptMarkerFiles(terminal, indexPath, indexDir, indexStatus == CleanStatus.CLEAN_WITH_CORRUPTED_MARKER);
}
Expand Down Expand Up @@ -425,7 +425,7 @@ protected void addNewHistoryCommit(Directory indexDirectory, Terminal terminal,
}
}

protected void newAllocationId(Environment environment, ShardPath shardPath, Terminal terminal) throws IOException {
private void newAllocationId(ShardPath shardPath, Terminal terminal) throws IOException {
final Path shardStatePath = shardPath.getShardStatePath();
final ShardStateMetaData shardStateMetaData =
ShardStateMetaData.FORMAT.loadLatestState(logger, namedXContentRegistry, shardStatePath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

public final class ShardPath {
public static final String INDEX_FOLDER_NAME = "index";
Expand Down Expand Up @@ -283,10 +284,10 @@ public boolean equals(Object o) {
return false;
}
final ShardPath shardPath = (ShardPath) o;
if (shardId != null ? !shardId.equals(shardPath.shardId) : shardPath.shardId != null) {
if (Objects.equals(shardId, shardPath.shardId) == false) {
return false;
}
if (path != null ? !path.equals(shardPath.path) : shardPath.path != null) {
if (Objects.equals(path, shardPath.path) == false) {
return false;
}

Expand Down
Loading