From 666c65ad6573ad8b3c17cc9ad1abae0405fdbc99 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Florian=20R=C3=B6pstorf?= <13087909+FRoepstorf@users.noreply.github.com> Date: Thu, 28 Sep 2023 15:39:00 +0200 Subject: [PATCH 1/4] feat: add error handling for eventbridge put events --- .github/workflows/run-tests.yml | 2 +- .../Broadcasters/EventBridgeBroadcaster.php | 19 ++- tests/Pub/BasicEvents/EventBridgeTest.php | 123 ++++++++++++------ 3 files changed, 102 insertions(+), 42 deletions(-) diff --git a/.github/workflows/run-tests.yml b/.github/workflows/run-tests.yml index 44a48dd..3b6f24b 100644 --- a/.github/workflows/run-tests.yml +++ b/.github/workflows/run-tests.yml @@ -4,7 +4,7 @@ on: [push, pull_request] jobs: test: - runs-on: ubuntu-18.04 + runs-on: ubuntu-latest strategy: fail-fast: true matrix: diff --git a/src/Pub/Broadcasting/Broadcasters/EventBridgeBroadcaster.php b/src/Pub/Broadcasting/Broadcasters/EventBridgeBroadcaster.php index 2f70441..ca62663 100644 --- a/src/Pub/Broadcasting/Broadcasters/EventBridgeBroadcaster.php +++ b/src/Pub/Broadcasting/Broadcasters/EventBridgeBroadcaster.php @@ -4,6 +4,8 @@ use Aws\EventBridge\EventBridgeClient; use Illuminate\Broadcasting\Broadcasters\Broadcaster; +use Illuminate\Support\Arr; +use Illuminate\Support\Facades\Log; class EventBridgeBroadcaster extends Broadcaster { @@ -52,9 +54,17 @@ public function broadcast(array $channels, $event, array $payload = []) { $events = $this->mapToEventBridgeEntries($channels, $event, $payload); - $this->eventBridgeClient->putEvents([ + $result = $this->eventBridgeClient->putEvents([ 'Entries' => $events, ]); + + if ($this->failedToBroadcast($result)) { + Log::error('Failed to send events to EventBridge', [ + 'errors' => collect($result->get('Entries'))->filter(function (array $entry) { + return Arr::hasAny($entry, ['ErrorCode', 'ErrorMessage']); + })->toArray(), + ]); + } } /** @@ -76,4 +86,11 @@ protected function mapToEventBridgeEntries(array $channels, string $event, array }) ->all(); } + + protected function failedToBroadcast(?\Aws\Result $result): bool + { + return $result + && $result->hasKey('FailedEntryCount') + && $result->get('FailedEntryCount') > 0; + } } diff --git a/tests/Pub/BasicEvents/EventBridgeTest.php b/tests/Pub/BasicEvents/EventBridgeTest.php index 58f2b0a..ecf6646 100644 --- a/tests/Pub/BasicEvents/EventBridgeTest.php +++ b/tests/Pub/BasicEvents/EventBridgeTest.php @@ -2,8 +2,9 @@ namespace PodPoint\AwsPubSub\Tests\Pub\BasicEvents; -use Illuminate\Foundation\Application; -use Mockery; +use Aws\Result; +use Illuminate\Support\Facades\Log; +use Mockery as m; use Mockery\MockInterface; use PodPoint\AwsPubSub\Tests\Pub\Concerns\InteractsWithEventBridge; use PodPoint\AwsPubSub\Tests\Pub\TestClasses\Events\UserRetrieved; @@ -30,60 +31,52 @@ protected function getEnvironmentSetUp($app) /** @test */ public function it_broadcasts_basic_event_with_the_event_name_as_the_detail_type_and_serialised_event_as_the_detail() { - $jane = $this->createJane(); + $event = new UserRetrieved($this->createJane()); - $janeRetrieved = new UserRetrieved($jane); - - $this->mockEventBridge(function (MockInterface $eventBridge) use ($janeRetrieved) { + $this->mockEventBridge(function (MockInterface $eventBridge) use ($event) { $eventBridge ->shouldReceive('putEvents') ->once() - ->with(Mockery::on(function ($arg) use ($janeRetrieved) { - return $arg['Entries'][0]['Detail'] === json_encode($janeRetrieved) + ->with(m::on(function ($arg) use ($event) { + return $arg['Entries'][0]['Detail'] === json_encode($event) && $arg['Entries'][0]['DetailType'] === UserRetrieved::class && $arg['Entries'][0]['EventBusName'] === 'users' && $arg['Entries'][0]['Source'] === 'my-app'; })); }); - event($janeRetrieved); + event($event); } /** @test */ public function it_broadcasts_basic_event_with_action() { - $jane = $this->createJane(); - - $janeRetrieved = new UserRetrievedWithCustomName($jane); + $event = new UserRetrievedWithCustomName($this->createJane()); - $this->mockEventBridge(function (MockInterface $eventBridge) use ($janeRetrieved) { + $this->mockEventBridge(function (MockInterface $eventBridge) use ($event) { $eventBridge ->shouldReceive('putEvents') ->once() - ->with(Mockery::on(function ($arg) use ($janeRetrieved) { - return $arg['Entries'][0]['Detail'] === json_encode($janeRetrieved) - && $arg['Entries'][0]['DetailType'] === 'user.retrieved' - && $arg['Entries'][0]['EventBusName'] === 'users' - && $arg['Entries'][0]['Source'] === 'my-app'; + ->with(m::on(function ($arg) use ($event) { + return $arg['Entries'][0]['Detail'] === json_encode($event) + && $arg['Entries'][0]['DetailType'] === 'user.retrieved'; })); }); - event($janeRetrieved); + event($event); } /** @test */ public function it_broadcasts_basic_event_with_action_and_custom_payload() { - $jane = $this->createJane(); + $event = new UserRetrievedWithCustomPayload($this->createJane()); - $janeRetrieved = new UserRetrievedWithCustomPayload($jane); - - $this->mockEventBridge(function (MockInterface $eventBridge) use ($janeRetrieved) { + $this->mockEventBridge(function (MockInterface $eventBridge) use ($event) { $eventBridge ->shouldReceive('putEvents') ->once() - ->with(Mockery::on(function ($arg) use ($janeRetrieved) { - $customPayload = array_merge($janeRetrieved->broadcastWith(), ['socket' => null]); + ->with(m::on(function ($arg) use ($event) { + $customPayload = array_merge($event->broadcastWith(), ['socket' => null]); return $arg['Entries'][0]['Detail'] === json_encode($customPayload) && $arg['Entries'][0]['DetailType'] === UserRetrievedWithCustomPayload::class @@ -92,40 +85,90 @@ public function it_broadcasts_basic_event_with_action_and_custom_payload() })); }); - event($janeRetrieved); + event($event); } /** @test */ public function it_broadcasts_basic_event_to_multiple_channels_as_buses() { - $jane = $this->createJane(); - - $janeRetrieved = new UserRetrievedWithMultipleChannels($jane); + $event = new UserRetrievedWithMultipleChannels($this->createJane()); - $this->mockEventBridge(function (MockInterface $eventBridge) use ($janeRetrieved) { + $this->mockEventBridge(function (MockInterface $eventBridge) use ($event) { $eventBridge ->shouldReceive('putEvents') ->once() - ->with(Mockery::on(function ($arg) use ($janeRetrieved) { - return collect($janeRetrieved->broadcastOn()) - ->map(function ($channel, $key) use ($arg, $janeRetrieved) { - return $arg['Entries'][$key]['Detail'] === json_encode($janeRetrieved) + ->with(m::on(function ($arg) use ($event) { + return collect($event->broadcastOn()) + ->map(function ($channel, $key) use ($arg, $event) { + return $arg['Entries'][$key]['Detail'] === json_encode($event) && $arg['Entries'][$key]['DetailType'] === UserRetrievedWithMultipleChannels::class && $arg['Entries'][$key]['EventBusName'] === $channel && $arg['Entries'][$key]['Source'] === 'my-app'; }) ->filter() - ->count() > 0; + ->count() === 2; })); }); - event($janeRetrieved); + event($event); } - /** - * @return User - */ - protected function createJane() + /** @test */ + public function it_can_use_a_source() + { + config(['broadcasting.connections.eventbridge.source' => 'some-other-source']); + + $event = new UserRetrievedWithMultipleChannels($this->createJane()); + + $this->mockEventBridge(function (MockInterface $eventBridge) use ($event) { + $eventBridge + ->shouldReceive('putEvents') + ->once() + ->with(m::on(function ($arg) use ($event) { + return collect($event->broadcastOn()) + ->map(function ($channel, $key) use ($arg) { + return $arg['Entries'][$key]['Source'] === 'some-other-source'; + }) + ->filter() + ->count() > 0; + })); + }); + + event($event); + } + + /** @test */ + public function it_logs_errors_when_events_fail_to_send() + { + $event = new UserRetrieved($this->createJane()); + + $failedEntry = [ + 'ErrorCode' => 'InternalFailure', + 'ErrorMessage' => 'Something went wrong', + 'EventId' => $this->faker->uuid, + ]; + + $this->mockEventBridge(function (MockInterface $eventBridge) use ($failedEntry) { + $eventBridge + ->shouldReceive('putEvents') + ->once() + ->andReturn(new Result([ + 'FailedEntryCount' => 1, + 'Entries' => [ + $failedEntry, + ['EventId' => $this->faker->uuid], + ], + ])); + }); + + Log::shouldReceive('error')->once()->with('Failed to send events to EventBridge', [ + 'errors' => [$failedEntry], + ]); + + event($event); + } + + protected function createJane(): User { return User::create([ 'name' => 'Jane', From 4b21b6fb1e899f3b4cf0b5b384d018a5832c2400 Mon Sep 17 00:00:00 2001 From: Clem Blanco Date: Fri, 6 Oct 2023 17:07:55 +0200 Subject: [PATCH 2/4] fix: undeleted messages (#74) --- src/Sub/Queue/Jobs/SnsEventDispatcherJob.php | 4 ++ .../Queue/Jobs/SnsEventDispatcherJobTest.php | 42 +++++++++++++++---- 2 files changed, 39 insertions(+), 7 deletions(-) diff --git a/src/Sub/Queue/Jobs/SnsEventDispatcherJob.php b/src/Sub/Queue/Jobs/SnsEventDispatcherJob.php index e6331bd..762f335 100644 --- a/src/Sub/Queue/Jobs/SnsEventDispatcherJob.php +++ b/src/Sub/Queue/Jobs/SnsEventDispatcherJob.php @@ -21,6 +21,8 @@ public function fire() 'message delivery is disabled for your SQS subscription.', $this->job); } + $this->release(); + return; } @@ -30,6 +32,8 @@ public function fire() 'subject' => $this->snsSubject(), ]); } + + $this->delete(); } /** diff --git a/tests/Sub/Queue/Jobs/SnsEventDispatcherJobTest.php b/tests/Sub/Queue/Jobs/SnsEventDispatcherJobTest.php index f32caf2..1d1276c 100644 --- a/tests/Sub/Queue/Jobs/SnsEventDispatcherJobTest.php +++ b/tests/Sub/Queue/Jobs/SnsEventDispatcherJobTest.php @@ -113,7 +113,7 @@ public function it_will_handle_empty_messages_with_a_subject() } /** @test */ - public function it_will_not_handle_raw_notification_messages() + public function it_will_not_handle_raw_notification_messages_and_release_the_message_onto_the_queue() { Log::shouldReceive('error')->once()->with( m::pattern('/^SqsSnsQueue: Invalid SNS payload/'), @@ -122,32 +122,60 @@ public function it_will_not_handle_raw_notification_messages() $this->mockedJobData = $this->mockedRawNotificationMessage()['Messages'][0]; - $this->getJob()->fire(); + $job = $this->getJob(); + $job->shouldNotReceive('delete'); + $job->shouldReceive('release')->once(); + + $job->fire(); Event::assertNotDispatched('foo'); } /** @test */ - public function it_will_not_handle_messages_where_the_event_name_to_trigger_cannot_be_resolved() + public function it_will_not_handle_messages_where_the_event_name_to_trigger_cannot_be_resolved_and_delete_the_message_from_the_queue() { $this->mockedJobData = $this->mockedRichNotificationMessage([ 'TopicArn' => '', 'Subject' => '', ])['Messages'][0]; - $this->getJob()->fire(); + $job = $this->getJob(); + $job->shouldReceive('delete')->once(); + $job->shouldNotReceive('release'); + + $job->fire(); Event::assertNotDispatched(''); } + /** @test */ + public function it_will_delete_the_message_from_the_queue_when_it_managed_to_dispatch_an_event() + { + $this->mockedJobData = $this->mockedRichNotificationMessage([ + 'TopicArn' => 'TopicArn:123456', + ])['Messages'][0]; + + $job = $this->getJob(); + $job->shouldReceive('delete')->once(); + + $job->fire(); + + Event::assertDispatched('TopicArn:123456'); + } + + /** @return SnsEventDispatcherJob|\Mockery\LegacyMockInterface */ protected function getJob() { - return new SnsEventDispatcherJob( + $mock = m::mock(SnsEventDispatcherJob::class, [ $this->app, m::mock(SqsClient::class), $this->mockedJobData, 'connection-name', - 'https://sqs.someregion.amazonaws.com/1234567891011/pubsub-events' - ); + 'https://sqs.someregion.amazonaws.com/1234567891011/pubsub-events', + ])->makePartial(); + + $mock->shouldReceive('delete', 'release')->byDefault(); + + return $mock; } } From 4070a08e5803dd0c173d6b77741e63b41b3e9d76 Mon Sep 17 00:00:00 2001 From: StyleCI Bot Date: Mon, 9 Oct 2023 08:39:39 +0000 Subject: [PATCH 3/4] Apply fixes from StyleCI --- tests/EventServiceProviderTest.php | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/EventServiceProviderTest.php b/tests/EventServiceProviderTest.php index 3b12c72..12286eb 100644 --- a/tests/EventServiceProviderTest.php +++ b/tests/EventServiceProviderTest.php @@ -116,6 +116,7 @@ public function invalidCredentialsDataProvider() /** * @test + * * @dataProvider invalidCredentialsDataProvider */ public function it_can_make_sure_some_aws_credentials_are_provided_and_valid(array $invalidCredentials) From de29dbf880504d5758629d531df31ed57791cc8b Mon Sep 17 00:00:00 2001 From: clemblanco Date: Mon, 9 Oct 2023 10:42:56 +0200 Subject: [PATCH 4/4] WIP --- src/Pub/Broadcasting/Broadcasters/EventBridgeBroadcaster.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Pub/Broadcasting/Broadcasters/EventBridgeBroadcaster.php b/src/Pub/Broadcasting/Broadcasters/EventBridgeBroadcaster.php index ca62663..1d9d54c 100644 --- a/src/Pub/Broadcasting/Broadcasters/EventBridgeBroadcaster.php +++ b/src/Pub/Broadcasting/Broadcasters/EventBridgeBroadcaster.php @@ -61,7 +61,7 @@ public function broadcast(array $channels, $event, array $payload = []) if ($this->failedToBroadcast($result)) { Log::error('Failed to send events to EventBridge', [ 'errors' => collect($result->get('Entries'))->filter(function (array $entry) { - return Arr::hasAny($entry, ['ErrorCode', 'ErrorMessage']); + return Arr::has($entry, 'ErrorCode') || Arr::has($entry, 'ErrorMessage'); })->toArray(), ]); }