From f6c88143f2ce5e001a98fda08361767d57cc2667 Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Wed, 17 Feb 2021 19:01:02 +0000 Subject: [PATCH 1/2] Apply GJF + minor polishing in RSocketPool Signed-off-by: Rossen Stoyanchev --- .../io/rsocket/loadbalance/RSocketPool.java | 44 +++++++------------ 1 file changed, 17 insertions(+), 27 deletions(-) diff --git a/rsocket-core/src/main/java/io/rsocket/loadbalance/RSocketPool.java b/rsocket-core/src/main/java/io/rsocket/loadbalance/RSocketPool.java index 514a5d3f4..24ed2933c 100644 --- a/rsocket-core/src/main/java/io/rsocket/loadbalance/RSocketPool.java +++ b/rsocket-core/src/main/java/io/rsocket/loadbalance/RSocketPool.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2020 the original author or authors. + * Copyright 2015-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -39,22 +39,18 @@ class RSocketPool extends ResolvingOperator implements CoreSubscriber> { - final DeferredResolutionRSocket deferredResolutionRSocket = new DeferredResolutionRSocket(this); - final RSocketConnector connector; - final LoadbalanceStrategy loadbalanceStrategy; - - volatile PooledRSocket[] activeSockets; - static final AtomicReferenceFieldUpdater ACTIVE_SOCKETS = AtomicReferenceFieldUpdater.newUpdater( RSocketPool.class, PooledRSocket[].class, "activeSockets"); - static final PooledRSocket[] EMPTY = new PooledRSocket[0]; static final PooledRSocket[] TERMINATED = new PooledRSocket[0]; - - volatile Subscription s; static final AtomicReferenceFieldUpdater S = AtomicReferenceFieldUpdater.newUpdater(RSocketPool.class, Subscription.class, "s"); + final DeferredResolutionRSocket deferredResolutionRSocket = new DeferredResolutionRSocket(this); + final RSocketConnector connector; + final LoadbalanceStrategy loadbalanceStrategy; + volatile PooledRSocket[] activeSockets; + volatile Subscription s; public RSocketPool( RSocketConnector connector, @@ -85,31 +81,27 @@ public void onSubscribe(Subscription s) { } } - /** - * This operation should happen rarely relatively compares the number of the {@link #select()} - * method invocations, therefore it is acceptable to have it algorithmically inefficient. The - * algorithmic complexity of this method is - * - * @param targets set which represents RSocket targets to balance on - */ @Override public void onNext(List targets) { if (isDisposed()) { return; } + // This operation should happen less frequently than calls to select() (which are per request) + // and therefore it is acceptable somewhat less efficient. + PooledRSocket[] previouslyActiveSockets; - PooledRSocket[] activeSockets; PooledRSocket[] inactiveSockets; + PooledRSocket[] socketsToUse; for (; ; ) { - HashMap rSocketSuppliersCopy = new HashMap<>(); + HashMap rSocketSuppliersCopy = new HashMap<>(targets.size()); int j = 0; for (LoadbalanceTarget target : targets) { rSocketSuppliersCopy.put(target, j++); } - // checking intersection of active RSocket with the newly received set + // Intersect current and new list of targets and find the ones to keep vs dispose previouslyActiveSockets = this.activeSockets; inactiveSockets = new PooledRSocket[previouslyActiveSockets.length]; PooledRSocket[] nextActiveSockets = @@ -141,20 +133,18 @@ public void onNext(List targets) { } } - // going though brightly new rsocket + // The remainder are the brand new targets for (LoadbalanceTarget target : rSocketSuppliersCopy.keySet()) { nextActiveSockets[activeSocketsPosition++] = new PooledRSocket(this, this.connector.connect(target.getTransport()), target); } - // shrank to actual length if (activeSocketsPosition == 0) { - activeSockets = EMPTY; + socketsToUse = EMPTY; } else { - activeSockets = Arrays.copyOf(nextActiveSockets, activeSocketsPosition); + socketsToUse = Arrays.copyOf(nextActiveSockets, activeSocketsPosition); } - - if (ACTIVE_SOCKETS.compareAndSet(this, previouslyActiveSockets, activeSockets)) { + if (ACTIVE_SOCKETS.compareAndSet(this, previouslyActiveSockets, socketsToUse)) { break; } } @@ -169,7 +159,7 @@ public void onNext(List targets) { if (isPending()) { // notifies that upstream is resolved - if (activeSockets != EMPTY) { + if (socketsToUse != EMPTY) { //noinspection ConstantConditions complete(this); } From 7c4305a4c6942f08eea5ad24530e3cb1cb26b4ba Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Wed, 17 Feb 2021 19:05:20 +0000 Subject: [PATCH 2/2] Clean up failed loadbalance target Closes gh-958 Signed-off-by: Rossen Stoyanchev --- .../java/io/rsocket/loadbalance/PooledRSocket.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/rsocket-core/src/main/java/io/rsocket/loadbalance/PooledRSocket.java b/rsocket-core/src/main/java/io/rsocket/loadbalance/PooledRSocket.java index 44a9334d3..1e7f09ec4 100644 --- a/rsocket-core/src/main/java/io/rsocket/loadbalance/PooledRSocket.java +++ b/rsocket-core/src/main/java/io/rsocket/loadbalance/PooledRSocket.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2020 the original author or authors. + * Copyright 2015-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -85,8 +85,8 @@ public void onError(Throwable t) { } this.doFinally(); - // terminate upstream which means retryBackoff has exhausted - this.terminate(t); + // terminate upstream (retryBackoff has exhausted) and remove from the parent target list + this.doCleanup(t); } @Override @@ -108,15 +108,15 @@ protected void doSubscribe() { @Override protected void doOnValueResolved(RSocket value) { - value.onClose().subscribe(null, t -> this.doCleanup(), this::doCleanup); + value.onClose().subscribe(null, this::doCleanup, () -> doCleanup(ON_DISPOSE)); } - void doCleanup() { + void doCleanup(Throwable t) { if (isDisposed()) { return; } - this.dispose(); + this.terminate(t); final RSocketPool parent = this.parent; for (; ; ) {