Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.Settings;
Expand All @@ -42,9 +43,11 @@
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.greaterThan;

@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, numDataNodes = 2)
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, numDataNodes = 2, numClientNodes = 1)
public class WriteMemoryLimitsIT extends ESIntegTestCase {

public static final String INDEX_NAME = "test";

@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()
Expand All @@ -70,13 +73,12 @@ protected int numberOfShards() {
}

public void testWriteBytesAreIncremented() throws Exception {
final String index = "test";
assertAcked(prepareCreate(index, Settings.builder()
assertAcked(prepareCreate(INDEX_NAME, Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)));
ensureGreen(index);
ensureGreen(INDEX_NAME);

IndicesStatsResponse response = client().admin().indices().prepareStats(index).get();
IndicesStatsResponse response = client().admin().indices().prepareStats(INDEX_NAME).get();
String primaryId = Stream.of(response.getShards())
.map(ShardStats::getShardRouting)
.filter(ShardRouting::primary)
Expand All @@ -89,8 +91,10 @@ public void testWriteBytesAreIncremented() throws Exception {
.findAny()
.get()
.currentNodeId();
String primaryName = client().admin().cluster().prepareState().get().getState().nodes().get(primaryId).getName();
String replicaName = client().admin().cluster().prepareState().get().getState().nodes().get(replicaId).getName();
DiscoveryNodes nodes = client().admin().cluster().prepareState().get().getState().nodes();
String primaryName = nodes.get(primaryId).getName();
String replicaName = nodes.get(replicaId).getName();
String coordinatingOnlyNode = nodes.getCoordinatingOnlyNodes().iterator().next().value.getName();

final CountDownLatch replicationSendPointReached = new CountDownLatch(1);
final CountDownLatch latchBlockingReplicationSend = new CountDownLatch(1);
Expand All @@ -117,7 +121,7 @@ public void testWriteBytesAreIncremented() throws Exception {
final BulkRequest bulkRequest = new BulkRequest();
int totalRequestSize = 0;
for (int i = 0; i < 80; ++i) {
IndexRequest request = new IndexRequest(index).id(UUIDs.base64UUID())
IndexRequest request = new IndexRequest(INDEX_NAME).id(UUIDs.base64UUID())
.source(Collections.singletonMap("key", randomAlphaOfLength(50)));
totalRequestSize += request.ramBytesUsed();
assertTrue(request.ramBytesUsed() > request.source().length());
Expand All @@ -128,18 +132,19 @@ public void testWriteBytesAreIncremented() throws Exception {
final long bulkShardRequestSize = totalRequestSize;

try {
final ActionFuture<BulkResponse> successFuture = client(replicaName).bulk(bulkRequest);
final ActionFuture<BulkResponse> successFuture = client(coordinatingOnlyNode).bulk(bulkRequest);
replicationSendPointReached.await();

WriteMemoryLimits primaryWriteLimits = internalCluster().getInstance(WriteMemoryLimits.class, primaryName);
WriteMemoryLimits replicaWriteLimits = internalCluster().getInstance(WriteMemoryLimits.class, replicaName);
WriteMemoryLimits coordinatingWriteLimits = internalCluster().getInstance(WriteMemoryLimits.class, coordinatingOnlyNode);

assertThat(primaryWriteLimits.getCoordinatingBytes(), greaterThan(bulkShardRequestSize));
assertThat(primaryWriteLimits.getPrimaryBytes(), greaterThan(bulkShardRequestSize));
assertEquals(0, primaryWriteLimits.getReplicaBytes());
assertEquals(bulkRequestSize, replicaWriteLimits.getCoordinatingBytes());
assertEquals(0, replicaWriteLimits.getPrimaryBytes());
assertEquals(0, replicaWriteLimits.getReplicaBytes());
assertThat(primaryWriteLimits.getWriteBytes(), greaterThan(bulkShardRequestSize));
assertEquals(0, primaryWriteLimits.getReplicaWriteBytes());
assertEquals(0, replicaWriteLimits.getWriteBytes());
assertEquals(0, replicaWriteLimits.getReplicaWriteBytes());
assertEquals(bulkRequestSize, coordinatingWriteLimits.getWriteBytes());
assertEquals(0, coordinatingWriteLimits.getReplicaWriteBytes());

ThreadPool replicaThreadPool = replicaTransportService.getThreadPool();
// Block the replica Write thread pool
Expand All @@ -162,31 +167,45 @@ public void testWriteBytesAreIncremented() throws Exception {
newActionsSendPointReached.await();
latchBlockingReplicationSend.countDown();

IndexRequest request = new IndexRequest(index).id(UUIDs.base64UUID())
IndexRequest request = new IndexRequest(INDEX_NAME).id(UUIDs.base64UUID())
.source(Collections.singletonMap("key", randomAlphaOfLength(50)));
final BulkRequest secondBulkRequest = new BulkRequest();
secondBulkRequest.add(request);

ActionFuture<BulkResponse> secondFuture = client(replicaName).bulk(secondBulkRequest);
// Use the primary or the replica data node as the coordinating node this time
boolean usePrimaryAsCoordinatingNode = randomBoolean();
final ActionFuture<BulkResponse> secondFuture;
if (usePrimaryAsCoordinatingNode) {
secondFuture = client(primaryName).bulk(secondBulkRequest);
} else {
secondFuture = client(replicaName).bulk(secondBulkRequest);
}

final long secondBulkRequestSize = secondBulkRequest.ramBytesUsed();
final long secondBulkShardRequestSize = request.ramBytesUsed();

assertEquals(bulkRequestSize + secondBulkRequestSize, replicaWriteLimits.getCoordinatingBytes());
assertBusy(() -> assertThat(replicaWriteLimits.getReplicaBytes(),
if (usePrimaryAsCoordinatingNode) {
assertThat(primaryWriteLimits.getWriteBytes(), greaterThan(bulkShardRequestSize + secondBulkRequestSize));
assertEquals(0, replicaWriteLimits.getWriteBytes());
} else {
assertThat(primaryWriteLimits.getWriteBytes(), greaterThan(bulkShardRequestSize));
assertEquals(secondBulkRequestSize, replicaWriteLimits.getWriteBytes());
}
assertEquals(bulkRequestSize, coordinatingWriteLimits.getWriteBytes());
assertBusy(() -> assertThat(replicaWriteLimits.getReplicaWriteBytes(),
greaterThan(bulkShardRequestSize + secondBulkShardRequestSize)));

latchBlockingReplication.countDown();

successFuture.actionGet();
secondFuture.actionGet();

assertEquals(0, primaryWriteLimits.getCoordinatingBytes());
assertEquals(0, primaryWriteLimits.getPrimaryBytes());
assertEquals(0, primaryWriteLimits.getReplicaBytes());
assertEquals(0, replicaWriteLimits.getCoordinatingBytes());
assertEquals(0, replicaWriteLimits.getPrimaryBytes());
assertEquals(0, replicaWriteLimits.getReplicaBytes());
assertEquals(0, primaryWriteLimits.getWriteBytes());
assertEquals(0, primaryWriteLimits.getReplicaWriteBytes());
assertEquals(0, replicaWriteLimits.getWriteBytes());
assertEquals(0, replicaWriteLimits.getReplicaWriteBytes());
assertEquals(0, coordinatingWriteLimits.getWriteBytes());
assertEquals(0, coordinatingWriteLimits.getReplicaWriteBytes());
} finally {
if (replicationSendPointReached.getCount() > 0) {
replicationSendPointReached.countDown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ public static IndexRequest getIndexWriteRequest(DocWriteRequest<?> docWriteReque
@Override
protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<BulkResponse> listener) {
long indexingBytes = bulkRequest.ramBytesUsed();
final Releasable releasable = writeMemoryLimits.markCoordinatingOperationStarted(indexingBytes);
final Releasable releasable = writeMemoryLimits.markWriteOperationStarted(indexingBytes);
final ActionListener<BulkResponse> releasingListener = ActionListener.runBefore(listener, releasable::close);
try {
doInternalExecute(task, bulkRequest, releasingListener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,34 +25,24 @@

public class WriteMemoryLimits {

private final AtomicLong coordinatingBytes = new AtomicLong(0);
private final AtomicLong primaryBytes = new AtomicLong(0);
private final AtomicLong replicaBytes = new AtomicLong(0);
private final AtomicLong writeBytes = new AtomicLong(0);
private final AtomicLong replicaWriteBytes = new AtomicLong(0);

public Releasable markCoordinatingOperationStarted(long bytes) {
coordinatingBytes.addAndGet(bytes);
return () -> coordinatingBytes.getAndAdd(-bytes);
public Releasable markWriteOperationStarted(long bytes) {
writeBytes.addAndGet(bytes);
return () -> writeBytes.getAndAdd(-bytes);
}

public long getCoordinatingBytes() {
return coordinatingBytes.get();
public long getWriteBytes() {
return writeBytes.get();
}

public Releasable markPrimaryOperationStarted(long bytes) {
primaryBytes.addAndGet(bytes);
return () -> primaryBytes.getAndAdd(-bytes);
public Releasable markReplicaWriteStarted(long bytes) {
replicaWriteBytes.getAndAdd(bytes);
return () -> replicaWriteBytes.getAndAdd(-bytes);
}

public long getPrimaryBytes() {
return primaryBytes.get();
}

public Releasable markReplicaOperationStarted(long bytes) {
replicaBytes.getAndAdd(bytes);
return () -> replicaBytes.getAndAdd(-bytes);
}

public long getReplicaBytes() {
return replicaBytes.get();
public long getReplicaWriteBytes() {
return replicaWriteBytes.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ public TransportResyncReplicationAction(Settings settings, TransportService tran
writeMemoryLimits);
}

@Override
protected void doExecute(Task parentTask, ResyncReplicationRequest request, ActionListener<ResyncReplicationResponse> listener) {
assert false : "use TransportResyncReplicationAction#sync";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we make this the default implementation for doExecute in TransportWriteAction when rerouteBypassed() is true? Maybe we can then also rename that method to supportsRerouteAction.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay. This is a little tricky because I have to expose reroute in TransportReplicationAction. But I did that and we can discuss.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eh. Actually I holding off this for the moment because it gets kind of messy. We can talk more about this. I did rename the method.

}

@Override
protected ResyncReplicationResponse newResponseInstance(StreamInput in) throws IOException {
return new ResyncReplicationResponse(in);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ protected Releasable checkOperationLimits(final Request request) {
}

protected void handlePrimaryRequest(final ConcreteShardRequest<Request> request, final TransportChannel channel, final Task task) {
Releasable releasable = checkPrimaryLimits(request.getRequest());
Releasable releasable = checkPrimaryLimits(request.getRequest(), request.sentFromLocalReroute());
ActionListener<Response> listener =
ActionListener.runBefore(new ChannelActionListener<>(channel, transportPrimaryAction, request), releasable::close);

Expand All @@ -296,7 +296,7 @@ protected void handlePrimaryRequest(final ConcreteShardRequest<Request> request,
}
}

protected Releasable checkPrimaryLimits(final Request request) {
protected Releasable checkPrimaryLimits(final Request request, boolean rerouteWasLocal) {
return () -> {};
}

Expand Down Expand Up @@ -371,8 +371,7 @@ void runWithPrimaryShardReference(final PrimaryShardReference primaryShardRefere
DiscoveryNode relocatingNode = clusterState.nodes().get(primary.relocatingNodeId());
transportService.sendRequest(relocatingNode, transportPrimaryAction,
new ConcreteShardRequest<>(primaryRequest.getRequest(), primary.allocationId().getRelocationId(),
primaryRequest.getPrimaryTerm()),
transportOptions,
primaryRequest.getPrimaryTerm()), transportOptions,
new ActionListenerResponseHandler<>(onCompletionListener, reader) {
@Override
public void handleResponse(Response response) {
Expand Down Expand Up @@ -584,7 +583,7 @@ public void onResponse(Releasable releasable) {
Releasables.closeWhileHandlingException(releasable); // release shard operation lock before responding to caller
AsyncReplicaAction.this.onFailure(e);
}));
// TODO: Evaludate if we still need to catch this exception
// TODO: Evaluate if we still need to catch this exception
} catch (Exception e) {
Releasables.closeWhileHandlingException(releasable); // release shard operation lock before responding to caller
AsyncReplicaAction.this.onFailure(e);
Expand Down Expand Up @@ -750,7 +749,7 @@ private void performLocalAction(ClusterState state, ShardRouting primary, Discov
transportPrimaryAction, request.shardId(), request, state.version(), primary.currentNodeId());
}
performAction(node, transportPrimaryAction, true,
new ConcreteShardRequest<>(request, primary.allocationId().getId(), indexMetadata.primaryTerm(primary.id())));
new ConcreteShardRequest<>(request, primary.allocationId().getId(), indexMetadata.primaryTerm(primary.id()), true));
}

private void performRemoteAction(ClusterState state, ShardRouting primary, DiscoveryNode node) {
Expand Down Expand Up @@ -1102,19 +1101,27 @@ public static class ConcreteShardRequest<R extends TransportRequest> extends Tra
private final String targetAllocationID;
private final long primaryTerm;
private final R request;
// Indicates if this primary shard request originated by a reroute on this local node.
private final boolean sentFromLocalReroute;

public ConcreteShardRequest(Writeable.Reader<R> requestReader, StreamInput in) throws IOException {
targetAllocationID = in.readString();
primaryTerm = in.readVLong();
sentFromLocalReroute = false;
request = requestReader.read(in);
}

public ConcreteShardRequest(R request, String targetAllocationID, long primaryTerm) {
this(request, targetAllocationID, primaryTerm, false);
}

public ConcreteShardRequest(R request, String targetAllocationID, long primaryTerm, boolean sentFromLocalReroute) {
Objects.requireNonNull(request);
Objects.requireNonNull(targetAllocationID);
this.request = request;
this.targetAllocationID = targetAllocationID;
this.primaryTerm = primaryTerm;
this.sentFromLocalReroute = sentFromLocalReroute;
}

@Override
Expand Down Expand Up @@ -1143,11 +1150,19 @@ public String getDescription() {

@Override
public void writeTo(StreamOutput out) throws IOException {
// If sentFromLocalReroute is marked true, then this request should just be looped back through
// the local transport. It should never be serialized to be sent over the wire. If it is sent over
// the wire, then it was NOT sent from a local reroute.
assert sentFromLocalReroute == false;
out.writeString(targetAllocationID);
out.writeVLong(primaryTerm);
request.writeTo(out);
}

public boolean sentFromLocalReroute() {
return sentFromLocalReroute;
}

public R getRequest() {
return request;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,18 @@ protected TransportWriteAction(Settings settings, String actionName, TransportSe

@Override
protected Releasable checkOperationLimits(Request request) {
return writeMemoryLimits.markCoordinatingOperationStarted(primaryOperationSize(request));
return writeMemoryLimits.markWriteOperationStarted(primaryOperationSize(request));
}

@Override
protected Releasable checkPrimaryLimits(Request request) {
return writeMemoryLimits.markPrimaryOperationStarted(primaryOperationSize(request));
protected Releasable checkPrimaryLimits(Request request, boolean rerouteWasLocal) {
// If this primary request was submitted by a reroute performed on this local node, we have already
// accounted the bytes.
if (rerouteWasLocal) {
return () -> {};
} else {
return writeMemoryLimits.markWriteOperationStarted(primaryOperationSize(request));
}
}

protected long primaryOperationSize(Request request) {
Expand All @@ -94,7 +100,7 @@ protected long primaryOperationSize(Request request) {

@Override
protected Releasable checkReplicaLimits(ReplicaRequest request) {
return writeMemoryLimits.markReplicaOperationStarted(replicaOperationSize(request));
return writeMemoryLimits.markReplicaWriteStarted(replicaOperationSize(request));
}

protected long replicaOperationSize(ReplicaRequest request) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1164,19 +1164,14 @@ private void assertAllPendingWriteLimitsReleased() throws Exception {
assertBusy(() -> {
for (NodeAndClient nodeAndClient : nodes.values()) {
WriteMemoryLimits writeMemoryLimits = getInstance(WriteMemoryLimits.class, nodeAndClient.name);
final long coordinatingBytes = writeMemoryLimits.getCoordinatingBytes();
if (coordinatingBytes > 0) {
throw new AssertionError("pending coordinating write bytes [" + coordinatingBytes + "] bytes on node ["
final long writeBytes = writeMemoryLimits.getWriteBytes();
if (writeBytes > 0) {
throw new AssertionError("pending write bytes [" + writeBytes + "] bytes on node ["
+ nodeAndClient.name + "].");
}
final long primaryBytes = writeMemoryLimits.getPrimaryBytes();
if (primaryBytes > 0) {
throw new AssertionError("pending primary write bytes [" + coordinatingBytes + "] bytes on node ["
+ nodeAndClient.name + "].");
}
final long replicaBytes = writeMemoryLimits.getReplicaBytes();
if (replicaBytes > 0) {
throw new AssertionError("pending replica write bytes [" + coordinatingBytes + "] bytes on node ["
final long replicaWriteBytes = writeMemoryLimits.getReplicaWriteBytes();
if (replicaWriteBytes > 0) {
throw new AssertionError("pending replica write bytes [" + writeBytes + "] bytes on node ["
+ nodeAndClient.name + "].");
}
}
Expand Down
Loading