Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
64 commits
Select commit Hold shift + click to select a range
d7265e6
Add post indexing hooks
Tim-Brooks Apr 13, 2020
0a782cd
Add post indexing hooks
Tim-Brooks Apr 13, 2020
84279ee
Changes
Tim-Brooks Apr 13, 2020
9ba9a1a
Merge remote-tracking branch 'upstream/master' into indexing_memory_q…
Tim-Brooks Apr 16, 2020
c9ad59c
Changes
Tim-Brooks Apr 16, 2020
d0e1081
Add tests
Tim-Brooks Apr 17, 2020
8a0a560
Changes
Tim-Brooks Apr 17, 2020
08810d6
Merge remote-tracking branch 'upstream/master' into indexing_memory_q…
Tim-Brooks Apr 29, 2020
b0a6568
Changes
Tim-Brooks Apr 29, 2020
dcfb545
Changes
Tim-Brooks Apr 30, 2020
bdf3bd6
Merge remote-tracking branch 'upstream/master' into indexing_memory_q…
Tim-Brooks May 12, 2020
755a6cb
Move
Tim-Brooks May 12, 2020
a6bf50e
Merge remote-tracking branch 'upstream/master' into indexing_memory_q…
Tim-Brooks May 12, 2020
c54ba33
Merge remote-tracking branch 'upstream/master' into indexing_memory_q…
Tim-Brooks May 13, 2020
2ccb5b6
Changes
Tim-Brooks May 13, 2020
524b965
Remove
Tim-Brooks May 13, 2020
de3bed7
Merge remote-tracking branch 'upstream/master' into indexing_memory_q…
Tim-Brooks May 19, 2020
a9afc77
WIP
Tim-Brooks May 19, 2020
832dec0
Changes
Tim-Brooks May 19, 2020
275df87
WIP
Tim-Brooks May 19, 2020
87af556
Changes
Tim-Brooks May 19, 2020
a15fcec
Merge remote-tracking branch 'upstream/master' into indexing_memory_q…
Tim-Brooks May 26, 2020
185c24c
Changes
Tim-Brooks May 26, 2020
f9723be
Merge remote-tracking branch 'upstream/master' into indexing_memory_q…
Tim-Brooks May 27, 2020
7160c1e
Merge remote-tracking branch 'upstream/master' into indexing_memory_q…
Tim-Brooks May 29, 2020
eaa6eaa
Change
Tim-Brooks May 29, 2020
718e343
Changes
Tim-Brooks May 29, 2020
668715e
WIP
Tim-Brooks May 29, 2020
fa52955
Merge remote-tracking branch 'upstream/master' into indexing_memory_q…
Tim-Brooks Jun 1, 2020
bc76b9a
Changes
Tim-Brooks Jun 3, 2020
9615713
Merge remote-tracking branch 'upstream/master' into indexing_memory_q…
Tim-Brooks Jun 3, 2020
2b581b5
Changes
Tim-Brooks Jun 3, 2020
d5aaf0e
Delete
Tim-Brooks Jun 3, 2020
777569a
Move
Tim-Brooks Jun 3, 2020
31459dc
Change
Tim-Brooks Jun 3, 2020
903ed49
Change
Tim-Brooks Jun 3, 2020
18a62cb
Merge remote-tracking branch 'upstream/master' into indexing_memory_q…
Tim-Brooks Jun 3, 2020
dfe5e8e
Changes
Tim-Brooks Jun 3, 2020
7708977
Tests
Tim-Brooks Jun 3, 2020
26dcf33
Merge remote-tracking branch 'upstream/master' into indexing_memory_q…
Tim-Brooks Jun 3, 2020
5295df9
Changes
Tim-Brooks Jun 3, 2020
2227059
Mute tests
Tim-Brooks Jun 3, 2020
1e421f0
Changes
Tim-Brooks Jun 4, 2020
4455203
Merge remote-tracking branch 'upstream/master' into indexing_memory_q…
Tim-Brooks Jun 8, 2020
d461c36
WIP
Tim-Brooks Jun 9, 2020
0cbd78a
Merge remote-tracking branch 'upstream/master' into indexing_memory_q…
Tim-Brooks Jun 9, 2020
f069856
Changes
Tim-Brooks Jun 10, 2020
da15409
Changes
Tim-Brooks Jun 11, 2020
90eee72
Merge remote-tracking branch 'upstream/master' into indexing_memory_q…
Tim-Brooks Jun 11, 2020
ded75bb
Merge remote-tracking branch 'upstream/master' into indexing_memory_q…
Tim-Brooks Jun 15, 2020
bda0bc6
Changes
Tim-Brooks Jun 16, 2020
3733dfc
Compile
Tim-Brooks Jun 16, 2020
e147024
Runbefore
Tim-Brooks Jun 16, 2020
05d2f1f
Merge remote-tracking branch 'upstream/master' into indexing_memory_q…
Tim-Brooks Jun 16, 2020
7267ba9
Changes
Tim-Brooks Jun 16, 2020
36d7cc0
Merge remote-tracking branch 'upstream/master' into indexing_memory_q…
Tim-Brooks Jun 16, 2020
f34aac6
Merge remote-tracking branch 'upstream/master' into indexing_memory_q…
Tim-Brooks Jun 17, 2020
eb8fd42
Changes
Tim-Brooks Jun 18, 2020
ab2f9c2
NPE
Tim-Brooks Jun 18, 2020
6fd1cfa
Merge remote-tracking branch 'upstream/master' into indexing_memory_q…
Tim-Brooks Jun 18, 2020
7f6c033
Changes
Tim-Brooks Jun 18, 2020
7dc01ea
Merge remote-tracking branch 'upstream/master' into indexing_memory_q…
Tim-Brooks Jun 23, 2020
6294455
Merge remote-tracking branch 'upstream/master' into indexing_memory_q…
Tim-Brooks Jun 24, 2020
f2a6c77
Changes
Tim-Brooks Jun 24, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.bulk;

