4747import io .fabric8 .kubernetes .client .dsl .NonNamespaceOperation ;
4848import io .fabric8 .kubernetes .client .dsl .Replaceable ;
4949import io .fabric8 .kubernetes .client .dsl .Resource ;
50- import io .fabric8 .kubernetes .client .dsl .Watchable ;
5150import io .fabric8 .kubernetes .client .dsl .base .WaitForConditionWatcher .WatchException ;
5251import io .fabric8 .kubernetes .client .dsl .internal .DefaultOperationInfo ;
5352import io .fabric8 .kubernetes .client .dsl .internal .WatchConnectionManager ;
8079import okhttp3 .HttpUrl ;
8180import okhttp3 .Request ;
8281
82+ import static java .util .concurrent .TimeUnit .NANOSECONDS ;
83+
8384public class BaseOperation <T extends HasMetadata , L extends KubernetesResourceList <T >, D extends Doneable <T >, R extends Resource <T , D >>
8485 extends OperationSupport
8586 implements
@@ -420,13 +421,13 @@ public final T createOrReplace(T... items) {
420421 }
421422
422423 @ Override
423- public FilterWatchListDeletable <T , L , Boolean , Watch , Watcher < T > > withLabels (Map <String , String > labels ) {
424+ public FilterWatchListDeletable <T , L , Boolean , Watch > withLabels (Map <String , String > labels ) {
424425 this .labels .putAll (labels );
425426 return this ;
426427 }
427428
428429 @ Override
429- public FilterWatchListDeletable <T , L , Boolean , Watch , Watcher < T > > withLabelSelector (LabelSelector selector ) {
430+ public FilterWatchListDeletable <T , L , Boolean , Watch > withLabelSelector (LabelSelector selector ) {
430431 Map <String , String > matchLabels = selector .getMatchLabels ();
431432 if (matchLabels != null ) {
432433 this .labels .putAll (matchLabels );
@@ -465,37 +466,37 @@ public FilterWatchListDeletable<T, L, Boolean, Watch, Watcher<T>> withLabelSelec
465466 */
466467 @ Override
467468 @ Deprecated
468- public FilterWatchListDeletable <T , L , Boolean , Watch , Watcher < T > > withoutLabels (Map <String , String > labels ) {
469+ public FilterWatchListDeletable <T , L , Boolean , Watch > withoutLabels (Map <String , String > labels ) {
469470 // Re-use "withoutLabel" to convert values from String to String[]
470471 labels .forEach (this ::withoutLabel );
471472 return this ;
472473 }
473474
474475 @ Override
475- public FilterWatchListDeletable <T , L , Boolean , Watch , Watcher < T > > withLabelIn (String key , String ... values ) {
476+ public FilterWatchListDeletable <T , L , Boolean , Watch > withLabelIn (String key , String ... values ) {
476477 labelsIn .put (key , values );
477478 return this ;
478479 }
479480
480481 @ Override
481- public FilterWatchListDeletable <T , L , Boolean , Watch , Watcher < T > > withLabelNotIn (String key , String ... values ) {
482+ public FilterWatchListDeletable <T , L , Boolean , Watch > withLabelNotIn (String key , String ... values ) {
482483 labelsNotIn .put (key , values );
483484 return this ;
484485 }
485486
486487 @ Override
487- public FilterWatchListDeletable <T , L , Boolean , Watch , Watcher < T > > withLabel (String key , String value ) {
488+ public FilterWatchListDeletable <T , L , Boolean , Watch > withLabel (String key , String value ) {
488489 labels .put (key , value );
489490 return this ;
490491 }
491492
492493 @ Override
493- public FilterWatchListDeletable <T , L , Boolean , Watch , Watcher < T > > withLabel (String key ) {
494+ public FilterWatchListDeletable <T , L , Boolean , Watch > withLabel (String key ) {
494495 return withLabel (key , null );
495496 }
496497
497498 @ Override
498- public FilterWatchListDeletable <T , L , Boolean , Watch , Watcher < T > > withoutLabel (String key , String value ) {
499+ public FilterWatchListDeletable <T , L , Boolean , Watch > withoutLabel (String key , String value ) {
499500 labelsNot .merge (key , new String []{value }, (oldList , newList ) -> {
500501 final String [] concatList = (String []) Array .newInstance (String .class , oldList .length + newList .length );
501502 System .arraycopy (oldList , 0 , concatList , 0 , oldList .length );
@@ -506,24 +507,24 @@ public FilterWatchListDeletable<T, L, Boolean, Watch, Watcher<T>> withoutLabel(S
506507 }
507508
508509 @ Override
509- public FilterWatchListDeletable <T , L , Boolean , Watch , Watcher < T > > withoutLabel (String key ) {
510+ public FilterWatchListDeletable <T , L , Boolean , Watch > withoutLabel (String key ) {
510511 return withoutLabel (key , null );
511512 }
512513
513514 @ Override
514- public FilterWatchListDeletable <T , L , Boolean , Watch , Watcher < T > > withFields (Map <String , String > fields ) {
515+ public FilterWatchListDeletable <T , L , Boolean , Watch > withFields (Map <String , String > fields ) {
515516 this .fields .putAll (fields );
516517 return this ;
517518 }
518519
519520 @ Override
520- public FilterWatchListDeletable <T , L , Boolean , Watch , Watcher < T > > withField (String key , String value ) {
521+ public FilterWatchListDeletable <T , L , Boolean , Watch > withField (String key , String value ) {
521522 fields .put (key , value );
522523 return this ;
523524 }
524525
525526 @ Override
526- public FilterWatchListDeletable <T , L , Boolean , Watch , Watcher < T > > withInvolvedObject (ObjectReference objectReference ) {
527+ public FilterWatchListDeletable <T , L , Boolean , Watch > withInvolvedObject (ObjectReference objectReference ) {
527528 if (objectReference != null ) {
528529 if (objectReference .getName () != null ) {
529530 fields .put (INVOLVED_OBJECT_NAME , objectReference .getName ());
@@ -561,14 +562,14 @@ public FilterWatchListDeletable<T, L, Boolean, Watch, Watcher<T>> withInvolvedOb
561562 */
562563 @ Override
563564 @ Deprecated
564- public FilterWatchListDeletable <T , L , Boolean , Watch , Watcher < T > > withoutFields (Map <String , String > fields ) {
565+ public FilterWatchListDeletable <T , L , Boolean , Watch > withoutFields (Map <String , String > fields ) {
565566 // Re-use "withoutField" to convert values from String to String[]
566567 labels .forEach (this ::withoutField );
567568 return this ;
568569 }
569570
570571 @ Override
571- public FilterWatchListDeletable <T , L , Boolean , Watch , Watcher < T > > withoutField (String key , String value ) {
572+ public FilterWatchListDeletable <T , L , Boolean , Watch > withoutField (String key , String value ) {
572573 fieldsNot .merge (key , new String []{value }, (oldList , newList ) -> {
573574 if (Utils .isNotNullOrEmpty (newList [0 ])) { // Only add new values when not null
574575 final String [] concatList = (String []) Array .newInstance (String .class , oldList .length + newList .length );
@@ -782,8 +783,8 @@ void deleteList() {
782783 }
783784
784785 @ Override
785- public Watchable < Watch , Watcher < T >> withResourceVersion (String resourceVersion ) {
786- return newInstance (context .withResourceVersion (resourceVersion ));
786+ public R withResourceVersion (String resourceVersion ) {
787+ return ( R ) newInstance (context .withResourceVersion (resourceVersion ));
787788 }
788789
789790 public Watch watch (final Watcher <T > watcher ) {
@@ -803,9 +804,9 @@ public Watch watch(String resourceVersion, Watcher<T> watcher) {
803804 public Watch watch (ListOptions options , final Watcher <T > watcher ) {
804805 WatcherToggle <T > watcherToggle = new WatcherToggle <>(watcher , true );
805806 options .setWatch (Boolean .TRUE );
806- WatchConnectionManager watch = null ;
807+ WatchConnectionManager < T , L > watch = null ;
807808 try {
808- watch = new WatchConnectionManager (
809+ watch = new WatchConnectionManager <> (
809810 client ,
810811 this ,
811812 options ,
@@ -839,7 +840,7 @@ public Watch watch(ListOptions options, final Watcher<T> watcher) {
839840 // HTTP GET. This is meant to handle cases like kubectl local proxy which does not support
840841 // websockets. Issue: https://github.com/kubernetes/kubernetes/issues/25126
841842 try {
842- return new WatchHTTPManager (
843+ return new WatchHTTPManager <> (
843844 client ,
844845 this ,
845846 options ,
@@ -1034,7 +1035,7 @@ public OperationInfo forOperationType(String type) {
10341035 }
10351036
10361037 @ Override
1037- public FilterWatchListDeletable <T , L , Boolean , Watch , Watcher < T > > withGracePeriod (long gracePeriodSeconds ) {
1038+ public FilterWatchListDeletable <T , L , Boolean , Watch > withGracePeriod (long gracePeriodSeconds ) {
10381039 return newInstance (context .withGracePeriodSeconds (gracePeriodSeconds ));
10391040 }
10401041
@@ -1114,40 +1115,54 @@ public T waitUntilCondition(Predicate<T> condition, long amount, TimeUnit timeUn
11141115
11151116 private T waitUntilConditionWithRetries (Predicate <T > condition , long timeoutNanos , long backoffMillis )
11161117 throws InterruptedException {
1118+ ListOptions options = null ;
11171119
1118- T item = fromServer ().get ();
1119- if (condition .test (item )) {
1120- return item ;
1120+ if (resourceVersion != null ) {
1121+ options = createListOptions (resourceVersion );
11211122 }
11221123
1123- long end = System .nanoTime () + timeoutNanos ;
1124+ long currentBackOff = backoffMillis ;
1125+ long remainingNanosToWait = timeoutNanos ;
1126+ while (remainingNanosToWait > 0 ) {
11241127
1125- WaitForConditionWatcher < T > watcher = new WaitForConditionWatcher <>( condition );
1126- Watch watch = item == null
1127- ? watch ( new ListOptionsBuilder ()
1128- . withResourceVersion ( null )
1129- . build (), watcher )
1130- : watch ( item . getMetadata (). getResourceVersion (), watcher );
1128+ T item = fromServer (). get ( );
1129+ if ( condition . test ( item )) {
1130+ return item ;
1131+ } else if ( options == null ) {
1132+ options = createListOptions ( getResourceVersion ( item ));
1133+ }
11311134
1132- try {
1133- return watcher .getFuture ()
1134- .get (timeoutNanos , TimeUnit .NANOSECONDS );
1135- } catch (ExecutionException e ) {
1136- if (e .getCause () instanceof WatchException && ((WatchException ) e .getCause ()).isShouldRetry ()) {
1137- watch .close ();
1138- LOG .debug ("retryable watch exception encountered, retrying after {} millis" , backoffMillis , e .getCause ());
1139- Thread .sleep (backoffMillis );
1140- long newTimeout = end - System .nanoTime ();
1141- long newBackoff = (long ) (backoffMillis * watchRetryBackoffMultiplier );
1142- return waitUntilConditionWithRetries (condition , newTimeout , newBackoff );
1135+ final WaitForConditionWatcher <T > watcher = new WaitForConditionWatcher <>(condition );
1136+ final long startTime = System .nanoTime ();
1137+ try (Watch ignored = watch (options , watcher )) {
1138+ return watcher .getFuture ().get (remainingNanosToWait , NANOSECONDS );
1139+ } catch (ExecutionException e ) {
1140+ Throwable cause = e .getCause ();
1141+ if (cause instanceof WatchException && ((WatchException ) cause ).isShouldRetry ()) {
1142+ LOG .debug ("retryable watch exception encountered, retrying after {} millis" , currentBackOff , cause );
1143+ Thread .sleep (currentBackOff );
1144+ currentBackOff *= watchRetryBackoffMultiplier ;
1145+ remainingNanosToWait -= (System .nanoTime () - startTime );
1146+ } else {
1147+ throw KubernetesClientException .launderThrowable (cause );
1148+ }
1149+ } catch (TimeoutException e ) {
1150+ break ;
11431151 }
1144- throw KubernetesClientException .launderThrowable (e .getCause ());
1145- } catch (TimeoutException e ) {
1146- LOG .debug ("ran out of time waiting for watcher, wait condition not met" );
1147- throw new IllegalArgumentException (type .getSimpleName () + " with name:[" + name + "] in namespace:[" + namespace + "] matching condition not found!" );
1148- } finally {
1149- watch .close ();
11501152 }
1153+
1154+ LOG .debug ("ran out of time waiting for watcher, wait condition not met" );
1155+ throw new IllegalArgumentException (type .getSimpleName () + " with name:[" + name + "] in namespace:[" + namespace + "] matching condition not found!" );
1156+ }
1157+
1158+ private static String getResourceVersion (HasMetadata item ) {
1159+ return (item == null ) ? null : item .getMetadata ().getResourceVersion ();
1160+ }
1161+
1162+ private static ListOptions createListOptions (String resourceVersion ) {
1163+ return new ListOptionsBuilder ()
1164+ .withResourceVersion (resourceVersion )
1165+ .build ();
11511166 }
11521167
11531168 public void setType (Class <T > type ) {
0 commit comments