Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
return Arrays.asList(
// Persistent action requests
new NamedWriteableRegistry.Entry(PersistentTaskParams.class, ShardFollowTask.NAME,
ShardFollowTask::new),
ShardFollowTask::readFrom),

// Task statuses
new NamedWriteableRegistry.Entry(Task.Status.class, ShardFollowNodeTaskStatus.STATUS_PARSER_NAME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,7 @@ private void followLeaderIndex(String autoFollowPattenName,
request.getParameters().setMaxWriteBufferCount(pattern.getMaxWriteBufferCount());
request.getParameters().setMaxWriteBufferSize(pattern.getMaxWriteBufferSize());
request.getParameters().setMaxRetryDelay(pattern.getMaxRetryDelay());
request.getParameters().setReadPollTimeout(pattern.getPollTimeout());
request.getParameters().setReadPollTimeout(pattern.getReadPollTimeout());

// Execute if the create and follow api call succeeds:
Runnable successHandler = () -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.xpack.core.XPackPlugin;
import org.elasticsearch.xpack.core.ccr.action.ImmutableFollowParameters;

import java.io.IOException;
import java.util.Arrays;
Expand All @@ -28,18 +28,7 @@
import java.util.Objects;
import java.util.Set;

import static org.elasticsearch.xpack.core.ccr.action.FollowParameters.MAX_READ_REQUEST_OPERATION_COUNT;
import static org.elasticsearch.xpack.core.ccr.action.FollowParameters.MAX_READ_REQUEST_SIZE;
import static org.elasticsearch.xpack.core.ccr.action.FollowParameters.MAX_OUTSTANDING_READ_REQUESTS;
import static org.elasticsearch.xpack.core.ccr.action.FollowParameters.MAX_WRITE_REQUEST_OPERATION_COUNT;
import static org.elasticsearch.xpack.core.ccr.action.FollowParameters.MAX_WRITE_REQUEST_SIZE;
import static org.elasticsearch.xpack.core.ccr.action.FollowParameters.MAX_OUTSTANDING_WRITE_REQUESTS;
import static org.elasticsearch.xpack.core.ccr.action.FollowParameters.MAX_WRITE_BUFFER_COUNT;
import static org.elasticsearch.xpack.core.ccr.action.FollowParameters.MAX_WRITE_BUFFER_SIZE;
import static org.elasticsearch.xpack.core.ccr.action.FollowParameters.MAX_RETRY_DELAY;
import static org.elasticsearch.xpack.core.ccr.action.FollowParameters.READ_POLL_TIMEOUT;

public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams {
public class ShardFollowTask extends ImmutableFollowParameters implements XPackPlugin.XPackPersistentTaskParams {

public static final String NAME = "xpack/ccr/shard_follow_task";

Expand All @@ -60,8 +49,8 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams {
private static ConstructingObjectParser<ShardFollowTask, Void> PARSER = new ConstructingObjectParser<>(NAME,
(a) -> new ShardFollowTask((String) a[0],
new ShardId((String) a[1], (String) a[2], (int) a[3]), new ShardId((String) a[4], (String) a[5], (int) a[6]),
(int) a[7], (ByteSizeValue) a[8], (int) a[9], (int) a[10], (ByteSizeValue) a[11], (int) a[12],
(int) a[13], (ByteSizeValue) a[14], (TimeValue) a[15], (TimeValue) a[16], (Map<String, String>) a[17]));
(Integer) a[7], (Integer) a[8], (Integer) a[9], (Integer) a[10], (ByteSizeValue) a[11], (ByteSizeValue) a[12],
(Integer) a[13], (ByteSizeValue) a[14], (TimeValue) a[15], (TimeValue) a[16], (Map<String, String>) a[17]));

static {
PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), REMOTE_CLUSTER_FIELD);
Expand All @@ -71,95 +60,47 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams {
PARSER.declareString(ConstructingObjectParser.constructorArg(), LEADER_SHARD_INDEX_FIELD);
PARSER.declareString(ConstructingObjectParser.constructorArg(), LEADER_SHARD_INDEX_UUID_FIELD);
PARSER.declareInt(ConstructingObjectParser.constructorArg(), LEADER_SHARD_SHARDID_FIELD);
PARSER.declareInt(ConstructingObjectParser.constructorArg(), MAX_READ_REQUEST_OPERATION_COUNT);
PARSER.declareField(
ConstructingObjectParser.constructorArg(),
(p, c) -> ByteSizeValue.parseBytesSizeValue(p.text(), MAX_READ_REQUEST_SIZE.getPreferredName()),
MAX_READ_REQUEST_SIZE,
ObjectParser.ValueType.STRING);
PARSER.declareInt(ConstructingObjectParser.constructorArg(), MAX_OUTSTANDING_READ_REQUESTS);
PARSER.declareInt(ConstructingObjectParser.constructorArg(), MAX_WRITE_REQUEST_OPERATION_COUNT);
PARSER.declareField(
ConstructingObjectParser.constructorArg(),
(p, c) -> ByteSizeValue.parseBytesSizeValue(p.text(), MAX_WRITE_REQUEST_SIZE.getPreferredName()),
MAX_WRITE_REQUEST_SIZE,
ObjectParser.ValueType.STRING);
PARSER.declareInt(ConstructingObjectParser.constructorArg(), MAX_OUTSTANDING_WRITE_REQUESTS);
PARSER.declareInt(ConstructingObjectParser.constructorArg(), MAX_WRITE_BUFFER_COUNT);
PARSER.declareField(
ConstructingObjectParser.constructorArg(),
(p, c) -> ByteSizeValue.parseBytesSizeValue(p.text(), MAX_WRITE_BUFFER_SIZE.getPreferredName()),
MAX_WRITE_BUFFER_SIZE,
ObjectParser.ValueType.STRING);
PARSER.declareField(ConstructingObjectParser.constructorArg(),
(p, c) -> TimeValue.parseTimeValue(p.text(), MAX_RETRY_DELAY.getPreferredName()),
MAX_RETRY_DELAY, ObjectParser.ValueType.STRING);
PARSER.declareField(ConstructingObjectParser.constructorArg(),
(p, c) -> TimeValue.parseTimeValue(p.text(), READ_POLL_TIMEOUT.getPreferredName()),
READ_POLL_TIMEOUT, ObjectParser.ValueType.STRING);
ImmutableFollowParameters.initParser(PARSER);
PARSER.declareObject(ConstructingObjectParser.constructorArg(), (p, c) -> p.mapStrings(), HEADERS);
}

private final String remoteCluster;
private final ShardId followShardId;
private final ShardId leaderShardId;
private final int maxReadRequestOperationCount;
private final ByteSizeValue maxReadRequestSize;
private final int maxOutstandingReadRequests;
private final int maxWriteRequestOperationCount;
private final ByteSizeValue maxWriteRequestSize;
private final int maxOutstandingWriteRequests;
private final int maxWriteBufferCount;
private final ByteSizeValue maxWriteBufferSize;
private final TimeValue maxRetryDelay;
private final TimeValue readPollTimeout;
private final Map<String, String> headers;

ShardFollowTask(
final String remoteCluster,
final ShardId followShardId,
final ShardId leaderShardId,
final int maxReadRequestOperationCount,
final ByteSizeValue maxReadRequestSize,
final int maxOutstandingReadRequests,
final int maxWriteRequestOperationCount,
final ByteSizeValue maxWriteRequestSize,
final int maxOutstandingReadRequests,
final int maxOutstandingWriteRequests,
final ByteSizeValue maxReadRequestSize,
final ByteSizeValue maxWriteRequestSize,
final int maxWriteBufferCount,
final ByteSizeValue maxWriteBufferSize,
final TimeValue maxRetryDelay,
final TimeValue readPollTimeout,
final Map<String, String> headers) {
super(maxReadRequestOperationCount, maxWriteRequestOperationCount, maxOutstandingReadRequests, maxOutstandingWriteRequests,
maxReadRequestSize, maxWriteRequestSize, maxWriteBufferCount, maxWriteBufferSize, maxRetryDelay, readPollTimeout);
this.remoteCluster = remoteCluster;
this.followShardId = followShardId;
this.leaderShardId = leaderShardId;
this.maxReadRequestOperationCount = maxReadRequestOperationCount;
this.maxReadRequestSize = maxReadRequestSize;
this.maxOutstandingReadRequests = maxOutstandingReadRequests;
this.maxWriteRequestOperationCount = maxWriteRequestOperationCount;
this.maxWriteRequestSize = maxWriteRequestSize;
this.maxOutstandingWriteRequests = maxOutstandingWriteRequests;
this.maxWriteBufferCount = maxWriteBufferCount;
this.maxWriteBufferSize = maxWriteBufferSize;
this.maxRetryDelay = maxRetryDelay;
this.readPollTimeout = readPollTimeout;
this.headers = headers != null ? Collections.unmodifiableMap(headers) : Collections.emptyMap();
}

public ShardFollowTask(StreamInput in) throws IOException {
this.remoteCluster = in.readString();
this.followShardId = ShardId.readShardId(in);
this.leaderShardId = ShardId.readShardId(in);
this.maxReadRequestOperationCount = in.readVInt();
this.maxReadRequestSize = new ByteSizeValue(in);
this.maxOutstandingReadRequests = in.readVInt();
this.maxWriteRequestOperationCount = in.readVInt();
this.maxWriteRequestSize = new ByteSizeValue(in);
this.maxOutstandingWriteRequests = in.readVInt();
this.maxWriteBufferCount = in.readVInt();
this.maxWriteBufferSize = new ByteSizeValue(in);
this.maxRetryDelay = in.readTimeValue();
this.readPollTimeout = in.readTimeValue();
public static ShardFollowTask readFrom(StreamInput in) throws IOException {
return new ShardFollowTask(in.readString(), ShardId.readShardId(in), ShardId.readShardId(in), in);
}

private ShardFollowTask(String remoteCluster, ShardId followShardId, ShardId leaderShardId, StreamInput in) throws IOException {
super(in);
this.remoteCluster = remoteCluster;
this.followShardId = followShardId;
this.leaderShardId = leaderShardId;
this.headers = Collections.unmodifiableMap(in.readMap(StreamInput::readString, StreamInput::readString));
}

Expand All @@ -175,50 +116,6 @@ public ShardId getLeaderShardId() {
return leaderShardId;
}

public int getMaxReadRequestOperationCount() {
return maxReadRequestOperationCount;
}

public int getMaxOutstandingReadRequests() {
return maxOutstandingReadRequests;
}

public int getMaxWriteRequestOperationCount() {
return maxWriteRequestOperationCount;
}

public ByteSizeValue getMaxWriteRequestSize() {
return maxWriteRequestSize;
}

public int getMaxOutstandingWriteRequests() {
return maxOutstandingWriteRequests;
}

public int getMaxWriteBufferCount() {
return maxWriteBufferCount;
}

public ByteSizeValue getMaxWriteBufferSize() {
return maxWriteBufferSize;
}

public ByteSizeValue getMaxReadRequestSize() {
return maxReadRequestSize;
}

public TimeValue getMaxRetryDelay() {
return maxRetryDelay;
}

public TimeValue getReadPollTimeout() {
return readPollTimeout;
}

public String getTaskId() {
return followShardId.getIndex().getUUID() + "-" + followShardId.getId();
}

public Map<String, String> getHeaders() {
return headers;
}
Expand All @@ -233,16 +130,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeString(remoteCluster);
followShardId.writeTo(out);
leaderShardId.writeTo(out);
out.writeVLong(maxReadRequestOperationCount);
maxReadRequestSize.writeTo(out);
out.writeVInt(maxOutstandingReadRequests);
out.writeVLong(maxWriteRequestOperationCount);
maxWriteRequestSize.writeTo(out);
out.writeVInt(maxOutstandingWriteRequests);
out.writeVInt(maxWriteBufferCount);
maxWriteBufferSize.writeTo(out);
out.writeTimeValue(maxRetryDelay);
out.writeTimeValue(readPollTimeout);
super.writeTo(out);
out.writeMap(headers, StreamOutput::writeString, StreamOutput::writeString);
}

Expand All @@ -260,16 +148,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field(LEADER_SHARD_INDEX_FIELD.getPreferredName(), leaderShardId.getIndex().getName());
builder.field(LEADER_SHARD_INDEX_UUID_FIELD.getPreferredName(), leaderShardId.getIndex().getUUID());
builder.field(LEADER_SHARD_SHARDID_FIELD.getPreferredName(), leaderShardId.id());
builder.field(MAX_READ_REQUEST_OPERATION_COUNT.getPreferredName(), maxReadRequestOperationCount);
builder.field(MAX_READ_REQUEST_SIZE.getPreferredName(), maxReadRequestSize.getStringRep());
builder.field(MAX_OUTSTANDING_READ_REQUESTS.getPreferredName(), maxOutstandingReadRequests);
builder.field(MAX_WRITE_REQUEST_OPERATION_COUNT.getPreferredName(), maxWriteRequestOperationCount);
builder.field(MAX_WRITE_REQUEST_SIZE.getPreferredName(), maxWriteRequestSize.getStringRep());
builder.field(MAX_OUTSTANDING_WRITE_REQUESTS.getPreferredName(), maxOutstandingWriteRequests);
builder.field(MAX_WRITE_BUFFER_COUNT.getPreferredName(), maxWriteBufferCount);
builder.field(MAX_WRITE_BUFFER_SIZE.getPreferredName(), maxWriteBufferSize.getStringRep());
builder.field(MAX_RETRY_DELAY.getPreferredName(), maxRetryDelay.getStringRep());
builder.field(READ_POLL_TIMEOUT.getPreferredName(), readPollTimeout.getStringRep());
toXContentFragment(builder);
builder.field(HEADERS.getPreferredName(), headers);
return builder.endObject();
}
Expand All @@ -278,39 +157,21 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (!super.equals(o)) return false;
ShardFollowTask that = (ShardFollowTask) o;
return Objects.equals(remoteCluster, that.remoteCluster) &&
Objects.equals(followShardId, that.followShardId) &&
Objects.equals(leaderShardId, that.leaderShardId) &&
maxReadRequestOperationCount == that.maxReadRequestOperationCount &&
maxReadRequestSize.equals(that.maxReadRequestSize) &&
maxOutstandingReadRequests == that.maxOutstandingReadRequests &&
maxWriteRequestOperationCount == that.maxWriteRequestOperationCount &&
maxWriteRequestSize.equals(that.maxWriteRequestSize) &&
maxOutstandingWriteRequests == that.maxOutstandingWriteRequests &&
maxWriteBufferCount == that.maxWriteBufferCount &&
maxWriteBufferSize.equals(that.maxWriteBufferSize) &&
Objects.equals(maxRetryDelay, that.maxRetryDelay) &&
Objects.equals(readPollTimeout, that.readPollTimeout) &&
Objects.equals(headers, that.headers);
}

