2121
2222import org .apache .logging .log4j .Logger ;
2323import org .apache .logging .log4j .message .ParameterizedMessage ;
24+ import org .elasticsearch .Assertions ;
2425import org .elasticsearch .common .collect .Tuple ;
2526import org .elasticsearch .common .unit .TimeValue ;
2627import org .elasticsearch .common .util .concurrent .FutureUtils ;
@@ -150,6 +151,9 @@ synchronized void add(final long waitingForGlobalCheckpoint, final GlobalCheckpo
150151
151152 @ Override
152153 public synchronized void close () throws IOException {
154+ if (closed ) {
155+ assert listeners .isEmpty () : listeners ;
156+ }
153157 closed = true ;
154158 notifyListeners (UNASSIGNED_SEQ_NO , new IndexShardClosedException (shardId ));
155159 }
@@ -188,8 +192,8 @@ synchronized void globalCheckpointUpdated(final long globalCheckpoint) {
188192 }
189193
190194 private void notifyListeners (final long globalCheckpoint , final IndexShardClosedException e ) {
191- assert Thread .holdsLock (this );
192- assert (globalCheckpoint == UNASSIGNED_SEQ_NO && e != null ) || ( globalCheckpoint >= NO_OPS_PERFORMED && e == null );
195+ assert Thread .holdsLock (this ) : Thread . currentThread () ;
196+ assertNotification (globalCheckpoint , e );
193197
194198 final Map <GlobalCheckpointListener , Tuple <Long , ScheduledFuture <?>>> listenersToNotify ;
195199 if (globalCheckpoint != UNASSIGNED_SEQ_NO ) {
@@ -219,6 +223,8 @@ private void notifyListeners(final long globalCheckpoint, final IndexShardClosed
219223 }
220224
221225 private void notifyListener (final GlobalCheckpointListener listener , final long globalCheckpoint , final Exception e ) {
226+ assertNotification (globalCheckpoint , e );
227+
222228 try {
223229 listener .accept (globalCheckpoint , e );
224230 } catch (final Exception caught ) {
@@ -231,10 +237,21 @@ private void notifyListener(final GlobalCheckpointListener listener, final long
231237 } else if (e instanceof IndexShardClosedException ) {
232238 logger .warn ("error notifying global checkpoint listener of closed shard" , caught );
233239 } else {
234- assert e instanceof TimeoutException : e ;
235240 logger .warn ("error notifying global checkpoint listener of timeout" , caught );
236241 }
237242 }
238243 }
239244
245+ private void assertNotification (final long globalCheckpoint , final Exception e ) {
246+ if (Assertions .ENABLED ) {
247+ assert globalCheckpoint >= UNASSIGNED_SEQ_NO : globalCheckpoint ;
248+ if (globalCheckpoint != UNASSIGNED_SEQ_NO ) {
249+ assert e == null : e ;
250+ } else {
251+ assert e != null ;
252+ assert e instanceof IndexShardClosedException || e instanceof TimeoutException : e ;
253+ }
254+ }
255+ }
256+
240257}
0 commit comments