Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,12 @@ pid = /tmp/stunnel.pid
[redis]
accept = 127.0.0.1:6390
connect = 127.0.0.1:6379
[redis_3]
accept = 127.0.0.1:16381
connect = 127.0.0.1:6381
[redis_4]
accept = 127.0.0.1:16382
connect = 127.0.0.1:6382
[redis_cluster_1]
accept = 127.0.0.1:8379
connect = 127.0.0.1:7379
Expand All @@ -269,6 +275,18 @@ connect = 127.0.0.1:7382
[redis_cluster_5]
accept = 127.0.0.1:8383
connect = 127.0.0.1:7383
[redis_sentinel_1]
accept = 127.0.0.1:36379
connect = 127.0.0.1:26379
[redis_sentinel_2]
accept = 127.0.0.1:36380
connect = 127.0.0.1:26380
[redis_sentinel_3]
accept = 127.0.0.1:36381
connect = 127.0.0.1:26381
[redis_sentinel_4]
accept = 127.0.0.1:36382
connect = 127.0.0.1:26382
endef

export REDIS1_CONF
Expand Down
27 changes: 17 additions & 10 deletions src/main/java/redis/clients/jedis/JedisFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public class JedisFactory implements PooledObjectFactory<Jedis> {

private final AtomicReference<HostAndPort> hostAndPort = new AtomicReference<>();

private final JedisClientConfig config;
private final JedisClientConfig clientConfig;

protected JedisFactory(final String host, final int port, final int connectionTimeout,
final int soTimeout, final String password, final int database, final String clientName) {
Expand Down Expand Up @@ -67,15 +67,15 @@ protected JedisFactory(final String host, final int port, final int connectionTi

protected JedisFactory(final HostAndPort hostAndPort, final JedisClientConfig clientConfig) {
this.hostAndPort.set(hostAndPort);
this.config = DefaultJedisClientConfig.copyConfig(clientConfig);
this.clientConfig = DefaultJedisClientConfig.copyConfig(clientConfig);
}

protected JedisFactory(final String host, final int port, final int connectionTimeout, final int soTimeout,
final int infiniteSoTimeout, final String user, final String password, final int database,
final String clientName, final boolean ssl, final SSLSocketFactory sslSocketFactory,
final SSLParameters sslParameters, final HostnameVerifier hostnameVerifier) {
this.hostAndPort.set(new HostAndPort(host, port));
this.config = DefaultJedisClientConfig.builder().connectionTimeoutMillis(connectionTimeout)
this.clientConfig = DefaultJedisClientConfig.builder().connectionTimeoutMillis(connectionTimeout)
.socketTimeoutMillis(soTimeout).blockingSocketTimeoutMillis(infiniteSoTimeout).user(user)
.password(password).databse(database).clientName(clientName)
.ssl(ssl).sslSocketFactory(sslSocketFactory)
Expand All @@ -88,11 +88,18 @@ protected JedisFactory(final String host, final int port, final int connectionTi
protected JedisFactory(final int connectionTimeout, final int soTimeout, final int infiniteSoTimeout,
final String user, final String password, final int database, final String clientName, final boolean ssl,
final SSLSocketFactory sslSocketFactory, final SSLParameters sslParameters, final HostnameVerifier hostnameVerifier) {
this.config = DefaultJedisClientConfig.builder().connectionTimeoutMillis(connectionTimeout)
this(DefaultJedisClientConfig.builder().connectionTimeoutMillis(connectionTimeout)
.socketTimeoutMillis(soTimeout).blockingSocketTimeoutMillis(infiniteSoTimeout).user(user)
.password(password).databse(database).clientName(clientName)
.ssl(ssl).sslSocketFactory(sslSocketFactory)
.sslParameters(sslParameters).hostnameVerifier(hostnameVerifier).build();
.sslParameters(sslParameters).hostnameVerifier(hostnameVerifier).build());
}

/**
* {@link #setHostAndPort(redis.clients.jedis.HostAndPort) setHostAndPort} must be called later.
*/
protected JedisFactory(final JedisClientConfig clientConfig) {
this.clientConfig = clientConfig;
}

protected JedisFactory(final URI uri, final int connectionTimeout, final int soTimeout,
Expand All @@ -114,7 +121,7 @@ protected JedisFactory(final URI uri, final int connectionTimeout, final int soT
"Cannot open Redis connection due invalid URI. %s", uri.toString()));
}
this.hostAndPort.set(new HostAndPort(uri.getHost(), uri.getPort()));
this.config = DefaultJedisClientConfig.builder().connectionTimeoutMillis(connectionTimeout)
this.clientConfig = DefaultJedisClientConfig.builder().connectionTimeoutMillis(connectionTimeout)
.socketTimeoutMillis(soTimeout).blockingSocketTimeoutMillis(infiniteSoTimeout)
.user(JedisURIHelper.getUser(uri)).password(JedisURIHelper.getPassword(uri))
.databse(JedisURIHelper.getDBIndex(uri)).clientName(clientName)
Expand All @@ -127,14 +134,14 @@ public void setHostAndPort(final HostAndPort hostAndPort) {
}

public void setPassword(final String password) {
this.config.updatePassword(password);
this.clientConfig.updatePassword(password);
}

@Override
public void activateObject(PooledObject<Jedis> pooledJedis) throws Exception {
final BinaryJedis jedis = pooledJedis.getObject();
if (jedis.getDB() != config.getDatabase()) {
jedis.select(config.getDatabase());
if (jedis.getDB() != clientConfig.getDatabase()) {
jedis.select(clientConfig.getDatabase());
}
}

Expand Down Expand Up @@ -163,7 +170,7 @@ public PooledObject<Jedis> makeObject() throws Exception {
final HostAndPort hostPort = this.hostAndPort.get();
Jedis jedis = null;
try {
jedis = new Jedis(hostPort, config);
jedis = new Jedis(hostPort, clientConfig);
jedis.connect();
return new DefaultPooledObject<>(jedis);
} catch (JedisException je) {
Expand Down
82 changes: 45 additions & 37 deletions src/main/java/redis/clients/jedis/JedisSentinelPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.slf4j.Logger;
Expand All @@ -21,7 +22,7 @@ public class JedisSentinelPool extends JedisPoolAbstract {
@Deprecated
protected static Logger log = LoggerFactory.getLogger(JedisSentinelPool.class);

protected final GenericObjectPoolConfig<Jedis> poolConfig;
@Deprecated protected final GenericObjectPoolConfig<Jedis> poolConfig;
private final JedisFactory factory;

@Deprecated protected int connectionTimeout;
Expand All @@ -39,6 +40,8 @@ public class JedisSentinelPool extends JedisPoolAbstract {
@Deprecated protected String sentinelPassword;
@Deprecated protected String sentinelClientName;

private final JedisClientConfig sentinelClientConfig;

protected final Set<MasterListener> masterListeners = new HashSet<>();

private volatile HostAndPort currentHostMaster;
Expand Down Expand Up @@ -177,14 +180,33 @@ public JedisSentinelPool(String masterName, Set<String> sentinels,

public JedisSentinelPool(String masterName, Set<String> sentinels,
final GenericObjectPoolConfig<Jedis> poolConfig, final JedisFactory factory) {
this(masterName, parseHostAndPorts(sentinels), poolConfig, factory,
DefaultJedisClientConfig.builder().build());
}

public JedisSentinelPool(String masterName, Set<HostAndPort> sentinels,
final GenericObjectPoolConfig<Jedis> poolConfig, final JedisClientConfig masteClientConfig,
final JedisClientConfig sentinelClientConfig) {
this(masterName, sentinels, poolConfig, new JedisFactory(masteClientConfig), sentinelClientConfig);
}

public JedisSentinelPool(String masterName, Set<HostAndPort> sentinels,
final GenericObjectPoolConfig<Jedis> poolConfig, final JedisFactory factory,
final JedisClientConfig sentinelClientConfig) {
super(poolConfig, factory);

this.poolConfig = poolConfig;
this.factory = factory;
this.sentinelClientConfig = sentinelClientConfig;

HostAndPort master = initSentinels(sentinels, masterName);
initMaster(master);
}

private static Set<HostAndPort> parseHostAndPorts(Set<String> strings) {
return strings.parallelStream().map(str -> HostAndPort.parseString(str)).collect(Collectors.toSet());
}

@Override
public void destroy() {
for (MasterListener m : masterListeners) {
Expand Down Expand Up @@ -212,51 +234,44 @@ private void initMaster(HostAndPort master) {
}
}

private HostAndPort initSentinels(Set<String> sentinels, final String masterName) {
private HostAndPort initSentinels(Set<HostAndPort> sentinels, final String masterName) {

HostAndPort master = null;
boolean sentinelAvailable = false;

log.info("Trying to find master from available Sentinels...");

for (String sentinel : sentinels) {
final HostAndPort hap = HostAndPort.parseString(sentinel);
for (HostAndPort sentinel : sentinels) {

log.debug("Connecting to Sentinel {}", hap);
log.debug("Connecting to Sentinel {}", sentinel);

try (Jedis jedis = new Jedis(hap.getHost(), hap.getPort(), sentinelConnectionTimeout, sentinelSoTimeout)) {
if (sentinelUser != null) {
jedis.auth(sentinelUser, sentinelPassword);
} else if (sentinelPassword != null) {
jedis.auth(sentinelPassword);
}
if (sentinelClientName != null) {
jedis.clientSetname(sentinelClientName);
}
try (Jedis jedis = new Jedis(sentinel, sentinelClientConfig)) {

List<String> masterAddr = jedis.sentinelGetMasterAddrByName(masterName);

// connected to sentinel...
sentinelAvailable = true;

if (masterAddr == null || masterAddr.size() != 2) {
log.warn("Can not get master addr, master name: {}. Sentinel: {}", masterName, hap);
log.warn("Can not get master addr, master name: {}. Sentinel: {}", masterName, sentinel);
continue;
}

master = toHostAndPort(masterAddr);
log.debug("Found Redis master at {}", master);
break;
} catch (JedisException e) {
// resolves #1036, it should handle JedisException there's another chance of raising JedisDataException
log.warn("Cannot get master address from sentinel running @ {}. Reason: {}. Trying next one.", hap, e);
// resolves #1036, it should handle JedisException there's another chance
// of raising JedisDataException
log.warn(
"Cannot get master address from sentinel running @ {}. Reason: {}. Trying next one.",
sentinel, e);
}
}

if (master == null) {
if (sentinelAvailable) {
// can connect to sentinel, but master name seems to not
// monitored
// can connect to sentinel, but master name seems to not monitored
throw new JedisException("Can connect to sentinel, but " + masterName
+ " seems to be not monitored...");
} else {
Expand All @@ -267,9 +282,9 @@ private HostAndPort initSentinels(Set<String> sentinels, final String masterName

log.info("Redis master running at {}, starting Sentinel listeners...", master);

for (String sentinel : sentinels) {
final HostAndPort hap = HostAndPort.parseString(sentinel);
MasterListener masterListener = new MasterListener(masterName, hap.getHost(), hap.getPort());
for (HostAndPort sentinel : sentinels) {

MasterListener masterListener = new MasterListener(masterName, sentinel.getHost(), sentinel.getPort());
// whether MasterListener threads are alive or not, process can be stopped
masterListener.setDaemon(true);
masterListeners.add(masterListener);
Expand Down Expand Up @@ -357,28 +372,22 @@ public void run() {
break;
}

j = new Jedis(host, port, sentinelConnectionTimeout, sentinelSoTimeout);
if (sentinelUser != null) {
j.auth(sentinelUser, sentinelPassword);
} else if (sentinelPassword != null) {
j.auth(sentinelPassword);
}
if (sentinelClientName != null) {
j.clientSetname(sentinelClientName);
}
final HostAndPort hostPort = new HostAndPort(host, port);
j = new Jedis(hostPort, sentinelClientConfig);

// code for active refresh
List<String> masterAddr = j.sentinelGetMasterAddrByName(masterName);
if (masterAddr == null || masterAddr.size() != 2) {
log.warn("Can not get master addr, master name: {}. Sentinel: {}:{}.", masterName, host, port);
log.warn("Can not get master addr, master name: {}. Sentinel: {}.", masterName,
hostPort);
} else {
initMaster(toHostAndPort(masterAddr));
}

j.subscribe(new JedisPubSub() {
@Override
public void onMessage(String channel, String message) {
log.debug("Sentinel {}:{} published: {}.", host, port, message);
log.debug("Sentinel {} published: {}.", hostPort, message);

String[] switchMasterMsg = message.split(" ");

Expand All @@ -393,9 +402,8 @@ public void onMessage(String channel, String message) {
}

} else {
log.error(
"Invalid message received on Sentinel {}:{} on channel +switch-master: {}", host,
port, message);
log.error("Invalid message received on Sentinel {} on channel +switch-master: {}",
hostPort, message);
}
}
}, "+switch-master");
Expand Down Expand Up @@ -427,7 +435,7 @@ public void shutdown() {
running.set(false);
// This isn't good, the Jedis object is not thread safe
if (j != null) {
j.disconnect();
j.close();
}
} catch (Exception e) {
log.error("Caught exception while shutting down: ", e);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package redis.clients.jedis.tests;

import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.junit.BeforeClass;
import org.junit.Test;
import java.util.HashSet;
import java.util.Set;

import redis.clients.jedis.DefaultJedisClientConfig;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.HostAndPortMapper;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisSentinelPool;

public class SSLJedisSentinelPoolTest {

private static final String MASTER_NAME = "mymaster";

private static Set<HostAndPort> sentinels = new HashSet<>();

private static final HostAndPortMapper SSL_PORT_MAPPER = (HostAndPort hap)
-> new HostAndPort(hap.getHost(), hap.getPort() + 10000);

private static final GenericObjectPoolConfig<Jedis> POOL_CONFIG = new GenericObjectPoolConfig<>();

@BeforeClass
public static void prepare() {
SSLJedisTest.setupTrustStore();

sentinels.add(HostAndPortUtil.getSentinelServers().get(1));
sentinels.add(HostAndPortUtil.getSentinelServers().get(3));
}

@Test
public void sentinelWithoutSslConnectsToRedisWithSsl() {
DefaultJedisClientConfig masterConfig = DefaultJedisClientConfig.builder()
.password("foobared").clientName("sentinel-master-client").ssl(true)
.hostAndPortMapper(SSL_PORT_MAPPER).build();
DefaultJedisClientConfig sentinelConfig = DefaultJedisClientConfig.builder()
.clientName("sentinel-client").ssl(false).build();
try (JedisSentinelPool pool = new JedisSentinelPool(MASTER_NAME, sentinels, POOL_CONFIG,
masterConfig, sentinelConfig)) {
pool.getResource().close();
}
}

@Test
public void sentinelWithSslConnectsToRedisWithoutSsl() {
DefaultJedisClientConfig masterConfig = DefaultJedisClientConfig.builder()
.password("foobared").clientName("sentinel-master-client").ssl(false).build();
DefaultJedisClientConfig sentinelConfig = DefaultJedisClientConfig.builder()
.clientName("sentinel-client").ssl(true).hostAndPortMapper(SSL_PORT_MAPPER).build();
try (JedisSentinelPool pool = new JedisSentinelPool(MASTER_NAME, sentinels, POOL_CONFIG,
masterConfig, sentinelConfig)) {
pool.getResource().close();
}
}

@Test
public void sentinelWithSslConnectsToRedisWithSsl() {
DefaultJedisClientConfig masterConfig = DefaultJedisClientConfig.builder()
.password("foobared").clientName("sentinel-master-client").ssl(true)
.hostAndPortMapper(SSL_PORT_MAPPER).build();
DefaultJedisClientConfig sentinelConfig = DefaultJedisClientConfig.builder()
.clientName("sentinel-client").ssl(true).hostAndPortMapper(SSL_PORT_MAPPER).build();
try (JedisSentinelPool pool = new JedisSentinelPool(MASTER_NAME, sentinels, POOL_CONFIG,
masterConfig, sentinelConfig)) {
pool.getResource().close();
}
}

}