@Override
public int hashCode() {
return Objects.hash(
super.hashCode(),
remoteCluster,
followShardId,
leaderShardId,
maxReadRequestOperationCount,
maxReadRequestSize,
maxOutstandingReadRequests,
maxWriteRequestOperationCount,
maxWriteRequestSize,
maxOutstandingWriteRequests,
maxWriteBufferCount,
maxWriteBufferSize,
maxRetryDelay,
readPollTimeout,
headers
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,11 +159,11 @@ static ClusterState innerPut(PutAutoFollowPatternAction.Request request,
request.getLeaderIndexPatterns(),
request.getFollowIndexNamePattern(),
request.getParameters().getMaxReadRequestOperationCount(),
request.getParameters().getMaxReadRequestSize(),
request.getParameters().getMaxOutstandingReadRequests(),
request.getParameters().getMaxWriteRequestOperationCount(),
request.getParameters().getMaxWriteRequestSize(),
request.getParameters().getMaxOutstandingReadRequests(),
request.getParameters().getMaxOutstandingWriteRequests(),
request.getParameters().getMaxReadRequestSize(),
request.getParameters().getMaxWriteRequestSize(),
request.getParameters().getMaxWriteBufferCount(),
request.getParameters().getMaxWriteBufferSize(),
request.getParameters().getMaxRetryDelay(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,11 +324,11 @@ private static ShardFollowTask createShardFollowTask(
new ShardId(followIndexMetadata.getIndex(), shardId),
new ShardId(leaderIndexMetadata.getIndex(), shardId),
maxReadRequestOperationCount,
maxReadRequestSize,
maxOutstandingReadRequests,
maxWriteRequestOperationCount,
maxWriteRequestSize,
maxOutstandingReadRequests,
maxOutstandingWriteRequests,
maxReadRequestSize,
maxWriteRequestSize,
maxWriteBufferCount,
maxWriteBufferSize,
maxRetryDelay,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@ protected AutoFollowMetadata createTestInstance() {
leaderPatterns,
randomAlphaOfLength(4),
randomIntBetween(0, Integer.MAX_VALUE),
new ByteSizeValue(randomNonNegativeLong(), ByteSizeUnit.BYTES),
randomIntBetween(0, Integer.MAX_VALUE),
randomIntBetween(0, Integer.MAX_VALUE),
new ByteSizeValue(randomNonNegativeLong(), ByteSizeUnit.BYTES),
randomIntBetween(0, Integer.MAX_VALUE),
new ByteSizeValue(randomNonNegativeLong(), ByteSizeUnit.BYTES),
new ByteSizeValue(randomNonNegativeLong(), ByteSizeUnit.BYTES),
randomIntBetween(0, Integer.MAX_VALUE),
new ByteSizeValue(randomNonNegativeLong()),
TimeValue.timeValueMillis(500),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ protected GetAutoFollowPatternAction.Response createTestInstance() {
Collections.singletonList(randomAlphaOfLength(4)),
randomAlphaOfLength(4),
randomIntBetween(0, Integer.MAX_VALUE),
new ByteSizeValue(randomNonNegativeLong(), ByteSizeUnit.BYTES),
randomIntBetween(0, Integer.MAX_VALUE),
randomIntBetween(0, Integer.MAX_VALUE),
new ByteSizeValue(randomNonNegativeLong(), ByteSizeUnit.BYTES),
randomIntBetween(0, Integer.MAX_VALUE),
new ByteSizeValue(randomNonNegativeLong(), ByteSizeUnit.BYTES),
new ByteSizeValue(randomNonNegativeLong(), ByteSizeUnit.BYTES),
randomIntBetween(0, Integer.MAX_VALUE),
new ByteSizeValue(randomNonNegativeLong()),
TimeValue.timeValueMillis(500),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,11 @@ private ShardFollowNodeTask createShardFollowTask(int concurrency, TestRun testR
new ShardId("follow_index", "", 0),
new ShardId("leader_index", "", 0),
testRun.maxOperationCount,
TransportResumeFollowAction.DEFAULT_MAX_READ_REQUEST_SIZE,
concurrency,
testRun.maxOperationCount,
TransportResumeFollowAction.DEFAULT_MAX_READ_REQUEST_SIZE,
concurrency,
concurrency,
TransportResumeFollowAction.DEFAULT_MAX_READ_REQUEST_SIZE,
TransportResumeFollowAction.DEFAULT_MAX_READ_REQUEST_SIZE,
10240,
new ByteSizeValue(512, ByteSizeUnit.MB),
TimeValue.timeValueMillis(10),
Expand Down
Loading