import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalSettingsPlugin;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import java.util.stream.Stream;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.greaterThan;

@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, numDataNodes = 2)
public class WriteMemoryLimitsIT extends ESIntegTestCase {

@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
// Need at least two threads because we are going to block one
.put("thread_pool.write.size", 2)
.build();
}

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(MockTransportService.TestPlugin.class, InternalSettingsPlugin.class);
}

@Override
protected int numberOfReplicas() {
return 1;
}

@Override
protected int numberOfShards() {
return 1;
}

public void testWriteBytesAreIncremented() throws Exception {
final String index = "test";
assertAcked(prepareCreate(index, Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)));
ensureGreen(index);

IndicesStatsResponse response = client().admin().indices().prepareStats(index).get();
String primaryId = Stream.of(response.getShards())
.map(ShardStats::getShardRouting)
.filter(ShardRouting::primary)
.findAny()
.get()
.currentNodeId();
String replicaId = Stream.of(response.getShards())
.map(ShardStats::getShardRouting)
.filter(sr -> sr.primary() == false)
.findAny()
.get()
.currentNodeId();
String primaryName = client().admin().cluster().prepareState().get().getState().nodes().get(primaryId).getName();
String replicaName = client().admin().cluster().prepareState().get().getState().nodes().get(replicaId).getName();

final CountDownLatch replicationSendPointReached = new CountDownLatch(1);
final CountDownLatch latchBlockingReplicationSend = new CountDownLatch(1);
final CountDownLatch newActionsSendPointReached = new CountDownLatch(2);
final CountDownLatch latchBlockingReplication = new CountDownLatch(1);

TransportService primaryService = internalCluster().getInstance(TransportService.class, primaryName);
final MockTransportService primaryTransportService = (MockTransportService) primaryService;
TransportService replicaService = internalCluster().getInstance(TransportService.class, replicaName);
final MockTransportService replicaTransportService = (MockTransportService) replicaService;

