Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
adba978
WIP: work on modifying bulk processor
Tim-Brooks Feb 23, 2017
b3ba38a
Pass the proper args around
Tim-Brooks Feb 24, 2017
b7971de
Work on refactoring bulk process and move method reference higher up
Tim-Brooks Feb 24, 2017
171c3eb
Back out method reference change
Tim-Brooks Feb 27, 2017
4161764
Merge remote-tracking branch 'upstream/master' into bulk_processor_me…
Tim-Brooks Feb 27, 2017
4c79012
Merge remote-tracking branch 'upstream/master' into bulk_processor_me…
Tim-Brooks Feb 28, 2017
14c12ca
Merge remote-tracking branch 'upstream/master' into bulk_processor_me…
Tim-Brooks Feb 28, 2017
b636060
Work out relationships between different threadpool options
Tim-Brooks Feb 28, 2017
c133a4f
Work on adding tests for integration with hlrc
Tim-Brooks Mar 1, 2017
ff0948c
Add test for rest high level client using bulk processor
Tim-Brooks Mar 1, 2017
da9739b
Merge remote-tracking branch 'upstream/master' into bulk_processor_me…
Tim-Brooks Mar 20, 2017
8bc6715
Remove unneeded interface
Tim-Brooks Mar 22, 2017
75c18dd
Fix issue with broken tests by overloading ctor
Tim-Brooks Mar 24, 2017
76b0541
Make changes based on review
Tim-Brooks Mar 30, 2017
8c28c2a
Fix issues from review
Tim-Brooks Mar 30, 2017
6f49ef3
Merge remote-tracking branch 'upstream/master' into bulk_processor_me…
Tim-Brooks Mar 30, 2017
a230f4d
Remove settings as required from bulk processor
Tim-Brooks Mar 30, 2017
a796112
Fix issue with integration test
Tim-Brooks Mar 30, 2017
b5c242e
Require threadpool in bulkprocessor builder
Tim-Brooks Mar 31, 2017
7604d74
Move scheduling fn creation into retry
Tim-Brooks Mar 31, 2017
30800fa
Fix issue with creating and stopping threadpool
Tim-Brooks Mar 31, 2017
b70bff2
Make a few changes based on review
Tim-Brooks Apr 3, 2017
3e9aa2d
Make Retry depend on threadpool
Tim-Brooks Apr 3, 2017
80dbe04
Stop threadpool and use generic threadpool
Tim-Brooks Apr 5, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
Expand All @@ -38,6 +39,9 @@
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.VersionType;
Expand All @@ -46,10 +50,13 @@
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptType;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import static java.util.Collections.singletonMap;

Expand Down Expand Up @@ -577,6 +584,102 @@ public void testBulk() throws IOException {
assertTrue(bulkResponse.getTookInMillis() > 0);
assertEquals(nbItems, bulkResponse.getItems().length);

validateBulkResponses(nbItems, errors, bulkResponse, bulkRequest);
}

public void testBulkProcessorIntegration() throws IOException, InterruptedException {
int nbItems = randomIntBetween(10, 100);
boolean[] errors = new boolean[nbItems];

XContentType xContentType = randomFrom(XContentType.JSON, XContentType.SMILE);

AtomicReference<BulkResponse> responseRef = new AtomicReference<>();
AtomicReference<BulkRequest> requestRef = new AtomicReference<>();
AtomicReference<Throwable> error = new AtomicReference<>();

BulkProcessor.Listener listener = new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {

}

@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
responseRef.set(response);
requestRef.set(request);
}

@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
error.set(failure);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't we want to keep track of the request here too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main point is of that callback is to set an exception if one occurs. And if the exception is not null than the test fails.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good to me

}
};

ThreadPool threadPool = new ThreadPool(Settings.builder().put("node.name", getClass().getName()).build());
try(BulkProcessor processor = new BulkProcessor.Builder(highLevelClient()::bulkAsync, listener, threadPool)
.setConcurrentRequests(0)
.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.GB))
.setBulkActions(nbItems + 1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does it matter that the bulk processor flushes all the docs at once on closing? can't we rely on the fact that after close everything is flushed, but it doesn't really matter how many bulk requests are sent? Is it because it makes bulk response validation more complicated as we'd have to accumulate the responses gotten from the listener?

.build()) {
for (int i = 0; i < nbItems; i++) {
String id = String.valueOf(i);
boolean erroneous = randomBoolean();
errors[i] = erroneous;

DocWriteRequest.OpType opType = randomFrom(DocWriteRequest.OpType.values());
if (opType == DocWriteRequest.OpType.DELETE) {
if (erroneous == false) {
assertEquals(RestStatus.CREATED,
highLevelClient().index(new IndexRequest("index", "test", id).source("field", -1)).status());
}
DeleteRequest deleteRequest = new DeleteRequest("index", "test", id);
processor.add(deleteRequest);

} else {
if (opType == DocWriteRequest.OpType.INDEX) {
IndexRequest indexRequest = new IndexRequest("index", "test", id).source(xContentType, "id", i);
if (erroneous) {
indexRequest.version(12L);
}
processor.add(indexRequest);

} else if (opType == DocWriteRequest.OpType.CREATE) {
IndexRequest createRequest = new IndexRequest("index", "test", id).source(xContentType, "id", i).create(true);
if (erroneous) {
assertEquals(RestStatus.CREATED, highLevelClient().index(createRequest).status());
}
processor.add(createRequest);

} else if (opType == DocWriteRequest.OpType.UPDATE) {
UpdateRequest updateRequest = new UpdateRequest("index", "test", id)
.doc(new IndexRequest().source(xContentType, "id", i));
if (erroneous == false) {
assertEquals(RestStatus.CREATED,
highLevelClient().index(new IndexRequest("index", "test", id).source("field", -1)).status());
}
processor.add(updateRequest);
}
}
}
assertNull(responseRef.get());
assertNull(requestRef.get());
}


