Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 36 additions & 17 deletions src/main/java/redis/clients/jedis/JedisClusterInfoCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;

import java.util.concurrent.Executors;
Expand Down Expand Up @@ -245,8 +246,7 @@ private void discoverClusterSlots(Connection jedis) {
}
w.lock();
try {
Arrays.fill(slots, null);
Arrays.fill(slotNodes, null);
resetSlots();
if (clientSideCache != null) {
clientSideCache.flush();
}
Expand Down Expand Up @@ -442,23 +442,41 @@ public List<ConnectionPool> getShuffledNodesPool() {
public void reset() {
w.lock();
try {
for (ConnectionPool pool : nodes.values()) {
try {
if (pool != null) {
pool.destroy();
}
} catch (RuntimeException e) {
// pass
}
}
nodes.clear();
Arrays.fill(slots, null);
Arrays.fill(slotNodes, null);
resetNodes();
resetSlots();
} finally {
w.unlock();
}
}

private void resetSlots() {
Arrays.fill(slots, null);
Arrays.fill(slotNodes, null);
resetReplicaSlots();
}

private void resetReplicaSlots() {
if (replicaSlots == null) {
return;
}

Arrays.stream(replicaSlots).filter(Objects::nonNull).forEach(List::clear);
Arrays.fill(replicaSlots, null);
}

private void resetNodes() {
for (ConnectionPool pool : nodes.values()) {
try {
if (pool != null) {
pool.destroy();
}
} catch (RuntimeException e) {
// pass
}
}
nodes.clear();
}

public void close() {
reset();
if (topologyRefreshExecutor != null) {
Expand All @@ -468,13 +486,14 @@ public void close() {
}

public static String getNodeKey(HostAndPort hnp) {
//return hnp.getHost() + ":" + hnp.getPort();
return hnp.toString();
}

@SuppressWarnings("unchecked")
private List<Object> executeClusterSlots(Connection jedis) {
jedis.sendCommand(Protocol.Command.CLUSTER, "SLOTS");
return jedis.getObjectMultiBulkReply();
CommandArguments clusterSlotsCmd = new ClusterCommandArguments(Protocol.Command.CLUSTER).add(
"SLOTS");
return (List<Object>) jedis.executeCommand(clusterSlotsCmd);
}

private List<Integer> getAssignedSlotArray(List<Object> slotInfo) {
Expand Down
256 changes: 256 additions & 0 deletions src/test/java/redis/clients/jedis/JedisClusterInfoCacheTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,256 @@
package redis.clients.jedis;

import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasItem;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.Mockito.when;
import static redis.clients.jedis.Protocol.Command.CLUSTER;
import static redis.clients.jedis.util.CommandArgumentMatchers.commandWithArgs;

@Tag("unit")
@ExtendWith(MockitoExtension.class)
public class JedisClusterInfoCacheTest {

private static final HostAndPort MASTER_HOST = new HostAndPort("127.0.0.1", 7000);
private static final HostAndPort REPLICA_1_HOST = new HostAndPort("127.0.0.1", 7001);
private static final HostAndPort REPLICA_2_HOST = new HostAndPort("127.0.0.1", 7002);
private static final int TEST_SLOT = 0;

@Mock
private Connection mockConnection;

@Test
public void testReplicaNodeRemovalAndRediscovery() {
// Create client config with read-only replicas enabled
JedisClientConfig clientConfig = DefaultJedisClientConfig.builder()
.readOnlyForRedisClusterReplicas().build();

Set<HostAndPort> startNodes = new HashSet<>();
startNodes.add(MASTER_HOST);

JedisClusterInfoCache cache = new JedisClusterInfoCache(clientConfig, startNodes);

// Mock the cluster slots responses
when(mockConnection.executeCommand(argThat(commandWithArgs(CLUSTER, "SLOTS")))).thenReturn(
masterReplicaSlotsResponse()).thenReturn(masterOnlySlotsResponse())
.thenReturn(masterReplica2SlotsResponse());

// Initial discovery with one master and one replica (replica-1)
cache.discoverClusterNodesAndSlots(mockConnection);
assertMasterNodeAvailable(cache);
assertReplicasAvailable(cache, REPLICA_1_HOST);

// Simulate rediscovery - master only
cache.discoverClusterNodesAndSlots(mockConnection);
// Master should still be available
// Replica should be cleared
assertMasterNodeAvailable(cache);
assertNoReplicasAvailable(cache);

// Simulate rediscovery - another replica (replica-2) coming back
cache.reset();
cache.discoverClusterNodesAndSlots(mockConnection);
assertReplicasAvailable(cache, REPLICA_2_HOST);
}

@Test
public void testResetWithReplicaSlots() {
// This test verifies that reset() properly clears replica slots

JedisClusterInfoCache cache = createCacheWithReplicasEnabled();

// Mock the cluster slots responses
when(mockConnection.executeCommand(argThat(commandWithArgs(CLUSTER, "SLOTS")))).thenReturn(
masterReplicaSlotsResponse());

// Initial discovery
cache.discoverClusterNodesAndSlots(mockConnection);
assertReplicasAvailable(cache, REPLICA_1_HOST);

// Call reset() - this should clear and nullify replica slots
cache.reset();

assertNoReplicasAvailable(cache);

// Rediscovery should work correctly
cache.discoverClusterNodesAndSlots(mockConnection);
assertReplicasAvailable(cache, REPLICA_1_HOST);
}

private List<Object> masterReplicaSlotsResponse() {
return createClusterSlotsResponse(
new SlotRange.Builder(0, 16383).master(MASTER_HOST, "master-id-1")
.replica(REPLICA_1_HOST, "replica-id-1").build());
}

private List<Object> masterOnlySlotsResponse() {
return createClusterSlotsResponse(
new SlotRange.Builder(0, 16383).master(MASTER_HOST, "master-id-1").build());
}

private List<Object> masterReplica2SlotsResponse() {
return createClusterSlotsResponse(
new SlotRange.Builder(0, 16383).master(MASTER_HOST, "master-id-1")
.replica(REPLICA_2_HOST, "replica-id-2").build());
}

private JedisClusterInfoCache createCacheWithReplicasEnabled() {

JedisClientConfig clientConfig = DefaultJedisClientConfig.builder()
.readOnlyForRedisClusterReplicas().build();

return new JedisClusterInfoCache(clientConfig,
new HashSet<>(Collections.singletonList(MASTER_HOST)));
}

private void assertNoReplicasAvailable(JedisClusterInfoCache cache) {
List<ConnectionPool> caheReplicaNodePools = cache.getSlotReplicaPools(TEST_SLOT);
assertNull(caheReplicaNodePools);
}

private void assertReplicasAvailable(JedisClusterInfoCache cache, HostAndPort... replicaNodes) {
List<ConnectionPool> caheReplicaNodePools = cache.getSlotReplicaPools(TEST_SLOT);
assertEquals(replicaNodes.length, caheReplicaNodePools.size());
for (HostAndPort expectedReplica : replicaNodes) {
ConnectionPool expectedNodePool = cache.getNode(expectedReplica);
assertThat(caheReplicaNodePools, hasItem(expectedNodePool));
}
}

private void assertMasterNodeAvailable(JedisClusterInfoCache cache) {
HostAndPort masterNode = cache.getSlotNode(TEST_SLOT);
assertNotNull(masterNode);
assertEquals(MASTER_HOST, masterNode);
}

/**
* Helper method to create a cluster slots response with master and replica nodes
*/
private List<Object> createClusterSlotsResponse(SlotRange... slotRanges) {
return Arrays.stream(slotRanges).map(this::clusterSlotRange).collect(Collectors.toList());
}

private List<Object> clusterSlotRange(SlotRange slotRange) {
List<Object> slotInfo = new ArrayList<>();
slotInfo.add((long) slotRange.start);
slotInfo.add((long) slotRange.end);
Node master = slotRange.master();
slotInfo.add(
Arrays.asList(master.getHost().getBytes(), (long) master.getPort(), master.id.getBytes()));
// Add replicas
slotRange.replicas().forEach(r -> slotInfo.add(
Arrays.asList(r.getHost().getBytes(), (long) r.getPort(), r.id.getBytes())));
return slotInfo;
}

static class SlotRange {
private final int start;
private final int end;
private final List<Node> nodes;

private SlotRange(int start, int end, List<Node> nodes) {
this.start = start;
this.end = end;
this.nodes = nodes;
}

public SlotRange.Builder builder(int start, int end) {
return new SlotRange.Builder(start, end);
}

public Node master() {
return nodes.get(0);
}

public List<Node> replicas() {
return nodes.subList(1, nodes.size());
}

static class Builder {
private final int start;
private final int end;
private final List<Node> nodes = new ArrayList<>();

public Builder(int start, int end) {
this.start = start;
this.end = end;
}

public Builder master(Node node) {
if (!nodes.isEmpty()) {
nodes.set(0, node);
} else {
nodes.add(node);
}
return this;
}

public Builder master(HostAndPort hostPort, String id) {
return master(new Node(hostPort, id));
}

public Builder replica(HostAndPort hostPort, String id) {
return replica(new Node(hostPort, id));
}

public Builder replica(Node node) {
if (nodes.isEmpty()) {
throw new IllegalStateException("Master node must be added before adding replicas");
}
nodes.add(node);
return this;
}

public SlotRange build() {
return new SlotRange(start, end, nodes);
}

}

}

static class Node {
private final HostAndPort hostPort;
private final String id;

public Node(HostAndPort hostPort, String id) {
this.hostPort = hostPort;
this.id = id;
}

public HostAndPort getHostPort() {
return hostPort;
}

public String getHost() {
return hostPort.getHost();
}

public int getPort() {
return hostPort.getPort();
}

public String getId() {
return id;
}

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package redis.clients.jedis.util;

import org.mockito.ArgumentMatcher;
import redis.clients.jedis.CommandArguments;
import redis.clients.jedis.args.Rawable;
import redis.clients.jedis.commands.ProtocolCommand;

/**
* Utility class providing Mockito ArgumentMatchers for CommandArguments testing.
*/
public final class CommandArgumentMatchers {

private CommandArgumentMatchers() {
throw new InstantiationError("Must not instantiate this class");
}

/**
* Matcher for CommandArguments with specific ProtocolCommand
*/
public static ArgumentMatcher<CommandArguments> commandIs(ProtocolCommand command) {
return args -> {
if (args == null || !(args instanceof CommandArguments)) {
return false;
}
return command.equals(args.getCommand());
};
}

/**
* Matcher for CommandArguments containing specific arguments
*/
public static ArgumentMatcher<CommandArguments> hasArgument(String expectedArg) {
return args -> {
for (Rawable arg : args) {

if (expectedArg.equals(SafeEncoder.encode(arg.getRaw()))) {
return true;
}
}
return false;
};
}

public static ArgumentMatcher<CommandArguments> commandWithArgs(ProtocolCommand command,
String expectedArg) {
return cmd -> commandIs(command).matches(cmd) && hasArgument(expectedArg).matches(cmd);
}

}
Loading