Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions core/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ public abstract class Engine implements Closeable {
public static final String SYNC_COMMIT_ID = "sync_id";

protected final ShardId shardId;
protected final String allocationId;
protected final Logger logger;
protected final EngineConfig engineConfig;
protected final Store store;
Expand Down Expand Up @@ -126,6 +127,7 @@ protected Engine(EngineConfig engineConfig) {

this.engineConfig = engineConfig;
this.shardId = engineConfig.getShardId();
this.allocationId = engineConfig.getAllocationId();
this.store = engineConfig.getStore();
this.logger = Loggers.getLogger(Engine.class, // we use the engine class directly here to make sure all subclasses have the same logger name
engineConfig.getIndexSettings().getSettings(), engineConfig.getShardId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
*/
public final class EngineConfig {
private final ShardId shardId;
private final String allocationId;
private final IndexSettings indexSettings;
private final ByteSizeValue indexingBufferSize;
private volatile boolean enableGcDeletes = true;
Expand Down Expand Up @@ -109,7 +110,7 @@ public final class EngineConfig {
/**
* Creates a new {@link org.elasticsearch.index.engine.EngineConfig}
*/
public EngineConfig(OpenMode openMode, ShardId shardId, ThreadPool threadPool,
public EngineConfig(OpenMode openMode, ShardId shardId, String allocationId, ThreadPool threadPool,
IndexSettings indexSettings, Engine.Warmer warmer, Store store,
MergePolicy mergePolicy, Analyzer analyzer,
Similarity similarity, CodecService codecService, Engine.EventListener eventListener,
Expand All @@ -120,6 +121,7 @@ public EngineConfig(OpenMode openMode, ShardId shardId, ThreadPool threadPool,
throw new IllegalArgumentException("openMode must not be null");
}
this.shardId = shardId;
this.allocationId = allocationId;
this.indexSettings = indexSettings;
this.threadPool = threadPool;
this.warmer = warmer == null ? (a) -> {} : warmer;
Expand Down Expand Up @@ -240,6 +242,15 @@ public IndexSettings getIndexSettings() {
*/
public ShardId getShardId() { return shardId; }

/**
* Returns the allocation ID for the shard.
*
* @return the allocation ID
*/
public String getAllocationId() {
return allocationId;
}

/**
* Returns the analyzer as the default analyzer in the engines {@link org.apache.lucene.index.IndexWriter}
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ public InternalEngine(EngineConfig engineConfig) throws EngineException {
throw new IllegalArgumentException(openMode.toString());
}
logger.trace("recovered [{}]", seqNoStats);
seqNoService = sequenceNumberService(shardId, engineConfig.getIndexSettings(), seqNoStats);
seqNoService = sequenceNumberService(shardId, allocationId, engineConfig.getIndexSettings(), seqNoStats);
updateMaxUnsafeAutoIdTimestampFromWriter(writer);
indexWriter = writer;
translog = openTranslog(engineConfig, writer, translogDeletionPolicy, () -> seqNoService().getGlobalCheckpoint());
Expand Down Expand Up @@ -283,10 +283,12 @@ private void updateMaxUnsafeAutoIdTimestampFromWriter(IndexWriter writer) {

private static SequenceNumbersService sequenceNumberService(
final ShardId shardId,
final String allocationId,
final IndexSettings indexSettings,
final SeqNoStats seqNoStats) {
return new SequenceNumbersService(
shardId,
allocationId,
indexSettings,
seqNoStats.getMaxSeqNo(),
seqNoStats.getLocalCheckpoint(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
*/
public class GlobalCheckpointTracker extends AbstractIndexShardComponent {

private final String allocationId;

/**
* The global checkpoint tracker can operate in two modes:
* - primary: this shard is in charge of collecting local checkpoint information from all shard copies and computing the global
Expand Down Expand Up @@ -245,12 +247,18 @@ private boolean invariant() {
* {@link SequenceNumbers#UNASSIGNED_SEQ_NO}.
*
* @param shardId the shard ID
* @param allocationId the allocation ID
* @param indexSettings the index settings
* @param globalCheckpoint the last known global checkpoint for this shard, or {@link SequenceNumbers#UNASSIGNED_SEQ_NO}
*/
GlobalCheckpointTracker(final ShardId shardId, final IndexSettings indexSettings, final long globalCheckpoint) {
GlobalCheckpointTracker(
final ShardId shardId,
final String allocationId,
final IndexSettings indexSettings,
final long globalCheckpoint) {
super(shardId, indexSettings);
assert globalCheckpoint >= SequenceNumbers.UNASSIGNED_SEQ_NO : "illegal initial global checkpoint: " + globalCheckpoint;
this.allocationId = allocationId;
this.primaryMode = false;
this.handoffInProgress = false;
this.appliedClusterStateVersion = -1L;
Expand Down Expand Up @@ -310,7 +318,7 @@ public synchronized void updateGlobalCheckpointOnReplica(final long globalCheckp
/**
* Initializes the global checkpoint tracker in primary mode (see {@link #primaryMode}. Called on primary activation or promotion.
*/
public synchronized void activatePrimaryMode(final String allocationId, final long localCheckpoint) {
public synchronized void activatePrimaryMode(final long localCheckpoint) {
assert invariant();
assert primaryMode == false;
assert localCheckpoints.get(allocationId) != null && localCheckpoints.get(allocationId).inSync &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,14 @@ public class SequenceNumbersService extends AbstractIndexShardComponent {
*/
public SequenceNumbersService(
final ShardId shardId,
final String allocationId,
final IndexSettings indexSettings,
final long maxSeqNo,
final long localCheckpoint,
final long globalCheckpoint) {
super(shardId, indexSettings);
localCheckpointTracker = new LocalCheckpointTracker(indexSettings, maxSeqNo, localCheckpoint);
globalCheckpointTracker = new GlobalCheckpointTracker(shardId, indexSettings, globalCheckpoint);
globalCheckpointTracker = new GlobalCheckpointTracker(shardId, allocationId, indexSettings, globalCheckpoint);
}

/**
Expand Down Expand Up @@ -201,7 +202,7 @@ public synchronized long getTrackedLocalCheckpointForShard(final String allocati
* Called on primary activation or promotion.
*/
public void activatePrimaryMode(final String allocationId, final long localCheckpoint) {
globalCheckpointTracker.activatePrimaryMode(allocationId, localCheckpoint);
globalCheckpointTracker.activatePrimaryMode(localCheckpoint);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2074,7 +2074,7 @@ private DocumentMapperForType docMapper(String type) {

private EngineConfig newEngineConfig(EngineConfig.OpenMode openMode) {
Sort indexSort = indexSortSupplier.get();
return new EngineConfig(openMode, shardId,
return new EngineConfig(openMode, shardId, shardRouting.allocationId().getId(),
threadPool, indexSettings, warmer, store, indexSettings.getMergePolicy(),
mapperService.indexAnalyzer(), similarityService.similarity(mapperService), codecService, shardEventListener,
indexCache.query(), cachingPolicy, translogConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,15 @@
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.TransportActions;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.AllocationId;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
Expand Down Expand Up @@ -198,6 +200,7 @@
public class InternalEngineTests extends ESTestCase {

protected final ShardId shardId = new ShardId(new Index("index", "_na_"), 0);
protected final AllocationId allocationId = AllocationId.newInitializing();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe just store the string here, i.e.
protected final String allocationId = AllocationId.newInitializing().getId();

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The actual allocation ID instance is needed in one place. 😐

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok 🆗

private static final IndexSettings INDEX_SETTINGS = IndexSettingsModule.newIndexSettings("index", Settings.EMPTY);

protected ThreadPool threadPool;
Expand Down Expand Up @@ -264,11 +267,11 @@ public EngineConfig copy(EngineConfig config, EngineConfig.OpenMode openMode) {
}

public EngineConfig copy(EngineConfig config, EngineConfig.OpenMode openMode, Analyzer analyzer) {
return new EngineConfig(openMode, config.getShardId(), config.getThreadPool(), config.getIndexSettings(), config.getWarmer(),
config.getStore(), config.getMergePolicy(), analyzer, config.getSimilarity(),
new CodecService(null, logger), config.getEventListener(), config.getQueryCache(),
config.getQueryCachingPolicy(), config.getTranslogConfig(),
config.getFlushMergesAfter(), config.getRefreshListeners(), config.getIndexSort(), config.getTranslogRecoveryRunner());
return new EngineConfig(openMode, config.getShardId(), config.getAllocationId(), config.getThreadPool(), config.getIndexSettings(),
config.getWarmer(), config.getStore(), config.getMergePolicy(), analyzer, config.getSimilarity(),
new CodecService(null, logger), config.getEventListener(), config.getQueryCache(), config.getQueryCachingPolicy(),
config.getTranslogConfig(), config.getFlushMergesAfter(), config.getRefreshListeners(), config.getIndexSort(),
config.getTranslogRecoveryRunner());
}

@Override
Expand Down Expand Up @@ -447,7 +450,7 @@ public void onFailedEngine(String reason, @Nullable Exception e) {
indexSettings.getSettings()));
final List<ReferenceManager.RefreshListener> refreshListenerList =
refreshListener == null ? emptyList() : Collections.singletonList(refreshListener);
EngineConfig config = new EngineConfig(openMode, shardId, threadPool, indexSettings, null, store,
EngineConfig config = new EngineConfig(openMode, shardId, allocationId.getId(), threadPool, indexSettings, null, store,
mergePolicy, iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(null, logger), listener,
IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig,
TimeValue.timeValueMinutes(5), refreshListenerList, indexSort, handler);
Expand Down Expand Up @@ -728,6 +731,7 @@ public void testCommitStats() throws IOException {
Store store = createStore();
InternalEngine engine = createEngine(store, createTempDir(), (config) -> new SequenceNumbersService(
config.getShardId(),
config.getAllocationId(),
config.getIndexSettings(),
maxSeqNo.get(),
localCheckpoint.get(),
Expand Down Expand Up @@ -901,6 +905,7 @@ public void testTranslogRecoveryWithMultipleGenerations() throws IOException {
initialEngine = createEngine(store, createTempDir(), (config) ->
new SequenceNumbersService(
config.getShardId(),
config.getAllocationId(),
config.getIndexSettings(),
SequenceNumbers.NO_OPS_PERFORMED,
SequenceNumbers.NO_OPS_PERFORMED,
Expand Down Expand Up @@ -2028,7 +2033,7 @@ public void testSeqNoAndCheckpoints() throws IOException {

try {
initialEngine = engine;
final ShardRouting primary = TestShardRouting.newShardRouting(shardId, "node1", true, ShardRoutingState.STARTED);
final ShardRouting primary = TestShardRouting.newShardRouting("test", shardId.id(), "node1", null, true, ShardRoutingState.STARTED, allocationId);
final ShardRouting replica = TestShardRouting.newShardRouting(shardId, "node2", false, ShardRoutingState.STARTED);
initialEngine.seqNoService().updateAllocationIdsFromMaster(1L, new HashSet<>(Arrays.asList(primary.allocationId().getId(),
replica.allocationId().getId())),
Expand Down Expand Up @@ -2788,12 +2793,11 @@ public void testRecoverFromForeignTranslog() throws IOException {
TranslogConfig translogConfig = new TranslogConfig(shardId, translog.location(), config.getIndexSettings(),
BigArrays.NON_RECYCLING_INSTANCE);

EngineConfig brokenConfig = new EngineConfig(EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG, shardId, threadPool,
config.getIndexSettings(), null, store, newMergePolicy(), config.getAnalyzer(),
config.getSimilarity(), new CodecService(null, logger), config.getEventListener(),
IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig,
TimeValue.timeValueMinutes(5), config.getRefreshListeners(), null,
config.getTranslogRecoveryRunner());
EngineConfig brokenConfig = new EngineConfig(EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG, shardId, allocationId.getId(),
threadPool, config.getIndexSettings(), null, store, newMergePolicy(), config.getAnalyzer(), config.getSimilarity(),
new CodecService(null, logger), config.getEventListener(), IndexSearcher.getDefaultQueryCache(),
IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, TimeValue.timeValueMinutes(5), config.getRefreshListeners(),
null, config.getTranslogRecoveryRunner());

try {
InternalEngine internalEngine = new InternalEngine(brokenConfig);
Expand Down Expand Up @@ -3628,6 +3632,7 @@ private SequenceNumbersService getStallingSeqNoService(
final AtomicLong expectedLocalCheckpoint) {
return new SequenceNumbersService(
shardId,
allocationId.getId(),
defaultSettings,
SequenceNumbers.NO_OPS_PERFORMED,
SequenceNumbers.NO_OPS_PERFORMED,
Expand Down Expand Up @@ -3839,7 +3844,7 @@ public void testNoOps() throws IOException {
final int globalCheckpoint = randomIntBetween(0, localCheckpoint);
try {
final SequenceNumbersService seqNoService =
new SequenceNumbersService(shardId, defaultSettings, maxSeqNo, localCheckpoint, globalCheckpoint) {
new SequenceNumbersService(shardId, allocationId.getId(), defaultSettings, maxSeqNo, localCheckpoint, globalCheckpoint) {
@Override
public long generateSeqNo() {
throw new UnsupportedOperationException();
Expand Down Expand Up @@ -3986,6 +3991,7 @@ public void testRestoreLocalCheckpointFromTranslog() throws IOException {
final SequenceNumbersService seqNoService =
new SequenceNumbersService(
shardId,
allocationId.getId(),
defaultSettings,
SequenceNumbers.NO_OPS_PERFORMED,
SequenceNumbers.NO_OPS_PERFORMED,
Expand Down
Loading