diff --git a/docs/reference/types/subscription.md b/docs/reference/types/subscription.md index c09ce3ae36..0a7717fadc 100644 --- a/docs/reference/types/subscription.md +++ b/docs/reference/types/subscription.md @@ -105,6 +105,8 @@ nav_order: 3 | `firstEvent` | Whether your application would like to receive events from the 'oldest' event emitted by your FireFly node (from the beginning of time), or the 'newest' event (from now), or a specific event sequence. Default is 'newest' | `SubOptsFirstEvent` | | `readAhead` | The number of events to stream ahead to your application, while waiting for confirmation of consumption of those events. At least once delivery semantics are used in FireFly, so if your application crashes/reconnects this is the maximum number of events you would expect to be redelivered after it restarts | `uint16` | | `withData` | Whether message events delivered over the subscription, should be packaged with the full data of those messages in-line as part of the event JSON payload. Or if the application should make separate REST calls to download that data. May not be supported on some transports. | `bool` | +| `batch` | Events are delivered in batches in an ordered array. The batch size is capped to the readAhead limit. The event payload is always an array even if there is a single event in the batch. Commonly used with Webhooks to allow events to be delivered and acknowledged in batches. | `bool` | +| `batchTimeout` | When batching is enabled, the optional timeout to send events even when the batch hasn't filled. | `string` | | `fastack` | Webhooks only: When true the event will be acknowledged before the webhook is invoked, allowing parallel invocations | `bool` | | `url` | Webhooks only: HTTP url to invoke. Can be relative if a base URL is set in the webhook plugin config | `string` | | `method` | Webhooks only: HTTP method to invoke. Default=POST | `string` | @@ -148,7 +150,7 @@ nav_order: 3 | `requestTimeout` | The max duration to hold a TLS handshake alive | `string` | | `maxIdleConns` | The max number of idle connections to hold pooled | `int` | | `idleTimeout` | The max duration to hold a HTTP keepalive connection between calls | `string` | -| `connectionTimeout` | | `string` | +| `connectionTimeout` | The maximum amount of time that a connection is allowed to remain with no data transmitted. | `string` | | `expectContinueTimeout` | See [ExpectContinueTimeout in the Go docs](https://pkg.go.dev/net/http#Transport) | `string` | diff --git a/docs/reference/types/wsstart.md b/docs/reference/types/wsstart.md index 880718c9a7..6514404189 100644 --- a/docs/reference/types/wsstart.md +++ b/docs/reference/types/wsstart.md @@ -96,6 +96,8 @@ nav_order: 23 | `firstEvent` | Whether your application would like to receive events from the 'oldest' event emitted by your FireFly node (from the beginning of time), or the 'newest' event (from now), or a specific event sequence. Default is 'newest' | `SubOptsFirstEvent` | | `readAhead` | The number of events to stream ahead to your application, while waiting for confirmation of consumption of those events. At least once delivery semantics are used in FireFly, so if your application crashes/reconnects this is the maximum number of events you would expect to be redelivered after it restarts | `uint16` | | `withData` | Whether message events delivered over the subscription, should be packaged with the full data of those messages in-line as part of the event JSON payload. Or if the application should make separate REST calls to download that data. May not be supported on some transports. | `bool` | +| `batch` | Events are delivered in batches in an ordered array. The batch size is capped to the readAhead limit. The event payload is always an array even if there is a single event in the batch. Commonly used with Webhooks to allow events to be delivered and acknowledged in batches. | `bool` | +| `batchTimeout` | When batching is enabled, the optional timeout to send events even when the batch hasn't filled. | `string` | | `fastack` | Webhooks only: When true the event will be acknowledged before the webhook is invoked, allowing parallel invocations | `bool` | | `url` | Webhooks only: HTTP url to invoke. Can be relative if a base URL is set in the webhook plugin config | `string` | | `method` | Webhooks only: HTTP method to invoke. Default=POST | `string` | @@ -139,7 +141,7 @@ nav_order: 23 | `requestTimeout` | The max duration to hold a TLS handshake alive | `string` | | `maxIdleConns` | The max number of idle connections to hold pooled | `int` | | `idleTimeout` | The max duration to hold a HTTP keepalive connection between calls | `string` | -| `connectionTimeout` | | `string` | +| `connectionTimeout` | The maximum amount of time that a connection is allowed to remain with no data transmitted. | `string` | | `expectContinueTimeout` | See [ExpectContinueTimeout in the Go docs](https://pkg.go.dev/net/http#Transport) | `string` | diff --git a/docs/swagger/swagger.yaml b/docs/swagger/swagger.yaml index a3381ed6f7..9c97442471 100644 --- a/docs/swagger/swagger.yaml +++ b/docs/swagger/swagger.yaml @@ -27701,6 +27701,17 @@ paths: options: description: Subscription options properties: + batch: + description: Events are delivered in batches in an ordered + array. The batch size is capped to the readAhead limit. + The event payload is always an array even if there is + a single event in the batch. Commonly used with Webhooks + to allow events to be delivered and acknowledged in batches. + type: boolean + batchTimeout: + description: When batching is enabled, the optional timeout + to send events even when the batch hasn't filled. + type: string fastack: description: 'Webhooks only: When true the event will be acknowledged before the webhook is invoked, allowing parallel @@ -27724,6 +27735,8 @@ paths: description: 'Webhooks only: a set of options for HTTP' properties: connectionTimeout: + description: The maximum amount of time that a connection + is allowed to remain with no data transmitted. type: string expectContinueTimeout: description: See [ExpectContinueTimeout in the Go docs](https://pkg.go.dev/net/http#Transport) @@ -27973,6 +27986,17 @@ paths: options: description: Subscription options properties: + batch: + description: Events are delivered in batches in an ordered array. + The batch size is capped to the readAhead limit. The event + payload is always an array even if there is a single event + in the batch. Commonly used with Webhooks to allow events + to be delivered and acknowledged in batches. + type: boolean + batchTimeout: + description: When batching is enabled, the optional timeout + to send events even when the batch hasn't filled. + type: string fastack: description: 'Webhooks only: When true the event will be acknowledged before the webhook is invoked, allowing parallel invocations' @@ -27995,6 +28019,8 @@ paths: description: 'Webhooks only: a set of options for HTTP' properties: connectionTimeout: + description: The maximum amount of time that a connection + is allowed to remain with no data transmitted. type: string expectContinueTimeout: description: See [ExpectContinueTimeout in the Go docs](https://pkg.go.dev/net/http#Transport) @@ -28226,6 +28252,17 @@ paths: options: description: Subscription options properties: + batch: + description: Events are delivered in batches in an ordered + array. The batch size is capped to the readAhead limit. + The event payload is always an array even if there is a + single event in the batch. Commonly used with Webhooks to + allow events to be delivered and acknowledged in batches. + type: boolean + batchTimeout: + description: When batching is enabled, the optional timeout + to send events even when the batch hasn't filled. + type: string fastack: description: 'Webhooks only: When true the event will be acknowledged before the webhook is invoked, allowing parallel invocations' @@ -28248,6 +28285,8 @@ paths: description: 'Webhooks only: a set of options for HTTP' properties: connectionTimeout: + description: The maximum amount of time that a connection + is allowed to remain with no data transmitted. type: string expectContinueTimeout: description: See [ExpectContinueTimeout in the Go docs](https://pkg.go.dev/net/http#Transport) @@ -28494,6 +28533,17 @@ paths: options: description: Subscription options properties: + batch: + description: Events are delivered in batches in an ordered array. + The batch size is capped to the readAhead limit. The event + payload is always an array even if there is a single event + in the batch. Commonly used with Webhooks to allow events + to be delivered and acknowledged in batches. + type: boolean + batchTimeout: + description: When batching is enabled, the optional timeout + to send events even when the batch hasn't filled. + type: string fastack: description: 'Webhooks only: When true the event will be acknowledged before the webhook is invoked, allowing parallel invocations' @@ -28516,6 +28566,8 @@ paths: description: 'Webhooks only: a set of options for HTTP' properties: connectionTimeout: + description: The maximum amount of time that a connection + is allowed to remain with no data transmitted. type: string expectContinueTimeout: description: See [ExpectContinueTimeout in the Go docs](https://pkg.go.dev/net/http#Transport) @@ -28747,6 +28799,17 @@ paths: options: description: Subscription options properties: + batch: + description: Events are delivered in batches in an ordered + array. The batch size is capped to the readAhead limit. + The event payload is always an array even if there is a + single event in the batch. Commonly used with Webhooks to + allow events to be delivered and acknowledged in batches. + type: boolean + batchTimeout: + description: When batching is enabled, the optional timeout + to send events even when the batch hasn't filled. + type: string fastack: description: 'Webhooks only: When true the event will be acknowledged before the webhook is invoked, allowing parallel invocations' @@ -28769,6 +28832,8 @@ paths: description: 'Webhooks only: a set of options for HTTP' properties: connectionTimeout: + description: The maximum amount of time that a connection + is allowed to remain with no data transmitted. type: string expectContinueTimeout: description: See [ExpectContinueTimeout in the Go docs](https://pkg.go.dev/net/http#Transport) @@ -29077,6 +29142,17 @@ paths: options: description: Subscription options properties: + batch: + description: Events are delivered in batches in an ordered + array. The batch size is capped to the readAhead limit. + The event payload is always an array even if there is a + single event in the batch. Commonly used with Webhooks to + allow events to be delivered and acknowledged in batches. + type: boolean + batchTimeout: + description: When batching is enabled, the optional timeout + to send events even when the batch hasn't filled. + type: string fastack: description: 'Webhooks only: When true the event will be acknowledged before the webhook is invoked, allowing parallel invocations' @@ -29099,6 +29175,8 @@ paths: description: 'Webhooks only: a set of options for HTTP' properties: connectionTimeout: + description: The maximum amount of time that a connection + is allowed to remain with no data transmitted. type: string expectContinueTimeout: description: See [ExpectContinueTimeout in the Go docs](https://pkg.go.dev/net/http#Transport) @@ -36544,6 +36622,17 @@ paths: options: description: Subscription options properties: + batch: + description: Events are delivered in batches in an ordered + array. The batch size is capped to the readAhead limit. + The event payload is always an array even if there is + a single event in the batch. Commonly used with Webhooks + to allow events to be delivered and acknowledged in batches. + type: boolean + batchTimeout: + description: When batching is enabled, the optional timeout + to send events even when the batch hasn't filled. + type: string fastack: description: 'Webhooks only: When true the event will be acknowledged before the webhook is invoked, allowing parallel @@ -36567,6 +36656,8 @@ paths: description: 'Webhooks only: a set of options for HTTP' properties: connectionTimeout: + description: The maximum amount of time that a connection + is allowed to remain with no data transmitted. type: string expectContinueTimeout: description: See [ExpectContinueTimeout in the Go docs](https://pkg.go.dev/net/http#Transport) @@ -36809,6 +36900,17 @@ paths: options: description: Subscription options properties: + batch: + description: Events are delivered in batches in an ordered array. + The batch size is capped to the readAhead limit. The event + payload is always an array even if there is a single event + in the batch. Commonly used with Webhooks to allow events + to be delivered and acknowledged in batches. + type: boolean + batchTimeout: + description: When batching is enabled, the optional timeout + to send events even when the batch hasn't filled. + type: string fastack: description: 'Webhooks only: When true the event will be acknowledged before the webhook is invoked, allowing parallel invocations' @@ -36831,6 +36933,8 @@ paths: description: 'Webhooks only: a set of options for HTTP' properties: connectionTimeout: + description: The maximum amount of time that a connection + is allowed to remain with no data transmitted. type: string expectContinueTimeout: description: See [ExpectContinueTimeout in the Go docs](https://pkg.go.dev/net/http#Transport) @@ -37062,6 +37166,17 @@ paths: options: description: Subscription options properties: + batch: + description: Events are delivered in batches in an ordered + array. The batch size is capped to the readAhead limit. + The event payload is always an array even if there is a + single event in the batch. Commonly used with Webhooks to + allow events to be delivered and acknowledged in batches. + type: boolean + batchTimeout: + description: When batching is enabled, the optional timeout + to send events even when the batch hasn't filled. + type: string fastack: description: 'Webhooks only: When true the event will be acknowledged before the webhook is invoked, allowing parallel invocations' @@ -37084,6 +37199,8 @@ paths: description: 'Webhooks only: a set of options for HTTP' properties: connectionTimeout: + description: The maximum amount of time that a connection + is allowed to remain with no data transmitted. type: string expectContinueTimeout: description: See [ExpectContinueTimeout in the Go docs](https://pkg.go.dev/net/http#Transport) @@ -37323,6 +37440,17 @@ paths: options: description: Subscription options properties: + batch: + description: Events are delivered in batches in an ordered array. + The batch size is capped to the readAhead limit. The event + payload is always an array even if there is a single event + in the batch. Commonly used with Webhooks to allow events + to be delivered and acknowledged in batches. + type: boolean + batchTimeout: + description: When batching is enabled, the optional timeout + to send events even when the batch hasn't filled. + type: string fastack: description: 'Webhooks only: When true the event will be acknowledged before the webhook is invoked, allowing parallel invocations' @@ -37345,6 +37473,8 @@ paths: description: 'Webhooks only: a set of options for HTTP' properties: connectionTimeout: + description: The maximum amount of time that a connection + is allowed to remain with no data transmitted. type: string expectContinueTimeout: description: See [ExpectContinueTimeout in the Go docs](https://pkg.go.dev/net/http#Transport) @@ -37576,6 +37706,17 @@ paths: options: description: Subscription options properties: + batch: + description: Events are delivered in batches in an ordered + array. The batch size is capped to the readAhead limit. + The event payload is always an array even if there is a + single event in the batch. Commonly used with Webhooks to + allow events to be delivered and acknowledged in batches. + type: boolean + batchTimeout: + description: When batching is enabled, the optional timeout + to send events even when the batch hasn't filled. + type: string fastack: description: 'Webhooks only: When true the event will be acknowledged before the webhook is invoked, allowing parallel invocations' @@ -37598,6 +37739,8 @@ paths: description: 'Webhooks only: a set of options for HTTP' properties: connectionTimeout: + description: The maximum amount of time that a connection + is allowed to remain with no data transmitted. type: string expectContinueTimeout: description: See [ExpectContinueTimeout in the Go docs](https://pkg.go.dev/net/http#Transport) @@ -37892,6 +38035,17 @@ paths: options: description: Subscription options properties: + batch: + description: Events are delivered in batches in an ordered + array. The batch size is capped to the readAhead limit. + The event payload is always an array even if there is a + single event in the batch. Commonly used with Webhooks to + allow events to be delivered and acknowledged in batches. + type: boolean + batchTimeout: + description: When batching is enabled, the optional timeout + to send events even when the batch hasn't filled. + type: string fastack: description: 'Webhooks only: When true the event will be acknowledged before the webhook is invoked, allowing parallel invocations' @@ -37914,6 +38068,8 @@ paths: description: 'Webhooks only: a set of options for HTTP' properties: connectionTimeout: + description: The maximum amount of time that a connection + is allowed to remain with no data transmitted. type: string expectContinueTimeout: description: See [ExpectContinueTimeout in the Go docs](https://pkg.go.dev/net/http#Transport) diff --git a/internal/coremsgs/en_error_messages.go b/internal/coremsgs/en_error_messages.go index 28f5e9c7f8..1996ca0c28 100644 --- a/internal/coremsgs/en_error_messages.go +++ b/internal/coremsgs/en_error_messages.go @@ -296,4 +296,6 @@ var ( MsgUnexpectedInterfaceType = ffe("FF10457", "Unexpected interface type: %T", 500) MsgBlockchainConnectorRESTErrConflict = ffe("FF10458", "Conflict from blockchain connector: %s", 409) MsgTokensRESTErrConflict = ffe("FF10459", "Conflict from tokens service: %s", 409) + MsgBatchWithDataNotSupported = ffe("FF10460", "Provided subscription '%s' enables batching and withData which is not supported", 400) + MsgBatchDeliveryNotSupported = ffe("FF10461", "Batch delivery not supported by transport '%s'", 400) ) diff --git a/internal/coremsgs/en_struct_descriptions.go b/internal/coremsgs/en_struct_descriptions.go index 24e792e1b2..391354faf2 100644 --- a/internal/coremsgs/en_struct_descriptions.go +++ b/internal/coremsgs/en_struct_descriptions.go @@ -537,9 +537,11 @@ var ( SubscriptionBlockchainEventFilterListener = ffm("SubscriptionBlockchainEventFilter.listener", "Regular expression to apply to the blockchain event 'listener' field, which is the UUID of the event listener. So you can restrict your subscription to certain blockchain listeners. Alternatively to avoid your application need to know listener UUIDs you can set the 'topic' field of blockchain event listeners, and use a topic filter on your subscriptions") // SubscriptionCoreOptions field descriptions - SubscriptionCoreOptionsFirstEvent = ffm("SubscriptionCoreOptions.firstEvent", "Whether your application would like to receive events from the 'oldest' event emitted by your FireFly node (from the beginning of time), or the 'newest' event (from now), or a specific event sequence. Default is 'newest'") - SubscriptionCoreOptionsReadAhead = ffm("SubscriptionCoreOptions.readAhead", "The number of events to stream ahead to your application, while waiting for confirmation of consumption of those events. At least once delivery semantics are used in FireFly, so if your application crashes/reconnects this is the maximum number of events you would expect to be redelivered after it restarts") - SubscriptionCoreOptionsWithData = ffm("SubscriptionCoreOptions.withData", "Whether message events delivered over the subscription, should be packaged with the full data of those messages in-line as part of the event JSON payload. Or if the application should make separate REST calls to download that data. May not be supported on some transports.") + SubscriptionCoreOptionsFirstEvent = ffm("SubscriptionCoreOptions.firstEvent", "Whether your application would like to receive events from the 'oldest' event emitted by your FireFly node (from the beginning of time), or the 'newest' event (from now), or a specific event sequence. Default is 'newest'") + SubscriptionCoreOptionsReadAhead = ffm("SubscriptionCoreOptions.readAhead", "The number of events to stream ahead to your application, while waiting for confirmation of consumption of those events. At least once delivery semantics are used in FireFly, so if your application crashes/reconnects this is the maximum number of events you would expect to be redelivered after it restarts") + SubscriptionCoreOptionsWithData = ffm("SubscriptionCoreOptions.withData", "Whether message events delivered over the subscription, should be packaged with the full data of those messages in-line as part of the event JSON payload. Or if the application should make separate REST calls to download that data. May not be supported on some transports.") + SubscriptionCoreOptionsBatch = ffm("SubscriptionCoreOptions.batch", "Events are delivered in batches in an ordered array. The batch size is capped to the readAhead limit. The event payload is always an array even if there is a single event in the batch. Commonly used with Webhooks to allow events to be delivered and acknowledged in batches.") + SubscriptionCoreOptionsBatchTimeout = ffm("SubscriptionCoreOptions.batchTimeout", "When batching is enabled, the optional timeout to send events even when the batch hasn't filled.") // TokenApproval field descriptions TokenApprovalLocalID = ffm("TokenApproval.localId", "The UUID of this token approval, in the local FireFly node") @@ -706,7 +708,7 @@ var ( WebhookOptHTTPExpectContinueTimeout = ffm("WebhookHTTPOptions.expectContinueTimeout", "See [ExpectContinueTimeout in the Go docs](https://pkg.go.dev/net/http#Transport)") WebhookOptHTTPIdleTimeout = ffm("WebhookHTTPOptions.idleTimeout", "The max duration to hold a HTTP keepalive connection between calls") WebhookOptHTTPMaxIdleConns = ffm("WebhookHTTPOptions.maxIdleConns", "The max number of idle connections to hold pooled") - WebhookOptHTTPConnectionTimeout = ffm("WebhookHTTPOptions.connectionTimeout", "") + WebhookOptHTTPConnectionTimeout = ffm("WebhookHTTPOptions.connectionTimeout", "The maximum amount of time that a connection is allowed to remain with no data transmitted.") WebhookOptHTTPTLSHandshakeTimeout = ffm("WebhookHTTPOptions.tlsHandshakeTimeout", "The max duration to hold a TLS handshake alive") WebhookOptHTTPRequestTimeout = ffm("WebhookHTTPOptions.requestTimeout", "The max duration to hold a TLS handshake alive") diff --git a/internal/events/event_dispatcher.go b/internal/events/event_dispatcher.go index 94e0b477fa..a862d2dd57 100644 --- a/internal/events/event_dispatcher.go +++ b/internal/events/event_dispatcher.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "sync" + "time" "github.com/hyperledger/firefly-common/pkg/config" "github.com/hyperledger/firefly-common/pkg/ffapi" @@ -39,7 +40,8 @@ import ( ) const ( - maxReadAhead = 65536 + maxReadAhead = 65536 + defaultBatchTimeout = time.Duration(2) * time.Second ) type ackNack struct { @@ -67,6 +69,8 @@ type eventDispatcher struct { mux sync.Mutex namespace string readAhead int + batch bool + batchTimeout time.Duration subscription *subscription txHelper txcommon.Helper } @@ -80,6 +84,15 @@ func newEventDispatcher(ctx context.Context, enricher *eventEnricher, ei events. if readAhead > maxReadAhead { readAhead = maxReadAhead } + + batchTimeout := defaultBatchTimeout + if sub.definition.Options.BatchTimeout != nil && *sub.definition.Options.BatchTimeout != "" { + batchTimeout = fftypes.ParseToDuration(*sub.definition.Options.BatchTimeout) + } + batch := false + if sub.definition.Options.Batch != nil { + batch = *sub.definition.Options.Batch + } ed := &eventDispatcher{ ctx: log.WithLogField(log.WithLogField(ctx, "role", fmt.Sprintf("ed[%s]", connID)), @@ -100,6 +113,8 @@ func newEventDispatcher(ctx context.Context, enricher *eventEnricher, ei events. acksNacks: make(chan ackNack), closed: make(chan struct{}), txHelper: txHelper, + batch: batch, + batchTimeout: batchTimeout, } pollerConf := &eventPollerConf{ @@ -146,10 +161,15 @@ func (ed *eventDispatcher) electAndStart() { l.Debugf("Closed before we became leader") return } - // We're ready to go - not + // We're ready to go ed.elected = true ed.eventPoller.start() - go ed.deliverEvents() + + if ed.batch { + go ed.deliverBatchedEvents() + } else { + go ed.deliverEvents() + } // Wait until the event poller closes <-ed.eventPoller.closed } @@ -284,22 +304,22 @@ func (ed *eventDispatcher) bufferedDelivery(events []core.LocallySequenced) (boo // or a reset event happens for { ed.mux.Lock() - var disapatchable []*core.EventDelivery + var dispatchable []*core.EventDelivery inflightCount := len(ed.inflight) maxDispatch := 1 + ed.readAhead - inflightCount if maxDispatch >= len(matching) { - disapatchable = matching + dispatchable = matching matching = nil } else if maxDispatch > 0 { - disapatchable = matching[0:maxDispatch] + dispatchable = matching[0:maxDispatch] matching = matching[maxDispatch:] } ed.mux.Unlock() l.Debugf("Dispatcher event state: readahead=%d candidates=%d matched=%d inflight=%d queued=%d dispatched=%d dispatchable=%d lastAck=%d nacks=%d highest=%d", - ed.readAhead, len(candidates), matchCount, inflightCount, len(matching), dispatched, len(disapatchable), lastAck, nacks, highestOffset) + ed.readAhead, len(candidates), matchCount, inflightCount, len(matching), dispatched, len(dispatchable), lastAck, nacks, highestOffset) - for _, event := range disapatchable { + for _, event := range dispatchable { ed.mux.Lock() ed.inflight[*event.ID] = &event.Event inflightCount = len(ed.inflight) @@ -364,6 +384,66 @@ func (ed *eventDispatcher) handleAckOffsetUpdate(ack ackNack) { } } +func (ed *eventDispatcher) deliverBatchedEvents() { + withData := ed.subscription.definition.Options.WithData != nil && *ed.subscription.definition.Options.WithData + + var events []*core.CombinedEventDataDelivery + var batchTimeoutContext context.Context + var batchTimeoutCancel func() + for { + var timeoutContext context.Context + var timedOut bool + if batchTimeoutContext != nil { + timeoutContext = batchTimeoutContext + } else { + timeoutContext = ed.ctx + } + select { + case event, ok := <-ed.eventDelivery: + if !ok { + if batchTimeoutCancel != nil { + batchTimeoutCancel() + } + return + } + + if events == nil { + events = []*core.CombinedEventDataDelivery{} + batchTimeoutContext, batchTimeoutCancel = context.WithTimeout(ed.ctx, ed.batchTimeout) + } + + log.L(ed.ctx).Debugf("Dispatching %s event in a batch: %.10d/%s [%s]: ref=%s/%s", ed.transport.Name(), event.Sequence, event.ID, event.Type, event.Namespace, event.Reference) + + var data []*core.Data + var err error + if withData && event.Message != nil { + data, _, err = ed.data.GetMessageDataCached(ed.ctx, event.Message) + } + + events = append(events, &core.CombinedEventDataDelivery{Event: event, Data: data}) + + if err != nil { + ed.deliveryResponse(&core.EventDeliveryResponse{ID: event.ID, Rejected: true}) + } + + case <-timeoutContext.Done(): + timedOut = true + case <-ed.ctx.Done(): + if batchTimeoutCancel != nil { + batchTimeoutCancel() + } + return + } + + if len(events) == ed.readAhead || (timedOut && len(events) > 0) { + _ = ed.transport.BatchDeliveryRequest(ed.ctx, ed.connID, ed.subscription.definition, events) + // If err handle all the delivery responses for all the events?? + events = nil + } + } +} + +// TODO issue here, we can't just call DeliveryRequest with one thing. func (ed *eventDispatcher) deliverEvents() { withData := ed.subscription.definition.Options.WithData != nil && *ed.subscription.definition.Options.WithData for { @@ -372,12 +452,14 @@ func (ed *eventDispatcher) deliverEvents() { if !ok { return } + log.L(ed.ctx).Debugf("Dispatching %s event: %.10d/%s [%s]: ref=%s/%s", ed.transport.Name(), event.Sequence, event.ID, event.Type, event.Namespace, event.Reference) var data []*core.Data var err error if withData && event.Message != nil { data, _, err = ed.data.GetMessageDataCached(ed.ctx, event.Message) } + if err == nil { err = ed.transport.DeliveryRequest(ed.ctx, ed.connID, ed.subscription.definition, event, data) } diff --git a/internal/events/event_dispatcher_test.go b/internal/events/event_dispatcher_test.go index 41ee0f35b7..2f592c92db 100644 --- a/internal/events/event_dispatcher_test.go +++ b/internal/events/event_dispatcher_test.go @@ -97,6 +97,40 @@ func TestEventDispatcherStartStop(t *testing.T) { ed.close() } +func TestEventDispatcherStartStopBatched(t *testing.T) { + ten := uint16(10) + oldest := core.SubOptsFirstEventOldest + truthy := true + ed, cancel := newTestEventDispatcher(&subscription{ + dispatcherElection: make(chan bool, 1), + definition: &core.Subscription{ + SubscriptionRef: core.SubscriptionRef{Namespace: "ns1", Name: "sub1"}, + Ephemeral: true, + Options: core.SubscriptionOptions{ + SubscriptionCoreOptions: core.SubscriptionCoreOptions{ + ReadAhead: &ten, + FirstEvent: &oldest, + Batch: &truthy, + }, + }, + }, + }) + defer cancel() + mdi := ed.database.(*databasemocks.Plugin) + ge := mdi.On("GetEvents", mock.Anything, mock.Anything, mock.Anything).Return([]*core.Event{}, nil, fmt.Errorf("context closed")) + confirmedElected := make(chan bool) + ge.RunFn = func(a mock.Arguments) { + <-confirmedElected + } + + assert.Equal(t, int(10), ed.readAhead) + ed.start() + confirmedElected <- true + close(confirmedElected) + ed.eventPoller.eventNotifier.newEvents <- 12345 + ed.close() +} + func TestMaxReadAhead(t *testing.T) { config.Set(coreconfig.SubscriptionDefaultsReadAhead, 65537) ed, cancel := newTestEventDispatcher(&subscription{ @@ -932,6 +966,21 @@ func TestEventDeliveryClosed(t *testing.T) { cancel() } +func TestBatchEventDeliveryClosed(t *testing.T) { + + sub := &subscription{ + definition: &core.Subscription{}, + } + ed, cancel := newTestEventDispatcher(sub) + defer cancel() + + ed.batchTimeout = 1 * time.Minute + ed.eventDelivery <- &core.EventDelivery{} + close(ed.eventDelivery) + + ed.deliverBatchedEvents() +} + func TestAckClosed(t *testing.T) { sub := &subscription{ @@ -1064,3 +1113,207 @@ func TestEventDispatcherWithReply(t *testing.T) { mbm.AssertExpectations(t) mms.AssertExpectations(t) } + +func TestEventDeliveryBatch(t *testing.T) { + log.SetLevel("debug") + var five = uint16(5) + truthy := true + sub := &subscription{ + dispatcherElection: make(chan bool, 1), + definition: &core.Subscription{ + SubscriptionRef: core.SubscriptionRef{ID: fftypes.NewUUID(), Namespace: "ns1", Name: "sub1"}, + Options: core.SubscriptionOptions{ + SubscriptionCoreOptions: core.SubscriptionCoreOptions{ + ReadAhead: &five, + Batch: &truthy, + }, + }, + }, + eventMatcher: regexp.MustCompile(fmt.Sprintf("^%s|%s$", core.EventTypeMessageConfirmed, core.EventTypeMessageConfirmed)), + } + + ed, cancel := newTestEventDispatcher(sub) + cancel() + ed.acksNacks = make(chan ackNack, 5) + + event1 := fftypes.NewUUID() + ed.inflight[*event1] = &core.Event{ + ID: event1, + Namespace: "ns1", + } + + mms := &syncasyncmocks.Sender{} + mbm := ed.broadcast.(*broadcastmocks.Manager) + mbm.On("NewBroadcast", mock.Anything).Return(mms) + mms.On("Send", mock.Anything).Return(nil) + + ed.deliveryResponse(&core.EventDeliveryResponse{ + ID: event1, + Reply: &core.MessageInOut{ + Message: core.Message{ + Header: core.MessageHeader{ + Tag: "myreplytag1", + CID: fftypes.NewUUID(), + Type: core.MessageTypeBroadcast, + }, + }, + InlineData: core.InlineData{ + {Value: fftypes.JSONAnyPtr(`"my reply"`)}, + }, + }, + }) + + mbm.AssertExpectations(t) + mms.AssertExpectations(t) +} + +func TestEventDispatcherBatchReadAhead(t *testing.T) { + log.SetLevel("debug") + var five = uint16(5) + subID := fftypes.NewUUID() + truthy := true + oneSec := "1s" + sub := &subscription{ + dispatcherElection: make(chan bool, 1), + definition: &core.Subscription{ + SubscriptionRef: core.SubscriptionRef{ID: subID, Namespace: "ns1", Name: "sub1"}, + Options: core.SubscriptionOptions{ + SubscriptionCoreOptions: core.SubscriptionCoreOptions{ + ReadAhead: &five, + Batch: &truthy, + BatchTimeout: &oneSec, + }, + }, + }, + eventMatcher: regexp.MustCompile(fmt.Sprintf("^%s|%s$", core.EventTypeMessageConfirmed, core.EventTypeMessageConfirmed)), + } + + ed, cancel := newTestEventDispatcher(sub) + defer cancel() + go ed.deliverBatchedEvents() + ed.eventPoller.offsetCommitted = make(chan int64, 3) + mdi := ed.database.(*databasemocks.Plugin) + mei := ed.transport.(*eventsmocks.Plugin) + mdm := ed.data.(*datamocks.Manager) + + eventDeliveries := make(chan *core.EventDelivery) + deliveryRequestMock := mei.On("BatchDeliveryRequest", ed.ctx, mock.Anything, mock.Anything, mock.Anything).Return(nil) + deliveryRequestMock.RunFn = func(a mock.Arguments) { + batchEvents := a.Get(3).([]*core.CombinedEventDataDelivery) + for _, event := range batchEvents { + eventDeliveries <- event.Event + } + } + + // Setup the IDs + ref1 := fftypes.NewUUID() + ev1 := fftypes.NewUUID() + ref2 := fftypes.NewUUID() + ev2 := fftypes.NewUUID() + ref3 := fftypes.NewUUID() + ev3 := fftypes.NewUUID() + ref4 := fftypes.NewUUID() + ev4 := fftypes.NewUUID() + + // Setup enrichment + mdm.On("GetMessageWithDataCached", mock.Anything, ref1).Return(&core.Message{ + Header: core.MessageHeader{ID: ref1}, + }, nil, true, nil) + mdm.On("GetMessageWithDataCached", mock.Anything, ref2).Return(&core.Message{ + Header: core.MessageHeader{ID: ref2}, + }, nil, true, nil) + mdm.On("GetMessageWithDataCached", mock.Anything, ref3).Return(&core.Message{ + Header: core.MessageHeader{ID: ref3}, + }, nil, true, nil) + mdm.On("GetMessageWithDataCached", mock.Anything, ref4).Return(&core.Message{ + Header: core.MessageHeader{ID: ref4}, + }, nil, true, nil) + + // Deliver a batch of messages + batch1Done := make(chan struct{}) + go func() { + repoll, err := ed.bufferedDelivery([]core.LocallySequenced{ + &core.Event{ID: ev1, Sequence: 10000001, Reference: ref1, Type: core.EventTypeMessageConfirmed}, // match + &core.Event{ID: ev2, Sequence: 10000002, Reference: ref2, Type: core.EventTypeMessageRejected}, + &core.Event{ID: ev3, Sequence: 10000003, Reference: ref3, Type: core.EventTypeMessageConfirmed}, // match + &core.Event{ID: ev4, Sequence: 10000004, Reference: ref4, Type: core.EventTypeMessageConfirmed}, // match + }) + assert.NoError(t, err) + assert.True(t, repoll) + close(batch1Done) + }() + + // Wait for the two calls to deliver the matching messages to the client (read ahead allows this) + event1 := <-eventDeliveries + assert.Equal(t, *ev1, *event1.ID) + assert.Equal(t, *ref1, *event1.Message.Header.ID) + event3 := <-eventDeliveries + assert.Equal(t, *ev3, *event3.ID) + assert.Equal(t, *ref3, *event3.Message.Header.ID) + event4 := <-eventDeliveries + assert.Equal(t, *ev4, *event4.ID) + assert.Equal(t, *ref4, *event4.Message.Header.ID) + + // Send back the two acks - out of order to validate the read-ahead logic + go func() { + ed.deliveryResponse(&core.EventDeliveryResponse{ID: event4.ID}) + ed.deliveryResponse(&core.EventDeliveryResponse{ID: event1.ID}) + ed.deliveryResponse(&core.EventDeliveryResponse{ID: event3.ID}) + }() + + // Confirm we get the offset updates in the correct order, even though the confirmations + // came in a different order from the app. + assert.Equal(t, int64(10000001), <-ed.eventPoller.offsetCommitted) + assert.Equal(t, int64(10000003), <-ed.eventPoller.offsetCommitted) + assert.Equal(t, int64(10000004), <-ed.eventPoller.offsetCommitted) + + // This should complete the batch + <-batch1Done + + mdi.AssertExpectations(t) + mei.AssertExpectations(t) + mdm.AssertExpectations(t) +} + +func TestBatchDeliverEventsWithDataFail(t *testing.T) { + yes := true + sub := &subscription{ + definition: &core.Subscription{ + Options: core.SubscriptionOptions{ + SubscriptionCoreOptions: core.SubscriptionCoreOptions{ + WithData: &yes, + }, + }, + }, + } + + ed, cancel := newTestEventDispatcher(sub) + defer cancel() + + mdm := ed.data.(*datamocks.Manager) + mdm.On("GetMessageDataCached", ed.ctx, mock.Anything).Return(nil, false, fmt.Errorf("pop")) + + id1 := fftypes.NewUUID() + ed.eventDelivery <- &core.EventDelivery{ + EnrichedEvent: core.EnrichedEvent{ + Event: core.Event{ + ID: id1, + }, + Message: &core.Message{ + Header: core.MessageHeader{ + ID: fftypes.NewUUID(), + }, + Data: core.DataRefs{ + {ID: fftypes.NewUUID()}, + }, + }, + }, + } + + ed.inflight[*id1] = &core.Event{ID: id1} + go ed.deliverBatchedEvents() + + an := <-ed.acksNacks + assert.True(t, an.isNack) + +} diff --git a/internal/events/event_manager.go b/internal/events/event_manager.go index a7e2dfc618..0bf7a02e30 100644 --- a/internal/events/event_manager.go +++ b/internal/events/event_manager.go @@ -62,6 +62,7 @@ type EventManager interface { CreateUpdateDurableSubscription(ctx context.Context, subDef *core.Subscription, mustNew bool) (err error) EnrichEvent(ctx context.Context, event *core.Event) (*core.EnrichedEvent, error) QueueBatchRewind(batchID *fftypes.UUID) + ResolveTransportAndCapabilities(ctx context.Context, transportName string) (string, *events.Capabilities, error) Start() error WaitStop() @@ -208,6 +209,17 @@ func (em *eventManager) DeletedSubscriptions() chan<- *fftypes.UUID { return em.subManager.deletedSubscriptions } +func (em *eventManager) ResolveTransportAndCapabilities(ctx context.Context, transportName string) (string, *events.Capabilities, error) { + if transportName == "" { + transportName = em.defaultTransport + } + t, err := em.subManager.getTransport(ctx, transportName) + if err != nil { + return "", nil, err + } + return transportName, t.Capabilities(), nil +} + func (em *eventManager) WaitStop() { em.subManager.close() if em.blobReceiver != nil { @@ -224,10 +236,6 @@ func (em *eventManager) CreateUpdateDurableSubscription(ctx context.Context, sub return i18n.NewError(ctx, coremsgs.MsgInvalidSubscription) } - if subDef.Transport == "" { - subDef.Transport = em.defaultTransport - } - // Check it can be parsed before inserting (the submanager will check again when processing the creation, so we discard the result) if _, err = em.subManager.parseSubscriptionDef(ctx, subDef); err != nil { return err diff --git a/internal/events/event_manager_test.go b/internal/events/event_manager_test.go index 3b1f16b5aa..df18f72768 100644 --- a/internal/events/event_manager_test.go +++ b/internal/events/event_manager_test.go @@ -396,6 +396,7 @@ func TestCreateDurableSubscriptionDupName(t *testing.T) { em := newTestEventManager(t) defer em.cleanup(t) sub := &core.Subscription{ + Transport: "websockets", SubscriptionRef: core.SubscriptionRef{ ID: fftypes.NewUUID(), Namespace: "ns1", @@ -411,6 +412,7 @@ func TestCreateDurableSubscriptionDefaultSubCannotParse(t *testing.T) { em := newTestEventManager(t) defer em.cleanup(t) sub := &core.Subscription{ + Transport: "websockets", SubscriptionRef: core.SubscriptionRef{ ID: fftypes.NewUUID(), Namespace: "ns1", @@ -429,6 +431,7 @@ func TestCreateDurableSubscriptionBadFirstEvent(t *testing.T) { defer em.cleanup(t) wrongFirstEvent := core.SubOptsFirstEvent("lobster") sub := &core.Subscription{ + Transport: "websockets", SubscriptionRef: core.SubscriptionRef{ ID: fftypes.NewUUID(), Namespace: "ns1", @@ -450,6 +453,7 @@ func TestCreateDurableSubscriptionNegativeFirstEvent(t *testing.T) { defer em.cleanup(t) wrongFirstEvent := core.SubOptsFirstEvent("-12345") sub := &core.Subscription{ + Transport: "websockets", SubscriptionRef: core.SubscriptionRef{ ID: fftypes.NewUUID(), Namespace: "ns1", @@ -470,6 +474,7 @@ func TestCreateDurableSubscriptionGetHighestSequenceFailure(t *testing.T) { em := newTestEventManager(t) defer em.cleanup(t) sub := &core.Subscription{ + Transport: "websockets", SubscriptionRef: core.SubscriptionRef{ ID: fftypes.NewUUID(), Namespace: "ns1", @@ -486,6 +491,7 @@ func TestCreateDurableSubscriptionOk(t *testing.T) { em := newTestEventManager(t) defer em.cleanup(t) sub := &core.Subscription{ + Transport: "websockets", SubscriptionRef: core.SubscriptionRef{ ID: fftypes.NewUUID(), Namespace: "ns1", @@ -509,6 +515,7 @@ func TestUpdateDurableSubscriptionOk(t *testing.T) { em := newTestEventManager(t) defer em.cleanup(t) sub := &core.Subscription{ + Transport: "websockets", SubscriptionRef: core.SubscriptionRef{ ID: fftypes.NewUUID(), Namespace: "ns1", @@ -517,6 +524,7 @@ func TestUpdateDurableSubscriptionOk(t *testing.T) { } var firstEvent core.SubOptsFirstEvent = "12345" em.mdi.On("GetSubscriptionByName", mock.Anything, "ns1", "sub1").Return(&core.Subscription{ + Transport: "websockets", SubscriptionRef: core.SubscriptionRef{ ID: fftypes.NewUUID(), }, @@ -603,3 +611,42 @@ func TestGetPlugins(t *testing.T) { assert.ElementsMatch(t, em.GetPlugins(), expectedPlugins) } + +func TestResolveTransportAndCapabilities(t *testing.T) { + em := newTestEventManager(t) + defer em.cleanup(t) + + em.mev.On("Capabilities").Return(&events.Capabilities{BatchDelivery: true}) + + resolved, c, err := em.ResolveTransportAndCapabilities(context.Background(), "websockets") + assert.NoError(t, err) + assert.Equal(t, "websockets", resolved) + assert.NotNil(t, c) + assert.True(t, c.BatchDelivery) + + em.mev.AssertExpectations(t) +} + +func TestResolveTransportAndCapabilitiesUnknown(t *testing.T) { + em := newTestEventManager(t) + defer em.cleanup(t) + + _, _, err := em.ResolveTransportAndCapabilities(context.Background(), "wrong") + assert.Regexp(t, "FF10172", err) +} + +func TestResolveTransportAndCapabilitiesDefault(t *testing.T) { + em := newTestEventManager(t) + defer em.cleanup(t) + em.defaultTransport = "websockets" + + em.mev.On("Capabilities").Return(&events.Capabilities{BatchDelivery: true}) + + resolved, c, err := em.ResolveTransportAndCapabilities(context.Background(), "") + assert.NoError(t, err) + assert.Equal(t, "websockets", resolved) + assert.NotNil(t, c) + assert.True(t, c.BatchDelivery) + + em.mev.AssertExpectations(t) +} diff --git a/internal/events/subscription_manager.go b/internal/events/subscription_manager.go index da9be8f573..788e72ec31 100644 --- a/internal/events/subscription_manager.go +++ b/internal/events/subscription_manager.go @@ -249,13 +249,21 @@ func (sm *subscriptionManager) deletedDurableSubscription(id *fftypes.UUID) { } } +func (sm *subscriptionManager) getTransport(ctx context.Context, transportName string) (events.Plugin, error) { + transport, ok := sm.transports[transportName] + if !ok { + return nil, i18n.NewError(ctx, coremsgs.MsgUnknownEventTransportPlugin, transportName) + } + return transport, nil +} + // nolint: gocyclo func (sm *subscriptionManager) parseSubscriptionDef(ctx context.Context, subDef *core.Subscription) (sub *subscription, err error) { filter := subDef.Filter - transport, ok := sm.transports[subDef.Transport] - if !ok { - return nil, i18n.NewError(ctx, coremsgs.MsgUnknownEventTransportPlugin, subDef.Transport) + transport, err := sm.getTransport(ctx, subDef.Transport) + if err != nil { + return nil, err } if subDef.Options.TLSConfigName != "" && sm.namespace.TLSConfigs[subDef.Options.TLSConfigName] != nil { diff --git a/internal/events/system/events.go b/internal/events/system/events.go index 4cdd0bdc30..354be298d7 100644 --- a/internal/events/system/events.go +++ b/internal/events/system/events.go @@ -23,6 +23,8 @@ import ( "github.com/hyperledger/firefly-common/pkg/config" "github.com/hyperledger/firefly-common/pkg/fftypes" + "github.com/hyperledger/firefly-common/pkg/i18n" + "github.com/hyperledger/firefly/internal/coremsgs" "github.com/hyperledger/firefly/pkg/core" "github.com/hyperledger/firefly/pkg/events" ) @@ -134,6 +136,10 @@ func (se *Events) DeliveryRequest(ctx context.Context, connID string, sub *core. return nil } +func (se *Events) BatchDeliveryRequest(ctx context.Context, connID string, sub *core.Subscription, events []*core.CombinedEventDataDelivery) error { + return i18n.NewError(ctx, coremsgs.MsgBatchDeliveryNotSupported, se.Name()) // should never happen +} + func (se *Events) NamespaceRestarted(ns string, startTime time.Time) { // no-op } diff --git a/internal/events/system/events_test.go b/internal/events/system/events_test.go index d099c24fef..69f6c3bfb9 100644 --- a/internal/events/system/events_test.go +++ b/internal/events/system/events_test.go @@ -142,3 +142,17 @@ func TestNamespaceRestarted(t *testing.T) { se.NamespaceRestarted("ns1", time.Now()) } + +func TestEventDeliveryBatch(t *testing.T) { + se, cancel := newTestEvents(t) + defer cancel() + + sub := &core.Subscription{ + SubscriptionRef: core.SubscriptionRef{ + Namespace: "ns1", + }, + } + + err := se.BatchDeliveryRequest(se.ctx, "id", sub, []*core.CombinedEventDataDelivery{}) + assert.Regexp(t, "FF10461", err) +} diff --git a/internal/events/webhooks/webhooks.go b/internal/events/webhooks/webhooks.go index cdd2e36e08..368dc2be63 100644 --- a/internal/events/webhooks/webhooks.go +++ b/internal/events/webhooks/webhooks.go @@ -57,11 +57,17 @@ type whRequest struct { r *resty.Request url string method string - body fftypes.JSONObject forceJSON bool replyTx string } +type whPayload struct { + input fftypes.JSONObject + parsedData0 fftypes.JSONObject + data0 *fftypes.JSONAny + body interface{} +} + type whResponse struct { Status int `json:"status"` Headers fftypes.JSONObject `json:"headers"` @@ -81,8 +87,10 @@ func (wh *WebHooks) Init(ctx context.Context, config config.Section) (err error) client := ffresty.NewWithConfig(ctx, *ffrestyConfig) *wh = WebHooks{ - ctx: log.WithLogField(ctx, "webhook", wh.connID), - capabilities: &events.Capabilities{}, + ctx: log.WithLogField(ctx, "webhook", wh.connID), + capabilities: &events.Capabilities{ + BatchDelivery: true, + }, callbacks: callbacks{ handlers: make(map[string]events.Callbacks), }, @@ -109,7 +117,73 @@ func (wh *WebHooks) Capabilities() *events.Capabilities { return wh.capabilities } -func (wh *WebHooks) buildRequest(ctx context.Context, restyClient *resty.Client, options fftypes.JSONObject, firstData fftypes.JSONObject) (req *whRequest, err error) { +// firstData parses data0 from the data the first time it's needed, and guarantees a non-nil result +func (p *whPayload) firstData() fftypes.JSONObject { + if p.parsedData0 != nil { + return p.parsedData0 + } + if p.data0 == nil { + p.parsedData0 = fftypes.JSONObject{} + } else { + // Use JSONObjectOk instead of JSONObject + // JSONObject fails for datatypes such as array, string, bool, number etc + var valid bool + p.parsedData0, valid = p.data0.JSONObjectOk() + if !valid { + p.parsedData0 = fftypes.JSONObject{ + "value": p.parsedData0, + } + } + } + return p.parsedData0 +} + +func (wh *WebHooks) buildPayload(ctx context.Context, sub *core.Subscription, event *core.CombinedEventDataDelivery) *whPayload { + + withData := sub.Options.WithData != nil && *sub.Options.WithData + options := sub.Options.TransportOptions() + p := &whPayload{ + // Options on how to process the input + input: options.GetObject("input"), + } + + allData := make([]*fftypes.JSONAny, 0, len(event.Data)) + if withData { + for _, d := range event.Data { + if d.Value != nil { + allData = append(allData, d.Value) + if p.data0 == nil { + p.data0 = d.Value + } + } + } + } + + // Choose to sub-select a field to send as the body + var bodyFromFirstData *fftypes.JSONAny + inputBody := p.input.GetString("body") + if inputBody != "" { + bodyFromFirstData = fftypes.JSONAnyPtr(p.firstData().GetObject(inputBody).String()) + } + + switch { + case bodyFromFirstData != nil: + // We might have been told to extract a body from the first data record + p.body = bodyFromFirstData.String() + case len(allData) > 1: + // We've got an array of data to POST + p.body = allData + case len(allData) == 1: + // Just send the first object, forced into an object per the rules in firstData() + p.body = p.firstData() + default: + // Just send the event itself + p.body = event.Event + } + return p +} + +func (wh *WebHooks) buildRequest(ctx context.Context, restyClient *resty.Client, options fftypes.JSONObject, p *whPayload) (req *whRequest, err error) { req = &whRequest{ r: restyClient.R(). SetDoNotParseResponse(true). @@ -145,34 +219,28 @@ func (wh *WebHooks) buildRequest(ctx context.Context, restyClient *resty.Client, } _ = req.r.SetQueryParam(q, s) } - if firstData != nil { - // Options on how to process the input - input := options.GetObject("input") + // p will be nil for a batch delivery + if p != nil { // Dynamic query support from input - inputQuery := input.GetString("query") + inputQuery := p.input.GetString("query") if inputQuery != "" { - iq := firstData.GetObject(inputQuery) + iq := p.firstData().GetObject(inputQuery) for q := range iq { _ = req.r.SetQueryParam(q, iq.GetString(q)) } } // Dynamic header support from input - inputHeaders := input.GetString("headers") + inputHeaders := p.input.GetString("headers") if inputHeaders != "" { - ih := firstData.GetObject(inputHeaders) + ih := p.firstData().GetObject(inputHeaders) for h := range ih { _ = req.r.SetHeader(h, ih.GetString(h)) } } - // Choose to sub-select a field to send as the body - inputBody := input.GetString("body") - if inputBody != "" { - req.body = firstData.GetObject(inputBody) - } // Choose to add an additional dynamic path - inputPath := input.GetString("path") + inputPath := p.input.GetString("path") if inputPath != "" { - extraPath := strings.TrimPrefix(firstData.GetString(inputPath), "/") + extraPath := strings.TrimPrefix(p.firstData().GetString(inputPath), "/") if len(extraPath) > 0 { pathSegments := strings.Split(extraPath, "/") for _, ps := range pathSegments { @@ -181,9 +249,9 @@ func (wh *WebHooks) buildRequest(ctx context.Context, restyClient *resty.Client, } } // Choose to add an additional dynamic path - inputTxtype := input.GetString("replytx") + inputTxtype := p.input.GetString("replytx") if inputTxtype != "" { - txType := firstData.GetString(inputTxtype) + txType := p.firstData().GetString(inputTxtype) if len(txType) > 0 { req.replyTx = txType if strings.EqualFold(txType, "true") { @@ -282,33 +350,27 @@ func (wh *WebHooks) ValidateOptions(ctx context.Context, options *core.Subscript // So these clients should live as long as the plugin exists options.RestyClient = ffresty.NewWithConfig(wh.ctx, newFFRestyConfig) - _, err := wh.buildRequest(ctx, options.RestyClient, options.TransportOptions(), fftypes.JSONObject{}) + _, err := wh.buildRequest(ctx, options.RestyClient, options.TransportOptions(), nil) return err } -func (wh *WebHooks) attemptRequest(ctx context.Context, sub *core.Subscription, event *core.EventDelivery, data core.DataArray) (req *whRequest, res *whResponse, err error) { - withData := sub.Options.WithData != nil && *sub.Options.WithData - allData := make([]*fftypes.JSONAny, 0, len(data)) - var firstData fftypes.JSONObject - var valid bool - if withData { - for _, d := range data { - if d.Value != nil { - allData = append(allData, d.Value) - } - } - if len(allData) == 0 { - firstData = fftypes.JSONObject{} - } else { - // Use JSONObjectOk instead of JSONObject - // JSONObject fails for datatypes such as array, string, bool, number etc - firstData, valid = allData[0].JSONObjectOk() - if !valid { - firstData = fftypes.JSONObject{ - "value": allData[0], - } - } +func (wh *WebHooks) attemptRequest(ctx context.Context, sub *core.Subscription, events []*core.CombinedEventDataDelivery, batch bool) (req *whRequest, res *whResponse, err error) { + + var payloadForBuildingRequest *whPayload // only set for a single event delivery + var requestBody interface{} + if len(events) == 1 && !batch { + payloadForBuildingRequest = wh.buildPayload(ctx, sub, events[0]) + // Payload for POST/PATCH/PUT is what is calculated for a single event in buildPayload + requestBody = payloadForBuildingRequest.body + } else { + batchBody := make([]interface{}, len(events)) + for i, event := range events { + // We only use the body itself from the whPayload - then discard it. + p := wh.buildPayload(ctx, sub, event) + batchBody[i] = p.body } + // Payload for POST/PATCH/PUT is the array of outputs calculated for a each event in buildPayload + requestBody = batchBody } client := wh.client @@ -316,33 +378,19 @@ func (wh *WebHooks) attemptRequest(ctx context.Context, sub *core.Subscription, client = sub.Options.RestyClient } - req, err = wh.buildRequest(ctx, client, sub.Options.TransportOptions(), firstData) + req, err = wh.buildRequest(ctx, client, sub.Options.TransportOptions(), payloadForBuildingRequest) if err != nil { return nil, nil, err } if req.method == http.MethodPost || req.method == http.MethodPatch || req.method == http.MethodPut { - switch { - case req.body != nil: - // We might have been told to extract a body from the first data record - req.r.SetBody(req.body) - case len(allData) > 1: - // We've got an array of data to POST - req.r.SetBody(allData) - case len(allData) == 1: - // Just send the first object directly - req.r.SetBody(firstData) - default: - // Just send the event itself - req.r.SetBody(event) - - } + req.r.SetBody(requestBody) } - log.L(wh.ctx).Debugf("Webhook-> %s %s event %s on subscription %s", req.method, req.url, event.ID, sub.ID) + // log.L(wh.ctx).Debugf("Webhook-> %s %s event %s on subscription %s", req.method, req.url, event.ID, sub.ID) resp, err := req.r.Execute(req.method, req.url) if err != nil { - log.L(ctx).Errorf("Webhook<- %s %s event %s on subscription %s failed: %s", req.method, req.url, event.ID, sub.ID, err) + // log.L(ctx).Errorf("Webhook<- %s %s event %s on subscription %s failed: %s", req.method, req.url, event.ID, sub.ID, err) return nil, nil, err } defer func() { _ = resp.RawBody().Close() }() @@ -351,7 +399,7 @@ func (wh *WebHooks) attemptRequest(ctx context.Context, sub *core.Subscription, Status: resp.StatusCode(), Headers: fftypes.JSONObject{}, } - log.L(wh.ctx).Infof("Webhook<- %s %s event %s on subscription %s returned %d", req.method, req.url, event.ID, sub.ID, res.Status) + // log.L(wh.ctx).Infof("Webhook<- %s %s event %s on subscription %s returned %d", req.method, req.url, event.ID, sub.ID, res.Status) header := resp.Header() for h := range header { res.Headers[h] = header.Get(h) @@ -384,8 +432,8 @@ func (wh *WebHooks) attemptRequest(ctx context.Context, sub *core.Subscription, return req, res, nil } -func (wh *WebHooks) doDelivery(ctx context.Context, connID string, reply bool, sub *core.Subscription, event *core.EventDelivery, data core.DataArray, fastAck bool) { - req, res, gwErr := wh.attemptRequest(ctx, sub, event, data) +func (wh *WebHooks) doDelivery(ctx context.Context, connID string, reply bool, sub *core.Subscription, events []*core.CombinedEventDataDelivery, fastAck, batched bool) { + req, res, gwErr := wh.attemptRequest(ctx, sub, events, batched) if gwErr != nil { // Generate a bad-gateway error response - we always want to send something back, // rather than just causing timeouts @@ -404,44 +452,49 @@ func (wh *WebHooks) doDelivery(ctx context.Context, connID string, reply bool, s b, _ := json.Marshal(&res) log.L(wh.ctx).Tracef("Webhook response: %s", string(b)) - // Emit the response - if reply && event.Message != nil { - txType := fftypes.FFEnum(strings.ToLower(sub.Options.TransportOptions().GetString("replytx"))) - if req != nil && req.replyTx != "" { - txType = fftypes.FFEnum(strings.ToLower(req.replyTx)) - } - if cb, ok := wh.callbacks.handlers[sub.Namespace]; ok { - log.L(wh.ctx).Debugf("Sending reply message for %s CID=%s", event.ID, event.Message.Header.ID) - cb.DeliveryResponse(connID, &core.EventDeliveryResponse{ - ID: event.ID, - Rejected: false, - Subscription: event.Subscription, - Reply: &core.MessageInOut{ - Message: core.Message{ - Header: core.MessageHeader{ - CID: event.Message.Header.ID, - Group: event.Message.Header.Group, - Type: event.Message.Header.Type, - Topics: event.Message.Header.Topics, - Tag: sub.Options.TransportOptions().GetString("replytag"), - TxType: txType, + // For each event emit a response + for _, combinedEvent := range events { + event := combinedEvent.Event + // Emit the response + if reply && event.Message != nil { + txType := fftypes.FFEnum(strings.ToLower(sub.Options.TransportOptions().GetString("replytx"))) + if req != nil && req.replyTx != "" { + txType = fftypes.FFEnum(strings.ToLower(req.replyTx)) + } + if cb, ok := wh.callbacks.handlers[sub.Namespace]; ok { + log.L(wh.ctx).Debugf("Sending reply message for %s CID=%s", event.ID, event.Message.Header.ID) + cb.DeliveryResponse(connID, &core.EventDeliveryResponse{ + ID: event.ID, + Rejected: false, + Subscription: event.Subscription, + Reply: &core.MessageInOut{ + Message: core.Message{ + Header: core.MessageHeader{ + CID: event.Message.Header.ID, + Group: event.Message.Header.Group, + Type: event.Message.Header.Type, + Topics: event.Message.Header.Topics, + Tag: sub.Options.TransportOptions().GetString("replytag"), + TxType: txType, + }, + }, + InlineData: core.InlineData{ + {Value: fftypes.JSONAnyPtrBytes(b)}, }, }, - InlineData: core.InlineData{ - {Value: fftypes.JSONAnyPtrBytes(b)}, - }, - }, - }) - } - } else if !fastAck { - if cb, ok := wh.callbacks.handlers[sub.Namespace]; ok { - cb.DeliveryResponse(connID, &core.EventDeliveryResponse{ - ID: event.ID, - Rejected: false, - Subscription: event.Subscription, - }) + }) + } + } else if !fastAck { + if cb, ok := wh.callbacks.handlers[sub.Namespace]; ok { + cb.DeliveryResponse(connID, &core.EventDeliveryResponse{ + ID: event.ID, + Rejected: false, + Subscription: event.Subscription, + }) + } } } + } func (wh *WebHooks) DeliveryRequest(ctx context.Context, connID string, sub *core.Subscription, event *core.EventDelivery, data core.DataArray) error { @@ -472,11 +525,63 @@ func (wh *WebHooks) DeliveryRequest(ctx context.Context, connID string, sub *cor Subscription: event.Subscription, }) } - go wh.doDelivery(ctx, connID, reply, sub, event, data, true) + go wh.doDelivery(ctx, connID, reply, sub, []*core.CombinedEventDataDelivery{{Event: event, Data: data}}, true, false) + return nil + } + + // NOTE: We could check here for batching and accumulate but we can't return because this causes the offset to jump... + + // TODO we don't look at the error here? + wh.doDelivery(ctx, connID, reply, sub, []*core.CombinedEventDataDelivery{{Event: event, Data: data}}, false, false) + return nil +} + +func (wh *WebHooks) BatchDeliveryRequest(ctx context.Context, connID string, sub *core.Subscription, events []*core.CombinedEventDataDelivery) error { + reply := sub.Options.TransportOptions().GetBool("reply") + if reply { + nonReplyEvents := []*core.CombinedEventDataDelivery{} + for _, combinedEvent := range events { + event := combinedEvent.Event + // We cowardly refuse to dispatch a message that is itself a reply, as it's hard for users to + // avoid loops - and there's no way for us to detect here if a user has configured correctly + // to avoid a loop. + if event.Message != nil && event.Message.Header.CID != nil { + log.L(wh.ctx).Debugf("Webhook subscription with reply enabled called with reply event '%s'", event.ID) + if cb, ok := wh.callbacks.handlers[sub.Namespace]; ok { + cb.DeliveryResponse(connID, &core.EventDeliveryResponse{ + ID: event.ID, + Rejected: false, + Subscription: event.Subscription, + }) + } + continue + } + + nonReplyEvents = append(nonReplyEvents, combinedEvent) + } + // Override the events to send without the reply events + events = nonReplyEvents + } + + // // In fastack mode we drive calls in parallel to the backend, immediately acknowledging the event + // NOTE: We cannot use this with reply mode, as when we're sending a reply the `DeliveryResponse` + // callback must include the reply in-line. + if !reply && sub.Options.TransportOptions().GetBool("fastack") { + for _, combinedEvent := range events { + event := combinedEvent.Event + if cb, ok := wh.callbacks.handlers[sub.Namespace]; ok { + cb.DeliveryResponse(connID, &core.EventDeliveryResponse{ + ID: event.ID, + Rejected: false, + Subscription: event.Subscription, + }) + } + } + go wh.doDelivery(ctx, connID, reply, sub, events, true, true) return nil } - wh.doDelivery(ctx, connID, reply, sub, event, data, false) + wh.doDelivery(ctx, connID, reply, sub, events, false, true) return nil } diff --git a/internal/events/webhooks/webhooks_test.go b/internal/events/webhooks/webhooks_test.go index 8e8398bfc2..e6a5fdc90a 100644 --- a/internal/events/webhooks/webhooks_test.go +++ b/internal/events/webhooks/webhooks_test.go @@ -1069,7 +1069,7 @@ func TestRequestReplyDataArrayError(t *testing.T) { mcb.AssertExpectations(t) } -func TestWebhookFailFastAsk(t *testing.T) { +func TestWebhookFailFastAck(t *testing.T) { wh, cancel := newTestWebHooks(t) defer cancel() @@ -1131,6 +1131,68 @@ func TestWebhookFailFastAsk(t *testing.T) { mcb.AssertExpectations(t) } +func TestWebhookFailFastAckBatch(t *testing.T) { + wh, cancel := newTestWebHooks(t) + defer cancel() + + msgID := fftypes.NewUUID() + r := mux.NewRouter() + server := httptest.NewServer(r) + server.Close() + + sub := &core.Subscription{ + SubscriptionRef: core.SubscriptionRef{ + Namespace: "ns1", + }, + } + sub.Options.TransportOptions()["fastack"] = true + event := &core.EventDelivery{ + EnrichedEvent: core.EnrichedEvent{ + Event: core.Event{ + ID: fftypes.NewUUID(), + }, + Message: &core.Message{ + Header: core.MessageHeader{ + ID: msgID, + Type: core.MessageTypeBroadcast, + }, + }, + }, + Subscription: core.SubscriptionRef{ + ID: sub.ID, + }, + } + + count := 0 + waiter := make(chan struct{}) + mcb := wh.callbacks.handlers["ns1"].(*eventsmocks.Callbacks) + mcb.On("DeliveryResponse", mock.Anything, mock.Anything). + Return(nil). + Run(func(a mock.Arguments) { + count++ + if count == 2 { + close(waiter) + } + }) + + // Drive two deliveries, waiting for them both to ack (noting both will fail) + err := wh.BatchDeliveryRequest(wh.ctx, mock.Anything, sub, []*core.CombinedEventDataDelivery{ + {Event: event, Data: core.DataArray{ + {ID: fftypes.NewUUID(), Value: fftypes.JSONAnyPtr(`"value1"`)}, + {ID: fftypes.NewUUID(), Value: fftypes.JSONAnyPtr(`"value2"`)}, + }}, + {Event: event, Data: core.DataArray{ + {ID: fftypes.NewUUID(), Value: fftypes.JSONAnyPtr(`"value1"`)}, + {ID: fftypes.NewUUID(), Value: fftypes.JSONAnyPtr(`"value2"`)}, + }}, + }) + assert.NoError(t, err) + + <-waiter + + mcb.AssertExpectations(t) +} + func TestDeliveryRequestNilMessage(t *testing.T) { wh, cancel := newTestWebHooks(t) defer cancel() @@ -1210,9 +1272,198 @@ func TestDeliveryRequestReplyToReply(t *testing.T) { mcb.AssertExpectations(t) } +func TestBatchDeliveryRequestReplyToReply(t *testing.T) { + wh, cancel := newTestWebHooks(t) + defer cancel() + + yes := true + sub := &core.Subscription{ + SubscriptionRef: core.SubscriptionRef{ + Namespace: "ns1", + }, + Options: core.SubscriptionOptions{ + SubscriptionCoreOptions: core.SubscriptionCoreOptions{ + WithData: &yes, + }, + }, + } + sub.Options.TransportOptions()["reply"] = true + event := &core.EventDelivery{ + EnrichedEvent: core.EnrichedEvent{ + Event: core.Event{ + ID: fftypes.NewUUID(), + }, + Message: &core.Message{ + Header: core.MessageHeader{ + ID: fftypes.NewUUID(), + Type: core.MessageTypeBroadcast, + CID: fftypes.NewUUID(), + }, + }, + }, + Subscription: core.SubscriptionRef{ + ID: sub.ID, + }, + } + + mcb := wh.callbacks.handlers["ns1"].(*eventsmocks.Callbacks) + mcb.On("DeliveryResponse", mock.Anything, mock.MatchedBy(func(response *core.EventDeliveryResponse) bool { + return !response.Rejected // should be accepted as a no-op so we can move on to other events + })) + + err := wh.BatchDeliveryRequest(wh.ctx, mock.Anything, sub, []*core.CombinedEventDataDelivery{{Event: event, Data: nil}}) + assert.NoError(t, err) + + mcb.AssertExpectations(t) +} + func TestNamespaceRestarted(t *testing.T) { wh, cancel := newTestWebHooks(t) defer cancel() wh.NamespaceRestarted("ns1", time.Now()) } + +func TestRequestWithBodyReplyEndToEndWithBatch(t *testing.T) { + wh, cancel := newTestWebHooks(t) + defer cancel() + + r := mux.NewRouter() + r.HandleFunc("/myapi", func(res http.ResponseWriter, req *http.Request) { + assert.Equal(t, "myheaderval", req.Header.Get("My-Header")) + assert.Equal(t, "myqueryval", req.URL.Query().Get("my-query")) + var data []fftypes.JSONObject + err := json.NewDecoder(req.Body).Decode(&data) + assert.NoError(t, err) + assert.Equal(t, len(data), 2) + assert.Equal(t, "inputvalue", data[0].GetObject("in_body").GetString("inputfield")) + res.Header().Set("my-reply-header", "myheaderval2") + res.WriteHeader(200) + res.Write([]byte(`{ + "replyfield": "replyvalue" + }`)) + }).Methods(http.MethodPut) + server := httptest.NewServer(r) + defer server.Close() + + yes := true + dataID := fftypes.NewUUID() + msgID := fftypes.NewUUID() + groupHash := fftypes.NewRandB32() + sub := &core.Subscription{ + SubscriptionRef: core.SubscriptionRef{ + Namespace: "ns1", + }, + Options: core.SubscriptionOptions{ + SubscriptionCoreOptions: core.SubscriptionCoreOptions{ + WithData: &yes, + }, + }, + } + to := sub.Options.TransportOptions() + to["reply"] = true + to["json"] = true + to["method"] = "PUT" + to["url"] = fmt.Sprintf("http://%s/myapi", server.Listener.Addr()) + to["headers"] = map[string]interface{}{ + "my-header": "myheaderval", + } + to["query"] = map[string]interface{}{ + "my-query": "myqueryval", + } + event1 := &core.EventDelivery{ + EnrichedEvent: core.EnrichedEvent{ + Event: core.Event{ + ID: fftypes.NewUUID(), + }, + Message: &core.Message{ + Header: core.MessageHeader{ + ID: msgID, + Group: groupHash, + Type: core.MessageTypePrivate, + }, + Data: core.DataRefs{ + {ID: dataID}, + }, + }, + }, + Subscription: core.SubscriptionRef{ + ID: sub.ID, + }, + } + + event2 := &core.EventDelivery{ + EnrichedEvent: core.EnrichedEvent{ + Event: core.Event{ + ID: fftypes.NewUUID(), + }, + Message: &core.Message{ + Header: core.MessageHeader{ + ID: msgID, + Group: groupHash, + Type: core.MessageTypePrivate, + }, + Data: core.DataRefs{ + {ID: dataID}, + }, + }, + }, + Subscription: core.SubscriptionRef{ + ID: sub.ID, + }, + } + + data1 := core.DataArray{&core.Data{ + ID: dataID, + Value: fftypes.JSONAnyPtr(`{ + "in_body": { + "inputfield": "inputvalue" + }, + "in_query": { + "dynamic-query": "dynamicqueryval" + }, + "in_headers": { + "dynamic-header": "dynamicheaderval" + }, + "in_path": "/my/sub/path?escape_query", + "in_replytx": true + }`), + }} + + data2 := core.DataArray{&core.Data{ + ID: dataID, + Value: fftypes.JSONAnyPtr(`{ + "in_body": { + "inputfield": "inputvalue" + }, + "in_query": { + "dynamic-query": "dynamicqueryval" + }, + "in_headers": { + "dynamic-header": "dynamicheaderval" + }, + "in_path": "/my/sub/path?escape_query", + "in_replytx": true + }`), + }} + + mcb := wh.callbacks.handlers["ns1"].(*eventsmocks.Callbacks) + mcb.On("DeliveryResponse", mock.Anything, mock.MatchedBy(func(response *core.EventDeliveryResponse) bool { + assert.Equal(t, *msgID, *response.Reply.Message.Header.CID) + assert.Equal(t, *groupHash, *response.Reply.Message.Header.Group) + assert.Equal(t, core.MessageTypePrivate, response.Reply.Message.Header.Type) + assert.Equal(t, "myheaderval2", response.Reply.InlineData[0].Value.JSONObject().GetObject("headers").GetString("My-Reply-Header")) + assert.Equal(t, "replyvalue", response.Reply.InlineData[0].Value.JSONObject().GetObject("body").GetString("replyfield")) + assert.Equal(t, float64(200), response.Reply.InlineData[0].Value.JSONObject()["status"]) + return true + })).Return(nil) + + err := wh.BatchDeliveryRequest(wh.ctx, mock.Anything, sub, []*core.CombinedEventDataDelivery{{Event: event1, Data: data1}, {Event: event2, Data: data2}}) + assert.NoError(t, err) + + mcb.AssertExpectations(t) +} + +func TestFirstDataNeverNil(t *testing.T) { + assert.NotNil(t, (&whPayload{}).firstData()) +} diff --git a/internal/events/websockets/websockets.go b/internal/events/websockets/websockets.go index 29e431d17f..4388aff166 100644 --- a/internal/events/websockets/websockets.go +++ b/internal/events/websockets/websockets.go @@ -217,3 +217,8 @@ func (ws *WebSockets) GetStatus() *core.WebSocketStatus { } return status } + +func (ws *WebSockets) BatchDeliveryRequest(ctx context.Context, connID string, sub *core.Subscription, events []*core.CombinedEventDataDelivery) error { + // We should have rejected creation of the subscription, due to us not supporting this in our capabilities + return i18n.NewError(ctx, coremsgs.MsgBatchDeliveryNotSupported, ws.Name()) +} diff --git a/internal/events/websockets/websockets_test.go b/internal/events/websockets/websockets_test.go index fef9ff5a75..6947d66e77 100644 --- a/internal/events/websockets/websockets_test.go +++ b/internal/events/websockets/websockets_test.go @@ -805,3 +805,18 @@ func TestNamespaceRestartedFailClose(t *testing.T) { mcb.AssertExpectations(t) } + +func TestEventDeliveryBatchReturnsUnsupported(t *testing.T) { + cbs := &eventsmocks.Callbacks{} + ws, _, cancel := newTestWebsockets(t, cbs, nil) + defer cancel() + + sub := &core.Subscription{ + SubscriptionRef: core.SubscriptionRef{ + Namespace: "ns1", + }, + } + + err := ws.BatchDeliveryRequest(ws.ctx, "id", sub, []*core.CombinedEventDataDelivery{}) + assert.Regexp(t, "FF10461", err) +} diff --git a/internal/orchestrator/orchestrator_test.go b/internal/orchestrator/orchestrator_test.go index f0bb81f09f..4e86368623 100644 --- a/internal/orchestrator/orchestrator_test.go +++ b/internal/orchestrator/orchestrator_test.go @@ -54,6 +54,7 @@ import ( "github.com/hyperledger/firefly/mocks/txwritermocks" "github.com/hyperledger/firefly/pkg/core" "github.com/hyperledger/firefly/pkg/database" + "github.com/hyperledger/firefly/pkg/events" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" ) @@ -199,6 +200,7 @@ func newTestOrchestrator() *testOrchestrator { tor.mcm.On("Name").Return("mock-cm").Maybe() tor.mmi.On("Name").Return("mock-mm").Maybe() tor.mmp.On("Name").Return("mock-mp").Maybe() + tor.mem.On("ResolveTransportAndCapabilities", mock.Anything, mock.Anything).Return("websockets", &events.Capabilities{}, nil).Maybe() tor.mds.On("Init", mock.Anything).Maybe() tor.cmi.On("GetCache", mock.Anything).Return(cache.NewUmanagedCache(tor.ctx, 100, 5*time.Minute), nil).Maybe() return tor diff --git a/internal/orchestrator/subscriptions.go b/internal/orchestrator/subscriptions.go index f782dddb92..fcfd934124 100644 --- a/internal/orchestrator/subscriptions.go +++ b/internal/orchestrator/subscriptions.go @@ -18,6 +18,7 @@ package orchestrator import ( "context" + "time" "github.com/hyperledger/firefly-common/pkg/ffapi" "github.com/hyperledger/firefly-common/pkg/fftypes" @@ -46,6 +47,11 @@ func (or *orchestrator) createUpdateSubscription(ctx context.Context, subDef *co if subDef.Transport == system.SystemEventsTransport { return nil, i18n.NewError(ctx, coremsgs.MsgSystemTransportInternal) } + resolvedTransport, capabilities, err := or.events.ResolveTransportAndCapabilities(ctx, subDef.Transport) + if err != nil { + return nil, err + } + subDef.Transport = resolvedTransport if subDef.Options.TLSConfigName != "" { if or.namespace.TLSConfigs[subDef.Options.TLSConfigName] == nil { @@ -56,6 +62,22 @@ func (or *orchestrator) createUpdateSubscription(ctx context.Context, subDef *co subDef.Options.TLSConfig = or.namespace.TLSConfigs[subDef.Options.TLSConfigName] } + if subDef.Options.BatchTimeout != nil && *subDef.Options.BatchTimeout != "" { + _, err := fftypes.ParseDurationString(*subDef.Options.BatchTimeout, time.Millisecond) + if err != nil { + return nil, err + } + } + + if subDef.Options.Batch != nil && *subDef.Options.Batch { + if subDef.Options.WithData != nil && *subDef.Options.WithData { + return nil, i18n.NewError(ctx, coremsgs.MsgBatchWithDataNotSupported, subDef.Name) + } + if !capabilities.BatchDelivery { + return nil, i18n.NewError(ctx, coremsgs.MsgBatchDeliveryNotSupported, subDef.Transport) + } + } + return subDef, or.events.CreateUpdateDurableSubscription(ctx, subDef, mustNew) } diff --git a/internal/orchestrator/subscriptions_test.go b/internal/orchestrator/subscriptions_test.go index ad441db585..58326469f5 100644 --- a/internal/orchestrator/subscriptions_test.go +++ b/internal/orchestrator/subscriptions_test.go @@ -25,6 +25,7 @@ import ( "github.com/hyperledger/firefly-common/pkg/fftypes" "github.com/hyperledger/firefly/internal/events/system" "github.com/hyperledger/firefly/internal/events/webhooks" + "github.com/hyperledger/firefly/mocks/eventmocks" "github.com/hyperledger/firefly/pkg/core" "github.com/hyperledger/firefly/pkg/database" "github.com/hyperledger/firefly/pkg/events" @@ -57,6 +58,89 @@ func TestCreateSubscriptionSystemTransport(t *testing.T) { assert.Regexp(t, "FF10266", err) } +func TestCreateSubscriptionBadBatchTimeout(t *testing.T) { + or := newTestOrchestrator() + defer or.cleanup(t) + + badTimeout := "-abc" + _, err := or.CreateSubscription(or.ctx, &core.Subscription{ + SubscriptionRef: core.SubscriptionRef{ + Name: "sub1", + }, + Options: core.SubscriptionOptions{ + SubscriptionCoreOptions: core.SubscriptionCoreOptions{ + BatchTimeout: &badTimeout, + }, + WebhookSubOptions: core.WebhookSubOptions{ + URL: "http://example.com", + }, + }, + Transport: "webhooks", + }) + assert.Regexp(t, "FF00137", err) +} + +func TestCreateSubscriptionBatchNotSupported(t *testing.T) { + or := newTestOrchestrator() + defer or.cleanup(t) + + truthy := true + _, err := or.CreateSubscription(or.ctx, &core.Subscription{ + SubscriptionRef: core.SubscriptionRef{ + Name: "sub1", + }, + Options: core.SubscriptionOptions{ + SubscriptionCoreOptions: core.SubscriptionCoreOptions{ + Batch: &truthy, + }, + WebhookSubOptions: core.WebhookSubOptions{ + URL: "http://example.com", + }, + }, + Transport: "webhooks", + }) + assert.Regexp(t, "FF10461", err) +} + +func TestCreateSubscriptionBatchWithData(t *testing.T) { + or := newTestOrchestrator() + defer or.cleanup(t) + + truthy := true + _, err := or.CreateSubscription(or.ctx, &core.Subscription{ + SubscriptionRef: core.SubscriptionRef{ + Name: "sub1", + }, + Options: core.SubscriptionOptions{ + SubscriptionCoreOptions: core.SubscriptionCoreOptions{ + WithData: &truthy, + Batch: &truthy, + }, + WebhookSubOptions: core.WebhookSubOptions{ + URL: "http://example.com", + }, + }, + Transport: "webhooks", + }) + assert.Regexp(t, "FF10460", err) +} + +func TestCreateSubscriptionBadTransport(t *testing.T) { + or := newTestOrchestrator() + defer or.cleanup(t) + + or.mem = &eventmocks.EventManager{} + or.mem.On("ResolveTransportAndCapabilities", mock.Anything, "wrongun").Return("", nil, fmt.Errorf("not found")) + or.events = or.mem + _, err := or.CreateSubscription(or.ctx, &core.Subscription{ + SubscriptionRef: core.SubscriptionRef{ + Name: "sub1", + }, + Transport: "wrongun", + }) + assert.Regexp(t, "not found", err) +} + func TestCreateSubscriptionOk(t *testing.T) { or := newTestOrchestrator() defer or.cleanup(t) diff --git a/mocks/eventmocks/event_manager.go b/mocks/eventmocks/event_manager.go index 1ed436b1bf..34ffb5a573 100644 --- a/mocks/eventmocks/event_manager.go +++ b/mocks/eventmocks/event_manager.go @@ -15,6 +15,8 @@ import ( mock "github.com/stretchr/testify/mock" + pkgevents "github.com/hyperledger/firefly/pkg/events" + sharedstorage "github.com/hyperledger/firefly/pkg/sharedstorage" system "github.com/hyperledger/firefly/internal/events/system" @@ -208,6 +210,39 @@ func (_m *EventManager) QueueBatchRewind(batchID *fftypes.UUID) { _m.Called(batchID) } +// ResolveTransportAndCapabilities provides a mock function with given fields: ctx, transportName +func (_m *EventManager) ResolveTransportAndCapabilities(ctx context.Context, transportName string) (string, *pkgevents.Capabilities, error) { + ret := _m.Called(ctx, transportName) + + var r0 string + var r1 *pkgevents.Capabilities + var r2 error + if rf, ok := ret.Get(0).(func(context.Context, string) (string, *pkgevents.Capabilities, error)); ok { + return rf(ctx, transportName) + } + if rf, ok := ret.Get(0).(func(context.Context, string) string); ok { + r0 = rf(ctx, transportName) + } else { + r0 = ret.Get(0).(string) + } + + if rf, ok := ret.Get(1).(func(context.Context, string) *pkgevents.Capabilities); ok { + r1 = rf(ctx, transportName) + } else { + if ret.Get(1) != nil { + r1 = ret.Get(1).(*pkgevents.Capabilities) + } + } + + if rf, ok := ret.Get(2).(func(context.Context, string) error); ok { + r2 = rf(ctx, transportName) + } else { + r2 = ret.Error(2) + } + + return r0, r1, r2 +} + // SharedStorageBatchDownloaded provides a mock function with given fields: ss, payloadRef, data func (_m *EventManager) SharedStorageBatchDownloaded(ss sharedstorage.Plugin, payloadRef string, data []byte) (*fftypes.UUID, error) { ret := _m.Called(ss, payloadRef, data) diff --git a/mocks/eventsmocks/plugin.go b/mocks/eventsmocks/plugin.go index d220b20906..4e93686d77 100644 --- a/mocks/eventsmocks/plugin.go +++ b/mocks/eventsmocks/plugin.go @@ -21,6 +21,20 @@ type Plugin struct { mock.Mock } +// BatchDeliveryRequest provides a mock function with given fields: ctx, connID, sub, _a3 +func (_m *Plugin) BatchDeliveryRequest(ctx context.Context, connID string, sub *core.Subscription, _a3 []*core.CombinedEventDataDelivery) error { + ret := _m.Called(ctx, connID, sub, _a3) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, string, *core.Subscription, []*core.CombinedEventDataDelivery) error); ok { + r0 = rf(ctx, connID, sub, _a3) + } else { + r0 = ret.Error(0) + } + + return r0 +} + // Capabilities provides a mock function with given fields: func (_m *Plugin) Capabilities() *events.Capabilities { ret := _m.Called() diff --git a/pkg/core/event.go b/pkg/core/event.go index b96e2d7263..6f659be681 100644 --- a/pkg/core/event.go +++ b/pkg/core/event.go @@ -1,4 +1,4 @@ -// Copyright © 2022 Kaleido, Inc. +// Copyright © 2023 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -99,6 +99,11 @@ type EventDelivery struct { Subscription SubscriptionRef `json:"subscription"` } +type CombinedEventDataDelivery struct { + Event *EventDelivery + Data DataArray +} + // EventDeliveryResponse is the payload an application sends back, to confirm it has accepted (or rejected) the event and as such // does not need to receive it again. type EventDeliveryResponse struct { diff --git a/pkg/core/subscription.go b/pkg/core/subscription.go index 48851fdd32..7af6fc85e7 100644 --- a/pkg/core/subscription.go +++ b/pkg/core/subscription.go @@ -89,9 +89,11 @@ const ( // SubscriptionCoreOptions are the core options that apply across all transports type SubscriptionCoreOptions struct { - FirstEvent *SubOptsFirstEvent `ffstruct:"SubscriptionCoreOptions" json:"firstEvent,omitempty"` - ReadAhead *uint16 `ffstruct:"SubscriptionCoreOptions" json:"readAhead,omitempty"` - WithData *bool `ffstruct:"SubscriptionCoreOptions" json:"withData,omitempty"` + FirstEvent *SubOptsFirstEvent `ffstruct:"SubscriptionCoreOptions" json:"firstEvent,omitempty"` + ReadAhead *uint16 `ffstruct:"SubscriptionCoreOptions" json:"readAhead,omitempty"` + WithData *bool `ffstruct:"SubscriptionCoreOptions" json:"withData,omitempty"` + Batch *bool `ffstruct:"SubscriptionCoreOptions" json:"batch,omitempty"` + BatchTimeout *string `ffstruct:"SubscriptionCoreOptions" json:"batchTimeout,omitempty"` } // SubscriptionOptions customize the behavior of subscriptions diff --git a/pkg/events/plugin.go b/pkg/events/plugin.go index a357dad03c..03098c5d2a 100644 --- a/pkg/events/plugin.go +++ b/pkg/events/plugin.go @@ -51,6 +51,10 @@ type Plugin interface { // Data will only be supplied as non-nil if the subscription is set to include data DeliveryRequest(ctx context.Context, connID string, sub *core.Subscription, event *core.EventDelivery, data core.DataArray) error + // DeliveryBatchRequest requests delivery of multiple events on a connection, which must later be responded to + // Data will only be supplied as non-nil if the subscription is set to include data + BatchDeliveryRequest(ctx context.Context, connID string, sub *core.Subscription, events []*core.CombinedEventDataDelivery) error + // NamespaceRestarted is called after a namespace restarts. For a connect-in style plugin, like // WebSockets, this must re-register any active connections that started before the time passed in. NamespaceRestarted(ns string, startTime time.Time) @@ -83,4 +87,6 @@ type Callbacks interface { DeliveryResponse(connID string, inflight *core.EventDeliveryResponse) } -type Capabilities struct{} +type Capabilities struct { + BatchDelivery bool +}