diff --git a/RabbitMQ.Stream.Client/RawProducer.cs b/RabbitMQ.Stream.Client/RawProducer.cs index 6e493f03..f8768a24 100644 --- a/RabbitMQ.Stream.Client/RawProducer.cs +++ b/RabbitMQ.Stream.Client/RawProducer.cs @@ -121,12 +121,24 @@ private async Task Init() { foreach (var id in publishingIds.Span) { - _config.ConfirmHandler(new Confirmation + try { - PublishingId = id, - Code = ResponseCode.Ok, - Stream = _config.Stream - }); + _config.ConfirmHandler(new Confirmation + { + PublishingId = id, + Code = ResponseCode.Ok, + Stream = _config.Stream + }); + } + catch (Exception e) + { + // The call is exposed to the user so we need to catch any exception + // there could be an exception in the user code. + // So here we log the exception and we continue. + + _logger.LogError(e, "Error during confirm handler, publishing id: {Id}. " + + "Hint: Check the user ConfirmHandler callback", id); + } } _semaphore.Release(publishingIds.Length); diff --git a/RabbitMQ.Stream.Client/Reliable/ConfirmationPipe.cs b/RabbitMQ.Stream.Client/Reliable/ConfirmationPipe.cs index 1b639a12..f3b53617 100644 --- a/RabbitMQ.Stream.Client/Reliable/ConfirmationPipe.cs +++ b/RabbitMQ.Stream.Client/Reliable/ConfirmationPipe.cs @@ -124,7 +124,8 @@ private async void OnTimedEvent(object sender, ElapsedEventArgs e) foreach (var pair in timedOutMessages) { - await RemoveUnConfirmedMessage(ConfirmationStatus.ClientTimeoutError, pair.Value.PublishingId, null).ConfigureAwait(false); + await RemoveUnConfirmedMessage(ConfirmationStatus.ClientTimeoutError, pair.Value.PublishingId, null) + .ConfigureAwait(false); } } @@ -145,8 +146,13 @@ internal void AddUnConfirmedMessage(ulong publishingId, List messages) }); } - internal Task RemoveUnConfirmedMessage(ConfirmationStatus confirmationStatus, ulong publishingId, string stream) + internal async Task RemoveUnConfirmedMessage(ConfirmationStatus confirmationStatus, ulong publishingId, + string stream) { - return _waitForConfirmationActionBlock.SendAsync((confirmationStatus, publishingId, stream)); + if (!await _waitForConfirmationActionBlock.SendAsync((confirmationStatus, publishingId, stream)) + .ConfigureAwait(false)) + { + await _waitForConfirmationActionBlock.Completion.ConfigureAwait(false); + } } } diff --git a/RabbitMQ.Stream.Client/Reliable/ProducerFactory.cs b/RabbitMQ.Stream.Client/Reliable/ProducerFactory.cs index 48e82cef..fe80ae3c 100644 --- a/RabbitMQ.Stream.Client/Reliable/ProducerFactory.cs +++ b/RabbitMQ.Stream.Client/Reliable/ProducerFactory.cs @@ -55,7 +55,7 @@ private async Task SuperStreamProducer() _ => ConfirmationStatus.UndefinedError }; _confirmationPipe.RemoveUnConfirmedMessage(confirmationStatus, confirmation.PublishingId, - stream); + stream).ConfigureAwait(false); } }, BaseLogger).ConfigureAwait(false); } @@ -97,7 +97,7 @@ private async Task StandardProducer() _ => ConfirmationStatus.UndefinedError }; _confirmationPipe.RemoveUnConfirmedMessage(confirmationStatus, confirmation.PublishingId, - confirmation.Stream); + confirmation.Stream).ConfigureAwait(false); } }, BaseLogger).ConfigureAwait(false); } diff --git a/kubernetes/stream_cluster.yaml b/kubernetes/stream_cluster.yaml index 631cb766..85592053 100644 --- a/kubernetes/stream_cluster.yaml +++ b/kubernetes/stream_cluster.yaml @@ -6,13 +6,15 @@ metadata: apiVersion: rabbitmq.com/v1beta1 kind: RabbitmqCluster metadata: - name: rabbitmq-stream + name: tls namespace: stream-clients-test spec: replicas: 1 image: rabbitmq:3-management service: type: LoadBalancer + tls: + secretName: tls-secret resources: requests: cpu: 500m diff --git a/kubernetes/test-server-key.pem b/kubernetes/test-server-key.pem new file mode 100644 index 00000000..14eed73c --- /dev/null +++ b/kubernetes/test-server-key.pem @@ -0,0 +1,28 @@ +-----BEGIN PRIVATE KEY----- +MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQCzfGD7b37pIVtW +O0lO27ZVJRdgFrEjI4F/9GUQvzb9zsYZBD4YbHIWISqrh1Y/4osQLrojKSkGZrX5 +6x/RegCNNsfIeqAM6gNQ1sVH5pxVp2essOitBoy8Udgf6kqH46+yrTX0Dk23cgSp +7j0gCXL7KM15DBxDygjA96E1SHp44DgkwCvWLeYXFie5uVL1SUDx4drCGfnu2Fcg +rvGfPBZpUIR9okmJWZIJcESkL2u4GaGQPGGbQOkCZOcqwRs80mwyYHsKSthKc58n +lqJlZd76s/LNMbGkMdppyInYnqacx3yOPWVsbH6hWKPb1wQsNWnzCVFL3krvrqDI +rCXOOKN1AgMBAAECggEAEXHelDmZPmv3TmmLPb6lmV92RujVtSpbiRX2J7tKCz+o +aeCLb4DE0ulM1iicha+NYBiGj2nN+rkLaVvEty2yNYd0QgRHH5I0GcySFqOvoLSZ +Y0O7jaukDJ6w0KNLNKt41Xc31f146MJMeT62UrGQayBjXidC7QTLNoQq9zyQ6MQK +crbj8f/TqT68V5f0nQeYFRGPYZPiaLcDB6mOCL09B4bfMxcOl0/6SVouV8BrTHxs +kqtO6yxrpc0XL1vfCrBqljlVLnXyGNmkaegSQMOTDfGqM6Mkc6771DYt5MJsmVCH +VgDZeMs/BJA/srV2rXW4cwfO0OOSTkE9cCNzZM6Q6QKBgQDmLk+bLb6cafFceuGl +gktNYi1TbTD62/shKulmFm8C7jNMTdbGi4pEwlnxzU57A+spr0nzody5c8qMIfUF +Cpf9aRWs7xG0WOCuYbUSI+gbICxaJOKe00TyWwf/xn13p6J4wecg4n5C8Wo1QIRi +nFwgaqYATkyzhE9+YOQKpxgeTQKBgQDHnlmMxiTuD3j9BxEkD3aY4E97qTfPNqgM +umrwXoU0paYtgfuxMe7yjkE/qVKn3QP+wzN+XMR6YcBdphQPlSCcEyJrAZZCGrgJ +CSO9anY6CA5bgZ9Mk3pBCldHEQqxWrg27bejkn4KV3PXmDtQwMYrpsgEu3DPCgy/ +TUVTnpK9yQKBgQC6PGQaWOOtKCapvZ6OTCJjJPkpU+JaRdwlVNPszl/ZTiLhLOWG +VOZ1hY5Cjutdqqj9XB8IaUDuJ5qM0PiusIiS9xAbkH6RnYuEa/eWCslEET7xXICj +IqrZMAAD2XQweMiCzdgUikzAGxXkqiOyqXH8pG1VOATlBjtPNFOtrs5bzQKBgD08 +1cn6609gzcQJy/ddCwwBHEEae3WFFe65rZ7J0GGDQ8SIMLd+Uwh0HY4zGplGkzgv +l/d27AuDO2k/Tr4tCJD4ycE7/mWPHtAezqkIJPbOi+EEleL/By02x+mUT8xywTqQ +mJqEkUgI5g/IssGmMeUoSAozmnrZYWm6gb8SUYAJAoGATjkZuRTAx+85ninZCbMU +fKpxM5KhCOHOBzxEKjmHp/fRUga+XoZMeWtmcqABWa3pD3kgTa+9z1hrQaUCh1cw +01QjqGxY9Z7VdBMPexXiht1JDZULaJ3Cif4YKUrwrDWc737HRqLP6c7EMwXnAod2 +RPe55pKjmk0X/uuple2WCAE= +-----END PRIVATE KEY----- diff --git a/kubernetes/test-server.pem b/kubernetes/test-server.pem new file mode 100644 index 00000000..83af11f1 --- /dev/null +++ b/kubernetes/test-server.pem @@ -0,0 +1,24 @@ +-----BEGIN CERTIFICATE----- +MIID7zCCAtegAwIBAgIBATANBgkqhkiG9w0BAQsFADBMMTswOQYDVQQDDDJUTFNH +ZW5TZWxmU2lnbmVkdFJvb3RDQSAyMDIzLTA5LTEzVDE2OjU0OjQ0LjU2NzY0MjEN +MAsGA1UEBwwEJCQkJDAeFw0yMzA5MTMxNDU0NDRaFw0zMzA5MTAxNDU0NDRaMDYx +IzAhBgNVBAMMGmdzYW50b21hZ2c2TFZETS52bXdhcmUuY29tMQ8wDQYDVQQKDAZz +ZXJ2ZXIwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQCzfGD7b37pIVtW +O0lO27ZVJRdgFrEjI4F/9GUQvzb9zsYZBD4YbHIWISqrh1Y/4osQLrojKSkGZrX5 +6x/RegCNNsfIeqAM6gNQ1sVH5pxVp2essOitBoy8Udgf6kqH46+yrTX0Dk23cgSp +7j0gCXL7KM15DBxDygjA96E1SHp44DgkwCvWLeYXFie5uVL1SUDx4drCGfnu2Fcg +rvGfPBZpUIR9okmJWZIJcESkL2u4GaGQPGGbQOkCZOcqwRs80mwyYHsKSthKc58n +lqJlZd76s/LNMbGkMdppyInYnqacx3yOPWVsbH6hWKPb1wQsNWnzCVFL3krvrqDI +rCXOOKN1AgMBAAGjgfEwge4wCQYDVR0TBAIwADALBgNVHQ8EBAMCBaAwEwYDVR0l +BAwwCgYIKwYBBQUHAwEwTAYDVR0RBEUwQ4IaZ3NhbnRvbWFnZzZMVkRNLnZtd2Fy +ZS5jb22CGmdzYW50b21hZ2c2TFZETS52bXdhcmUuY29tgglsb2NhbGhvc3QwHQYD +VR0OBBYEFDHW2nif3ILxi7DWd4TNyn63Q7apMB8GA1UdIwQYMBaAFKccKJsr3YYn +MVNXfujEGRCONpu9MDEGA1UdHwQqMCgwJqAkoCKGIGh0dHA6Ly9jcmwtc2VydmVy +OjgwMDAvYmFzaWMuY3JsMA0GCSqGSIb3DQEBCwUAA4IBAQCwa+ksiRPR06JZzKFd +pcD4K5oZ6F5mVpTqn3Kf5jS1cz6Ippi/T8nU8k/xVKmDMqqCWCYGal1U8DmHGPzQ +WOWMk/Ibb72feCS4txIH4GuV/ZO868/5qOy1rmP/UjOY6Kpyju/Eg13AdzcuSnZ3 +rZcSncm/gY5BHMmUJdMutTe+Scz32VW7yV8Mi+2ZwsMiqLksZMpBJqPyxroGTksI +p7bklWf1pOgQqh9XJqu3x4rceH0o3xHZ/wana4RnSWL7Q4N6TNinAjLzlLvDByW7 +JX9ivpCVpM0n6tIT+E7UbWVX6WoICjCJeDLNwq/jYVEDP80O3yjDYKpYIOqp/e7Q +/BJy +-----END CERTIFICATE-----