diff --git a/HostingMessageDemo/Dockerfile b/HostingMessageDemo/Dockerfile index d90b868..1d38254 100644 --- a/HostingMessageDemo/Dockerfile +++ b/HostingMessageDemo/Dockerfile @@ -10,7 +10,7 @@ COPY ["Hydra4Net.HostingExtensions/Hydra4Net.HostingExtensions.csproj", "Hydra4N COPY ["Hydra4NET/Hydra4NET.csproj", "Hydra4NET/"] RUN dotnet restore "HostingMessageDemo/HostingMessageDemo.csproj" COPY . . -WORKDIR "/src/HostingMessageDemo" +WORKDIR "HostingMessageDemo" RUN dotnet build "HostingMessageDemo.csproj" -c Release -o /app/build FROM build AS publish diff --git a/HostingMessageDemo/SampleMessageHandler.cs b/HostingMessageDemo/SampleMessageHandler.cs index 7a8f0c6..bd23c83 100644 --- a/HostingMessageDemo/SampleMessageHandler.cs +++ b/HostingMessageDemo/SampleMessageHandler.cs @@ -7,14 +7,11 @@ namespace HostingMessageDemo { internal class SampleMessageHandler : HydraEventsHandler { - private ILogger _logger; - private string _mode = ""; private readonly Sender _sender; - public SampleMessageHandler(ILogger logger, Sender sender, IHydra hydra) + public SampleMessageHandler(ILogger logger, Sender sender, IHydra hydra) : base(logger) { - _logger = logger; _sender = sender; SetValidateMode(hydra); } @@ -32,7 +29,7 @@ void SetValidateMode(IHydra hydra) { case Modes.Sender: case Modes.Queuer: - _logger.LogInformation($"Configured as {_mode}"); + Logger.LogInformation($"Configured as {_mode}"); break; default: throw new ArgumentOutOfRangeException("ServiceType", "Hydra config doesn't specify a valid ServiceType role"); @@ -43,7 +40,7 @@ public override async Task OnMessageReceived(IInboundMessage msg, IHydra hydra) { try { - _logger.LogInformation($"Received message of type {msg.Type}"); + Logger.LogInformation($"Received message of type {msg.Type}"); if (_mode == Modes.Sender && msg.ReceivedUMF != null) { await _sender.ProcessMessage(msg.Type, msg.ReceivedUMF); @@ -62,7 +59,7 @@ public override async Task OnMessageReceived(IInboundMessage msg, IHydra hydra) } catch (Exception e) { - _logger.LogError(e, "OnMessageReceived failed"); + Logger.LogError(e, "OnMessageReceived failed"); } } @@ -72,16 +69,15 @@ public override async Task OnQueueMessageReceived(IInboundMessage msg, IHydra hy return; try { - _logger.LogInformation($"Queuer: processing queued message from sender"); + Logger.LogInformation($"Queuer: processing queued message from sender"); if (msg.Type == "queuer") { await HandleQueuerType(msg, hydra); } - } catch (Exception e) { - _logger.LogError(e, "Queue handler failed"); + Logger.LogError(e, "Queue handler failed"); } } @@ -100,20 +96,20 @@ private async Task HandleQueuerType(IInboundMessage msg, IHydra hydra) Msg = $"Queuer: processed message containing {Msg} with ID of {Id}" }); string json = sharedMessage.Serialize(); - _logger.LogInformation($"Queuer: mark message: {msg.MessageJson}"); + Logger.LogInformation($"Queuer: mark message: {msg.MessageJson}"); await hydra.MarkQueueMessageAsync(msg.MessageJson ?? "", true); - _logger.LogInformation($"Queuer: send json: {json}"); + Logger.LogInformation($"Queuer: send json: {json}"); await hydra.SendMessageAsync(sharedMessage.To, json); - _logger.LogInformation($"Queuer: sent completion message back to sender"); + Logger.LogInformation($"Queuer: sent completion message back to sender"); } else { - _logger.LogWarning("Queue Msg null: {0}", msg.MessageJson); + Logger.LogWarning("Queue Msg null: {0}", msg.MessageJson); } } else { - _logger.LogError("SharedMessage is null, body: {0}", msg.MessageJson); + Logger.LogError("SharedMessage is null, body: {0}", msg.MessageJson); } } @@ -130,9 +126,8 @@ private async Task HandleRespondType(IInboundMessage msg, IHydra hydra) Msg = $"Queuer: sending single response to {Msg} with ID of {Id}" }); await hydra.SendMessageAsync(sharedMessage); - _logger.LogInformation($"Queuer: sent single response message back to sender"); + Logger.LogInformation($"Queuer: sent single response message back to sender"); } - } private async Task HandleResponseStreamType(IInboundMessage msg, IHydra hydra) @@ -151,7 +146,7 @@ private async Task HandleResponseStreamType(IInboundMessage msg, IHydra hydra) Msg = $"Queuer: sending response stream {i} to {Msg} with ID of {Id}" }); await hydra.SendMessageAsync(sharedMessage); - _logger.LogInformation($"Queuer: sent response stream message back to sender"); + Logger.LogInformation($"Queuer: sent response stream message back to sender"); } IUMF completeMsg = hydra.CreateUMFResponse(sm!, "response-stream-complete", new SharedMessageBody() { @@ -159,35 +154,48 @@ private async Task HandleResponseStreamType(IInboundMessage msg, IHydra hydra) Msg = $"Queuer: sending complete response stream to {Msg} with ID of {Id}" }); await hydra.SendMessageAsync(completeMsg); - _logger.LogInformation($"Queuer: sent response stream complete message back to sender"); + Logger.LogInformation($"Queuer: sent response stream complete message back to sender"); } } #region Optional public override Task BeforeInit(IHydra hydra) { - _logger.LogInformation($"Hydra initialized"); + Logger.LogInformation($"Hydra initialized"); return base.BeforeInit(hydra); } public override Task OnShutdown(IHydra hydra) { - _logger.LogInformation($"Hydra shut down"); + Logger.LogInformation($"Hydra shut down"); return base.OnShutdown(hydra); } public override Task OnInitError(IHydra hydra, Exception e) { - _logger.LogCritical(e, "A fatal error occurred initializing Hydra"); + Logger.LogCritical(e, "A fatal error occurred initializing Hydra"); return base.OnInitError(hydra, e); } public override Task OnDequeueError(IHydra hydra, Exception e) { - _logger.LogWarning(e, "An error occurred while dequeueing Hydra"); + //base class logs this (Error level) by default return base.OnDequeueError(hydra, e); } + public override Task OnInternalError(IHydra hydra, Exception e) + { + //base class logs this (Error level) by default + return base.OnInternalError(hydra, e); + } + + //base class logs this (Debug level) by default + public override void OnDebugEvent(IHydra hydra, DebugEvent e) + { + //base class logs this (Error level) by default + base.OnDebugEvent(hydra, e); + } + #endregion Optional } } diff --git a/HostingMessageDemo/appsettings.json b/HostingMessageDemo/appsettings.json index 3ba7730..702e5fb 100644 --- a/HostingMessageDemo/appsettings.json +++ b/HostingMessageDemo/appsettings.json @@ -1,4 +1,10 @@ { + "Logging": { + "LogLevel": { + "Default": "Debug", + "Microsoft.AspNetCore": "Warning" + } + }, "Hydra": { "ServiceName": "sender-svcs", "ServiceIP": "10.0.9.*", diff --git a/HostingMessageDemo/scripts/build.bat b/HostingMessageDemo/scripts/build.bat index 54161d6..e5b1942 100644 --- a/HostingMessageDemo/scripts/build.bat +++ b/HostingMessageDemo/scripts/build.bat @@ -1 +1 @@ -docker build --force-rm -t hostingdemo -f Dockerfile ../.. +docker build --force-rm -t hostingdemo -f ../Dockerfile ../.. \ No newline at end of file diff --git a/Hydra4NET/Config/HydraConfigObject.cs b/Hydra4NET/Config/HydraConfigObject.cs index 5922d86..38d19d4 100644 --- a/Hydra4NET/Config/HydraConfigObject.cs +++ b/Hydra4NET/Config/HydraConfigObject.cs @@ -17,17 +17,21 @@ public class HydraConfigObject public Plugins? Plugins { get; set; } public RedisConfig? Redis { get; set; } + /// + /// Configure whether to emit debug events from + /// + public bool EmitDebugEvents { get; set; } = true; + + /// + /// The maximum length of the UMF message to emit in debug events. Larger UMFs will be truncated if value > 0. + /// + public int? EmitDebugMaxUmfLength { get; set; } = 2000; + public string GetRedisConnectionString() { if (Redis == null) throw new NullReferenceException("Redis configuration is null"); - //no default database in case the ConnectionMultiplexer is accessed outside hydra - string connectionString = Redis.GetRedisHost(); - if (!string.IsNullOrWhiteSpace(Redis.Options)) - { - connectionString = $"{connectionString},{Redis.Options}"; - } - return connectionString; + return Redis.GetConnectionString(); } /// @@ -63,4 +67,4 @@ public class HydraLogger public bool LogToConsole { get; set; } public bool OnlyLogLocally { get; set; } } -} +} \ No newline at end of file diff --git a/Hydra4NET/Config/RedisConfig.cs b/Hydra4NET/Config/RedisConfig.cs index 14fe984..8ee0ce1 100644 --- a/Hydra4NET/Config/RedisConfig.cs +++ b/Hydra4NET/Config/RedisConfig.cs @@ -14,5 +14,15 @@ public string GetRedisHost() throw new ArgumentNullException(nameof(Host), "Host cannot be null or empty"); return $"{Host}:{Port ?? 6379}"; } + public string GetConnectionString() + { + //no default database in case the ConnectionMultiplexer is accessed outside hydra + string connectionString = GetRedisHost(); + if (!string.IsNullOrWhiteSpace(Options)) + { + connectionString = $"{connectionString},{Options}"; + } + return connectionString; + } } } diff --git a/Hydra4NET/DebugEvent.cs b/Hydra4NET/DebugEvent.cs new file mode 100644 index 0000000..a1d7775 --- /dev/null +++ b/Hydra4NET/DebugEvent.cs @@ -0,0 +1,20 @@ +namespace Hydra4NET +{ + public enum DebugEventType + { + MessageReceived, + SendMessage, + QueueReceived, + SendQueue, + SendBroadcastMessage, + MarkQueueMessage, + Register + } + + public class DebugEvent + { + public DebugEventType EventType { get; set; } + public string? UMF { get; set; } + public string Message { get; set; } = ""; + } +} diff --git a/Hydra4NET/Helpers/StandardSerializer.cs b/Hydra4NET/Helpers/StandardSerializer.cs index 99d0ce5..f7a670b 100644 --- a/Hydra4NET/Helpers/StandardSerializer.cs +++ b/Hydra4NET/Helpers/StandardSerializer.cs @@ -4,16 +4,24 @@ namespace Hydra4NET.Helpers { public class StandardSerializer { + //caching these options improves performance + private static readonly JsonSerializerOptions DeserializeOptions = new JsonSerializerOptions() + { + PropertyNameCaseInsensitive = true + }; + + private static readonly JsonSerializerOptions SerializeOptions = new JsonSerializerOptions() + { + PropertyNamingPolicy = JsonNamingPolicy.CamelCase + }; + /// /// A standardized JSON deserializer helper. This is essential as Hydra-based services written in non-Dotnet environments expect a universal format. /// /// static public U? Deserialize(string message) where U : class { - return JsonSerializer.Deserialize(message, new JsonSerializerOptions() - { - PropertyNameCaseInsensitive = true - }); + return JsonSerializer.Deserialize(message, DeserializeOptions); } /// @@ -22,10 +30,18 @@ public class StandardSerializer /// static public string Serialize(T item) { - return JsonSerializer.Serialize(item, new JsonSerializerOptions() - { - PropertyNamingPolicy = JsonNamingPolicy.CamelCase, - }); + return JsonSerializer.Serialize(item, SerializeOptions); + } + + /// + /// A standardized JSON serializer helper which ensures that the generated JSON is compatible with JavaScript camel case. Serializes directly to UTF8 bytes for better performance + /// + /// + /// + /// + static public byte[] SerializeBytes(T item) + { + return JsonSerializer.SerializeToUtf8Bytes(item, SerializeOptions); } } } diff --git a/Hydra4NET/Hydra.cs b/Hydra4NET/Hydra.cs index 3a2bebc..669d450 100644 --- a/Hydra4NET/Hydra.cs +++ b/Hydra4NET/Hydra.cs @@ -71,8 +71,14 @@ public partial class Hydra : IHydra public string? NodeVersion { get; private set; } public string? InstanceID { get; private set; } - private int _isInit = 0; - public bool IsInitialized => _isInit != 0; + private readonly ThreadSafeBool _isInitialized = false; + public bool IsInitialized + { + get => _isInitialized.Value; + private set => _isInitialized.Value = value; + } + + public bool IsRedisConnected => _redis != null && _redis.IsConnected; private IConnectionMultiplexer? _redis; @@ -83,8 +89,7 @@ public partial class Hydra : IHydra public delegate Task UMFMessageHandler(IInboundMessage msg); private UMFMessageHandler? _MessageHandler = null; - public delegate Task InternalErrorHandler(Exception exception); - private InternalErrorHandler? _ErrorHandler; + #endregion // Message delegate public Hydra(HydraConfigObject config) @@ -109,7 +114,7 @@ void LoadConfig(HydraConfigObject config) /// /// /// - public IDatabase GetDatabase() + private IDatabase GetDatabase() { if (_redis == null || _config == null) throw new HydraException("Hydra has not been initialized, cannot retrieve a Database instance", HydraException.ErrorType.NotInitialized); @@ -125,6 +130,59 @@ public IServer GetServer() return _redis.GetServer(GetRedisConfig().GetRedisHost()); } + private void SetupEnvironmentConfig() + { + HostName = Dns.GetHostName(); + ProcessID = Process.GetCurrentProcess().Id; + Architecture = Environment.GetEnvironmentVariable("PROCESSOR_ARCHITECTURE"); + NodeVersion = System.Runtime.InteropServices.RuntimeInformation.FrameworkDescription; + + if (ServiceIP == null || ServiceIP == string.Empty) + { + using Socket socket = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, 0); + socket.Connect("8.8.8.8", 65530); + if (socket.LocalEndPoint is IPEndPoint endPoint) + { + ServiceIP = endPoint.Address.ToString(); + } + } + else if (ServiceIP.Contains(".") && ServiceIP.Contains("*")) + { + // then IP address field specifies a pattern match + int starPoint = ServiceIP.IndexOf("*"); + string pattern = ServiceIP.Substring(0, starPoint); + string selectedIP = string.Empty; + var myhost = Dns.GetHostEntry(Dns.GetHostName()); + foreach (var ipaddr in myhost.AddressList) + { + if (ipaddr.AddressFamily == AddressFamily.InterNetwork) + { + string ip = ipaddr.ToString(); + if (ip.StartsWith(pattern)) + { + selectedIP = ip; + break; + } + } + } + ServiceIP = selectedIP; + } + } + + private void ConfigureReconnect() + { + _redis!.ConnectionFailed += async (s, e) => + { + if (!_cts.IsCancellationRequested) + await EmitStatusChange(new RedisConnectionStatus(ConnectionStatus.Disconnected, "Redis connection lost, retrying", e.Exception)); + }; + + _redis!.ConnectionRestored += async (s, e) => + { + await EmitStatusChange(new RedisConnectionStatus(ConnectionStatus.Reconnected, "Redis connection restored", e.Exception)); + }; + } + public async Task InitAsync(HydraConfigObject? config = null) { if (IsInitialized) @@ -136,43 +194,11 @@ public async Task InitAsync(HydraConfigObject? config = null) try { //probably throw if no config passed or invalid? - HostName = Dns.GetHostName(); - ProcessID = Process.GetCurrentProcess().Id; - Architecture = Environment.GetEnvironmentVariable("PROCESSOR_ARCHITECTURE"); - NodeVersion = System.Runtime.InteropServices.RuntimeInformation.FrameworkDescription; - - if (ServiceIP == null || ServiceIP == string.Empty) - { - using Socket socket = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, 0); - socket.Connect("8.8.8.8", 65530); - if (socket.LocalEndPoint is IPEndPoint endPoint) - { - ServiceIP = endPoint.Address.ToString(); - } - } - else if (ServiceIP.Contains(".") && ServiceIP.Contains("*")) - { - // then IP address field specifies a pattern match - int starPoint = ServiceIP.IndexOf("*"); - string pattern = ServiceIP.Substring(0, starPoint); - string selectedIP = string.Empty; - var myhost = Dns.GetHostEntry(Dns.GetHostName()); - foreach (var ipaddr in myhost.AddressList) - { - if (ipaddr.AddressFamily == AddressFamily.InterNetwork) - { - string ip = ipaddr.ToString(); - if (ip.StartsWith(pattern)) - { - selectedIP = ip; - break; - } - } - } - ServiceIP = selectedIP; - } + SetupEnvironmentConfig(); InstanceID = Guid.NewGuid().ToString().Replace("-", ""); _redis = ConnectionMultiplexer.Connect(_config.GetRedisConnectionString()); + await EmitStatusChange(new RedisConnectionStatus(ConnectionStatus.Connected, "Redis connection established")); + //TODO: validate conn string here and give detailed errors if something missing? if (_redis != null && _redis.IsConnected) { @@ -180,6 +206,7 @@ public async Task InitAsync(HydraConfigObject? config = null) ConfigurePresenceTask(); ConfigureEventsChannel(); SetInitializedTrue(); + ConfigureReconnect(); } else { @@ -194,13 +221,11 @@ public async Task InitAsync(HydraConfigObject? config = null) { throw new HydraException("Failed to initialize Hydra", e, HydraException.ErrorType.InitializationError); } - } private void SetInitializedTrue() { - //switch Initialized = true in atomic manner; - Interlocked.Increment(ref _isInit); + IsInitialized = true; _initTcs.SetResult(true); } @@ -227,9 +252,12 @@ private async Task SendMessage(UMFRouteEntry parsedEntry, string jsonUMFMe // Pick random presence entry instanceId = entries.GetRandomEntry()?.InstanceID ?? ""; } + else + EmitDebug(DebugEventType.SendMessage, jsonUMFMessage, "Could not find presence entry to send message"); } if (instanceId != string.Empty && _redis != null) { + EmitDebug(DebugEventType.SendMessage, jsonUMFMessage, $"Sending message to instance {instanceId}"); await _redis.GetSubscriber().PublishAsync($"{_mc_message_key}:{parsedEntry.ServiceName}:{instanceId}", jsonUMFMessage); return true; //TODO: if above returns > 0 and the redis instance is not a cluster, that indicates that message was sent. Can we safely use this info? } @@ -246,6 +274,7 @@ private async Task SendBroadcastMessage(UMFRouteEntry parsedEntry, string jsonUM { if (_redis != null) { + EmitDebug(DebugEventType.SendBroadcastMessage, jsonUMFMessage, $"Sending broadcast message"); await _redis.GetSubscriber().PublishAsync($"{_mc_message_key}:{parsedEntry.ServiceName}", jsonUMFMessage); } } @@ -257,19 +286,17 @@ public void OnMessageHandler(UMFMessageHandler handler) _MessageHandler = handler; } - public void OnInternalErrorHandler(InternalErrorHandler handler) - { - if (handler is null) - throw new ArgumentNullException("handler", "InternalErrorHandler cannot be null"); - _ErrorHandler = handler; - } - private async Task QueueMessage(UMFRouteEntry? entry, string jsonUMFMessage) { if (string.IsNullOrEmpty(entry?.Error)) { + EmitDebug(DebugEventType.SendBroadcastMessage, jsonUMFMessage, $"Sending queued message"); await GetDatabase().ListLeftPushAsync($"{_redis_pre_key}:{entry!.ServiceName}:mqrecieved", jsonUMFMessage); } + else + { + EmitDebug(DebugEventType.SendQueue, jsonUMFMessage, $"Route Error queueing message: {entry?.Error}"); + } } public Task QueueMessageAsync(IUMF umfHeader) => @@ -301,6 +328,7 @@ public async Task MarkQueueMessageAsync(string jsonUMFMessage, bool comp UMFRouteEntry entry = umfHeader.GetRouteEntry(); if (entry != null) { + EmitDebug(DebugEventType.MarkQueueMessage, jsonUMFMessage, $"Marking queue message"); var db = GetDatabase(); await db.ListRemoveAsync($"{_redis_pre_key}:{entry.ServiceName}:mqinprogress", jsonUMFMessage, -1); if (completed == false) @@ -362,7 +390,7 @@ async Task HandleMessage(RedisValue value) try { string msg = (string?)value ?? String.Empty; - + EmitDebug(DebugEventType.MessageReceived, msg, $"Message received"); var umf = ReceivedUMF.Deserialize(msg); var inMsg = new InboundMessage { @@ -380,8 +408,7 @@ await AddMessageChannelAction(Task.Run(async () => } catch (Exception e) { - if (_ErrorHandler != null) - await _ErrorHandler(e); + await EmitError(e); } } @@ -391,16 +418,21 @@ private async Task RegisterService() { if (_redis == null) return; + + EmitDebug(DebugEventType.Register, "", $"Registering to hydra"); + var db = GetDatabase(); var key = $"{_redis_pre_key}:{ServiceName}:service"; - await db.StringSetAsync(key, StandardSerializer.Serialize(new RegistrationEntry + + //this gets set once and then expires in 3 seconds, does this need to be refreshed with the presence? + await db.StringSetAsync(key, StandardSerializer.SerializeBytes(new RegistrationEntry { ServiceName = ServiceName, Type = ServiceType, })); await db.KeyExpireAsync(key, TimeSpan.FromSeconds(_KEY_EXPIRATION_TTL)); - //use concurrent subscription + //use concurrent subscription. Redis wil lresubscribe on reconnect await _redis.GetSubscriber().SubscribeAsync($"{_mc_message_key}:{ServiceName}", (c, m) => Task.Run(() => HandleMessage(m))); await _redis.GetSubscriber().SubscribeAsync($"{_mc_message_key}:{ServiceName}:{InstanceID}", (c, m) => Task.Run(() => HandleMessage(m))); } @@ -431,6 +463,80 @@ public IRedisConfig GetRedisConfig() throw new NullReferenceException("Redis configuration is null, check your configuration"); return _config.Redis; } - } -} + #region Events + + public delegate Task InternalErrorHandler(Exception exception); + private InternalErrorHandler? _ErrorHandler; + + public void OnInternalErrorHandler(InternalErrorHandler handler) + { + if (handler is null) + throw new ArgumentNullException("handler", "InternalErrorHandler cannot be null"); + _ErrorHandler = handler; + } + + async Task EmitError(Exception e) + { + try + { + if (_ErrorHandler != null) + await _ErrorHandler(e); + } + catch { } + } + + public delegate void InternalDebugHandler(DebugEvent debugEvent); + private InternalDebugHandler? _DebugHandler; + + public void OnDebugEventHandler(InternalDebugHandler handler) + { + if (handler is null) + throw new ArgumentNullException("handler", "InternalDebugHandler cannot be null"); + _DebugHandler = handler; + } + + void EmitDebug(DebugEventType debugEventType, string umf, string message) + { + if (_DebugHandler is null || _config is null || !_config.EmitDebugEvents) + return; + try + { + if (_config.EmitDebugMaxUmfLength.GetValueOrDefault() > 0 && umf.Length > _config.EmitDebugMaxUmfLength) + umf = umf.Substring(0, _config.EmitDebugMaxUmfLength.Value); + _DebugHandler(new DebugEvent + { + EventType = debugEventType, + UMF = umf, + Message = message, + }); + } + catch { } + } + + public delegate Task ReconnectHandler(RedisConnectionStatus status); + private ReconnectHandler? _ConnectHandler; + + public void OnRedisConnectionChange(ReconnectHandler handler) + { + if (handler is null) + throw new ArgumentNullException("handler", "InternalErrorHandler cannot be null"); + _ConnectHandler = handler; + } + + async Task EmitStatusChange(RedisConnectionStatus status) + { + try + { + if (_ConnectHandler != null) + await _ConnectHandler(status); + } + catch (Exception e) + { + await EmitError(e); + } + } + + #endregion + } +} \ No newline at end of file diff --git a/Hydra4NET/Hydra4NET.csproj b/Hydra4NET/Hydra4NET.csproj index 07bbf84..389cc9d 100644 --- a/Hydra4NET/Hydra4NET.csproj +++ b/Hydra4NET/Hydra4NET.csproj @@ -6,7 +6,7 @@ True Hydra for .NET True - 0.0.14 + 0.0.15 Hydra for .NET is a library for building microservices. 2022 Carlos Justiniano, Justin Couch, and Contributors hydra microservice diff --git a/Hydra4NET/IHydra.cs b/Hydra4NET/IHydra.cs index a47fa65..114bcf9 100644 --- a/Hydra4NET/IHydra.cs +++ b/Hydra4NET/IHydra.cs @@ -83,6 +83,23 @@ public interface IHydra : IDisposable, IAsyncDisposable /// void OnInternalErrorHandler(InternalErrorHandler handler); + /// + /// Registers an asynchronous event handler for when debug events are emitted by Hydra + /// + /// + void OnDebugEventHandler(InternalDebugHandler handler); + + /// + /// Called when Hydra's connection to Redis changes states + /// + /// + void OnRedisConnectionChange(ReconnectHandler handler); + + /// + /// Whether Hydra is currently connected to Redis + /// + bool IsRedisConnected { get; } + /// /// Adds a message to a services queue /// diff --git a/Hydra4NET/IInboundMessageStream.cs b/Hydra4NET/IInboundMessageStream.cs index f64a721..30e3c36 100644 --- a/Hydra4NET/IInboundMessageStream.cs +++ b/Hydra4NET/IInboundMessageStream.cs @@ -12,6 +12,11 @@ public interface IInboundMessageStream : IDisposable /// /// IAsyncEnumerable EnumerateMessagesAsync(CancellationToken ct = default); + + /// + /// Indicates whether the stream has been marked to stop listening for messages + /// + bool IsDisposed { get; } } public interface IInboundMessageStream : IInboundMessageStream diff --git a/Hydra4NET/IUMF.cs b/Hydra4NET/IUMF.cs index af9e39c..945961f 100644 --- a/Hydra4NET/IUMF.cs +++ b/Hydra4NET/IUMF.cs @@ -1,4 +1,5 @@ -using System; +using Hydra4NET.Helpers; +using System; namespace Hydra4NET { @@ -50,6 +51,16 @@ public interface IUMF /// string Serialize(); + /// + /// Serializes this UMF message to Utf8 encoded bytes + /// + /// + byte[] SerializeUtf8Bytes(); + + /// + /// Retrieves the routing information for the message + /// + /// UMFRouteEntry GetRouteEntry(); } diff --git a/Hydra4NET/Internal/InboundMessageStream.cs b/Hydra4NET/Internal/InboundMessageStream.cs index 29b8cde..ec4a8b2 100644 --- a/Hydra4NET/Internal/InboundMessageStream.cs +++ b/Hydra4NET/Internal/InboundMessageStream.cs @@ -21,40 +21,35 @@ public InboundMessageStream(string mid, int? maxBuffer = null) protected Channel _channel; - private const int _IncompleteValue = 0; + private readonly ThreadSafeBool _isDisposed = false; - private int _isComplete = _IncompleteValue; - - public bool IsComplete => _isComplete != _IncompleteValue; - - public IAsyncEnumerable EnumerateMessagesAsync(CancellationToken ct = default) + public bool IsDisposed { - return _channel.Reader.ReadAllAsync(ct); + get => _isDisposed.Value; + private set => _isDisposed.Value = value; } - - void MarkComplete() + + public IAsyncEnumerable EnumerateMessagesAsync(CancellationToken ct = default) { - if (IsComplete) - return; - //extra thread safety - Interlocked.Increment(ref _isComplete); - _channel.Writer.TryComplete(); + return _channel.Reader.ReadAllAsync(ct); } public async ValueTask AddMessage(IInboundMessage msg) { - if(!IsComplete) + if(!IsDisposed) await _channel.Writer.WriteAsync(msg); } public void Dispose() { + if (IsDisposed) + return; + IsDisposed = true; OnDispose(); - MarkComplete(); + _channel.Writer.TryComplete(); } public Action OnDispose { get; set; } = () => { }; - } internal class InboundMessageStream : InboundMessageStream, IInboundMessageStream where TResBdy: new() diff --git a/Hydra4NET/Internal/ThreadSafeBool.cs b/Hydra4NET/Internal/ThreadSafeBool.cs new file mode 100644 index 0000000..c9a5f5f --- /dev/null +++ b/Hydra4NET/Internal/ThreadSafeBool.cs @@ -0,0 +1,29 @@ +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; + +namespace Hydra4NET.Internal +{ + internal class ThreadSafeBool + { + private const int FALSE_VAL = 0; + private const int TRUE_VAL = 1; + private int _val; //defaults to FALSE_VAL (0) + + public bool Value + { + //read or set in thread-safe manner + get => Interlocked.CompareExchange(ref _val, TRUE_VAL, TRUE_VAL) == TRUE_VAL; + set + { + if (value) + Interlocked.CompareExchange(ref _val, TRUE_VAL, FALSE_VAL); + else + Interlocked.CompareExchange(ref _val, FALSE_VAL, TRUE_VAL); + } + } + + public static implicit operator ThreadSafeBool(bool v) => new ThreadSafeBool { Value = v }; + } +} diff --git a/Hydra4NET/Partials/HealthPresence.cs b/Hydra4NET/Partials/HealthPresence.cs index c606c0f..027c757 100644 --- a/Hydra4NET/Partials/HealthPresence.cs +++ b/Hydra4NET/Partials/HealthPresence.cs @@ -3,7 +3,6 @@ using System; using System.Collections.Generic; using System.Diagnostics; -using System.Text.Json; using System.Threading.Tasks; /* @@ -64,7 +63,7 @@ public class PresenceNodeEntry #endregion // Entry classes #region Presence and Health check handling - private string BuildHealthCheckEntry() + private byte[] BuildHealthCheckEntry() { HealthCheckEntry healthCheckEntry = new HealthCheckEntry() { @@ -88,10 +87,10 @@ private string BuildHealthCheckEntry() var runtime = DateTime.Now - Process.GetCurrentProcess().StartTime; healthCheckEntry.UptimeSeconds = runtime.TotalSeconds; - return StandardSerializer.Serialize(healthCheckEntry); + return StandardSerializer.SerializeBytes(healthCheckEntry); } - private string BuildPresenceNodeEntry() + private byte[] BuildPresenceNodeEntry() { PresenceNodeEntry presenceNodeEntry = new PresenceNodeEntry() { @@ -105,7 +104,7 @@ private string BuildPresenceNodeEntry() HostName = HostName, Elapsed = 0 }; - return StandardSerializer.Serialize(presenceNodeEntry); + return StandardSerializer.SerializeBytes(presenceNodeEntry); } private void ConfigurePresenceTask() => _presenceTask = UpdatePresence(); // allows for calling UpdatePresence without awaiting @@ -116,13 +115,20 @@ private async Task UpdatePresence() { while (!_cts.Token.IsCancellationRequested) { - await PresenceEvent(); - if (_secondsTick++ == _HEALTH_UPDATE_INTERVAL) + try { - await HealthCheckEvent(); - _secondsTick = _ONE_SECOND; + await PresenceEvent(); + if (_secondsTick++ == _HEALTH_UPDATE_INTERVAL) + { + await HealthCheckEvent(); + _secondsTick = _ONE_SECOND; + } + await Task.Delay(_ONE_SECOND * 1000, _cts.Token); + } + catch (Exception e) + { + await EmitError(e); } - await Task.Delay(_ONE_SECOND * 1000, _cts.Token); } } catch (OperationCanceledException) @@ -132,7 +138,7 @@ private async Task UpdatePresence() private async Task PresenceEvent() { - if (_redis != null) + if (_redis != null && _redis.IsConnected) { var db = GetDatabase(); await db.StringSetAsync($"{_redis_pre_key}:{ServiceName}:{InstanceID}:presence", InstanceID); @@ -143,7 +149,7 @@ private async Task PresenceEvent() private async Task HealthCheckEvent() { - if (_redis != null) + if (_redis != null && _redis.IsConnected) { var db = GetDatabase(); await db.StringSetAsync($"{_redis_pre_key}:{ServiceName}:{InstanceID}:health", BuildHealthCheckEntry()); @@ -168,11 +174,7 @@ public async Task GetPresenceAsync(string serviceNa string? s = await GetDatabase().HashGetAsync(_nodes_hash_key, id); if (s != null) { - PresenceNodeEntry? presenceNodeEntry = JsonSerializer.Deserialize(s, new JsonSerializerOptions() - { - PropertyNameCaseInsensitive = true, - PropertyNamingPolicy = JsonNamingPolicy.CamelCase - }); + PresenceNodeEntry? presenceNodeEntry = StandardSerializer.Deserialize(s); if (presenceNodeEntry != null) { serviceEntries.Add(presenceNodeEntry); @@ -194,11 +196,7 @@ public async Task GetServiceNodesAsync() HashEntry[] list = await db.HashGetAllAsync($"{_redis_pre_key}:nodes"); foreach (var entry in list) { - PresenceNodeEntry? presenceNodeEntry = JsonSerializer.Deserialize(entry.Value, new JsonSerializerOptions() - { - PropertyNameCaseInsensitive = true, - PropertyNamingPolicy = JsonNamingPolicy.CamelCase - }); + PresenceNodeEntry? presenceNodeEntry = StandardSerializer.Deserialize((string?)entry.Value ?? ""); if (presenceNodeEntry != null) { var unixTimestamp = GetUtcTimeStamp(presenceNodeEntry.UpdatedOn); diff --git a/Hydra4NET/QueueProcessor.cs b/Hydra4NET/QueueProcessor.cs index 262a1df..59932d4 100644 --- a/Hydra4NET/QueueProcessor.cs +++ b/Hydra4NET/QueueProcessor.cs @@ -47,6 +47,8 @@ public void Init(CancellationToken ct = default) { try { + if (!_hydra.IsRedisConnected) + return; string message = await _hydra.GetQueueMessageAsync(); if (message != string.Empty) { @@ -83,6 +85,7 @@ await ProcessMessage(new InboundMessage _timer?.Change((int)_slidingDuration, 0); } }, null, 0, 0); + //when cancelled, it will stop the timer ct.Register(() => _timer?.Change(Timeout.Infinite, 0)); } @@ -108,4 +111,4 @@ public void Dispose() _timer?.Dispose(); } } -} +} \ No newline at end of file diff --git a/Hydra4NET/RedisConnectionStatus.cs b/Hydra4NET/RedisConnectionStatus.cs new file mode 100644 index 0000000..b5a7998 --- /dev/null +++ b/Hydra4NET/RedisConnectionStatus.cs @@ -0,0 +1,25 @@ +using System; + +namespace Hydra4NET +{ + public enum ConnectionStatus + { + Disconnected, + Connected, + Reconnected + } + + public class RedisConnectionStatus + { + internal RedisConnectionStatus(ConnectionStatus status, string? message = null, Exception? exception = null) + { + Status = status; + Message = message; + Exception = exception; + } + + public ConnectionStatus Status { get; private set; } + public string? Message { get; private set; } + public Exception? Exception { get; private set; } + } +} \ No newline at end of file diff --git a/Hydra4NET/UMF.cs b/Hydra4NET/UMF.cs index 6df0643..105f37a 100644 --- a/Hydra4NET/UMF.cs +++ b/Hydra4NET/UMF.cs @@ -181,6 +181,8 @@ public UMF() : base() { } /// /// public string Serialize() => StandardSerializer.Serialize(this); + + public byte[] SerializeUtf8Bytes() => StandardSerializer.SerializeBytes(this); } } diff --git a/Hydra4Net.HostingExtensions/DefaultMessageHandler.cs b/Hydra4Net.HostingExtensions/DefaultMessageHandler.cs deleted file mode 100644 index 4d8a82e..0000000 --- a/Hydra4Net.HostingExtensions/DefaultMessageHandler.cs +++ /dev/null @@ -1,6 +0,0 @@ -namespace Hydra4Net.HostingExtensions -{ - internal class DefaultMessageHandler : HydraEventsHandler - { - } -} diff --git a/Hydra4Net.HostingExtensions/Hydra4Net.HostingExtensions.csproj b/Hydra4Net.HostingExtensions/Hydra4Net.HostingExtensions.csproj index 95f625f..8af85c2 100644 --- a/Hydra4Net.HostingExtensions/Hydra4Net.HostingExtensions.csproj +++ b/Hydra4Net.HostingExtensions/Hydra4Net.HostingExtensions.csproj @@ -6,7 +6,7 @@ True Microsoft.Hosting.Extensions Helpers for Hydra for .NET True - 0.0.14 + 0.0.15 Helper extensions for Microsoft.Hosting.Extensions and Hydra for .NET 2022 Carlos Justiniano, Justin Couch, and Contributors hydra microservice diff --git a/Hydra4Net.HostingExtensions/HydraBackgroundService.cs b/Hydra4Net.HostingExtensions/HydraBackgroundService.cs index 0ae408f..9e676fe 100644 --- a/Hydra4Net.HostingExtensions/HydraBackgroundService.cs +++ b/Hydra4Net.HostingExtensions/HydraBackgroundService.cs @@ -37,6 +37,12 @@ async Task PerformHandlerAction(Func action) await action(scope.ServiceProvider.GetRequiredService()); } + void PerformHandlerAction(Action action) + { + using var scope = _services.CreateScope(); + action(scope.ServiceProvider.GetRequiredService()); + } + //called once at app start protected override async Task ExecuteAsync(CancellationToken stoppingToken) { @@ -44,6 +50,8 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) { _hydra.OnMessageHandler((msg) => PerformHandlerAction(e => e.OnMessageReceived(msg, _hydra))); _hydra.OnInternalErrorHandler((ex) => PerformHandlerAction(e => e.OnInternalError(_hydra, ex))); + _hydra.OnRedisConnectionChange((s) => PerformHandlerAction(e => e.OnRedisConnectionChange(_hydra, s))); + _hydra.OnDebugEventHandler((ev) => PerformHandlerAction(e => e.OnDebugEvent(_hydra, ev))); await PerformHandlerAction(e => e.BeforeInit(_hydra)); await _hydra.InitAsync(); _queue.Init(stoppingToken); @@ -58,4 +66,4 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) } } } -} +} \ No newline at end of file diff --git a/Hydra4Net.HostingExtensions/HydraEventsHandler.cs b/Hydra4Net.HostingExtensions/HydraEventsHandler.cs index 0f9331c..50e8ed8 100644 --- a/Hydra4Net.HostingExtensions/HydraEventsHandler.cs +++ b/Hydra4Net.HostingExtensions/HydraEventsHandler.cs @@ -1,4 +1,5 @@ using Hydra4NET; +using Microsoft.Extensions.Logging; using System; using System.Threading.Tasks; @@ -6,6 +7,12 @@ namespace Hydra4Net.HostingExtensions { public class HydraEventsHandler : IHydraEventsHandler { + protected ILogger Logger; + + public HydraEventsHandler(ILogger logger) + { + Logger = logger; + } public virtual Task OnMessageReceived(IInboundMessage msg, IHydra hydra) { return Task.CompletedTask; @@ -20,6 +27,7 @@ public virtual Task BeforeInit(IHydra hydra) { return Task.CompletedTask; } + public virtual Task OnShutdown(IHydra hydra) { return Task.CompletedTask; @@ -32,12 +40,36 @@ public virtual Task OnInitError(IHydra hydra, Exception e) public virtual Task OnDequeueError(IHydra hydra, Exception e) { + Logger.LogError(e, "Hydra dequeue error occurred"); return Task.CompletedTask; } public virtual Task OnInternalError(IHydra hydra, Exception e) { + Logger.LogError(e, "Internal Hydra error occurred"); + return Task.CompletedTask; + } + + public virtual void OnDebugEvent(IHydra hydra, DebugEvent e) + { + Logger.LogDebug("Hydra: {0}: {1}", e.Message, string.IsNullOrEmpty(e.UMF) ? "(no UMF)" : e.UMF); + } + + public virtual Task OnRedisConnectionChange(IHydra hydra, RedisConnectionStatus connectionStatus) + { + switch (connectionStatus.Status) + { + case ConnectionStatus.Connected: + Logger.LogDebug(connectionStatus.Message); + break; + case ConnectionStatus.Disconnected: + Logger.LogError(connectionStatus.Exception, connectionStatus.Message); + break; + case ConnectionStatus.Reconnected: + Logger.LogInformation(connectionStatus.Exception, connectionStatus.Message); + break; + } return Task.CompletedTask; } } -} +} \ No newline at end of file diff --git a/Hydra4Net.HostingExtensions/IHydraEventsHandler.cs b/Hydra4Net.HostingExtensions/IHydraEventsHandler.cs index d8026a3..ae6e738 100644 --- a/Hydra4Net.HostingExtensions/IHydraEventsHandler.cs +++ b/Hydra4Net.HostingExtensions/IHydraEventsHandler.cs @@ -38,20 +38,42 @@ public interface IHydraEventsHandler Task OnInitError(IHydra hydra, Exception exception); /// - /// Called after a Hydra dequeueing error occurs + /// Called after a Hydra dequeuing error occurs /// /// /// /// Task OnDequeueError(IHydra hydra, Exception e); + /// + /// Called when a Hydra internal error occurs + /// + /// + /// + /// Task OnInternalError(IHydra hydra, Exception e); + /// + /// Called when a Hydra debug information is emitted + /// + /// + /// + /// + void OnDebugEvent(IHydra hydra, DebugEvent e); + /// /// Called on application shutdown /// /// /// Task OnShutdown(IHydra hydra); + + /// + /// Called when Hydra connection to Redis changes + /// + /// + /// + /// + Task OnRedisConnectionChange(IHydra hydra, RedisConnectionStatus connectionStatus); } } \ No newline at end of file