Skip to content

Conversation

@ywelsch
Copy link
Contributor

@ywelsch ywelsch commented May 23, 2017

Adds a task that streams all operations from the primary's global checkpoint to all shards.

Relates to #10708

@ywelsch ywelsch added :Distributed Indexing/Recovery Anything around constructing a new shard, either from a local or a remote source. >enhancement v6.0.0 labels May 23, 2017
@ywelsch ywelsch requested a review from bleskes May 23, 2017 12:10
@bleskes
Copy link
Contributor

bleskes commented May 29, 2017

Yannick and I discussed the PR. Yannick is working on some preparatory infrastructure changes before picking this up again.

@ywelsch ywelsch force-pushed the enhance/primary-replica-resync branch from eb4dc61 to ac164e5 Compare June 19, 2017 18:41
Copy link
Contributor

@bleskes bleskes left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks great. I left a bunch of minor comments and suggestion for improvements. This one thing I want to discuss and I'll reach out via slack

location = syncOperationResultOrThrow(operationResult, location);
} catch (Exception e) {
// if its not a failure to be ignored, let it bubble up
if (!TransportActions.isShardNotAvailableException(e)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's better to let the higher up components deal (i.e., ReplicationOperation) with this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I copied it over from TransportShardBulkAction to ensure that it is consistent with that one. I think it makes sense to let it bubble up. If we change the behavior for TransportShardBulkAction though, it should be a separate follow-up PR.

@Override
protected ReplicationOperation.Replicas newReplicasProxy() {
// We treat the resync as best-effort for now and don't mark unavailable shard copies as stale.
return new ReplicasProxy();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we do want to fail the shard if it was assigned, no?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we agreed to handle this in a follow-up. Not failing the shard will only be problematic when index.number_of_replicas is > 1, the resync failed but subsequent index operations are replicated just fine.

@Override
protected WritePrimaryResult<ResyncReplicationRequest, ResyncReplicationResponse> shardOperationOnPrimary(
ResyncReplicationRequest request, IndexShard primary) throws Exception {
final ResyncReplicationRequest replicaRequest = performOnPrimary(request, primary);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why did you add this extra empty method? can we clean it up?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's used in the tests. This ensures that the ResyncAction in the tests stays in sync with TransportResyncReplicationAction (e.g. if we would add more code there).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure. I missed that usage.


@Override
public void handleException(TransportException exp) {
listener.onFailure(exp);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we capture shard not available exceptions here, to deal with the primary failing/shutting down?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I handled it at top-level in IndexShard where the sync is triggered as primary failing / shutting down can also happen in the PrimaryReplicaSyncer. I think it's nicer not to rely on that though and I've added code here now to handle this.

}

class PrimaryOperationTransportHandler implements TransportRequestHandler<ConcreteShardRequest<Request>> {
public class PrimaryOperationTransportHandler implements TransportRequestHandler<ConcreteShardRequest<Request>> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wondering if we should do this like this:

 protected class PrimaryOperationTransportHandler implements TransportRequestHandler<ConcreteShardRequest<Request>> {

        public PrimaryOperationTransportHandler() {
            
        }

up to you


private final ShardId shardId;

public ResyncRequest(ShardId shardId) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add the allocation id in question to the description so it will be visible?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure

ResyncReplicationRequest request = new ResyncReplicationRequest(shardId, operations);
logger.trace("sending batch of [{}][{}] (total sent: [{}], skipped: [{}])", operations.size(), new ByteSizeValue(size),
totalSentOps.get(), totalSkippedOps.get());
syncAction.sync(request, task, primaryAllocationId, this);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we want to set the task phase to something like "ops_sent"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that would just indicate that the first ops have gone out, which can also be tracked with the detailed tasks stats. I don't see the benefit.


long globalCheckPoint = numDocs > 0 ? randomIntBetween(0, numDocs - 1) : 0;

if (numDocs > 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need this check for >0?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remnant from an earlier iteration


import java.util.function.Supplier;

public class TransportResyncReplicationAction extends TransportWriteAction<ResyncReplicationRequest,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need a BWC layer here, ala GlobalCheckpointSyncAction#sendReplicaRequest

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also a BWC test will be nice...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BWC test... sigh. A rolling upgrade should cover resyncs (e.g. the existing rolling-upgrade test that we have in qa/rolling-upgrade). For the resync to actually send out a request to a 5.6 node, the test would require 3 nodes (and an index with 2 replicas), which will make the rolling-upgrade test even slower as it is. I'm not sure that's a good idea.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've opened #25336 for discusssion


public class PrimaryReplicaSyncerTests extends IndexShardTestCase {

public void testSyncerSendsOffCorrectDocuments() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we want to add a test for the status reporting correct numbers?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure

Copy link
Contributor

@bleskes bleskes left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @ywelsch . For future reference and people reading along. We plan to do the following follow ups (order TDB)

  1. Replace the primaryReplicaResyncInProgress boolean with two new IndexShardStates (something like PROMOTING, PROMOTED). This will allows us to follow the life cycle of the primary
  2. Currently there is a race condition between primary promotion and primary relocation which may result in indexShardOperationPermits throwing an IllegalStateException. This happens if relocation is so fast it ends while the async block operations call made while updating the primary term is still in effect. To address this, we plan to:
    1. add an expected primary term to the recovery request
    2. delay recoveries if the term is not right and the primary hasn't reached the new "PROMOTED" state (i.e., sync is still in progress). The later can also, temporarily, be achieved by exposing the primaryReplicaResyncInProgress boolean.

@ywelsch ywelsch merged commit e41eae9 into elastic:master Jun 22, 2017
jasontedor added a commit to jasontedor/elasticsearch that referenced this pull request Jun 22, 2017
* master: (56 commits)
  Initialize max unsafe auto ID timestamp on shrink
  Enable a long translog retention policy by default (elastic#25294)
  Remove `index.mapping.single_type=false` from core/tests (elastic#25331)
  test: single type defaults to true since alpha1 and not alpha3
  Get short path name for native controllers
  Live primary-replica resync (no rollback) (elastic#24841)
  Upgrade to lucene-7.0.0-snapshot-ad2cb77. (elastic#25349)
  percolator: Deprecate `document_type` parameter.
  [DOCS] Fixed typo.
  [rest-api-spec/indices.refresh] Remove old params
  Remove redundant and broken MD5 checksum from repository-s3 (elastic#25270)
  Initialize sequence numbers on a shrunken index
  Port most snapshot/restore static bwc tests to qa:full-cluster-restart (elastic#25296)
  Javadoc: ThreadPool doesn't reject while shutdown (elastic#23678)
  test: verify `size_to_upgrade_in_bytes` in assertBusy(...)
  Docs: Removed duplicated line in mapping docs
  Add backward compatibility indices for 5.4.2
  Update MockTransportService to the age of Transport.Connection (elastic#25320)
  Add version v5.4.2 after release
  IndexMetaData: Add internal format index setting (elastic#25292)
  ...
ywelsch added a commit that referenced this pull request Jul 27, 2017
The translog view was being closed too early, possibly causing a failed resync. Note: The bug only affects unreleased code.

Relates to #24841
ywelsch added a commit that referenced this pull request Jul 27, 2017
The translog view was being closed too early, possibly causing a failed resync. Note: The bug only affects unreleased code.

Relates to #24841
ywelsch added a commit that referenced this pull request Jul 27, 2017
The translog view was being closed too early, possibly causing a failed resync. Note: The bug only affects unreleased code.

Relates to #24841
dnhatn added a commit to dnhatn/elasticsearch that referenced this pull request Dec 27, 2017
This commit adds a new index shard state - promoting. This state
indicates that a replica is promoting to primary and primary-replica
resync is in progress.

Relates elastic#24841
dnhatn added a commit that referenced this pull request Feb 3, 2018
Currently when failing a shard we also mark it as stale (eg. remove its
allocationId from from the InSync set). However in some cases, we need 
to be able to fail shards but keep them InSync set. This commit adds
such capacity. This is a preparatory change to make the primary-replica
resync less lenient.

Relates #24841
dnhatn added a commit that referenced this pull request Feb 3, 2018
Currently when failing a shard we also mark it as stale (eg. remove its
allocationId from from the InSync set). However in some cases, we need 
to be able to fail shards but keep them InSync set. This commit adds
such capacity. This is a preparatory change to make the primary-replica
resync less lenient.

Relates #24841
dnhatn added a commit that referenced this pull request Mar 9, 2018
Today, failures from the primary-replica resync are ignored as the best 
effort to not mark shards as stale during the cluster restart. However
this can be problematic if replicas failed to execute resync operations
but just fine in the subsequent write operations. When this happens,
replica will miss some operations from the new primary. There are some
implications if the local checkpoint on replica can't advance because of
the missing operations.

1. The global checkpoint won't advance - this causes both primary and 
replicas keep many index commits

2. Engine on replica won't flush periodically because uncommitted stats
is calculated based on the local checkpoint

3. Replica can use a large number of bitsets to keep track operations seqno

However we can prevent this issue but still reserve the best-effort by 
failing replicas which fail to execute resync operations but not mark
them as stale. We have prepared to the required infrastructure in #28049
and #28054 for this change.

Relates #24841
dnhatn added a commit that referenced this pull request Mar 10, 2018
Today, failures from the primary-replica resync are ignored as the best 
effort to not mark shards as stale during the cluster restart. However
this can be problematic if replicas failed to execute resync operations
but just fine in the subsequent write operations. When this happens,
replica will miss some operations from the new primary. There are some
implications if the local checkpoint on replica can't advance because of
the missing operations.

1. The global checkpoint won't advance - this causes both primary and 
replicas keep many index commits

2. Engine on replica won't flush periodically because uncommitted stats
is calculated based on the local checkpoint

3. Replica can use a large number of bitsets to keep track operations seqno

However we can prevent this issue but still reserve the best-effort by 
failing replicas which fail to execute resync operations but not mark
them as stale. We have prepared to the required infrastructure in #28049
and #28054 for this change.

Relates #24841
sebasjm pushed a commit to sebasjm/elasticsearch that referenced this pull request Mar 10, 2018
Today, failures from the primary-replica resync are ignored as the best 
effort to not mark shards as stale during the cluster restart. However
this can be problematic if replicas failed to execute resync operations
but just fine in the subsequent write operations. When this happens,
replica will miss some operations from the new primary. There are some
implications if the local checkpoint on replica can't advance because of
the missing operations.

1. The global checkpoint won't advance - this causes both primary and 
replicas keep many index commits

2. Engine on replica won't flush periodically because uncommitted stats
is calculated based on the local checkpoint

3. Replica can use a large number of bitsets to keep track operations seqno

However we can prevent this issue but still reserve the best-effort by 
failing replicas which fail to execute resync operations but not mark
them as stale. We have prepared to the required infrastructure in elastic#28049
and elastic#28054 for this change.

Relates elastic#24841
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

:Distributed Indexing/Recovery Anything around constructing a new shard, either from a local or a remote source. >enhancement v6.0.0-beta1

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants