Skip to content

Commit ff495af

Browse files
committed
NoSQL: simplify node allocation
1 parent 202c66c commit ff495af

File tree

4 files changed

+22
-66
lines changed

4 files changed

+22
-66
lines changed

persistence/nosql/nodes/impl/src/main/java/org/apache/polaris/nodes/impl/NodeManagementImpl.java

Lines changed: 17 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import static java.lang.Integer.bitCount;
2424

2525
import com.google.common.annotations.VisibleForTesting;
26+
import com.google.common.primitives.Ints;
2627
import jakarta.annotation.Nonnull;
2728
import jakarta.annotation.PreDestroy;
2829
import jakarta.enterprise.context.ApplicationScoped;
@@ -35,15 +36,14 @@
3536
import java.time.temporal.ChronoUnit;
3637
import java.util.ArrayList;
3738
import java.util.Arrays;
38-
import java.util.BitSet;
3939
import java.util.Objects;
4040
import java.util.Optional;
4141
import java.util.Set;
4242
import java.util.concurrent.ConcurrentHashMap;
4343
import java.util.concurrent.ThreadLocalRandom;
4444
import java.util.concurrent.locks.Lock;
4545
import java.util.concurrent.locks.ReentrantLock;
46-
import java.util.function.IntSupplier;
46+
import java.util.stream.IntStream;
4747
import org.apache.polaris.ids.api.IdGenerator;
4848
import org.apache.polaris.ids.api.IdGeneratorSpec;
4949
import org.apache.polaris.ids.api.ImmutableIdGeneratorSpec;
@@ -255,35 +255,10 @@ private LeaseParams leaseInternal() {
255255

256256
var now = clock.currentInstant();
257257

258-
var checkedIds = new BitSet(numNodeIds); // 128 bytes for 1024 nodes - not too much
259-
var rand = ThreadLocalRandom.current();
260-
261-
var generateNodeId =
262-
(IntSupplier)
263-
() -> {
264-
if (checkedIds.cardinality() == numNodeIds) {
265-
return -1;
266-
}
267-
while (true) {
268-
// The implementation _could_ use BitSet.nextClearBit(), but this is intentionally
269-
// _not_ done here to let two concurrently started nodes not generate the same set
270-
// of "fallback" node IDs to use (after checking with a hash over the network
271-
// interface).
272-
var nodeId = rand.nextInt(numNodeIds);
273-
if (!checkedIds.get(nodeId)) {
274-
checkedIds.set(nodeId);
275-
return nodeId;
276-
}
277-
}
278-
};
279-
280-
var ids = new Integer[CHECK_BATCH_SIZE];
281-
282-
// First, try with a hash of the local network address...
258+
// First, try with a hash of the local network address.
283259
// The node ID in this attempt is somewhat deterministic for the local machine.
284-
// This is not really a necessity, but it can reduce the overall number of ever allocated node
285-
// IDs. It is rather a cosmetic thing, so that a LIST NODES command in the Persistence-REPL
286-
// doesn't flood users after a couple of restarts.
260+
// This is not really necessary, but it can reduce the overall number of ever-allocated node
261+
// IDs.
287262
try {
288263
var hashOverNetworkInterfaces =
289264
NetworkInterface.networkInterfaces()
@@ -309,39 +284,22 @@ private LeaseParams leaseInternal() {
309284

310285
var nodeId = hashOverNetworkInterfaces & (numNodeIds - 1);
311286

312-
var leased = tryLeaseFromCandidates(new Integer[] {nodeId}, now);
287+
var leased = tryLeaseFromCandidates(new int[] {nodeId}, now);
313288
if (leased != null) {
314289
return leased;
315290
}
316291
} catch (SocketException e) {
317292
// ignore
318293
}
319294

320-
// Next, try with randomly picked node IDs and attempt to lease...
321-
for (int i = 0; i < 3 * numNodeIds / CHECK_BATCH_SIZE; i++) {
322-
Arrays.fill(ids, null);
323-
for (int j = 0; j < ids.length; j++) {
324-
var nodeId = generateNodeId.getAsInt();
325-
if (nodeId == -1) {
326-
break;
327-
}
328-
ids[j] = nodeId;
329-
}
295+
// If the lease-attempt using the hash over network-interfaced did not succeed, try with
296+
// randomly picked node IDs.
330297

331-
var leased = tryLeaseFromCandidates(ids, now);
332-
if (leased != null) {
333-
return leased;
334-
}
335-
}
298+
var nodeIdsToCheck = IntStream.range(0, numNodeIds).toArray();
299+
Ints.rotate(nodeIdsToCheck, ThreadLocalRandom.current().nextInt(numNodeIds));
336300

337-
// Next, try again, but using sequential nodeIds starting at a random offset...
338-
var offset = rand.nextInt(numNodeIds);
339301
for (int i = 0; i < numNodeIds; i += CHECK_BATCH_SIZE) {
340-
Arrays.fill(ids, null);
341-
for (int j = 0; j < ids.length; j++) {
342-
var nodeId = (offset + i + j) % numNodeIds;
343-
ids[j] = nodeId;
344-
}
302+
var ids = Arrays.copyOfRange(nodeIdsToCheck, i, i + CHECK_BATCH_SIZE);
345303
var leased = tryLeaseFromCandidates(ids, now);
346304
if (leased != null) {
347305
return leased;
@@ -376,19 +334,16 @@ public void close() {
376334
}
377335
}
378336

379-
private LeaseParams tryLeaseFromCandidates(Integer[] nodeIds, Instant now) {
337+
private LeaseParams tryLeaseFromCandidates(int[] nodeIds, Instant now) {
380338
var nodesFetched = nodeStore.fetchMany(nodeIds);
381339
for (int i = 0; i < nodeIds.length; i++) {
382340
// NodeStore.fetchMany MUST return as many elements as the input array.
383341
var nodeId = nodeIds[i];
384-
if (nodeId != null) {
385-
var id = (int) nodeId;
386-
var node = nodesFetched[i];
387-
if (canLease(node, now)) {
388-
var leased = tryLease(id, node);
389-
if (leased != null) {
390-
return leased;
391-
}
342+
var node = nodesFetched[i];
343+
if (canLease(node, now)) {
344+
var leased = tryLease(nodeId, node);
345+
if (leased != null) {
346+
return leased;
392347
}
393348
}
394349
}

persistence/nosql/nodes/impl/src/test/java/org/apache/polaris/nodes/impl/TestNodeManagementImpl.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,7 @@ public void simple() {
154154
.build())
155155
.build();
156156
try (var mgmt = new NodeManagementImpl(config, clock, new MockNodeStoreFactory(), scheduler)) {
157+
soft.assertThat(mgmt.maxNumberOfNodes()).isEqualTo(config.numNodes());
157158
var lease = mgmt.lease();
158159
soft.assertThat(lease).isNotNull();
159160
soft.assertThat(lease.nodeIdIfValid()).isNotEqualTo(-1);

persistence/nosql/nodes/impl/src/testFixtures/java/org/apache/polaris/nodes/impl/MockNodeStore.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ public class MockNodeStore implements NodeStore {
3333
@Override
3434
public NodeState persist(
3535
int nodeId, Optional<NodeState> expectedNodeState, @Nonnull NodeState newState) {
36-
if (expectedNodeState.isPresent()) {
36+
if (nodeId >= 0 && expectedNodeState.isPresent()) {
3737
var result =
3838
nodeStates.computeIfPresent(
3939
nodeId,
@@ -56,11 +56,11 @@ public NodeState persist(
5656

5757
@Nonnull
5858
@Override
59-
public NodeState[] fetchMany(@Nonnull Integer... nodeIds) {
59+
public NodeState[] fetchMany(@Nonnull int... nodeIds) {
6060
var r = new NodeState[nodeIds.length];
6161
for (int i = 0; i < nodeIds.length; i++) {
6262
var id = nodeIds[i];
63-
if (id != null) {
63+
if (id >= 0) {
6464
r[i] = nodeStates.get(id);
6565
}
6666
}

persistence/nosql/nodes/spi/src/main/java/org/apache/polaris/nodes/spi/NodeStore.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ public interface NodeStore {
2727
Optional<NodeState> fetch(int nodeId);
2828

2929
@Nonnull
30-
NodeState[] fetchMany(@Nonnull Integer... nodeIds);
30+
NodeState[] fetchMany(@Nonnull int... nodeIds);
3131

3232
@Nullable
3333
NodeState persist(int nodeId, Optional<NodeState> expectedNodeState, @Nonnull NodeState newState);

0 commit comments

Comments
 (0)