BulkResponse bulkResponse = responseRef.get();
BulkRequest bulkRequest = requestRef.get();

assertEquals(RestStatus.OK, bulkResponse.status());
assertTrue(bulkResponse.getTookInMillis() > 0);
assertEquals(nbItems, bulkResponse.getItems().length);
assertNull(error.get());

validateBulkResponses(nbItems, errors, bulkResponse, bulkRequest);

terminate(threadPool);
}

private void validateBulkResponses(int nbItems, boolean[] errors, BulkResponse bulkResponse, BulkRequest bulkRequest) {
for (int i = 0; i < nbItems; i++) {
BulkItemResponse bulkItemResponse = bulkResponse.getItems()[i];

Expand Down
85 changes: 44 additions & 41 deletions core/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.action.bulk;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
Expand All @@ -28,17 +29,14 @@
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.Closeable;
import java.util.Objects;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;

/**
* A bulk processor is a thread safe bulk processing class, allowing to easily set when to "flush" a new bulk request
Expand Down Expand Up @@ -66,7 +64,7 @@ public interface Listener {

/**
* Callback after a failed execution of bulk request.
*
* <p>
* Note that in case an instance of <code>InterruptedException</code> is passed, which means that request processing has been
* cancelled externally, the thread's interruption status has been restored prior to calling this method.
*/
Expand All @@ -78,10 +76,10 @@ public interface Listener {
*/
public static class Builder {

private final Client client;
private final BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer;
private final Listener listener;
private final ThreadPool threadPool;

private String name;
private int concurrentRequests = 1;
private int bulkActions = 1000;
private ByteSizeValue bulkSize = new ByteSizeValue(5, ByteSizeUnit.MB);
Expand All @@ -92,17 +90,10 @@ public static class Builder {
* Creates a builder of bulk processor with the client to use and the listener that will be used
* to be notified on the completion of bulk requests.
*/
public Builder(Client client, Listener listener) {
this.client = client;
public Builder(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, Listener listener, ThreadPool threadPool) {
this.consumer = consumer;
this.listener = listener;
}

/**
* Sets an optional name to identify this bulk processor.
*/
public Builder setName(String name) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for the record, I think it is ok to remove setName here. the name was only used to have slightly different logging for the flush daemon thread. We can obtain the same now that we use the thread pool in the same way we do everywhere in this class, through node.name in the settings.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++

this.name = name;
return this;
this.threadPool = threadPool;
}

/**
Expand Down Expand Up @@ -164,23 +155,21 @@ public Builder setBackoffPolicy(BackoffPolicy backoffPolicy) {
* Builds a new bulk processor.
*/
public BulkProcessor build() {
return new BulkProcessor(client, backoffPolicy, listener, name, concurrentRequests, bulkActions, bulkSize, flushInterval);
return new BulkProcessor(consumer, backoffPolicy, listener, concurrentRequests, bulkActions, bulkSize, flushInterval, threadPool);
}
}

public static Builder builder(Client client, Listener listener) {
Objects.requireNonNull(client, "client");
Objects.requireNonNull(listener, "listener");

return new Builder(client, listener);
return new Builder(client::bulk, listener, client.threadPool());
}

private final int bulkActions;
private final long bulkSize;


private final ScheduledThreadPoolExecutor scheduler;
private final ScheduledFuture<?> scheduledFuture;
private final ThreadPool.Cancellable cancellableFlushTask;

private final AtomicLong executionIdGen = new AtomicLong();

Expand All @@ -189,22 +178,21 @@ public static Builder builder(Client client, Listener listener) {

private volatile boolean closed = false;

BulkProcessor(Client client, BackoffPolicy backoffPolicy, Listener listener, @Nullable String name, int concurrentRequests, int bulkActions, ByteSizeValue bulkSize, @Nullable TimeValue flushInterval) {
BulkProcessor(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, BackoffPolicy backoffPolicy, Listener listener,
int concurrentRequests, int bulkActions, ByteSizeValue bulkSize, @Nullable TimeValue flushInterval,
ThreadPool threadPool) {
this.bulkActions = bulkActions;
this.bulkSize = bulkSize.getBytes();

this.bulkRequest = new BulkRequest();
this.bulkRequestHandler = (concurrentRequests == 0) ? BulkRequestHandler.syncHandler(client, backoffPolicy, listener) : BulkRequestHandler.asyncHandler(client, backoffPolicy, listener, concurrentRequests);

if (flushInterval != null) {
this.scheduler = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1, EsExecutors.daemonThreadFactory(client.settings(), (name != null ? "[" + name + "]" : "") + "bulk_processor"));
this.scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
this.scheduler.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(new Flush(), flushInterval.millis(), flushInterval.millis(), TimeUnit.MILLISECONDS);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I may have forgotten, but what has changed here compared to the startFlush method? We don't call daemonThreadFactory as we dropped the name and settings requirement for logging right? I wonder if we should still call that and just provide the standard "bulk_processor" prefix.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the past startFlush could use a ScheduledExecutor that we might have created in the BulkProcessor. Now we require a ThreadPool we do not need this code anymore. As we can just use the thread pool to schedule.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this sounds weird, as we were requiring a Client before, which comes with a ThreadPool implicitly. That means this change could be potentially made outside of this PR right (not asking you to move it, just trying to better understand)?

Copy link
Contributor Author

@Tim-Brooks Tim-Brooks Apr 3, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah this change is not dependent on my work. The "flush" task probably could have been scheduled using the client's ThreadPool. But for some reason the BulkProcessor was creating its own for scheduling flush tasks.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it thank you.

if (concurrentRequests == 0) {
this.bulkRequestHandler = BulkRequestHandler.syncHandler(consumer, backoffPolicy, listener, threadPool);
} else {
this.scheduler = null;
this.scheduledFuture = null;
this.bulkRequestHandler = BulkRequestHandler.asyncHandler(consumer, backoffPolicy, listener, threadPool, concurrentRequests);
}

// Start period flushing task after everything is setup
this.cancellableFlushTask = startFlushTask(flushInterval, threadPool);
}

/**
Expand All @@ -214,20 +202,20 @@ public static Builder builder(Client client, Listener listener) {
public void close() {
try {
awaitClose(0, TimeUnit.NANOSECONDS);
} catch(InterruptedException exc) {
} catch (InterruptedException exc) {
Thread.currentThread().interrupt();
}
}

/**
* Closes the processor. If flushing by time is enabled, then it's shutdown. Any remaining bulk actions are flushed.
*
* <p>
* If concurrent requests are not enabled, returns {@code true} immediately.
* If concurrent requests are enabled, waits for up to the specified timeout for all bulk requests to complete then returns {@code true},
* If the specified waiting time elapses before all bulk requests complete, {@code false} is returned.
*
* @param timeout The maximum time to wait for the bulk requests to complete
* @param unit The time unit of the {@code timeout} argument
* @param unit The time unit of the {@code timeout} argument
* @return {@code true} if all bulk requests completed and {@code false} if the waiting time elapsed before all the bulk requests completed
* @throws InterruptedException If the current thread is interrupted
*/
Expand All @@ -236,10 +224,9 @@ public synchronized boolean awaitClose(long timeout, TimeUnit unit) throws Inter
return true;
}
closed = true;
if (this.scheduledFuture != null) {
FutureUtils.cancel(this.scheduledFuture);
this.scheduler.shutdown();
}

this.cancellableFlushTask.cancel();

if (bulkRequest.numberOfActions() > 0) {
execute();
}
Expand Down Expand Up @@ -301,12 +288,28 @@ public BulkProcessor add(BytesReference data, @Nullable String defaultIndex, @Nu
* Adds the data from the bytes to be processed by the bulk processor
*/
public synchronized BulkProcessor add(BytesReference data, @Nullable String defaultIndex, @Nullable String defaultType,
@Nullable String defaultPipeline, @Nullable Object payload, XContentType xContentType) throws Exception {
@Nullable String defaultPipeline, @Nullable Object payload, XContentType xContentType) throws Exception {
bulkRequest.add(data, defaultIndex, defaultType, null, null, null, defaultPipeline, payload, true, xContentType);
executeIfNeeded();
return this;
}

private ThreadPool.Cancellable startFlushTask(TimeValue flushInterval, ThreadPool threadPool) {
if (flushInterval == null) {
return new ThreadPool.Cancellable() {
@Override
public void cancel() {}

@Override
public boolean isCancelled() {
return true;
}
};
}

return threadPool.scheduleWithFixedDelay(new Flush(), flushInterval, ThreadPool.Names.GENERIC);
}

private void executeIfNeeded() {
ensureOpen();
if (!isOverTheLimit()) {
Expand Down
Loading