2626import org .elasticsearch .client .Client ;
2727import org .elasticsearch .common .Nullable ;
2828import org .elasticsearch .common .bytes .BytesReference ;
29+ import org .elasticsearch .common .collect .Tuple ;
2930import org .elasticsearch .common .settings .Settings ;
3031import org .elasticsearch .common .unit .ByteSizeUnit ;
3132import org .elasticsearch .common .unit .ByteSizeValue ;
3940import java .util .concurrent .ScheduledThreadPoolExecutor ;
4041import java .util .concurrent .TimeUnit ;
4142import java .util .concurrent .atomic .AtomicLong ;
43+ import java .util .concurrent .locks .ReentrantLock ;
4244import java .util .function .BiConsumer ;
4345import java .util .function .Supplier ;
4446
@@ -225,6 +227,7 @@ private static Scheduler buildScheduler(ScheduledThreadPoolExecutor scheduledThr
225227 private final Runnable onClose ;
226228
227229 private volatile boolean closed = false ;
230+ private final ReentrantLock lock = new ReentrantLock ();
228231
229232 BulkProcessor (BiConsumer <BulkRequest , ActionListener <BulkResponse >> consumer , BackoffPolicy backoffPolicy , Listener listener ,
230233 int concurrentRequests , int bulkActions , ByteSizeValue bulkSize , @ Nullable TimeValue flushInterval ,
@@ -264,21 +267,26 @@ public void close() {
264267 * completed
265268 * @throws InterruptedException If the current thread is interrupted
266269 */
267- public synchronized boolean awaitClose (long timeout , TimeUnit unit ) throws InterruptedException {
268- if (closed ) {
269- return true ;
270- }
271- closed = true ;
270+ public boolean awaitClose (long timeout , TimeUnit unit ) throws InterruptedException {
271+ lock .lock ();
272+ try {
273+ if (closed ) {
274+ return true ;
275+ }
276+ closed = true ;
272277
273- this .cancellableFlushTask .cancel ();
278+ this .cancellableFlushTask .cancel ();
274279
275- if (bulkRequest .numberOfActions () > 0 ) {
276- execute ();
277- }
278- try {
279- return this .bulkRequestHandler .awaitClose (timeout , unit );
280+ if (bulkRequest .numberOfActions () > 0 ) {
281+ execute ();
282+ }
283+ try {
284+ return this .bulkRequestHandler .awaitClose (timeout , unit );
285+ } finally {
286+ onClose .run ();
287+ }
280288 } finally {
281- onClose . run ();
289+ lock . unlock ();
282290 }
283291 }
284292
@@ -315,10 +323,22 @@ protected void ensureOpen() {
315323 }
316324 }
317325
318- private synchronized void internalAdd (DocWriteRequest <?> request ) {
319- ensureOpen ();
320- bulkRequest .add (request );
321- executeIfNeeded ();
326+ private void internalAdd (DocWriteRequest <?> request ) {
327+ //bulkRequest and instance swapping is not threadsafe, so execute the mutations under a lock.
328+ //once the bulk request is ready to be shipped swap the instance reference unlock and send the local reference to the handler.
329+ Tuple <BulkRequest , Long > bulkRequestToExecute = null ;
330+ lock .lock ();
331+ try {
332+ ensureOpen ();
333+ bulkRequest .add (request );
334+ bulkRequestToExecute = newBulkRequestIfNeeded ();
335+ } finally {
336+ lock .unlock ();
337+ }
338+ //execute sending the local reference outside the lock to allow handler to control the concurrency via it's configuration.
339+ if (bulkRequestToExecute != null ) {
340+ execute (bulkRequestToExecute .v1 (), bulkRequestToExecute .v2 ());
341+ }
322342 }
323343
324344 /**
@@ -332,11 +352,23 @@ public BulkProcessor add(BytesReference data, @Nullable String defaultIndex, @Nu
332352 /**
333353 * Adds the data from the bytes to be processed by the bulk processor
334354 */
335- public synchronized BulkProcessor add (BytesReference data , @ Nullable String defaultIndex , @ Nullable String defaultType ,
355+ public BulkProcessor add (BytesReference data , @ Nullable String defaultIndex , @ Nullable String defaultType ,
336356 @ Nullable String defaultPipeline ,
337357 XContentType xContentType ) throws Exception {
338- bulkRequest .add (data , defaultIndex , defaultType , null , null , defaultPipeline , true , xContentType );
339- executeIfNeeded ();
358+ Tuple <BulkRequest , Long > bulkRequestToExecute = null ;
359+ lock .lock ();
360+ try {
361+ ensureOpen ();
362+ bulkRequest .add (data , defaultIndex , defaultType , null , null , defaultPipeline ,
363+ true , xContentType );
364+ bulkRequestToExecute = newBulkRequestIfNeeded ();
365+ } finally {
366+ lock .unlock ();
367+ }
368+
369+ if (bulkRequestToExecute != null ) {
370+ execute (bulkRequestToExecute .v1 (), bulkRequestToExecute .v2 ());
371+ }
340372 return this ;
341373 }
342374
@@ -358,23 +390,32 @@ public boolean isCancelled() {
358390 return scheduler .scheduleWithFixedDelay (flushRunnable , flushInterval , ThreadPool .Names .GENERIC );
359391 }
360392
361- private void executeIfNeeded () {
393+ // needs to be executed under a lock
394+ private Tuple <BulkRequest ,Long > newBulkRequestIfNeeded (){
362395 ensureOpen ();
363396 if (!isOverTheLimit ()) {
364- return ;
397+ return null ;
365398 }
366- execute ();
399+ final BulkRequest bulkRequest = this .bulkRequest ;
400+ this .bulkRequest = bulkRequestSupplier .get ();
401+ return new Tuple <>(bulkRequest ,executionIdGen .incrementAndGet ()) ;
402+ }
403+
404+ // may be executed without a lock
405+ private void execute (BulkRequest bulkRequest , long executionId ){
406+ this .bulkRequestHandler .execute (bulkRequest , executionId );
367407 }
368408
369- // (currently) needs to be executed under a lock
409+ // needs to be executed under a lock
370410 private void execute () {
371411 final BulkRequest bulkRequest = this .bulkRequest ;
372412 final long executionId = executionIdGen .incrementAndGet ();
373413
374414 this .bulkRequest = bulkRequestSupplier .get ();
375- this . bulkRequestHandler . execute (bulkRequest , executionId );
415+ execute (bulkRequest , executionId );
376416 }
377417
418+ // needs to be executed under a lock
378419 private boolean isOverTheLimit () {
379420 if (bulkActions != -1 && bulkRequest .numberOfActions () >= bulkActions ) {
380421 return true ;
@@ -388,25 +429,32 @@ private boolean isOverTheLimit() {
388429 /**
389430 * Flush pending delete or index requests.
390431 */
391- public synchronized void flush () {
392- ensureOpen ();
393- if (bulkRequest .numberOfActions () > 0 ) {
394- execute ();
432+ public void flush () {
433+ lock .lock ();
434+ try {
435+ ensureOpen ();
436+ if (bulkRequest .numberOfActions () > 0 ) {
437+ execute ();
438+ }
439+ } finally {
440+ lock .unlock ();
395441 }
396442 }
397443
398444 class Flush implements Runnable {
399-
400445 @ Override
401446 public void run () {
402- synchronized (BulkProcessor .this ) {
447+ lock .lock ();
448+ try {
403449 if (closed ) {
404450 return ;
405451 }
406452 if (bulkRequest .numberOfActions () == 0 ) {
407453 return ;
408454 }
409455 execute ();
456+ } finally {
457+ lock .unlock ();
410458 }
411459 }
412460 }
0 commit comments