Skip to content

Commit dc9e364

Browse files
authored
Count coordinating and primary bytes as write bytes (#58984)
This is a follow-up to #57573. This commit combines coordinating and primary bytes under the same "write" bucket. Double accounting is prevented by only accounting the bytes at either the reroute phase or the primary phase. TransportBulkAction calls execute directly, so the operations handler is skipped and the bytes are not double accounted.
1 parent 2c43421 commit dc9e364

File tree

9 files changed

+180
-70
lines changed

9 files changed

+180
-70
lines changed

server/src/internalClusterTest/java/org/elasticsearch/action/bulk/WriteMemoryLimitsIT.java

Lines changed: 44 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.elasticsearch.action.admin.indices.stats.ShardStats;
2424
import org.elasticsearch.action.index.IndexRequest;
2525
import org.elasticsearch.cluster.metadata.IndexMetadata;
26+
import org.elasticsearch.cluster.node.DiscoveryNodes;
2627
import org.elasticsearch.cluster.routing.ShardRouting;
2728
import org.elasticsearch.common.UUIDs;
2829
import org.elasticsearch.common.settings.Settings;
@@ -42,9 +43,11 @@
4243
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
4344
import static org.hamcrest.Matchers.greaterThan;
4445

45-
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, numDataNodes = 2)
46+
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, numDataNodes = 2, numClientNodes = 1, transportClientRatio = 0.0D)
4647
public class WriteMemoryLimitsIT extends ESIntegTestCase {
4748

49+
public static final String INDEX_NAME = "test";
50+
4851
@Override
4952
protected Settings nodeSettings(int nodeOrdinal) {
5053
return Settings.builder()
@@ -69,15 +72,13 @@ protected int numberOfShards() {
6972
return 1;
7073
}
7174

72-
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/58983")
7375
public void testWriteBytesAreIncremented() throws Exception {
74-
final String index = "test";
75-
assertAcked(prepareCreate(index, Settings.builder()
76+
assertAcked(prepareCreate(INDEX_NAME, Settings.builder()
7677
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
7778
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)));
78-
ensureGreen(index);
79+
ensureGreen(INDEX_NAME);
7980

80-
IndicesStatsResponse response = client().admin().indices().prepareStats(index).get();
81+
IndicesStatsResponse response = client().admin().indices().prepareStats(INDEX_NAME).get();
8182
String primaryId = Stream.of(response.getShards())
8283
.map(ShardStats::getShardRouting)
8384
.filter(ShardRouting::primary)
@@ -90,8 +91,10 @@ public void testWriteBytesAreIncremented() throws Exception {
9091
.findAny()
9192
.get()
9293
.currentNodeId();
93-
String primaryName = client().admin().cluster().prepareState().get().getState().nodes().get(primaryId).getName();
94-
String replicaName = client().admin().cluster().prepareState().get().getState().nodes().get(replicaId).getName();
94+
DiscoveryNodes nodes = client().admin().cluster().prepareState().get().getState().nodes();
95+
String primaryName = nodes.get(primaryId).getName();
96+
String replicaName = nodes.get(replicaId).getName();
97+
String coordinatingOnlyNode = nodes.getCoordinatingOnlyNodes().iterator().next().value.getName();
9598

9699
final CountDownLatch replicationSendPointReached = new CountDownLatch(1);
97100
final CountDownLatch latchBlockingReplicationSend = new CountDownLatch(1);
@@ -118,7 +121,7 @@ public void testWriteBytesAreIncremented() throws Exception {
118121
final BulkRequest bulkRequest = new BulkRequest();
119122
int totalRequestSize = 0;
120123
for (int i = 0; i < 80; ++i) {
121-
IndexRequest request = new IndexRequest(index).id(UUIDs.base64UUID())
124+
IndexRequest request = new IndexRequest(INDEX_NAME).id(UUIDs.base64UUID())
122125
.source(Collections.singletonMap("key", randomAlphaOfLength(50)));
123126
totalRequestSize += request.ramBytesUsed();
124127
assertTrue(request.ramBytesUsed() > request.source().length());
@@ -129,18 +132,19 @@ public void testWriteBytesAreIncremented() throws Exception {
129132
final long bulkShardRequestSize = totalRequestSize;
130133

131134
try {
132-
final ActionFuture<BulkResponse> successFuture = client(replicaName).bulk(bulkRequest);
135+
final ActionFuture<BulkResponse> successFuture = client(coordinatingOnlyNode).bulk(bulkRequest);
133136
replicationSendPointReached.await();
134137

135138
WriteMemoryLimits primaryWriteLimits = internalCluster().getInstance(WriteMemoryLimits.class, primaryName);
136139
WriteMemoryLimits replicaWriteLimits = internalCluster().getInstance(WriteMemoryLimits.class, replicaName);
140+
WriteMemoryLimits coordinatingWriteLimits = internalCluster().getInstance(WriteMemoryLimits.class, coordinatingOnlyNode);
137141

138-
assertThat(primaryWriteLimits.getCoordinatingBytes(), greaterThan(bulkShardRequestSize));
139-
assertThat(primaryWriteLimits.getPrimaryBytes(), greaterThan(bulkShardRequestSize));
140-
assertEquals(0, primaryWriteLimits.getReplicaBytes());
141-
assertEquals(bulkRequestSize, replicaWriteLimits.getCoordinatingBytes());
142-
assertEquals(0, replicaWriteLimits.getPrimaryBytes());
143-
assertEquals(0, replicaWriteLimits.getReplicaBytes());
142+
assertThat(primaryWriteLimits.getWriteBytes(), greaterThan(bulkShardRequestSize));
143+
assertEquals(0, primaryWriteLimits.getReplicaWriteBytes());
144+
assertEquals(0, replicaWriteLimits.getWriteBytes());
145+
assertEquals(0, replicaWriteLimits.getReplicaWriteBytes());
146+
assertEquals(bulkRequestSize, coordinatingWriteLimits.getWriteBytes());
147+
assertEquals(0, coordinatingWriteLimits.getReplicaWriteBytes());
144148

145149
ThreadPool replicaThreadPool = replicaTransportService.getThreadPool();
146150
// Block the replica Write thread pool
@@ -163,31 +167,45 @@ public void testWriteBytesAreIncremented() throws Exception {
163167
newActionsSendPointReached.await();
164168
latchBlockingReplicationSend.countDown();
165169

166-
IndexRequest request = new IndexRequest(index).id(UUIDs.base64UUID())
170+
IndexRequest request = new IndexRequest(INDEX_NAME).id(UUIDs.base64UUID())
167171
.source(Collections.singletonMap("key", randomAlphaOfLength(50)));
168172
final BulkRequest secondBulkRequest = new BulkRequest();
169173
secondBulkRequest.add(request);
170174

171-
ActionFuture<BulkResponse> secondFuture = client(replicaName).bulk(secondBulkRequest);
175+
// Use the primary or the replica data node as the coordinating node this time
176+
boolean usePrimaryAsCoordinatingNode = randomBoolean();
177+
final ActionFuture<BulkResponse> secondFuture;
178+
if (usePrimaryAsCoordinatingNode) {
179+
secondFuture = client(primaryName).bulk(secondBulkRequest);
180+
} else {
181+
secondFuture = client(replicaName).bulk(secondBulkRequest);
182+
}
172183

173184
final long secondBulkRequestSize = secondBulkRequest.ramBytesUsed();
174185
final long secondBulkShardRequestSize = request.ramBytesUsed();
175186

176-
assertBusy(() -> assertEquals(bulkRequestSize + secondBulkRequestSize, replicaWriteLimits.getCoordinatingBytes()));
177-
assertBusy(() -> assertThat(replicaWriteLimits.getReplicaBytes(),
187+
if (usePrimaryAsCoordinatingNode) {
188+
assertThat(primaryWriteLimits.getWriteBytes(), greaterThan(bulkShardRequestSize + secondBulkRequestSize));
189+
assertEquals(0, replicaWriteLimits.getWriteBytes());
190+
} else {
191+
assertThat(primaryWriteLimits.getWriteBytes(), greaterThan(bulkShardRequestSize));
192+
assertEquals(secondBulkRequestSize, replicaWriteLimits.getWriteBytes());
193+
}
194+
assertEquals(bulkRequestSize, coordinatingWriteLimits.getWriteBytes());
195+
assertBusy(() -> assertThat(replicaWriteLimits.getReplicaWriteBytes(),
178196
greaterThan(bulkShardRequestSize + secondBulkShardRequestSize)));
179197

180198
latchBlockingReplication.countDown();
181199

182200
successFuture.actionGet();
183201
secondFuture.actionGet();
184202

185-
assertEquals(0, primaryWriteLimits.getCoordinatingBytes());
186-
assertEquals(0, primaryWriteLimits.getPrimaryBytes());
187-
assertEquals(0, primaryWriteLimits.getReplicaBytes());
188-
assertEquals(0, replicaWriteLimits.getCoordinatingBytes());
189-
assertEquals(0, replicaWriteLimits.getPrimaryBytes());
190-
assertEquals(0, replicaWriteLimits.getReplicaBytes());
203+
assertEquals(0, primaryWriteLimits.getWriteBytes());
204+
assertEquals(0, primaryWriteLimits.getReplicaWriteBytes());
205+
assertEquals(0, replicaWriteLimits.getWriteBytes());
206+
assertEquals(0, replicaWriteLimits.getReplicaWriteBytes());
207+
assertEquals(0, coordinatingWriteLimits.getWriteBytes());
208+
assertEquals(0, coordinatingWriteLimits.getReplicaWriteBytes());
191209
} finally {
192210
if (replicationSendPointReached.getCount() > 0) {
193211
replicationSendPointReached.countDown();

server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ public static IndexRequest getIndexWriteRequest(DocWriteRequest<?> docWriteReque
166166
@Override
167167
protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<BulkResponse> listener) {
168168
long indexingBytes = bulkRequest.ramBytesUsed();
169-
final Releasable releasable = writeMemoryLimits.markCoordinatingOperationStarted(indexingBytes);
169+
final Releasable releasable = writeMemoryLimits.markWriteOperationStarted(indexingBytes);
170170
final ActionListener<BulkResponse> releasingListener = ActionListener.runBefore(listener, releasable::close);
171171
try {
172172
doInternalExecute(task, bulkRequest, releasingListener);

server/src/main/java/org/elasticsearch/action/bulk/WriteMemoryLimits.java

Lines changed: 12 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -25,34 +25,24 @@
2525

2626
public class WriteMemoryLimits {
2727

28-
private final AtomicLong coordinatingBytes = new AtomicLong(0);
29-
private final AtomicLong primaryBytes = new AtomicLong(0);
30-
private final AtomicLong replicaBytes = new AtomicLong(0);
28+
private final AtomicLong writeBytes = new AtomicLong(0);
29+
private final AtomicLong replicaWriteBytes = new AtomicLong(0);
3130

32-
public Releasable markCoordinatingOperationStarted(long bytes) {
33-
coordinatingBytes.addAndGet(bytes);
34-
return () -> coordinatingBytes.getAndAdd(-bytes);
31+
public Releasable markWriteOperationStarted(long bytes) {
32+
writeBytes.addAndGet(bytes);
33+
return () -> writeBytes.getAndAdd(-bytes);
3534
}
3635

37-
public long getCoordinatingBytes() {
38-
return coordinatingBytes.get();
36+
public long getWriteBytes() {
37+
return writeBytes.get();
3938
}
4039

41-
public Releasable markPrimaryOperationStarted(long bytes) {
42-
primaryBytes.addAndGet(bytes);
43-
return () -> primaryBytes.getAndAdd(-bytes);
40+
public Releasable markReplicaWriteStarted(long bytes) {
41+
replicaWriteBytes.getAndAdd(bytes);
42+
return () -> replicaWriteBytes.getAndAdd(-bytes);
4443
}
4544

46-
public long getPrimaryBytes() {
47-
return primaryBytes.get();
48-
}
49-
50-
public Releasable markReplicaOperationStarted(long bytes) {
51-
replicaBytes.getAndAdd(bytes);
52-
return () -> replicaBytes.getAndAdd(-bytes);
53-
}
54-
55-
public long getReplicaBytes() {
56-
return replicaBytes.get();
45+
public long getReplicaWriteBytes() {
46+
return replicaWriteBytes.get();
5747
}
5848
}

server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,11 @@ public TransportResyncReplicationAction(Settings settings, TransportService tran
6464
writeMemoryLimits);
6565
}
6666

67+
@Override
68+
protected void doExecute(Task parentTask, ResyncReplicationRequest request, ActionListener<ResyncReplicationResponse> listener) {
69+
assert false : "use TransportResyncReplicationAction#sync";
70+
}
71+
6772
@Override
6873
protected ResyncReplicationResponse newResponseInstance(StreamInput in) throws IOException {
6974
return new ResyncReplicationResponse(in);

server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -286,7 +286,7 @@ protected Releasable checkOperationLimits(final Request request) {
286286
}
287287

288288
protected void handlePrimaryRequest(final ConcreteShardRequest<Request> request, final TransportChannel channel, final Task task) {
289-
Releasable releasable = checkPrimaryLimits(request.getRequest());
289+
Releasable releasable = checkPrimaryLimits(request.getRequest(), request.sentFromLocalReroute());
290290
ActionListener<Response> listener =
291291
ActionListener.runBefore(new ChannelActionListener<>(channel, transportPrimaryAction, request), releasable::close);
292292

@@ -297,7 +297,7 @@ protected void handlePrimaryRequest(final ConcreteShardRequest<Request> request,
297297
}
298298
}
299299

300-
protected Releasable checkPrimaryLimits(final Request request) {
300+
protected Releasable checkPrimaryLimits(final Request request, boolean rerouteWasLocal) {
301301
return () -> {};
302302
}
303303

@@ -372,8 +372,7 @@ void runWithPrimaryShardReference(final PrimaryShardReference primaryShardRefere
372372
DiscoveryNode relocatingNode = clusterState.nodes().get(primary.relocatingNodeId());
373373
transportService.sendRequest(relocatingNode, transportPrimaryAction,
374374
new ConcreteShardRequest<>(primaryRequest.getRequest(), primary.allocationId().getRelocationId(),
375-
primaryRequest.getPrimaryTerm()),
376-
transportOptions,
375+
primaryRequest.getPrimaryTerm()), transportOptions,
377376
new ActionListenerResponseHandler<Response>(onCompletionListener, reader) {
378377
@Override
379378
public void handleResponse(Response response) {
@@ -585,7 +584,7 @@ public void onResponse(Releasable releasable) {
585584
Releasables.closeWhileHandlingException(releasable); // release shard operation lock before responding to caller
586585
AsyncReplicaAction.this.onFailure(e);
587586
}));
588-
// TODO: Evaludate if we still need to catch this exception
587+
// TODO: Evaluate if we still need to catch this exception
589588
} catch (Exception e) {
590589
Releasables.closeWhileHandlingException(releasable); // release shard operation lock before responding to caller
591590
AsyncReplicaAction.this.onFailure(e);
@@ -751,7 +750,7 @@ private void performLocalAction(ClusterState state, ShardRouting primary, Discov
751750
transportPrimaryAction, request.shardId(), request, state.version(), primary.currentNodeId());
752751
}
753752
performAction(node, transportPrimaryAction, true,
754-
new ConcreteShardRequest<>(request, primary.allocationId().getId(), indexMetadata.primaryTerm(primary.id())));
753+
new ConcreteShardRequest<>(request, primary.allocationId().getId(), indexMetadata.primaryTerm(primary.id()), true));
755754
}
756755

757756
private void performRemoteAction(ClusterState state, ShardRouting primary, DiscoveryNode node) {
@@ -1103,19 +1102,27 @@ public static class ConcreteShardRequest<R extends TransportRequest> extends Tra
11031102
private final String targetAllocationID;
11041103
private final long primaryTerm;
11051104
private final R request;
1105+
// Indicates if this primary shard request originated by a reroute on this local node.
1106+
private final boolean sentFromLocalReroute;
11061107

11071108
public ConcreteShardRequest(Writeable.Reader<R> requestReader, StreamInput in) throws IOException {
11081109
targetAllocationID = in.readString();
11091110
primaryTerm = in.readVLong();
1111+
sentFromLocalReroute = false;
11101112
request = requestReader.read(in);
11111113
}
11121114

11131115
public ConcreteShardRequest(R request, String targetAllocationID, long primaryTerm) {
1116+
this(request, targetAllocationID, primaryTerm, false);
1117+
}
1118+
1119+
public ConcreteShardRequest(R request, String targetAllocationID, long primaryTerm, boolean sentFromLocalReroute) {
11141120
Objects.requireNonNull(request);
11151121
Objects.requireNonNull(targetAllocationID);
11161122
this.request = request;
11171123
this.targetAllocationID = targetAllocationID;
11181124
this.primaryTerm = primaryTerm;
1125+
this.sentFromLocalReroute = sentFromLocalReroute;
11191126
}
11201127

11211128
@Override
@@ -1144,11 +1151,19 @@ public String getDescription() {
11441151

11451152
@Override
11461153
public void writeTo(StreamOutput out) throws IOException {
1154+
// If sentFromLocalReroute is marked true, then this request should just be looped back through
1155+
// the local transport. It should never be serialized to be sent over the wire. If it is sent over
1156+
// the wire, then it was NOT sent from a local reroute.
1157+
assert sentFromLocalReroute == false;
11471158
out.writeString(targetAllocationID);
11481159
out.writeVLong(primaryTerm);
11491160
request.writeTo(out);
11501161
}
11511162

1163+
public boolean sentFromLocalReroute() {
1164+
return sentFromLocalReroute;
1165+
}
1166+
11521167
public R getRequest() {
11531168
return request;
11541169
}

server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -80,12 +80,18 @@ protected TransportWriteAction(Settings settings, String actionName, TransportSe
8080

8181
@Override
8282
protected Releasable checkOperationLimits(Request request) {
83-
return writeMemoryLimits.markCoordinatingOperationStarted(primaryOperationSize(request));
83+
return writeMemoryLimits.markWriteOperationStarted(primaryOperationSize(request));
8484
}
8585

8686
@Override
87-
protected Releasable checkPrimaryLimits(Request request) {
88-
return writeMemoryLimits.markPrimaryOperationStarted(primaryOperationSize(request));
87+
protected Releasable checkPrimaryLimits(Request request, boolean rerouteWasLocal) {
88+
// If this primary request was submitted by a reroute performed on this local node, we have already
89+
// accounted the bytes.
90+
if (rerouteWasLocal) {
91+
return () -> {};
92+
} else {
93+
return writeMemoryLimits.markWriteOperationStarted(primaryOperationSize(request));
94+
}
8995
}
9096

9197
protected long primaryOperationSize(Request request) {
@@ -94,7 +100,7 @@ protected long primaryOperationSize(Request request) {
94100

95101
@Override
96102
protected Releasable checkReplicaLimits(ReplicaRequest request) {
97-
return writeMemoryLimits.markReplicaOperationStarted(replicaOperationSize(request));
103+
return writeMemoryLimits.markReplicaWriteStarted(replicaOperationSize(request));
98104
}
99105

100106
protected long replicaOperationSize(ReplicaRequest request) {

test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1349,19 +1349,14 @@ private void assertAllPendingWriteLimitsReleased() throws Exception {
13491349
assertBusy(() -> {
13501350
for (NodeAndClient nodeAndClient : nodes.values()) {
13511351
WriteMemoryLimits writeMemoryLimits = getInstance(WriteMemoryLimits.class, nodeAndClient.name);
1352-
final long coordinatingBytes = writeMemoryLimits.getCoordinatingBytes();
1353-
if (coordinatingBytes > 0) {
1354-
throw new AssertionError("pending coordinating write bytes [" + coordinatingBytes + "] bytes on node ["
1352+
final long writeBytes = writeMemoryLimits.getWriteBytes();
1353+
if (writeBytes > 0) {
1354+
throw new AssertionError("pending write bytes [" + writeBytes + "] bytes on node ["
13551355
+ nodeAndClient.name + "].");
13561356
}
1357-
final long primaryBytes = writeMemoryLimits.getPrimaryBytes();
1358-
if (primaryBytes > 0) {
1359-
throw new AssertionError("pending primary write bytes [" + coordinatingBytes + "] bytes on node ["
1360-
+ nodeAndClient.name + "].");
1361-
}
1362-
final long replicaBytes = writeMemoryLimits.getReplicaBytes();
1363-
if (replicaBytes > 0) {
1364-
throw new AssertionError("pending replica write bytes [" + coordinatingBytes + "] bytes on node ["
1357+
final long replicaWriteBytes = writeMemoryLimits.getReplicaWriteBytes();
1358+
if (replicaWriteBytes > 0) {
1359+
throw new AssertionError("pending replica write bytes [" + writeBytes + "] bytes on node ["
13651360
+ nodeAndClient.name + "].");
13661361
}
13671362
}

0 commit comments

Comments
 (0)