primaryTransportService.addSendBehavior((connection, requestId, action, request, options) -> {
if (action.equals(TransportShardBulkAction.ACTION_NAME + "[r]")) {
try {
replicationSendPointReached.countDown();
latchBlockingReplicationSend.await();
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
}
connection.sendRequest(requestId, action, request, options);
});

final BulkRequest bulkRequest = new BulkRequest();
int totalRequestSize = 0;
for (int i = 0; i < 80; ++i) {
IndexRequest request = new IndexRequest(index).id(UUIDs.base64UUID())
.source(Collections.singletonMap("key", randomAlphaOfLength(50)));
totalRequestSize += request.ramBytesUsed();
assertTrue(request.ramBytesUsed() > request.source().length());
bulkRequest.add(request);
}

final long bulkRequestSize = bulkRequest.ramBytesUsed();
final long bulkShardRequestSize = totalRequestSize;

try {
final ActionFuture<BulkResponse> successFuture = client(replicaName).bulk(bulkRequest);
replicationSendPointReached.await();

WriteMemoryLimits primaryWriteLimits = internalCluster().getInstance(WriteMemoryLimits.class, primaryName);
WriteMemoryLimits replicaWriteLimits = internalCluster().getInstance(WriteMemoryLimits.class, replicaName);

assertThat(primaryWriteLimits.getCoordinatingBytes(), greaterThan(bulkShardRequestSize));
assertThat(primaryWriteLimits.getPrimaryBytes(), greaterThan(bulkShardRequestSize));
assertEquals(0, primaryWriteLimits.getReplicaBytes());
assertEquals(bulkRequestSize, replicaWriteLimits.getCoordinatingBytes());
assertEquals(0, replicaWriteLimits.getPrimaryBytes());
assertEquals(0, replicaWriteLimits.getReplicaBytes());

ThreadPool replicaThreadPool = replicaTransportService.getThreadPool();
// Block the replica Write thread pool
replicaThreadPool.executor(ThreadPool.Names.WRITE).execute(() -> {
try {
newActionsSendPointReached.countDown();
latchBlockingReplication.await();
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
});
replicaThreadPool.executor(ThreadPool.Names.WRITE).execute(() -> {
try {
newActionsSendPointReached.countDown();
latchBlockingReplication.await();
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
});
newActionsSendPointReached.await();
latchBlockingReplicationSend.countDown();

IndexRequest request = new IndexRequest(index).id(UUIDs.base64UUID())
.source(Collections.singletonMap("key", randomAlphaOfLength(50)));
final BulkRequest secondBulkRequest = new BulkRequest();
secondBulkRequest.add(request);

ActionFuture<BulkResponse> secondFuture = client(replicaName).bulk(secondBulkRequest);

final long secondBulkRequestSize = secondBulkRequest.ramBytesUsed();
final long secondBulkShardRequestSize = request.ramBytesUsed();

assertEquals(bulkRequestSize + secondBulkRequestSize, replicaWriteLimits.getCoordinatingBytes());
assertBusy(() -> assertThat(replicaWriteLimits.getReplicaBytes(),
greaterThan(bulkShardRequestSize + secondBulkShardRequestSize)));

latchBlockingReplication.countDown();

successFuture.actionGet();
secondFuture.actionGet();

assertEquals(0, primaryWriteLimits.getCoordinatingBytes());
assertEquals(0, primaryWriteLimits.getPrimaryBytes());
assertEquals(0, primaryWriteLimits.getReplicaBytes());
assertEquals(0, replicaWriteLimits.getCoordinatingBytes());
assertEquals(0, replicaWriteLimits.getPrimaryBytes());
assertEquals(0, replicaWriteLimits.getReplicaBytes());
} finally {
if (replicationSendPointReached.getCount() > 0) {
replicationSendPointReached.countDown();
}
while (newActionsSendPointReached.getCount() > 0) {
newActionsSendPointReached.countDown();
}
if (latchBlockingReplicationSend.getCount() > 0) {
latchBlockingReplicationSend.countDown();
}
if (latchBlockingReplication.getCount() > 0) {
latchBlockingReplication.countDown();
}
primaryTransportService.clearAllRules();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,8 @@ protected void shardOperationOnPrimary(Request shardRequest, IndexShard primary,
}

@Override
protected ReplicaResult shardOperationOnReplica(Request shardRequest, IndexShard replica) {
return new ReplicaResult();
protected void shardOperationOnReplica(Request shardRequest, IndexShard replica, ActionListener<ReplicaResult> listener) {
listener.onResponse(new ReplicaResult());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.elasticsearch.action;

import org.apache.lucene.util.Accountable;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.IndicesOptions;
Expand All @@ -40,7 +41,7 @@
* Generic interface to group ActionRequest, which perform writes to a single document
* Action requests implementing this can be part of {@link org.elasticsearch.action.bulk.BulkRequest}
*/
public interface DocWriteRequest<T> extends IndicesRequest {
public interface DocWriteRequest<T> extends IndicesRequest, Accountable {

/**
* Set the index for this request
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,11 @@ protected void shardOperationOnPrimary(final ShardRequest shardRequest, final In
}

@Override
protected ReplicaResult shardOperationOnReplica(final ShardRequest shardRequest, final IndexShard replica) throws IOException {
executeShardOperation(shardRequest, replica);
return new ReplicaResult();
protected void shardOperationOnReplica(ShardRequest shardRequest, IndexShard replica, ActionListener<ReplicaResult> listener) {
ActionListener.completeWith(listener, () -> {
executeShardOperation(shardRequest, replica);
return new ReplicaResult();
});
}

private void executeShardOperation(final ShardRequest request, final IndexShard indexShard) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,12 @@ protected void shardOperationOnPrimary(ShardFlushRequest shardRequest, IndexShar
}

@Override
protected ReplicaResult shardOperationOnReplica(ShardFlushRequest request, IndexShard replica) {
replica.flush(request.getRequest());
logger.trace("{} flush request executed on replica", replica.shardId());
return new ReplicaResult();
protected void shardOperationOnReplica(ShardFlushRequest request, IndexShard replica, ActionListener<ReplicaResult> listener) {
ActionListener.completeWith(listener, () -> {
replica.flush(request.getRequest());
logger.trace("{} flush request executed on replica", replica.shardId());
return new ReplicaResult();
});
}

// TODO: Remove this transition in 9.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,12 @@ protected void shardOperationOnPrimary(BasicReplicationRequest shardRequest, Ind
}

@Override
protected ReplicaResult shardOperationOnReplica(BasicReplicationRequest request, IndexShard replica) {
replica.refresh("api");
logger.trace("{} refresh request executed on replica", replica.shardId());
return new ReplicaResult();
protected void shardOperationOnReplica(BasicReplicationRequest request, IndexShard replica,
ActionListener<ReplicaResult> listener) {
ActionListener.completeWith(listener, () -> {
replica.refresh("api");
logger.trace("{} refresh request executed on replica", replica.shardId());
return new ReplicaResult();
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.elasticsearch.action.bulk;

import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
Expand All @@ -30,7 +32,9 @@
import java.io.IOException;
import java.util.Objects;

public class BulkItemRequest implements Writeable {
public class BulkItemRequest implements Writeable, Accountable {

private static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(BulkItemRequest.class);

private int id;
private DocWriteRequest<?> request;
Expand Down Expand Up @@ -115,4 +119,9 @@ public void writeThin(StreamOutput out) throws IOException {
DocWriteRequest.writeDocumentRequestThin(out, request);
out.writeOptionalWriteable(primaryResponse == null ? null : primaryResponse::writeThin);
}

@Override
public long ramBytesUsed() {
return SHALLOW_SIZE + request.ramBytesUsed();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.elasticsearch.action.bulk;

import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.CompositeIndicesRequest;
Expand Down Expand Up @@ -55,7 +57,9 @@
* Note that we only support refresh on the bulk request not per item.
* @see org.elasticsearch.client.Client#bulk(BulkRequest)
*/
public class BulkRequest extends ActionRequest implements CompositeIndicesRequest, WriteRequest<BulkRequest> {
public class BulkRequest extends ActionRequest implements CompositeIndicesRequest, WriteRequest<BulkRequest>, Accountable {

private static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(BulkRequest.class);

private static final int REQUEST_OVERHEAD = 50;

Expand Down Expand Up @@ -373,4 +377,9 @@ private static String valueOrDefault(String value, String globalDefault) {
}
return value;
}

@Override
public long ramBytesUsed() {
return SHALLOW_SIZE + requests.stream().mapToLong(Accountable::ramBytesUsed).sum();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.elasticsearch.action.bulk;

import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.Version;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.index.IndexRequest;
Expand All @@ -32,12 +34,14 @@
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import java.util.stream.Stream;

public class BulkShardRequest extends ReplicatedWriteRequest<BulkShardRequest> {
public class BulkShardRequest extends ReplicatedWriteRequest<BulkShardRequest> implements Accountable {

public static final Version COMPACT_SHARD_ID_VERSION = Version.V_7_9_0;
private static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(BulkShardRequest.class);

private BulkItemRequest[] items;
private final BulkItemRequest[] items;

public BulkShardRequest(StreamInput in) throws IOException {
super(in);
Expand Down Expand Up @@ -164,4 +168,9 @@ public void onRetry() {
}
}
}

@Override
public long ramBytesUsed() {
return SHALLOW_SIZE + Stream.of(items).mapToLong(Accountable::ramBytesUsed).sum();
}
}
Loading