Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -235,9 +235,14 @@ public TransportAction(Settings settings,
protected Response shardOperation(Request request, ShardId shardId) throws IOException {
IndexService indexService = indicesService.indexServiceSafe(request.getShard().getIndex());
IndexShard indexShard = indexService.getShard(request.getShard().id());

final long indexMetaDataVersion = clusterService.state().metaData().index(shardId.getIndex()).getVersion();
request.maxSeqNo = Math.min(request.maxSeqNo, indexShard.getGlobalCheckpoint());
// The following shard generates the request based on the global checkpoint which may not be synced to all leading copies.
Copy link
Member

@jasontedor jasontedor May 31, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you amend this comment that the primary copy on the follower generates the request based on its knowledge of the global checkpoint on the primary copy on the leader?

// However, this guarantees that the requesting range always be below the local-checkpoint of any leading copies.
final long localCheckpoint = indexShard.getLocalCheckpoint();
Copy link
Member

@martijnvg martijnvg May 31, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to use local check point instead of global checkpoint here. Should be just a safe as using global checkpoint.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm sorry, but I don't think we should use the local checkpoint. I understand that it's safe because of the follower semantics and I also understand that the request is not supposed to be bellow the local checkpoint (this can be an assertion, as Jason noted) but I don't think we should rely on it. It's too subtle and difficult to understand. If there's no good reason to use the local checkpoint here (please share if there is, I can't see it ) can we please go back to using the global checkpoint?

We also don't really need to fail the request here but rather return what we have, if we have it (as before).

PS - can you also add a comment that this all best effort and that the true check is done when creating the snapshot? (merge policies etc can change availability of operations)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also don't really need to fail the request here but rather return what we have, if we have it (as before).

This will make the logic a bit more complex in the shard follow task. I prefer to fail here, knowing that the primary copy will have the requested range.

Alternatively the shard follow task can maybe use the global checkpoint of the shard copy with the lowest global checkpoint (liker was discussed in the es-ccr channel last night). Then this problem shouldn't occur either.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will make the logic a bit more complex in the shard follow task.

Why is that? we already account for partial results due to byte size limits.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is true. I guess I reacted too quick. We already do this correctly, the assumption right now is that byte size limit has been reached.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made this an assertion in 2a9a200#diff-64da2b915e53a36fdc911178059a02e5R242. The only purpose of this assertion is to make sure that the follower never requests a wrong range. However, we cannot use the global checkpoint here, and I "loosen" the condition by using the local-checkpoint - as the best effort. I am okay to remove this assertion. WDYT?

Copy link
Member

@jasontedor jasontedor Jun 1, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bleskes I do not agree that using the local checkpoint is too subtle and difficult to understand; we are relying on fundamental relationships between local and global checkpoints here? The problem is the global checkpoint on the replica is not the global checkpoint, it's only local knowledge (say "local global checkpoint" three times fast) that could be out of date but we know:

local checkpoint on replica >= actual global checkpoint >= global checkpoint on request

and that's why we can have an assertion here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And I also don't think we should use the global checkpoint and return partial results here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not agree that using the local checkpoint is too subtle and difficult to understand;

Let me unpack a bit what I meant with subtle. The current approach relies on a well behaved client that follows the pattern of formulating its requests based on some knowledge of the global checkpoint. That behavior is not clear when you look at the ShardChangesAction. It is a part of the shard following task which is not trivial to follow. That's what I meant with subtle. The system is complex and the more we can understand by reading a single file the better.

we are relying on fundamental relationships between local and global checkpoints here?

It is true that if you sample the global checkpoint from somewhere, all local checkpoints of in sync shards are above it and therefore it is safe to trim any request that using it uses a global checkpoint as an upper bound by the local checkpoint of an in-sync shard. It is also true that search requests should never be routed to not-in-sync shards, if we could manage it. Sadly that's not true and I'm not sure how to achieve that without other draw backs that are worse or some schemes that are complicated and will take time to bake.

Search requests are routed based on a cluster state they sample, which may be stale. They use a list of shard copies and prefer to go to active shard but if those fail they will go and try initializing shards. We don't know at what phase of recovery they are. We also don't know what their local checkpoint mean. It is highly likely it will be lower than the local checkpoint of the primary and thus will be safe (based on the behavior of the client), but maybe it's not? maybe it was constructed by a primary that has since failed and it has transferred operations that weren't safe and those aren't rolled back it? I'm not saying that's necessarily broken. I am saying that this gets complicated very quickly and I'm not sure it's right.

Using the local knowledge of the global checkpoint is always safe and is simple to understand. The complicated part is how the global checkpoint is maintained but you don't need to know that.

PS I want to go back to the notion of a well behaved client, from a different angle then complexity. It's true that we are currently building CCR and not the changes API but we do plan to build infrastructure that will power the Changes API (which CCR would be based on if we had it). With that in mind, I would rather avoid adding assumptions to the code that rely on some correctness aspects of the request. The logic can hopefully stay simple - you can ask for anything you want but we're not exposing unsafe ops. Also, this is why the original API was designed to say "give me X operation starting at this point up" (X be a number or size) rather than the current API of "give me this range please". To be clear - I'm OK with the range change (for now - we'll see how the changes API develops) but I want to be conscious of the Changes API and potential implications to it.

if (localCheckpoint < request.minSeqNo || localCheckpoint < request.maxSeqNo) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that all we need here is that the local history at least covers the requested range so the opposite of localCheckpoint >= request.maxSeqNo should be sufficient here as we already validate on the request that minSeqNo < maxSeqNo. Therefore, I think that this condition can be indexShard.getLocalCheckpoint() < request.maxSeqNo.

throw new IllegalStateException("invalid request from_seqno=[" + request.minSeqNo + "], " +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that we should add an assertion here? This should never happen in production because the global checkpoint on the primary exceeds is not more than, by definition, the local checkpoints on all of the in-sync shard copies. This shard copy must be in-sync or it would not be receiving this request and therefore I think we should treat this as a fatal condition? I am not sure if we are being harsh enough here.

"to_seqno=[" + request.maxSeqNo + "], local_checkpoint=[" + localCheckpoint + "], shardId=[" + shardId + "]");
}
final Translog.Operation[] operations =
getOperationsBetween(indexShard, request.minSeqNo, request.maxSeqNo, request.maxTranslogsBytes);
return new Response(indexMetaDataVersion, operations);
Expand Down