2121
2222import org .apache .logging .log4j .Logger ;
2323import org .apache .logging .log4j .message .ParameterizedMessage ;
24+ import org .elasticsearch .common .collect .Tuple ;
2425import org .elasticsearch .common .unit .TimeValue ;
2526import org .elasticsearch .common .util .concurrent .FutureUtils ;
2627
2728import java .io .Closeable ;
2829import java .io .IOException ;
30+ import java .util .HashMap ;
2931import java .util .LinkedHashMap ;
3032import java .util .Map ;
3133import java .util .Objects ;
3436import java .util .concurrent .ScheduledFuture ;
3537import java .util .concurrent .TimeUnit ;
3638import java .util .concurrent .TimeoutException ;
39+ import java .util .stream .Collectors ;
3740
3841import static org .elasticsearch .index .seqno .SequenceNumbers .NO_OPS_PERFORMED ;
3942import static org .elasticsearch .index .seqno .SequenceNumbers .UNASSIGNED_SEQ_NO ;
@@ -63,7 +66,7 @@ public interface GlobalCheckpointListener {
6366
6467 // guarded by this
6568 private boolean closed ;
66- private Map <GlobalCheckpointListener , ScheduledFuture <?>> listeners ;
69+ private final Map <GlobalCheckpointListener , Tuple < Long , ScheduledFuture <?>>> listeners = new LinkedHashMap <>() ;
6770 private long lastKnownGlobalCheckpoint = UNASSIGNED_SEQ_NO ;
6871
6972 private final ShardId shardId ;
@@ -91,62 +94,56 @@ public interface GlobalCheckpointListener {
9194 }
9295
9396 /**
94- * Add a global checkpoint listener. If the global checkpoint is above the current global checkpoint known to the listener then the
95- * listener will be asynchronously notified on the executor used to construct this collection of global checkpoint listeners. If the
96- * shard is closed then the listener will be asynchronously notified on the executor used to construct this collection of global
97- * checkpoint listeners. The listener will only be notified of at most one event, either the global checkpoint is updated or the shard
98- * is closed. A listener must re-register after one of these events to receive subsequent events. Callers may add a timeout to be
99- * notified after if the timeout elapses. In this case, the listener will be notified with a {@link TimeoutException}. Passing null for
100- * the timeout means no timeout will be associated to the listener.
97+ * Add a global checkpoint listener. If the global checkpoint is equal to or above the global checkpoint the listener is waiting for,
98+ * then the listener will be asynchronously notified on the executor used to construct this collection of global checkpoint listeners.
99+ * If the shard is closed then the listener will be asynchronously notified on the executor used to construct this collection of global
100+ * checkpoint listeners. The listener will only be notified of at most one event, either the global checkpoint is updated above the
101+ * global checkpoint the listener is waiting for, or the shard is closed. A listener must re-register after one of these events to
102+ * receive subsequent events. Callers may add a timeout to be notified after if the timeout elapses. In this case, the listener will be
103+ * notified with a {@link TimeoutException}. Passing null fo the timeout means no timeout will be associated to the listener.
101104 *
102- * @param currentGlobalCheckpoint the current global checkpoint known to the listener
103- * @param listener the listener
104- * @param timeout the listener timeout, or null if no timeout
105+ * @param waitingForGlobalCheckpoint the current global checkpoint known to the listener
106+ * @param listener the listener
107+ * @param timeout the listener timeout, or null if no timeout
105108 */
106- synchronized void add (final long currentGlobalCheckpoint , final GlobalCheckpointListener listener , final TimeValue timeout ) {
109+ synchronized void add (final long waitingForGlobalCheckpoint , final GlobalCheckpointListener listener , final TimeValue timeout ) {
107110 if (closed ) {
108111 executor .execute (() -> notifyListener (listener , UNASSIGNED_SEQ_NO , new IndexShardClosedException (shardId )));
109112 return ;
110113 }
111- if (lastKnownGlobalCheckpoint > currentGlobalCheckpoint ) {
114+ if (lastKnownGlobalCheckpoint >= waitingForGlobalCheckpoint ) {
112115 // notify directly
113116 executor .execute (() -> notifyListener (listener , lastKnownGlobalCheckpoint , null ));
114117 } else {
115- if (listeners == null ) {
116- listeners = new LinkedHashMap <>();
117- }
118118 if (timeout == null ) {
119- listeners .put (listener , null );
119+ listeners .put (listener , Tuple . tuple ( waitingForGlobalCheckpoint , null ) );
120120 } else {
121121 listeners .put (
122122 listener ,
123- scheduler .schedule (
124- () -> {
125- final boolean removed ;
126- synchronized (this ) {
127- /*
128- * Note that the listeners map can be null if a notification nulled out the map reference when
129- * notifying listeners, and then our scheduled execution occurred before we could be cancelled by
130- * the notification. In this case, we would have blocked waiting for access to this critical
131- * section.
132- *
133- * What is more, we know that this listener has a timeout associated with it (otherwise we would
134- * not be here) so the return value from remove being null is an indication that we are not in the
135- * map. This can happen if a notification nulled out the listeners, and then our scheduled execution
136- * occurred before we could be cancelled by the notification, and then another thread added a
137- * listener causing the listeners map reference to be non-null again. In this case, our listener
138- * here would not be in the map and we should not fire the timeout logic.
139- */
140- removed = listeners != null && listeners .remove (listener ) != null ;
141- }
142- if (removed ) {
143- final TimeoutException e = new TimeoutException (timeout .getStringRep ());
144- logger .trace ("global checkpoint listener timed out" , e );
145- executor .execute (() -> notifyListener (listener , UNASSIGNED_SEQ_NO , e ));
146- }
147- },
148- timeout .nanos (),
149- TimeUnit .NANOSECONDS ));
123+ Tuple .tuple (
124+ waitingForGlobalCheckpoint ,
125+ scheduler .schedule (
126+ () -> {
127+ final boolean removed ;
128+ synchronized (this ) {
129+ /*
130+ * We know that this listener has a timeout associated with it (otherwise we would not be
131+ * here) so the future component of the return value from remove being null is an indication
132+ * that we are not in the map. This can happen if a notification collected us into listeners
133+ * to be notified and removed us from the map, and then our scheduled execution occurred
134+ * before we could be cancelled by the notification. In this case, our listener here would
135+ * not be in the map and we should not fire the timeout logic.
136+ */
137+ removed = listeners .remove (listener ).v2 () != null ;
138+ }
139+ if (removed ) {
140+ final TimeoutException e = new TimeoutException (timeout .getStringRep ());
141+ logger .trace ("global checkpoint listener timed out" , e );
142+ executor .execute (() -> notifyListener (listener , UNASSIGNED_SEQ_NO , e ));
143+ }
144+ },
145+ timeout .nanos (),
146+ TimeUnit .NANOSECONDS )));
150147 }
151148 }
152149 }
@@ -163,7 +160,7 @@ public synchronized void close() throws IOException {
163160 * @return the number of listeners pending notification
164161 */
165162 synchronized int pendingListeners () {
166- return listeners == null ? 0 : listeners .size ();
163+ return listeners .size ();
167164 }
168165
169166 /**
@@ -173,7 +170,7 @@ synchronized int pendingListeners() {
173170 * @return a scheduled future representing the timeout future for the listener, otherwise null
174171 */
175172 synchronized ScheduledFuture <?> getTimeoutFuture (final GlobalCheckpointListener listener ) {
176- return listeners .get (listener );
173+ return listeners .get (listener ). v2 () ;
177174 }
178175
179176 /**
@@ -193,22 +190,31 @@ synchronized void globalCheckpointUpdated(final long globalCheckpoint) {
193190 private void notifyListeners (final long globalCheckpoint , final IndexShardClosedException e ) {
194191 assert Thread .holdsLock (this );
195192 assert (globalCheckpoint == UNASSIGNED_SEQ_NO && e != null ) || (globalCheckpoint >= NO_OPS_PERFORMED && e == null );
196- if (listeners != null ) {
197- // capture the current listeners
198- final Map <GlobalCheckpointListener , ScheduledFuture <?>> currentListeners = listeners ;
199- listeners = null ;
200- if (currentListeners != null ) {
201- executor .execute (() -> {
202- for (final Map .Entry <GlobalCheckpointListener , ScheduledFuture <?>> listener : currentListeners .entrySet ()) {
203- /*
204- * We do not want to interrupt any timeouts that fired, these will detect that the listener has been notified and
205- * not trigger the timeout.
206- */
207- FutureUtils .cancel (listener .getValue ());
208- notifyListener (listener .getKey (), globalCheckpoint , e );
209- }
210- });
211- }
193+
194+ final Map <GlobalCheckpointListener , Tuple <Long , ScheduledFuture <?>>> listenersToNotify ;
195+ if (globalCheckpoint != UNASSIGNED_SEQ_NO ) {
196+ listenersToNotify =
197+ listeners
198+ .entrySet ()
199+ .stream ()
200+ .filter (entry -> entry .getValue ().v1 () <= globalCheckpoint )
201+ .collect (Collectors .toMap (Map .Entry ::getKey , Map .Entry ::getValue ));
202+ listenersToNotify .keySet ().forEach (listeners ::remove );
203+ } else {
204+ listenersToNotify = new HashMap <>(listeners );
205+ listeners .clear ();
206+ }
207+ if (listenersToNotify .isEmpty () == false ) {
208+ executor .execute (() ->
209+ listenersToNotify
210+ .forEach ((listener , t ) -> {
211+ /*
212+ * We do not want to interrupt any timeouts that fired, these will detect that the listener has been
213+ * notified and not trigger the timeout.
214+ */
215+ FutureUtils .cancel (t .v2 ());
216+ notifyListener (listener , globalCheckpoint , e );
217+ }));
212218 }
213219 }
214220
0 commit comments