diff --git a/StackExchange.Redis.sln b/StackExchange.Redis.sln index 23908aebb..cd4d9b060 100644 --- a/StackExchange.Redis.sln +++ b/StackExchange.Redis.sln @@ -23,6 +23,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "RedisConfigs", "RedisConfig tests\RedisConfigs\cli-master.cmd = tests\RedisConfigs\cli-master.cmd tests\RedisConfigs\cli-secure.cmd = tests\RedisConfigs\cli-secure.cmd tests\RedisConfigs\cli-slave.cmd = tests\RedisConfigs\cli-slave.cmd + tests\RedisConfigs\docker-compose.yml = tests\RedisConfigs\docker-compose.yml + tests\RedisConfigs\Dockerfile = tests\RedisConfigs\Dockerfile tests\RedisConfigs\start-all.cmd = tests\RedisConfigs\start-all.cmd tests\RedisConfigs\start-all.sh = tests\RedisConfigs\start-all.sh tests\RedisConfigs\start-basic.cmd = tests\RedisConfigs\start-basic.cmd @@ -126,6 +128,12 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "TestConsoleBaseline", "toys EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = ".github", ".github\.github.csproj", "{8FB98E7D-DAE2-4465-BD9A-104000E0A2D4}" EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Docker", "Docker", "{A9F81DA3-DA82-423E-A5DD-B11C37548E06}" + ProjectSection(SolutionItems) = preProject + tests\RedisConfigs\Docker\docker-entrypoint.sh = tests\RedisConfigs\Docker\docker-entrypoint.sh + tests\RedisConfigs\Docker\supervisord.conf = tests\RedisConfigs\Docker\supervisord.conf + EndProjectSection +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -197,6 +205,7 @@ Global {3DA1EEED-E9FE-43D9-B293-E000CFCCD91A} = {E25031D3-5C64-430D-B86F-697B66816FD8} {153A10E4-E668-41AD-9E0F-6785CE7EED66} = {3AD17044-6BFF-4750-9AC2-2CA466375F2A} {D58114AE-4998-4647-AFCA-9353D20495AE} = {E25031D3-5C64-430D-B86F-697B66816FD8} + {A9F81DA3-DA82-423E-A5DD-B11C37548E06} = {96E891CD-2ED7-4293-A7AB-4C6F5D8D2B05} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {193AA352-6748-47C1-A5FC-C9AA6B5F000B} diff --git a/docs/Configuration.md b/docs/Configuration.md index 6440ffcaf..183253f95 100644 --- a/docs/Configuration.md +++ b/docs/Configuration.md @@ -29,6 +29,14 @@ This will connect to a single server on the local machine using the default redi var conn = ConnectionMultiplexer.Connect("redis0:6380,redis1:6380,allowAdmin=true"); ``` +If you specify a serviceName in the connection string, it will trigger sentinel mode. This example will connect to a sentinel server on the local machine +using the default sentinel port (26379), discover the current master server for the `mymaster` service and return a managed connection +pointing to that master server that will automatically be updated if the master changes: + +```csharp +var conn = ConnectionMultiplexer.Connect("localhost,serviceName=mymaster"); +``` + An overview of mapping between the `string` and `ConfigurationOptions` representation is shown below, but you can switch between them trivially: ```csharp @@ -79,7 +87,7 @@ The `ConfigurationOptions` object has a wide range of properties, all of which a | proxy={proxy type} | `Proxy` | `Proxy.None` | Type of proxy in use (if any); for example "twemproxy" | | resolveDns={bool} | `ResolveDns` | `false` | Specifies that DNS resolution should be explicit and eager, rather than implicit | | responseTimeout={int} | `ResponseTimeout` | `SyncTimeout` | Time (ms) to decide whether the socket is unhealthy | -| serviceName={string} | `ServiceName` | `null` | Not currently implemented (intended for use with sentinel) | +| serviceName={string} | `ServiceName` | `null` | Used for connecting to a sentinel master service | | ssl={bool} | `Ssl` | `false` | Specifies that SSL encryption should be used | | sslHost={string} | `SslHost` | `null` | Enforces a particular SSL host identity on the server's certificate | | sslProtocols={enum} | `SslProtocols` | `null` | Ssl/Tls versions supported when using an encrypted connection. Use '\|' to provide multiple values. | diff --git a/src/StackExchange.Redis/ConnectionMultiplexer.cs b/src/StackExchange.Redis/ConnectionMultiplexer.cs index 1d7d32645..626d91306 100644 --- a/src/StackExchange.Redis/ConnectionMultiplexer.cs +++ b/src/StackExchange.Redis/ConnectionMultiplexer.cs @@ -840,10 +840,10 @@ internal ServerEndPoint AnyConnected(ServerType serverType, uint startOffset, Re public static Task ConnectAsync(string configuration, TextWriter log = null) { SocketConnection.AssertDependencies(); - return ConnectImplAsync(configuration, log); + return ConnectAsync(PrepareConfig(configuration), log); } - private static async Task ConnectImplAsync(object configuration, TextWriter log = null) + private static async Task ConnectImplAsync(ConfigurationOptions configuration, TextWriter log = null) { IDisposable killMe = null; EventHandler connectHandler = null; @@ -862,6 +862,12 @@ private static async Task ConnectImplAsync(object configu } killMe = null; Interlocked.Increment(ref muxer._connectCompletedCount); + + if (muxer.ServerSelectionStrategy.ServerType == ServerType.Sentinel) + { + // Initialize the Sentinel handlers + muxer.InitializeSentinel(logProxy); + } return muxer; } finally @@ -880,10 +886,19 @@ private static async Task ConnectImplAsync(object configu public static Task ConnectAsync(ConfigurationOptions configuration, TextWriter log = null) { SocketConnection.AssertDependencies(); - return ConnectImplAsync(configuration, log); + + if (IsSentinel(configuration)) + return SentinelMasterConnectAsync(configuration, log); + + return ConnectImplAsync(PrepareConfig(configuration), log); } - internal static ConfigurationOptions PrepareConfig(object configuration) + private static bool IsSentinel(ConfigurationOptions configuration) + { + return !string.IsNullOrEmpty(configuration?.ServiceName); + } + + internal static ConfigurationOptions PrepareConfig(object configuration, bool sentinel = false) { if (configuration == null) throw new ArgumentNullException(nameof(configuration)); ConfigurationOptions config; @@ -900,7 +915,21 @@ internal static ConfigurationOptions PrepareConfig(object configuration) throw new ArgumentException("Invalid configuration object", nameof(configuration)); } if (config.EndPoints.Count == 0) throw new ArgumentException("No endpoints specified", nameof(configuration)); + + if (sentinel) + { + // this is required when connecting to sentinel servers + config.TieBreaker = ""; + config.CommandMap = CommandMap.Sentinel; + + // use default sentinel port + config.EndPoints.SetDefaultPorts(26379); + + return config; + } + config.SetDefaultPorts(); + return config; } @@ -953,9 +982,9 @@ public void Dispose() } } } - private static ConnectionMultiplexer CreateMultiplexer(object configuration, LogProxy log, out EventHandler connectHandler) + private static ConnectionMultiplexer CreateMultiplexer(ConfigurationOptions configuration, LogProxy log, out EventHandler connectHandler) { - var muxer = new ConnectionMultiplexer(PrepareConfig(configuration)); + var muxer = new ConnectionMultiplexer(configuration); connectHandler = null; if (log != null) { @@ -988,22 +1017,127 @@ private static ConnectionMultiplexer CreateMultiplexer(object configuration, Log /// The to log to. public static ConnectionMultiplexer Connect(string configuration, TextWriter log = null) { - SocketConnection.AssertDependencies(); - return ConnectImpl(configuration, log); + return Connect(PrepareConfig(configuration), log); } /// /// Create a new ConnectionMultiplexer instance /// - /// The configurtion options to use for this multiplexer. + /// The configuration options to use for this multiplexer. /// The to log to. public static ConnectionMultiplexer Connect(ConfigurationOptions configuration, TextWriter log = null) { SocketConnection.AssertDependencies(); - return ConnectImpl(configuration, log); + + if (IsSentinel(configuration)) + { + return SentinelMasterConnect(configuration, log); + } + + return ConnectImpl(PrepareConfig(configuration), log); } - private static ConnectionMultiplexer ConnectImpl(object configuration, TextWriter log) + /// + /// Create a new ConnectionMultiplexer instance that connects to a sentinel server + /// + /// The string configuration to use for this multiplexer. + /// The to log to. + public static ConnectionMultiplexer SentinelConnect(string configuration, TextWriter log = null) + { + SocketConnection.AssertDependencies(); + return ConnectImpl(PrepareConfig(configuration, sentinel: true), log); + } + + /// + /// Create a new ConnectionMultiplexer instance that connects to a sentinel server + /// + /// The string configuration to use for this multiplexer. + /// The to log to. + public static Task SentinelConnectAsync(string configuration, TextWriter log = null) + { + SocketConnection.AssertDependencies(); + return ConnectImplAsync(PrepareConfig(configuration, sentinel: true), log); + } + + /// + /// Create a new ConnectionMultiplexer instance that connects to a sentinel server + /// + /// The configuration options to use for this multiplexer. + /// The to log to. + public static ConnectionMultiplexer SentinelConnect(ConfigurationOptions configuration, TextWriter log = null) + { + SocketConnection.AssertDependencies(); + return ConnectImpl(PrepareConfig(configuration, sentinel: true), log); + } + + /// + /// Create a new ConnectionMultiplexer instance that connects to a sentinel server + /// + /// The configuration options to use for this multiplexer. + /// The to log to. + public static Task SentinelConnectAsync(ConfigurationOptions configuration, TextWriter log = null) + { + SocketConnection.AssertDependencies(); + return ConnectImplAsync(PrepareConfig(configuration, sentinel: true), log); + } + + /// + /// Create a new ConnectionMultiplexer instance that connects to a sentinel server, discovers the current master server + /// for the specified ServiceName in the config and returns a managed connection to the current master server + /// + /// The string configuration to use for this multiplexer. + /// The to log to. + private static ConnectionMultiplexer SentinelMasterConnect(string configuration, TextWriter log = null) + { + return SentinelMasterConnect(PrepareConfig(configuration, sentinel: true), log); + } + + /// + /// Create a new ConnectionMultiplexer instance that connects to a sentinel server, discovers the current master server + /// for the specified ServiceName in the config and returns a managed connection to the current master server + /// + /// The configuration options to use for this multiplexer. + /// The to log to. + private static ConnectionMultiplexer SentinelMasterConnect(ConfigurationOptions configuration, TextWriter log = null) + { + var sentinelConnection = SentinelConnect(configuration, log); + + var muxer = sentinelConnection.GetSentinelMasterConnection(configuration, log); + // set reference to sentinel connection so that we can dispose it + muxer.sentinelConnection = sentinelConnection; + + return muxer; + } + + /// + /// Create a new ConnectionMultiplexer instance that connects to a sentinel server, discovers the current master server + /// for the specified ServiceName in the config and returns a managed connection to the current master server + /// + /// The string configuration to use for this multiplexer. + /// The to log to. + private static Task SentinelMasterConnectAsync(string configuration, TextWriter log = null) + { + return SentinelMasterConnectAsync(PrepareConfig(configuration, sentinel: true), log); + } + + /// + /// Create a new ConnectionMultiplexer instance that connects to a sentinel server, discovers the current master server + /// for the specified ServiceName in the config and returns a managed connection to the current master server + /// + /// The configuration options to use for this multiplexer. + /// The to log to. + private static async Task SentinelMasterConnectAsync(ConfigurationOptions configuration, TextWriter log = null) + { + var sentinelConnection = await SentinelConnectAsync(configuration, log).ForAwait(); + + var muxer = sentinelConnection.GetSentinelMasterConnection(configuration, log); + // set reference to sentinel connection so that we can dispose it + muxer.sentinelConnection = sentinelConnection; + + return muxer; + } + + private static ConnectionMultiplexer ConnectImpl(ConfigurationOptions configuration, TextWriter log) { IDisposable killMe = null; EventHandler connectHandler = null; @@ -1044,7 +1178,7 @@ private static ConnectionMultiplexer ConnectImpl(object configuration, TextWrite } finally { - if (connectHandler != null) muxer.ConnectionFailed -= connectHandler; + if (connectHandler != null && muxer != null) muxer.ConnectionFailed -= connectHandler; if (killMe != null) try { killMe.Dispose(); } catch { } } } @@ -1573,7 +1707,7 @@ internal async Task ReconfigureAsync(bool first, bool reconfigureAll, LogP int iterCount = first ? 2 : 1; // this is fix for https://github.com/StackExchange/StackExchange.Redis/issues/300 - // auto discoverability of cluster nodes is made synchronous. + // auto discoverability of cluster nodes is made synchronous. // we try to connect to endpoints specified inside the user provided configuration // and when we encounter one such endpoint to which we are able to successfully connect, // we get the list of cluster nodes from this endpoint and try to proactively connect @@ -1663,7 +1797,7 @@ internal async Task ReconfigureAsync(bool first, bool reconfigureAll, LogP if (clusterCount > 0 && !encounteredConnectedClusterServer) { - // we have encountered a connected server with clustertype for the first time. + // we have encountered a connected server with clustertype for the first time. // so we will get list of other nodes from this server using "CLUSTER NODES" command // and try to connect to these other nodes in the next iteration encounteredConnectedClusterServer = true; @@ -2143,6 +2277,7 @@ public bool IsConnecting internal Timer sentinelMasterReconnectTimer; internal Dictionary sentinelConnectionChildren; + internal ConnectionMultiplexer sentinelConnection = null; /// /// Initializes the connection as a Sentinel connection and adds @@ -2161,6 +2296,7 @@ internal void InitializeSentinel(LogProxy logProxy) // Subscribe to sentinel change events ISubscriber sub = GetSubscriber(); + if (sub.SubscribedEndpoint("+switch-master") == null) { sub.Subscribe("+switch-master", (channel, message) => @@ -2231,30 +2367,61 @@ public ConnectionMultiplexer GetSentinelMasterConnection(ConfigurationOptions co return sentinelConnectionChildren[config.ServiceName]; } - // Get an initial endpoint - try twice - EndPoint newMasterEndPoint = GetConfiguredMasterForService(config.ServiceName) - ?? GetConfiguredMasterForService(config.ServiceName); + bool success = false; + ConnectionMultiplexer connection = null; - if (newMasterEndPoint == null) + var sw = Stopwatch.StartNew(); + do { - throw new RedisConnectionException(ConnectionFailureType.UnableToConnect, - $"Sentinel: Failed connecting to configured master for service: {config.ServiceName}"); - } + // Get an initial endpoint - try twice + EndPoint newMasterEndPoint = GetConfiguredMasterForService(config.ServiceName) + ?? GetConfiguredMasterForService(config.ServiceName); - // Replace the master endpoint, if we found another one - // If not, assume the last state is the best we have and minimize the race - if (config.EndPoints.Count == 1) - { - config.EndPoints[0] = newMasterEndPoint; - } - else + if (newMasterEndPoint == null) + { + throw new RedisConnectionException(ConnectionFailureType.UnableToConnect, + $"Sentinel: Failed connecting to configured master for service: {config.ServiceName}"); + } + + EndPoint[] replicaEndPoints = GetReplicasForService(config.ServiceName) + ?? GetReplicasForService(config.ServiceName); + + // Replace the master endpoint, if we found another one + // If not, assume the last state is the best we have and minimize the race + if (config.EndPoints.Count == 1) + { + config.EndPoints[0] = newMasterEndPoint; + } + else + { + config.EndPoints.Clear(); + config.EndPoints.TryAdd(newMasterEndPoint); + } + + foreach (var replicaEndPoint in replicaEndPoints) + { + config.EndPoints.TryAdd(replicaEndPoint); + } + + connection = ConnectImpl(config, log); + + // verify role is master according to: + // https://redis.io/topics/sentinel-clients + if (connection.GetServer(newMasterEndPoint)?.Role() == RedisLiterals.master) + { + success = true; + break; + } + + Thread.Sleep(100); + } while (sw.ElapsedMilliseconds < config.ConnectTimeout); + + if (!success) { - config.EndPoints.Clear(); - config.EndPoints.Add(newMasterEndPoint); + throw new RedisConnectionException(ConnectionFailureType.UnableToConnect, + $"Sentinel: Failed connecting to configured master for service: {config.ServiceName}"); } - ConnectionMultiplexer connection = Connect(config, log); - // Attach to reconnect event to ensure proper connection to the new master connection.ConnectionRestored += OnManagedConnectionRestored; @@ -2284,7 +2451,7 @@ internal void OnManagedConnectionRestored(object sender, ConnectionFailedEventAr try { - // Run a switch to make sure we have update-to-date + // Run a switch to make sure we have update-to-date // information about which master we should connect to SwitchMaster(e.EndPoint, connection); @@ -2352,10 +2519,22 @@ internal EndPoint GetConfiguredMasterForService(string serviceName) => internal EndPoint currentSentinelMasterEndPoint; + internal EndPoint[] GetReplicasForService(string serviceName) => + GetServerSnapshot() + .ToArray() + .Where(s => s.ServerType == ServerType.Sentinel) + .AsParallel() + .Select(s => + { + try { return GetServer(s.EndPoint).SentinelGetReplicaAddresses(serviceName); } + catch { return null; } + }) + .FirstOrDefault(r => r != null); + /// /// Switches the SentinelMasterConnection over to a new master. /// - /// The endpoing responsible for the switch + /// The endpoint responsible for the switch /// The connection that should be switched over to a new master endpoint /// Log to write to, if any internal void SwitchMaster(EndPoint switchBlame, ConnectionMultiplexer connection, TextWriter log = null) @@ -2376,19 +2555,24 @@ internal void SwitchMaster(EndPoint switchBlame, ConnectionMultiplexer connectio $"Sentinel: Failed connecting to switch master for service: {serviceName}"); } - if (newMasterEndPoint != null) + connection.currentSentinelMasterEndPoint = newMasterEndPoint; + + if (!connection.servers.Contains(newMasterEndPoint)) { - connection.currentSentinelMasterEndPoint = newMasterEndPoint; + EndPoint[] replicaEndPoints = GetReplicasForService(serviceName) + ?? GetReplicasForService(serviceName); - if (!connection.servers.Contains(newMasterEndPoint)) + connection.servers.Clear(); + connection.RawConfig.EndPoints.Clear(); + connection.RawConfig.EndPoints.TryAdd(newMasterEndPoint); + foreach (var replicaEndPoint in replicaEndPoints) { - connection.RawConfig.EndPoints.Clear(); - connection.servers.Clear(); - connection.RawConfig.EndPoints.Add(newMasterEndPoint); - Trace(string.Format("Switching master to {0}", newMasterEndPoint)); - // Trigger a reconfigure - connection.ReconfigureAsync(false, false, logProxy, switchBlame, string.Format("master switch {0}", serviceName), false, CommandFlags.PreferMaster).Wait(); + connection.RawConfig.EndPoints.TryAdd(replicaEndPoint); } + Trace(string.Format("Switching master to {0}", newMasterEndPoint)); + // Trigger a reconfigure + connection.ReconfigureAsync(false, false, logProxy, switchBlame, + string.Format("master switch {0}", serviceName), false, CommandFlags.PreferMaster).Wait(); UpdateSentinelAddressList(serviceName); } @@ -2417,7 +2601,7 @@ internal void UpdateSentinelAddressList(string serviceName) foreach (EndPoint newSentinel in firstCompleteRequest.Where(x => !RawConfig.EndPoints.Contains(x))) { hasNew = true; - RawConfig.EndPoints.Add(newSentinel); + RawConfig.EndPoints.TryAdd(newSentinel); } if (hasNew) @@ -2515,6 +2699,7 @@ public void Dispose() { GC.SuppressFinalize(this); Close(!_isDisposed); + sentinelConnection?.Dispose(); } internal Task ExecuteAsyncImpl(Message message, ResultProcessor processor, object state, ServerEndPoint server) diff --git a/src/StackExchange.Redis/EndPointCollection.cs b/src/StackExchange.Redis/EndPointCollection.cs index 8a9ae28d9..2ec0b5fe0 100644 --- a/src/StackExchange.Redis/EndPointCollection.cs +++ b/src/StackExchange.Redis/EndPointCollection.cs @@ -9,7 +9,7 @@ namespace StackExchange.Redis /// /// A list of endpoints /// - public sealed class EndPointCollection : Collection, IEnumerable, IEnumerable + public sealed class EndPointCollection : Collection, IEnumerable { /// /// Create a new EndPointCollection @@ -59,6 +59,26 @@ public void Add(string hostAndPort) /// The port for to add. public void Add(IPAddress host, int port) => Add(new IPEndPoint(host, port)); + /// + /// Try adding a new endpoint to the list. + /// + /// The endpoint to add. + /// True if the endpoint was added or false if not. + public bool TryAdd(EndPoint endpoint) + { + if (endpoint == null) throw new ArgumentNullException(nameof(endpoint)); + + if (!Contains(endpoint)) + { + base.InsertItem(Count, endpoint); + return true; + } + else + { + return false; + } + } + /// /// See Collection<T>.InsertItem() /// diff --git a/src/StackExchange.Redis/Interfaces/IServer.cs b/src/StackExchange.Redis/Interfaces/IServer.cs index 3d9945177..f2da646ca 100644 --- a/src/StackExchange.Redis/Interfaces/IServer.cs +++ b/src/StackExchange.Redis/Interfaces/IServer.cs @@ -792,7 +792,7 @@ public partial interface IServer : IRedis /// for the given service name. /// /// the sentinel service name - /// + /// The command flags to use. /// a list of the sentinel ips and ports EndPoint[] SentinelGetSentinelAddresses(string serviceName, CommandFlags flags = CommandFlags.None); @@ -801,10 +801,28 @@ public partial interface IServer : IRedis /// for the given service name. /// /// the sentinel service name - /// + /// The command flags to use. /// a list of the sentinel ips and ports Task SentinelGetSentinelAddressesAsync(string serviceName, CommandFlags flags = CommandFlags.None); + /// + /// Returns the ip and port numbers of all known Sentinel replicas + /// for the given service name. + /// + /// the sentinel service name + /// The command flags to use. + /// a list of the replica ips and ports + EndPoint[] SentinelGetReplicaAddresses(string serviceName, CommandFlags flags = CommandFlags.None); + + /// + /// Returns the ip and port numbers of all known Sentinel replicas + /// for the given service name. + /// + /// the sentinel service name + /// The command flags to use. + /// a list of the replica ips and ports + Task SentinelGetReplicaAddressesAsync(string serviceName, CommandFlags flags = CommandFlags.None); + /// /// Show the state and info of the specified master. /// @@ -1024,5 +1042,14 @@ internal static class IServerExtensions /// /// The server to simulate failure on. public static void SimulateConnectionFailure(this IServer server) => (server as RedisServer)?.SimulateConnectionFailure(); + + public static string Role(this IServer server) + { + var result = (RedisResult[])server.Execute("ROLE"); + if (result != null && result.Length > 0) + return result[0].ToString(); + + return null; + } } } diff --git a/src/StackExchange.Redis/RedisLiterals.cs b/src/StackExchange.Redis/RedisLiterals.cs index 38554cf36..09feab9d9 100644 --- a/src/StackExchange.Redis/RedisLiterals.cs +++ b/src/StackExchange.Redis/RedisLiterals.cs @@ -119,12 +119,14 @@ public static readonly RedisValue // misc (config, etc) databases = "databases", + master = "master", no = "no", normal = "normal", pubsub = "pubsub", replica = "replica", replica_read_only = "replica-read-only", replication = "replication", + sentinel = "sentinel", server = "server", slave = "slave", slave_read_only = "slave-read-only", diff --git a/src/StackExchange.Redis/RedisServer.cs b/src/StackExchange.Redis/RedisServer.cs index dce6627f2..2f787c5da 100644 --- a/src/StackExchange.Redis/RedisServer.cs +++ b/src/StackExchange.Redis/RedisServer.cs @@ -830,6 +830,18 @@ public Task SentinelGetSentinelAddressesAsync(string serviceName, Co return ExecuteAsync(msg, ResultProcessor.SentinelAddressesEndPoints); } + public EndPoint[] SentinelGetReplicaAddresses(string serviceName, CommandFlags flags = CommandFlags.None) + { + var msg = Message.Create(-1, flags, RedisCommand.SENTINEL, RedisLiterals.SLAVES, (RedisValue)serviceName); + return ExecuteSync(msg, ResultProcessor.SentinelAddressesEndPoints); + } + + public Task SentinelGetReplicaAddressesAsync(string serviceName, CommandFlags flags = CommandFlags.None) + { + var msg = Message.Create(-1, flags, RedisCommand.SENTINEL, RedisLiterals.SLAVES, (RedisValue)serviceName); + return ExecuteAsync(msg, ResultProcessor.SentinelAddressesEndPoints); + } + public KeyValuePair[] SentinelMaster(string serviceName, CommandFlags flags = CommandFlags.None) { var msg = Message.Create(-1, flags, RedisCommand.SENTINEL, RedisLiterals.MASTER, (RedisValue)serviceName); @@ -866,6 +878,7 @@ public Task[][]> SentinelMastersAsync(CommandFlags return ExecuteAsync(msg, ResultProcessor.SentinelArrayOfArrays); } + // For previous compat only KeyValuePair[][] IServer.SentinelSlaves(string serviceName, CommandFlags flags) => SentinelReplicas(serviceName, flags); @@ -876,6 +889,7 @@ public KeyValuePair[][] SentinelReplicas(string serviceName, Com return ExecuteSync(msg, ResultProcessor.SentinelArrayOfArrays); } + // For previous compat only Task[][]> IServer.SentinelSlavesAsync(string serviceName, CommandFlags flags) => SentinelReplicasAsync(serviceName, flags); diff --git a/src/StackExchange.Redis/ResultProcessor.cs b/src/StackExchange.Redis/ResultProcessor.cs index 925b5066f..be80a0ed1 100644 --- a/src/StackExchange.Redis/ResultProcessor.cs +++ b/src/StackExchange.Redis/ResultProcessor.cs @@ -126,7 +126,10 @@ public static readonly ResultProcessor SentinelMasterEndpoint = new SentinelGetMasterAddressByNameProcessor(); public static readonly ResultProcessor - SentinelAddressesEndPoints = new SentinelGetSentinelAddresses(); + SentinelAddressesEndPoints = new SentinelGetSentinelAddressesProcessor(); + + public static readonly ResultProcessor + SentinelReplicaEndPoints = new SentinelGetReplicaAddressesProcessor(); public static readonly ResultProcessor[][]> SentinelArrayOfArrays = new SentinelArrayOfArraysProcessor(); @@ -2036,7 +2039,62 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes } } - private sealed class SentinelGetSentinelAddresses : ResultProcessor + private sealed class SentinelGetSentinelAddressesProcessor : ResultProcessor + { + protected override bool SetResultCore(PhysicalConnection connection, Message message, in RawResult result) + { + List endPoints = new List(); + + switch (result.Type) + { + case ResultType.MultiBulk: + foreach (RawResult item in result.GetItems()) + { + var arr = item.GetItemsAsValues(); + string ip = null; + string portStr = null; + + for (int i = 0; i < arr.Length && (ip == null || portStr == null); i += 2) + { + string name = arr[i]; + string value = arr[i + 1]; + + switch (name) + { + case "ip": + ip = value; + break; + case "port": + portStr = value; + break; + } + } + + if (ip != null && portStr != null && int.TryParse(portStr, out int port)) + { + endPoints.Add(Format.ParseEndPoint(ip, port)); + } + } + break; + + case ResultType.SimpleString: + //We don't want to blow up if the master is not found + if (result.IsNull) + return true; + break; + } + + if (endPoints.Count > 0) + { + SetResult(message, endPoints.ToArray()); + return true; + } + + return false; + } + } + + private sealed class SentinelGetReplicaAddressesProcessor : ResultProcessor { protected override bool SetResultCore(PhysicalConnection connection, Message message, in RawResult result) { diff --git a/tests/RedisConfigs/Docker/docker-entrypoint.sh b/tests/RedisConfigs/Docker/docker-entrypoint.sh index 5ab35b2c8..6e925d375 100644 --- a/tests/RedisConfigs/Docker/docker-entrypoint.sh +++ b/tests/RedisConfigs/Docker/docker-entrypoint.sh @@ -4,7 +4,7 @@ if [ "$#" -ne 0 ]; then exec "$@" else mkdir -p /var/log/supervisor - mkdir Temp/ + mkdir -p Temp/ supervisord -c /etc/supervisord.conf sleep 3 diff --git a/tests/StackExchange.Redis.Tests/Sentinel.cs b/tests/StackExchange.Redis.Tests/Sentinel.cs index d4ea6cc01..ec32d02f2 100644 --- a/tests/StackExchange.Redis.Tests/Sentinel.cs +++ b/tests/StackExchange.Redis.Tests/Sentinel.cs @@ -1,6 +1,8 @@ using System; using System.Collections.Generic; +using System.Diagnostics; using System.IO; +using System.Linq; using System.Net; using System.Threading; using System.Threading.Tasks; @@ -28,24 +30,16 @@ public Sentinel(ITestOutputHelper output) : base(output) Skip.IfNoConfig(nameof(TestConfig.Config.SentinelServer), TestConfig.Current.SentinelServer); Skip.IfNoConfig(nameof(TestConfig.Config.SentinelSeviceName), TestConfig.Current.SentinelSeviceName); - var options = new ConfigurationOptions() - { - CommandMap = CommandMap.Sentinel, - EndPoints = { - { TestConfig.Current.SentinelServer, TestConfig.Current.SentinelPortA }, - { TestConfig.Current.SentinelServer, TestConfig.Current.SentinelPortB }, - { TestConfig.Current.SentinelServer, TestConfig.Current.SentinelPortC } - }, - AllowAdmin = true, - TieBreaker = "", - ServiceName = TestConfig.Current.SentinelSeviceName, - SyncTimeout = 5000 - }; - Conn = ConnectionMultiplexer.Connect(options, ConnectionLog); + var options = ServiceOptions.Clone(); + options.EndPoints.Add(TestConfig.Current.SentinelServer, TestConfig.Current.SentinelPortA); + options.EndPoints.Add(TestConfig.Current.SentinelServer, TestConfig.Current.SentinelPortB); + options.EndPoints.Add(TestConfig.Current.SentinelServer, TestConfig.Current.SentinelPortC); + + Conn = ConnectionMultiplexer.SentinelConnect(options, ConnectionLog); for (var i = 0; i < 150; i++) { Thread.Sleep(20); - if (Conn.IsConnected && Conn.GetSentinelMasterConnection(ServiceOptions).IsConnected) + if (Conn.IsConnected && Conn.GetSentinelMasterConnection(options).IsConnected) { break; } @@ -54,7 +48,175 @@ public Sentinel(ITestOutputHelper output) : base(output) SentinelServerA = Conn.GetServer(TestConfig.Current.SentinelServer, TestConfig.Current.SentinelPortA); SentinelServerB = Conn.GetServer(TestConfig.Current.SentinelServer, TestConfig.Current.SentinelPortB); SentinelServerC = Conn.GetServer(TestConfig.Current.SentinelServer, TestConfig.Current.SentinelPortC); - SentinelsServers = new IServer[] { SentinelServerA, SentinelServerB, SentinelServerC }; + SentinelsServers = new[] { SentinelServerA, SentinelServerB, SentinelServerC }; + + // wait until we are in a state of a single master and replica + WaitForReady(); + } + + [Fact] + public void MasterConnectTest() + { + var connectionString = $"{TestConfig.Current.SentinelServer}:{TestConfig.Current.SentinelPortA},serviceName={ServiceOptions.ServiceName},allowAdmin=true"; + var conn = ConnectionMultiplexer.Connect(connectionString); + + var db = conn.GetDatabase(); + db.Ping(); + + var endpoints = conn.GetEndPoints(); + Assert.Equal(2, endpoints.Length); + + var servers = endpoints.Select(e => conn.GetServer(e)).ToArray(); + Assert.Equal(2, servers.Length); + + var master = servers.FirstOrDefault(s => !s.IsReplica); + Assert.NotNull(master); + var replica = servers.FirstOrDefault(s => s.IsReplica); + Assert.NotNull(replica); + Assert.NotEqual(master.EndPoint.ToString(), replica.EndPoint.ToString()); + + var expected = DateTime.Now.Ticks.ToString(); + Log("Tick Key: " + expected); + var key = Me(); + db.KeyDelete(key, CommandFlags.FireAndForget); + db.StringSet(key, expected); + + var value = db.StringGet(key); + Assert.Equal(expected, value); + + // force read from replica, replication has some lag + WaitForReplication(servers.First()); + value = db.StringGet(key, CommandFlags.DemandReplica); + Assert.Equal(expected, value); + } + + [Fact] + public async Task MasterConnectAsyncTest() + { + var connectionString = $"{TestConfig.Current.SentinelServer}:{TestConfig.Current.SentinelPortA},serviceName={ServiceOptions.ServiceName},allowAdmin=true"; + var conn = await ConnectionMultiplexer.ConnectAsync(connectionString); + + var db = conn.GetDatabase(); + await db.PingAsync(); + + var endpoints = conn.GetEndPoints(); + Assert.Equal(2, endpoints.Length); + + var servers = endpoints.Select(e => conn.GetServer(e)).ToArray(); + Assert.Equal(2, servers.Length); + + var master = servers.FirstOrDefault(s => !s.IsReplica); + Assert.NotNull(master); + var replica = servers.FirstOrDefault(s => s.IsReplica); + Assert.NotNull(replica); + Assert.NotEqual(master.EndPoint.ToString(), replica.EndPoint.ToString()); + + var expected = DateTime.Now.Ticks.ToString(); + Log("Tick Key: " + expected); + var key = Me(); + await db.KeyDeleteAsync(key, CommandFlags.FireAndForget); + await db.StringSetAsync(key, expected); + + var value = await db.StringGetAsync(key); + Assert.Equal(expected, value); + + // force read from replica, replication has some lag + WaitForReplication(servers.First()); + value = await db.StringGetAsync(key, CommandFlags.DemandReplica); + Assert.Equal(expected, value); + } + + [Fact] + public async Task ManagedMasterConnectionEndToEndWithFailoverTest() + { + var connectionString = $"{TestConfig.Current.SentinelServer}:{TestConfig.Current.SentinelPortA},serviceName={ServiceOptions.ServiceName},allowAdmin=true"; + var conn = await ConnectionMultiplexer.ConnectAsync(connectionString); + conn.ConfigurationChanged += (s, e) => { + Log($"Configuration changed: {e.EndPoint}"); + }; + + var db = conn.GetDatabase(); + await db.PingAsync(); + + var endpoints = conn.GetEndPoints(); + Assert.Equal(2, endpoints.Length); + + var servers = endpoints.Select(e => conn.GetServer(e)).ToArray(); + Assert.Equal(2, servers.Length); + + var master = servers.FirstOrDefault(s => !s.IsReplica); + Assert.NotNull(master); + var replica = servers.FirstOrDefault(s => s.IsReplica); + Assert.NotNull(replica); + Assert.NotEqual(master.EndPoint.ToString(), replica.EndPoint.ToString()); + + // set string value on current master + var expected = DateTime.Now.Ticks.ToString(); + Log("Tick Key: " + expected); + var key = Me(); + await db.KeyDeleteAsync(key, CommandFlags.FireAndForget); + await db.StringSetAsync(key, expected); + + var value = await db.StringGetAsync(key); + Assert.Equal(expected, value); + + // force read from replica, replication has some lag + WaitForReplication(servers.First()); + value = await db.StringGetAsync(key, CommandFlags.DemandReplica); + Assert.Equal(expected, value); + + // forces and verifies failover + DoFailover(); + + endpoints = conn.GetEndPoints(); + Assert.Equal(2, endpoints.Length); + + servers = endpoints.Select(e => conn.GetServer(e)).ToArray(); + Assert.Equal(2, servers.Length); + + var newMaster = servers.FirstOrDefault(s => !s.IsReplica); + Assert.NotNull(newMaster); + Assert.Equal(replica.EndPoint.ToString(), newMaster.EndPoint.ToString()); + var newReplica = servers.FirstOrDefault(s => s.IsReplica); + Assert.NotNull(newReplica); + Assert.Equal(master.EndPoint.ToString(), newReplica.EndPoint.ToString()); + Assert.NotEqual(master.EndPoint.ToString(), replica.EndPoint.ToString()); + + value = await db.StringGetAsync(key); + Assert.Equal(expected, value); + + // force read from replica, replication has some lag + WaitForReplication(newMaster); + value = await db.StringGetAsync(key, CommandFlags.DemandReplica); + Assert.Equal(expected, value); + } + + [Fact] + public void SentinelConnectTest() + { + var options = ServiceOptions.Clone(); + options.EndPoints.Add(TestConfig.Current.SentinelServer, TestConfig.Current.SentinelPortA); + + var conn = ConnectionMultiplexer.SentinelConnect(options); + var db = conn.GetDatabase(); + + var test = db.Ping(); + Log("ping to sentinel {0}:{1} took {2} ms", TestConfig.Current.SentinelServer, + TestConfig.Current.SentinelPortA, test.TotalMilliseconds); + } + + [Fact] + public async Task SentinelConnectAsyncTest() + { + var options = ServiceOptions.Clone(); + options.EndPoints.Add(TestConfig.Current.SentinelServer, TestConfig.Current.SentinelPortA); + + var conn = await ConnectionMultiplexer.SentinelConnectAsync(options); + var db = conn.GetDatabase(); + + var test = await db.PingAsync(); + Log("ping to sentinel {0}:{1} took {2} ms", TestConfig.Current.SentinelServer, + TestConfig.Current.SentinelPortA, test.TotalMilliseconds); } [Fact] @@ -102,6 +264,7 @@ public async Task SentinelGetMasterAddressByNameAsyncTest() Log("{0}:{1}", ipEndPoint.Address, ipEndPoint.Port); } } + [Fact] public void SentinelGetMasterAddressByNameNegativeTest() { @@ -165,7 +328,6 @@ private class IpComparer : IEqualityComparer public void SentinelSentinelsTest() { var sentinels = SentinelServerA.SentinelSentinels(ServiceName); - var Server26380Info = SentinelServerB.Info(); var expected = new List { SentinelServerB.EndPoint.ToString(), @@ -334,275 +496,153 @@ public async Task SentinelReplicasAsyncTest() } [Fact] - public async Task SentinelFailoverTest() - { - var i = 0; - foreach (var server in SentinelsServers) - { - Log("Failover: " + i++); - var master = server.SentinelGetMasterAddressByName(ServiceName); - var slaves = server.SentinelReplicas(ServiceName); - - await Task.Delay(1000).ForAwait(); - try - { - Log("Failover attempted initiated"); - server.SentinelFailover(ServiceName); - Log(" Success!"); - } - catch (RedisServerException ex) when (ex.Message.Contains("NOGOODSLAVE")) - { - // Retry once - Log(" Retry initiated"); - await Task.Delay(1000).ForAwait(); - server.SentinelFailover(ServiceName); - Log(" Retry complete"); - } - await Task.Delay(2000).ForAwait(); - - var newMaster = server.SentinelGetMasterAddressByName(ServiceName); - var newSlave = server.SentinelReplicas(ServiceName); - - Assert.Equal(slaves[0].ToDictionary()["name"], newMaster.ToString()); - Assert.Equal(master.ToString(), newSlave[0].ToDictionary()["name"]); - } - } - - [Fact] - public async Task SentinelFailoverAsyncTest() + public async Task SentinelGetSentinelAddressesTest() { - var i = 0; - foreach (var server in SentinelsServers) - { - Log("Failover: " + i++); - var master = server.SentinelGetMasterAddressByName(ServiceName); - var slaves = server.SentinelReplicas(ServiceName); - - await Task.Delay(1000).ForAwait(); - try - { - Log("Failover attempted initiated"); - await server.SentinelFailoverAsync(ServiceName).ForAwait(); - Log(" Success!"); - } - catch (RedisServerException ex) when (ex.Message.Contains("NOGOODSLAVE")) - { - // Retry once - Log(" Retry initiated"); - await Task.Delay(1000).ForAwait(); - await server.SentinelFailoverAsync(ServiceName).ForAwait(); - Log(" Retry complete"); - } - await Task.Delay(2000).ForAwait(); + var addresses = await SentinelServerA.SentinelGetSentinelAddressesAsync(ServiceName).ForAwait(); + Assert.Contains(SentinelServerB.EndPoint, addresses); + Assert.Contains(SentinelServerC.EndPoint, addresses); - var newMaster = server.SentinelGetMasterAddressByName(ServiceName); - var newReplica = server.SentinelReplicas(ServiceName); + addresses = await SentinelServerB.SentinelGetSentinelAddressesAsync(ServiceName).ForAwait(); + Assert.Contains(SentinelServerA.EndPoint, addresses); + Assert.Contains(SentinelServerC.EndPoint, addresses); - Assert.Equal(slaves[0].ToDictionary()["name"], newMaster.ToString()); - Assert.Equal(master.ToString(), newReplica[0].ToDictionary()["name"]); - } + addresses = await SentinelServerC.SentinelGetSentinelAddressesAsync(ServiceName).ForAwait(); + Assert.Contains(SentinelServerA.EndPoint, addresses); + Assert.Contains(SentinelServerB.EndPoint, addresses); } -#if DEBUG [Fact] - public async Task GetSentinelMasterConnectionFailoverTest() + public async Task ReadOnlyConnectionReplicasTest() { - var conn = Conn.GetSentinelMasterConnection(ServiceOptions); - var endpoint = conn.currentSentinelMasterEndPoint.ToString(); + var replicas = SentinelServerA.SentinelGetReplicaAddresses(ServiceName); + var config = new ConfigurationOptions(); - try + foreach (var replica in replicas) { - Log("Failover attempted initiated"); - SentinelServerA.SentinelFailover(ServiceName); - Log(" Success!"); + config.EndPoints.Add(replica); } - catch (RedisServerException ex) when (ex.Message.Contains("NOGOODSLAVE")) - { - // Retry once - Log(" Retry initiated"); - await Task.Delay(1000).ForAwait(); - SentinelServerA.SentinelFailover(ServiceName); - Log(" Retry complete"); - } - await Task.Delay(2000).ForAwait(); - // Try and complete ASAP - await UntilCondition(TimeSpan.FromSeconds(10), () => { - var checkConn = Conn.GetSentinelMasterConnection(ServiceOptions); - return endpoint != checkConn.currentSentinelMasterEndPoint.ToString(); - }); + var readonlyConn = await ConnectionMultiplexer.ConnectAsync(config); - // Post-check for validity - var conn1 = Conn.GetSentinelMasterConnection(ServiceOptions); - Assert.NotEqual(endpoint, conn1.currentSentinelMasterEndPoint.ToString()); + await UntilCondition(TimeSpan.FromSeconds(2), () => readonlyConn.IsConnected); + Assert.True(readonlyConn.IsConnected); + var db = readonlyConn.GetDatabase(); + var s = db.StringGet("test"); + Assert.True(s.IsNullOrEmpty); + //var ex = Assert.Throws(() => db.StringSet("test", "try write to read only instance")); + //Assert.StartsWith("No connection is available to service this operation", ex.Message); } - [Fact] - public async Task GetSentinelMasterConnectionFailoverAsyncTest() + private void DoFailover() { - var conn = Conn.GetSentinelMasterConnection(ServiceOptions); - var endpoint = conn.currentSentinelMasterEndPoint.ToString(); + WaitForReady(); - try - { - Log("Failover attempted initiated"); - await SentinelServerA.SentinelFailoverAsync(ServiceName).ForAwait(); - Log(" Success!"); - } - catch (RedisServerException ex) when (ex.Message.Contains("NOGOODSLAVE")) - { - // Retry once - Log(" Retry initiated"); - await Task.Delay(1000).ForAwait(); - await SentinelServerA.SentinelFailoverAsync(ServiceName).ForAwait(); - Log(" Retry complete"); - } + // capture current replica + var replicas = SentinelServerA.SentinelGetReplicaAddresses(ServiceName); - // Try and complete ASAP - await UntilCondition(TimeSpan.FromSeconds(10), () => { - var checkConn = Conn.GetSentinelMasterConnection(ServiceOptions); - return endpoint != checkConn.currentSentinelMasterEndPoint.ToString(); - }); + Log("Starting failover..."); + var sw = Stopwatch.StartNew(); + SentinelServerA.SentinelFailover(ServiceName); - // Post-check for validity - var conn1 = Conn.GetSentinelMasterConnection(ServiceOptions); - Assert.NotEqual(endpoint, conn1.currentSentinelMasterEndPoint.ToString()); + // wait until the replica becomes the master + WaitForReady(expectedMaster: replicas[0]); + Log($"Time to failover: {sw.Elapsed}"); } -#endif - [Fact] - public async Task GetSentinelMasterConnectionWriteReadFailover() + private void WaitForReady(EndPoint expectedMaster = null, bool waitForReplication = false, TimeSpan? duration = null) { - Log("Conn:"); - foreach (var server in Conn.GetServerSnapshot().ToArray()) - { - Log(" Endpoint: " + server.EndPoint); - } - Log("Conn Replicas:"); - foreach (var replicas in SentinelServerA.SentinelReplicas(ServiceName)) - { - foreach(var pair in replicas) - { - Log(" {0}: {1}", pair.Key, pair.Value); - } - } + duration ??= TimeSpan.FromSeconds(30); + + var sw = Stopwatch.StartNew(); - var conn = Conn.GetSentinelMasterConnection(ServiceOptions); - var s = conn.currentSentinelMasterEndPoint.ToString(); - Log("Sentinel Master Endpoint: " + s); - foreach (var server in conn.GetServerSnapshot().ToArray()) + // wait until we have 1 master and 1 replica and have verified their roles + var master = SentinelServerA.SentinelGetMasterAddressByName(ServiceName); + if (expectedMaster != null && expectedMaster.ToString() != master.ToString()) { - Log(" Server: " + server.EndPoint); - Log(" Master Endpoint: " + server.MasterEndPoint); - Log(" IsSlave: " + server.IsReplica); - Log(" ReplicaReadOnly: " + server.ReplicaReadOnly); - var info = conn.GetServer(server.EndPoint).Info("Replication"); - foreach (var section in info) + while (sw.Elapsed < duration.Value) { - Log(" Section: " + section.Key); - foreach (var pair in section) + Thread.Sleep(1000); + try { - Log(" " + pair.Key +": " + pair.Value); + master = SentinelServerA.SentinelGetMasterAddressByName(ServiceName); + if (expectedMaster.ToString() == master.ToString()) + break; + } + catch (Exception) + { + // ignore } } } + if (expectedMaster != null && expectedMaster.ToString() != master.ToString()) + throw new RedisException($"Master was expected to be {expectedMaster}"); + Log($"Master is {master}"); - IDatabase db = conn.GetDatabase(); - var expected = DateTime.Now.Ticks.ToString(); - Log("Tick Key: " + expected); - var key = Me(); - db.KeyDelete(key, CommandFlags.FireAndForget); - db.StringSet(key, expected); + var replicas = SentinelServerA.SentinelGetReplicaAddresses(ServiceName); + var checkConn = Conn.GetSentinelMasterConnection(ServiceOptions); - await UntilCondition(TimeSpan.FromSeconds(10), - () => SentinelServerA.SentinelMaster(ServiceName).ToDictionary()["num-slaves"] != "0" - ); - Log("Conditions met"); + WaitForRole(checkConn.GetServer(master), "master", duration.Value.Subtract(sw.Elapsed)); + WaitForRole(checkConn.GetServer(replicas[0]), "slave", duration.Value.Subtract(sw.Elapsed)); - try - { - Log("Failover attempted initiated"); - SentinelServerA.SentinelFailover(ServiceName); - Log(" Success!"); - } - catch (RedisServerException ex) when (ex.Message.Contains("NOGOODSLAVE")) + if (waitForReplication) { - // Retry once - Log(" Retry initiated"); - await Task.Delay(1000).ForAwait(); - SentinelServerA.SentinelFailover(ServiceName); - Log(" Retry complete"); + WaitForReplication(checkConn.GetServer(master), duration.Value.Subtract(sw.Elapsed)); } - Log("Delaying for failover conditions..."); - await Task.Delay(2000).ForAwait(); - Log("Conditons check..."); - // Spin until complete (with a timeout) - since this can vary - await UntilCondition(TimeSpan.FromSeconds(20), () => - { - var checkConn = Conn.GetSentinelMasterConnection(ServiceOptions); - return s != checkConn.currentSentinelMasterEndPoint.ToString() - && expected == checkConn.GetDatabase().StringGet(key); - }); - Log(" Conditions met."); - - var conn1 = Conn.GetSentinelMasterConnection(ServiceOptions); - var s1 = conn1.currentSentinelMasterEndPoint.ToString(); - Log("New master endpoint: " + s1); - - var actual = conn1.GetDatabase().StringGet(key); - Log("Fetched tick key: " + actual); - - Assert.NotNull(s); - Assert.NotNull(s1); - Assert.NotEmpty(s); - Assert.NotEmpty(s1); - Assert.NotEqual(s, s1); - // TODO: Track this down on the test race - //Assert.Equal(expected, actual); } - [Fact] - public async Task SentinelGetSentinelAddressesTest() + private void WaitForRole(IServer server, string role, TimeSpan? duration = null) { - var addresses = await SentinelServerA.SentinelGetSentinelAddressesAsync(ServiceName).ForAwait(); - Assert.Contains(SentinelServerB.EndPoint, addresses); - Assert.Contains(SentinelServerC.EndPoint, addresses); + duration ??= TimeSpan.FromSeconds(30); - addresses = await SentinelServerB.SentinelGetSentinelAddressesAsync(ServiceName).ForAwait(); - Assert.Contains(SentinelServerA.EndPoint, addresses); - Assert.Contains(SentinelServerC.EndPoint, addresses); + Log($"Waiting for server ({server.EndPoint}) role to be \"{role}\"..."); + var sw = Stopwatch.StartNew(); + while (sw.Elapsed < duration.Value) + { + try + { + if (server.Role() == role) + { + Log($"Done waiting for server ({server.EndPoint}) role to be \"{role}\""); + return; + } + } + catch (Exception) + { + // ignore + } - addresses = await SentinelServerC.SentinelGetSentinelAddressesAsync(ServiceName).ForAwait(); - Assert.Contains(SentinelServerA.EndPoint, addresses); - Assert.Contains(SentinelServerB.EndPoint, addresses); + Thread.Sleep(1000); + } + + throw new RedisException("Timeout waiting for server to have expected role assigned"); } - [Fact] - public async Task ReadOnlyConnectionReplicasTest() + private void WaitForReplication(IServer master, TimeSpan? duration = null) { - var replicas = SentinelServerA.SentinelReplicas(ServiceName); - var config = new ConfigurationOptions - { - TieBreaker = "", - ServiceName = TestConfig.Current.SentinelSeviceName, - }; + duration ??= TimeSpan.FromSeconds(10); - foreach (var kv in replicas) + Log("Waiting for master/replica replication to be in sync..."); + var sw = Stopwatch.StartNew(); + while (sw.Elapsed < duration.Value) { - Assert.Equal("slave", kv.ToDictionary()["flags"]); - config.EndPoints.Add(kv.ToDictionary()["name"]); - } + var info = master.Info("replication"); + var replicationInfo = info.FirstOrDefault(f => f.Key == "Replication")?.ToArray().ToDictionary(); + var replicaInfo = replicationInfo?.FirstOrDefault(i => i.Key.StartsWith("slave")).Value?.Split(',').ToDictionary(i => i.Split('=').First(), i => i.Split('=').Last()); + var replicaOffset = replicaInfo?["offset"]; + var masterOffset = replicationInfo?["master_repl_offset"]; - var readonlyConn = ConnectionMultiplexer.Connect(config); + if (replicaOffset == masterOffset) + { + Log($"Done waiting for master ({masterOffset}) / replica ({replicaOffset}) replication to be in sync"); + return; + } - await UntilCondition(TimeSpan.FromSeconds(2), () => readonlyConn.IsConnected); - Assert.True(readonlyConn.IsConnected); - var db = readonlyConn.GetDatabase(); - var s = db.StringGet("test"); - Assert.True(s.IsNullOrEmpty); - //var ex = Assert.Throws(() => db.StringSet("test", "try write to read only instance")); - //Assert.StartsWith("No connection is available to service this operation", ex.Message); + Log($"Waiting for master ({masterOffset}) / replica ({replicaOffset}) replication to be in sync..."); + + Thread.Sleep(250); + } + throw new RedisException("Timeout waiting for test servers master/replica replication to be in sync."); } } }