Skip to content

Commit fe72060

Browse files
committed
feat: batching events and webhook plugin support
Signed-off-by: Enrique Lacal <[email protected]>
1 parent 14ba21d commit fe72060

File tree

13 files changed

+522
-18
lines changed

13 files changed

+522
-18
lines changed

docs/reference/types/subscription.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,8 @@ nav_order: 3
105105
| `firstEvent` | Whether your application would like to receive events from the 'oldest' event emitted by your FireFly node (from the beginning of time), or the 'newest' event (from now), or a specific event sequence. Default is 'newest' | `SubOptsFirstEvent` |
106106
| `readAhead` | The number of events to stream ahead to your application, while waiting for confirmation of consumption of those events. At least once delivery semantics are used in FireFly, so if your application crashes/reconnects this is the maximum number of events you would expect to be redelivered after it restarts | `uint16` |
107107
| `withData` | Whether message events delivered over the subscription, should be packaged with the full data of those messages in-line as part of the event JSON payload. Or if the application should make separate REST calls to download that data. May not be supported on some transports. | `bool` |
108+
| `batch` | Enable batching. Works in conjunction with readAhead which defines the batchSize. | `bool` |
109+
| `batchTimeout` | When batching is enabled, the optional timeout to send events even when the batch hasn't filled. | `string` |
108110
| `fastack` | Webhooks only: When true the event will be acknowledged before the webhook is invoked, allowing parallel invocations | `bool` |
109111
| `url` | Webhooks only: HTTP url to invoke. Can be relative if a base URL is set in the webhook plugin config | `string` |
110112
| `method` | Webhooks only: HTTP method to invoke. Default=POST | `string` |
@@ -148,7 +150,7 @@ nav_order: 3
148150
| `requestTimeout` | The max duration to hold a TLS handshake alive | `string` |
149151
| `maxIdleConns` | The max number of idle connections to hold pooled | `int` |
150152
| `idleTimeout` | The max duration to hold a HTTP keepalive connection between calls | `string` |
151-
| `connectionTimeout` | | `string` |
153+
| `connectionTimeout` | The maximum amount of time that a connection is allowed to remain with no data transmitted. | `string` |
152154
| `expectContinueTimeout` | See [ExpectContinueTimeout in the Go docs](https://pkg.go.dev/net/http#Transport) | `string` |
153155

154156

docs/reference/types/wsstart.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,8 @@ nav_order: 23
9696
| `firstEvent` | Whether your application would like to receive events from the 'oldest' event emitted by your FireFly node (from the beginning of time), or the 'newest' event (from now), or a specific event sequence. Default is 'newest' | `SubOptsFirstEvent` |
9797
| `readAhead` | The number of events to stream ahead to your application, while waiting for confirmation of consumption of those events. At least once delivery semantics are used in FireFly, so if your application crashes/reconnects this is the maximum number of events you would expect to be redelivered after it restarts | `uint16` |
9898
| `withData` | Whether message events delivered over the subscription, should be packaged with the full data of those messages in-line as part of the event JSON payload. Or if the application should make separate REST calls to download that data. May not be supported on some transports. | `bool` |
99+
| `batch` | Enable batching. Works in conjunction with readAhead which defines the batchSize. | `bool` |
100+
| `batchTimeout` | When batching is enabled, the optional timeout to send events even when the batch hasn't filled. | `string` |
99101
| `fastack` | Webhooks only: When true the event will be acknowledged before the webhook is invoked, allowing parallel invocations | `bool` |
100102
| `url` | Webhooks only: HTTP url to invoke. Can be relative if a base URL is set in the webhook plugin config | `string` |
101103
| `method` | Webhooks only: HTTP method to invoke. Default=POST | `string` |
@@ -139,7 +141,7 @@ nav_order: 23
139141
| `requestTimeout` | The max duration to hold a TLS handshake alive | `string` |
140142
| `maxIdleConns` | The max number of idle connections to hold pooled | `int` |
141143
| `idleTimeout` | The max duration to hold a HTTP keepalive connection between calls | `string` |
142-
| `connectionTimeout` | | `string` |
144+
| `connectionTimeout` | The maximum amount of time that a connection is allowed to remain with no data transmitted. | `string` |
143145
| `expectContinueTimeout` | See [ExpectContinueTimeout in the Go docs](https://pkg.go.dev/net/http#Transport) | `string` |
144146

145147

docs/swagger/swagger.yaml

Lines changed: 120 additions & 0 deletions
Large diffs are not rendered by default.

internal/coremsgs/en_struct_descriptions.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -537,9 +537,11 @@ var (
537537
SubscriptionBlockchainEventFilterListener = ffm("SubscriptionBlockchainEventFilter.listener", "Regular expression to apply to the blockchain event 'listener' field, which is the UUID of the event listener. So you can restrict your subscription to certain blockchain listeners. Alternatively to avoid your application need to know listener UUIDs you can set the 'topic' field of blockchain event listeners, and use a topic filter on your subscriptions")
538538

539539
// SubscriptionCoreOptions field descriptions
540-
SubscriptionCoreOptionsFirstEvent = ffm("SubscriptionCoreOptions.firstEvent", "Whether your application would like to receive events from the 'oldest' event emitted by your FireFly node (from the beginning of time), or the 'newest' event (from now), or a specific event sequence. Default is 'newest'")
541-
SubscriptionCoreOptionsReadAhead = ffm("SubscriptionCoreOptions.readAhead", "The number of events to stream ahead to your application, while waiting for confirmation of consumption of those events. At least once delivery semantics are used in FireFly, so if your application crashes/reconnects this is the maximum number of events you would expect to be redelivered after it restarts")
542-
SubscriptionCoreOptionsWithData = ffm("SubscriptionCoreOptions.withData", "Whether message events delivered over the subscription, should be packaged with the full data of those messages in-line as part of the event JSON payload. Or if the application should make separate REST calls to download that data. May not be supported on some transports.")
540+
SubscriptionCoreOptionsFirstEvent = ffm("SubscriptionCoreOptions.firstEvent", "Whether your application would like to receive events from the 'oldest' event emitted by your FireFly node (from the beginning of time), or the 'newest' event (from now), or a specific event sequence. Default is 'newest'")
541+
SubscriptionCoreOptionsReadAhead = ffm("SubscriptionCoreOptions.readAhead", "The number of events to stream ahead to your application, while waiting for confirmation of consumption of those events. At least once delivery semantics are used in FireFly, so if your application crashes/reconnects this is the maximum number of events you would expect to be redelivered after it restarts")
542+
SubscriptionCoreOptionsWithData = ffm("SubscriptionCoreOptions.withData", "Whether message events delivered over the subscription, should be packaged with the full data of those messages in-line as part of the event JSON payload. Or if the application should make separate REST calls to download that data. May not be supported on some transports.")
543+
SubscriptionCoreOptionsBatch = ffm("SubscriptionCoreOptions.batch", "Enable batching. Works in conjunction with readAhead which defines the batchSize.")
544+
SubscriptionCoreOptionsBatchTimeout = ffm("SubscriptionCoreOptions.batchTimeout", "When batching is enabled, the optional timeout to send events even when the batch hasn't filled.")
543545

544546
// TokenApproval field descriptions
545547
TokenApprovalLocalID = ffm("TokenApproval.localId", "The UUID of this token approval, in the local FireFly node")
@@ -706,7 +708,7 @@ var (
706708
WebhookOptHTTPExpectContinueTimeout = ffm("WebhookHTTPOptions.expectContinueTimeout", "See [ExpectContinueTimeout in the Go docs](https://pkg.go.dev/net/http#Transport)")
707709
WebhookOptHTTPIdleTimeout = ffm("WebhookHTTPOptions.idleTimeout", "The max duration to hold a HTTP keepalive connection between calls")
708710
WebhookOptHTTPMaxIdleConns = ffm("WebhookHTTPOptions.maxIdleConns", "The max number of idle connections to hold pooled")
709-
WebhookOptHTTPConnectionTimeout = ffm("WebhookHTTPOptions.connectionTimeout", "")
711+
WebhookOptHTTPConnectionTimeout = ffm("WebhookHTTPOptions.connectionTimeout", "The maximum amount of time that a connection is allowed to remain with no data transmitted.")
710712
WebhookOptHTTPTLSHandshakeTimeout = ffm("WebhookHTTPOptions.tlsHandshakeTimeout", "The max duration to hold a TLS handshake alive")
711713
WebhookOptHTTPRequestTimeout = ffm("WebhookHTTPOptions.requestTimeout", "The max duration to hold a TLS handshake alive")
712714

internal/events/event_dispatcher.go

Lines changed: 96 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"context"
2121
"fmt"
2222
"sync"
23+
"time"
2324

2425
"github.com/hyperledger/firefly-common/pkg/config"
2526
"github.com/hyperledger/firefly-common/pkg/ffapi"
@@ -39,7 +40,8 @@ import (
3940
)
4041

4142
const (
42-
maxReadAhead = 65536
43+
maxReadAhead = 65536
44+
defaultBatchTimeout = time.Duration(2) * time.Second
4345
)
4446

4547
type ackNack struct {
@@ -66,7 +68,9 @@ type eventDispatcher struct {
6668
eventDelivery chan *core.EventDelivery
6769
mux sync.Mutex
6870
namespace string
69-
readAhead int
71+
readAhead int // confusing to have both
72+
batch bool // boolean? gonna receiver array, interface of application to FF
73+
batchTimeout time.Duration
7074
subscription *subscription
7175
txHelper txcommon.Helper
7276
}
@@ -80,6 +84,11 @@ func newEventDispatcher(ctx context.Context, enricher *eventEnricher, ei events.
8084
if readAhead > maxReadAhead {
8185
readAhead = maxReadAhead
8286
}
87+
88+
batchTimeout := defaultBatchTimeout
89+
if sub.definition.Options.BatchTimeout != "" {
90+
batchTimeout = fftypes.ParseToDuration(sub.definition.Options.BatchTimeout)
91+
}
8392
ed := &eventDispatcher{
8493
ctx: log.WithLogField(log.WithLogField(ctx,
8594
"role", fmt.Sprintf("ed[%s]", connID)),
@@ -100,6 +109,8 @@ func newEventDispatcher(ctx context.Context, enricher *eventEnricher, ei events.
100109
acksNacks: make(chan ackNack),
101110
closed: make(chan struct{}),
102111
txHelper: txHelper,
112+
batch: sub.definition.Options.Batch,
113+
batchTimeout: batchTimeout,
103114
}
104115

105116
pollerConf := &eventPollerConf{
@@ -149,7 +160,12 @@ func (ed *eventDispatcher) electAndStart() {
149160
// We're ready to go - not
150161
ed.elected = true
151162
ed.eventPoller.start()
152-
go ed.deliverEvents()
163+
164+
if ed.batch {
165+
go ed.deliverBatchedEvents()
166+
} else {
167+
go ed.deliverEvents()
168+
}
153169
// Wait until the event poller closes
154170
<-ed.eventPoller.closed
155171
}
@@ -262,6 +278,7 @@ func (ed *eventDispatcher) bufferedDelivery(events []core.LocallySequenced) (boo
262278
// At this point, the page of messages we've been given are loaded from the DB into memory,
263279
// but we can only make them in-flight and push them to the client up to the maximum
264280
// readahead (which is likely lower than our page size - 1 by default)
281+
// TODO how do we guarantee that the readahead is lower that our page size?
265282

266283
if len(events) == 0 {
267284
return false, nil
@@ -280,33 +297,35 @@ func (ed *eventDispatcher) bufferedDelivery(events []core.LocallySequenced) (boo
280297
matchCount := len(matching)
281298
dispatched := 0
282299

300+
// TODO We handle batch delivery here!
301+
283302
// We stay here blocked until we've consumed all the messages in the buffer,
284303
// or a reset event happens
285304
for {
286305
ed.mux.Lock()
287-
var disapatchable []*core.EventDelivery
306+
var dispatchable []*core.EventDelivery
288307
inflightCount := len(ed.inflight)
289308
maxDispatch := 1 + ed.readAhead - inflightCount
290309
if maxDispatch >= len(matching) {
291-
disapatchable = matching
310+
dispatchable = matching
292311
matching = nil
293312
} else if maxDispatch > 0 {
294-
disapatchable = matching[0:maxDispatch]
313+
dispatchable = matching[0:maxDispatch]
295314
matching = matching[maxDispatch:]
296315
}
297316
ed.mux.Unlock()
298317

299318
l.Debugf("Dispatcher event state: readahead=%d candidates=%d matched=%d inflight=%d queued=%d dispatched=%d dispatchable=%d lastAck=%d nacks=%d highest=%d",
300-
ed.readAhead, len(candidates), matchCount, inflightCount, len(matching), dispatched, len(disapatchable), lastAck, nacks, highestOffset)
319+
ed.readAhead, len(candidates), matchCount, inflightCount, len(matching), dispatched, len(dispatchable), lastAck, nacks, highestOffset)
301320

302-
for _, event := range disapatchable {
321+
for _, event := range dispatchable {
303322
ed.mux.Lock()
304323
ed.inflight[*event.ID] = &event.Event
305324
inflightCount = len(ed.inflight)
306325
ed.mux.Unlock()
307326

308327
dispatched++
309-
ed.eventDelivery <- event
328+
ed.eventDelivery <- event // Do we send batches on the channel or accumulate in the plugin?
310329
}
311330

312331
if inflightCount == 0 {
@@ -364,6 +383,72 @@ func (ed *eventDispatcher) handleAckOffsetUpdate(ack ackNack) {
364383
}
365384
}
366385

386+
func (ed *eventDispatcher) deliverBatchedEvents() {
387+
withData := ed.subscription.definition.Options.WithData != nil && *ed.subscription.definition.Options.WithData
388+
389+
var events []*core.EventDelivery
390+
var dataSet []core.DataArray
391+
var batchTimeoutContext context.Context
392+
var batchTimeoutCancel func()
393+
for {
394+
var timeoutContext context.Context
395+
var timedOut bool
396+
if batchTimeoutContext != nil {
397+
timeoutContext = batchTimeoutContext
398+
} else {
399+
timeoutContext = ed.ctx
400+
}
401+
select {
402+
case event, ok := <-ed.eventDelivery:
403+
if !ok {
404+
if batchTimeoutCancel != nil {
405+
batchTimeoutCancel()
406+
}
407+
return
408+
}
409+
410+
if events == nil && dataSet == nil {
411+
events = []*core.EventDelivery{}
412+
dataSet = []core.DataArray{}
413+
batchTimeoutContext, batchTimeoutCancel = context.WithTimeout(ed.ctx, ed.batchTimeout)
414+
}
415+
416+
log.L(ed.ctx).Debugf("Dispatching %s event in a batch: %.10d/%s [%s]: ref=%s/%s", ed.transport.Name(), event.Sequence, event.ID, event.Type, event.Namespace, event.Reference)
417+
418+
events = append(events, event)
419+
420+
var data []*core.Data
421+
var err error
422+
if withData && event.Message != nil {
423+
data, _, err = ed.data.GetMessageDataCached(ed.ctx, event.Message)
424+
dataSet = append(dataSet, data)
425+
}
426+
427+
if err != nil {
428+
ed.deliveryResponse(&core.EventDeliveryResponse{ID: event.ID, Rejected: true})
429+
}
430+
431+
case <-timeoutContext.Done():
432+
timedOut = true
433+
case <-ed.ctx.Done():
434+
if batchTimeoutCancel != nil {
435+
batchTimeoutCancel()
436+
}
437+
return
438+
}
439+
440+
if len(events) >= ed.readAhead || (timedOut && len(events) > 0) {
441+
// TODO properly handle the error
442+
batchTimeoutCancel()
443+
_ = ed.transport.DeliveryBatchRequest(ed.ctx, ed.connID, ed.subscription.definition, events, dataSet)
444+
// If err handle all the delivery responses for all the events??
445+
events = nil
446+
dataSet = nil
447+
}
448+
}
449+
}
450+
451+
// TODO issue here, we can't just call DeliveryRequest with one thing.
367452
func (ed *eventDispatcher) deliverEvents() {
368453
withData := ed.subscription.definition.Options.WithData != nil && *ed.subscription.definition.Options.WithData
369454
for {
@@ -372,12 +457,14 @@ func (ed *eventDispatcher) deliverEvents() {
372457
if !ok {
373458
return
374459
}
460+
375461
log.L(ed.ctx).Debugf("Dispatching %s event: %.10d/%s [%s]: ref=%s/%s", ed.transport.Name(), event.Sequence, event.ID, event.Type, event.Namespace, event.Reference)
376462
var data []*core.Data
377463
var err error
378464
if withData && event.Message != nil {
379465
data, _, err = ed.data.GetMessageDataCached(ed.ctx, event.Message)
380466
}
467+
381468
if err == nil {
382469
err = ed.transport.DeliveryRequest(ed.ctx, ed.connID, ed.subscription.definition, event, data)
383470
}

internal/events/event_dispatcher_test.go

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1064,3 +1064,55 @@ func TestEventDispatcherWithReply(t *testing.T) {
10641064
mbm.AssertExpectations(t)
10651065
mms.AssertExpectations(t)
10661066
}
1067+
1068+
func TestEventDeliveryBatch(t *testing.T) {
1069+
log.SetLevel("debug")
1070+
var five = uint16(5)
1071+
sub := &subscription{
1072+
dispatcherElection: make(chan bool, 1),
1073+
definition: &core.Subscription{
1074+
SubscriptionRef: core.SubscriptionRef{ID: fftypes.NewUUID(), Namespace: "ns1", Name: "sub1"},
1075+
Options: core.SubscriptionOptions{
1076+
SubscriptionCoreOptions: core.SubscriptionCoreOptions{
1077+
ReadAhead: &five,
1078+
Batch: true,
1079+
},
1080+
},
1081+
},
1082+
eventMatcher: regexp.MustCompile(fmt.Sprintf("^%s|%s$", core.EventTypeMessageConfirmed, core.EventTypeMessageConfirmed)),
1083+
}
1084+
1085+
ed, cancel := newTestEventDispatcher(sub)
1086+
cancel()
1087+
ed.acksNacks = make(chan ackNack, 5)
1088+
1089+
event1 := fftypes.NewUUID()
1090+
ed.inflight[*event1] = &core.Event{
1091+
ID: event1,
1092+
Namespace: "ns1",
1093+
}
1094+
1095+
mms := &syncasyncmocks.Sender{}
1096+
mbm := ed.broadcast.(*broadcastmocks.Manager)
1097+
mbm.On("NewBroadcast", mock.Anything).Return(mms)
1098+
mms.On("Send", mock.Anything).Return(nil)
1099+
1100+
ed.deliveryResponse(&core.EventDeliveryResponse{
1101+
ID: event1,
1102+
Reply: &core.MessageInOut{
1103+
Message: core.Message{
1104+
Header: core.MessageHeader{
1105+
Tag: "myreplytag1",
1106+
CID: fftypes.NewUUID(),
1107+
Type: core.MessageTypeBroadcast,
1108+
},
1109+
},
1110+
InlineData: core.InlineData{
1111+
{Value: fftypes.JSONAnyPtr(`"my reply"`)},
1112+
},
1113+
},
1114+
})
1115+
1116+
mbm.AssertExpectations(t)
1117+
mms.AssertExpectations(t)
1118+
}

internal/events/system/events.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,10 @@ func (se *Events) DeliveryRequest(ctx context.Context, connID string, sub *core.
134134
return nil
135135
}
136136

137+
func (se *Events) DeliveryBatchRequest(ctx context.Context, connID string, sub *core.Subscription, events []*core.EventDelivery, data []core.DataArray) error {
138+
return nil
139+
}
140+
137141
func (se *Events) NamespaceRestarted(ns string, startTime time.Time) {
138142
// no-op
139143
}

0 commit comments

Comments
 (0)