Skip to content

Commit a1a47d1

Browse files
authored
Merge pull request #1353 from kaleido-io/expose_webhook_options
feat: expose retry and HTTP options for webhooks
2 parents ae2854d + 30d22a3 commit a1a47d1

File tree

62 files changed

+1215
-403
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

62 files changed

+1215
-403
lines changed

docs/reference/types/subscription.md

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,8 @@ nav_order: 3
116116
| `query` | Webhooks only: Static query params to set on the webhook request | `` |
117117
| `tlsConfigName` | The name of an existing TLS configuration associated to the namespace to use | `string` |
118118
| `input` | Webhooks only: A set of options to extract data from the first JSON input data in the incoming message. Only applies if withData=true | [`WebhookInputOptions`](#webhookinputoptions) |
119+
| `retry` | Webhooks only: a set of options for retrying the webhook call | [`WebhookRetryOptions`](#webhookretryoptions) |
120+
| `httpOptions` | Webhooks only: a set of options for HTTP | [`WebhookHTTPOptions`](#webhookhttpoptions) |
119121

120122
## WebhookInputOptions
121123

@@ -128,4 +130,26 @@ nav_order: 3
128130
| `replytx` | A top-level property of the first data input, to use to dynamically set whether to pin the response (so the requester can choose) | `string` |
129131

130132

133+
## WebhookRetryOptions
134+
135+
| Field Name | Description | Type |
136+
|------------|-------------|------|
137+
| `enabled` | Enables retry on HTTP calls, defaults to false | `bool` |
138+
| `count` | Number of times to retry the webhook call in case of failure | `int` |
139+
| `initialDelay` | Initial delay between retries when we retry the webhook call | `string` |
140+
| `maxDelay` | Max delay between retries when we retry the webhookcall | `string` |
141+
142+
143+
## WebhookHTTPOptions
144+
145+
| Field Name | Description | Type |
146+
|------------|-------------|------|
147+
| `tlsHandshakeTimeout` | The max duration to hold a TLS handshake alive | `string` |
148+
| `requestTimeout` | The max duration to hold a TLS handshake alive | `string` |
149+
| `maxIdleConns` | The max number of idle connections to hold pooled | `int` |
150+
| `idleTimeout` | The max duration to hold a HTTP keepalive connection between calls | `string` |
151+
| `connectionTimeout` | | `string` |
152+
| `expectContinueTimeout` | See [ExpectContinueTimeout in the Go docs](https://pkg.go.dev/net/http#Transport) | `string` |
153+
154+
131155

docs/reference/types/wsstart.md

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,8 @@ nav_order: 23
107107
| `query` | Webhooks only: Static query params to set on the webhook request | `` |
108108
| `tlsConfigName` | The name of an existing TLS configuration associated to the namespace to use | `string` |
109109
| `input` | Webhooks only: A set of options to extract data from the first JSON input data in the incoming message. Only applies if withData=true | [`WebhookInputOptions`](#webhookinputoptions) |
110+
| `retry` | Webhooks only: a set of options for retrying the webhook call | [`WebhookRetryOptions`](#webhookretryoptions) |
111+
| `httpOptions` | Webhooks only: a set of options for HTTP | [`WebhookHTTPOptions`](#webhookhttpoptions) |
110112

111113
## WebhookInputOptions
112114

@@ -119,4 +121,26 @@ nav_order: 23
119121
| `replytx` | A top-level property of the first data input, to use to dynamically set whether to pin the response (so the requester can choose) | `string` |
120122

121123

124+
## WebhookRetryOptions
125+
126+
| Field Name | Description | Type |
127+
|------------|-------------|------|
128+
| `enabled` | Enables retry on HTTP calls, defaults to false | `bool` |
129+
| `count` | Number of times to retry the webhook call in case of failure | `int` |
130+
| `initialDelay` | Initial delay between retries when we retry the webhook call | `string` |
131+
| `maxDelay` | Max delay between retries when we retry the webhookcall | `string` |
132+
133+
134+
## WebhookHTTPOptions
135+
136+
| Field Name | Description | Type |
137+
|------------|-------------|------|
138+
| `tlsHandshakeTimeout` | The max duration to hold a TLS handshake alive | `string` |
139+
| `requestTimeout` | The max duration to hold a TLS handshake alive | `string` |
140+
| `maxIdleConns` | The max number of idle connections to hold pooled | `int` |
141+
| `idleTimeout` | The max duration to hold a HTTP keepalive connection between calls | `string` |
142+
| `connectionTimeout` | | `string` |
143+
| `expectContinueTimeout` | See [ExpectContinueTimeout in the Go docs](https://pkg.go.dev/net/http#Transport) | `string` |
144+
145+
122146

docs/swagger/swagger.yaml

Lines changed: 540 additions & 0 deletions
Large diffs are not rendered by default.

internal/coremsgs/en_struct_descriptions.go

Lines changed: 28 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -681,22 +681,34 @@ var (
681681
WSSubscriptionStatusFilter = ffm("WSSubscriptionStatus.filter", "The subscription filter specification")
682682
WSSubscriptionStatusStartTime = ffm("WSSubscriptionStatus.startTime", "The time the subscription started (reset on dynamic namespace reload)")
683683

684-
WebhooksOptJSON = ffm("WebhookSubOptions.json", "Webhooks only: Whether to assume the response body is JSON, regardless of the returned Content-Type")
685-
WebhooksOptReply = ffm("WebhookSubOptions.reply", "Webhooks only: Whether to automatically send a reply event, using the body returned by the webhook")
686-
WebhooksOptHeaders = ffm("WebhookSubOptions.headers", "Webhooks only: Static headers to set on the webhook request")
687-
WebhooksOptQuery = ffm("WebhookSubOptions.query", "Webhooks only: Static query params to set on the webhook request")
688-
WebhooksOptInput = ffm("WebhookSubOptions.input", "Webhooks only: A set of options to extract data from the first JSON input data in the incoming message. Only applies if withData=true")
689-
WebhooksOptFastAck = ffm("WebhookSubOptions.fastack", "Webhooks only: When true the event will be acknowledged before the webhook is invoked, allowing parallel invocations")
690-
WebhooksOptURL = ffm("WebhookSubOptions.url", "Webhooks only: HTTP url to invoke. Can be relative if a base URL is set in the webhook plugin config")
691-
WebhooksOptMethod = ffm("WebhookSubOptions.method", "Webhooks only: HTTP method to invoke. Default=POST")
692-
WebhooksOptReplyTag = ffm("WebhookSubOptions.replytag", "Webhooks only: The tag to set on the reply message")
693-
WebhooksOptReplyTx = ffm("WebhookSubOptions.replytx", "Webhooks only: The transaction type to set on the reply message")
694-
WebhooksOptTLSConfigName = ffm("WebhookSubOptions.tlsConfigName", "The name of an existing TLS configuration associated to the namespace to use")
695-
WebhooksOptInputQuery = ffm("WebhookInputOptions.query", "A top-level property of the first data input, to use for query parameters")
696-
WebhooksOptInputHeaders = ffm("WebhookInputOptions.headers", "A top-level property of the first data input, to use for headers")
697-
WebhooksOptInputBody = ffm("WebhookInputOptions.body", "A top-level property of the first data input, to use for the request body. Default is the whole first body")
698-
WebhooksOptInputPath = ffm("WebhookInputOptions.path", "A top-level property of the first data input, to use for a path to append with escaping to the webhook path")
699-
WebhooksOptInputReplyTx = ffm("WebhookInputOptions.replytx", "A top-level property of the first data input, to use to dynamically set whether to pin the response (so the requester can choose)")
684+
WebhooksOptJSON = ffm("WebhookSubOptions.json", "Webhooks only: Whether to assume the response body is JSON, regardless of the returned Content-Type")
685+
WebhooksOptReply = ffm("WebhookSubOptions.reply", "Webhooks only: Whether to automatically send a reply event, using the body returned by the webhook")
686+
WebhooksOptHeaders = ffm("WebhookSubOptions.headers", "Webhooks only: Static headers to set on the webhook request")
687+
WebhooksOptQuery = ffm("WebhookSubOptions.query", "Webhooks only: Static query params to set on the webhook request")
688+
WebhooksOptInput = ffm("WebhookSubOptions.input", "Webhooks only: A set of options to extract data from the first JSON input data in the incoming message. Only applies if withData=true")
689+
WebhooksOptFastAck = ffm("WebhookSubOptions.fastack", "Webhooks only: When true the event will be acknowledged before the webhook is invoked, allowing parallel invocations")
690+
WebhooksOptURL = ffm("WebhookSubOptions.url", "Webhooks only: HTTP url to invoke. Can be relative if a base URL is set in the webhook plugin config")
691+
WebhooksOptMethod = ffm("WebhookSubOptions.method", "Webhooks only: HTTP method to invoke. Default=POST")
692+
WebhooksOptReplyTag = ffm("WebhookSubOptions.replytag", "Webhooks only: The tag to set on the reply message")
693+
WebhooksOptReplyTx = ffm("WebhookSubOptions.replytx", "Webhooks only: The transaction type to set on the reply message")
694+
WebhooksOptTLSConfigName = ffm("WebhookSubOptions.tlsConfigName", "The name of an existing TLS configuration associated to the namespace to use")
695+
WebhooksOptHTTPOptions = ffm("WebhookSubOptions.httpOptions", "Webhooks only: a set of options for HTTP")
696+
WebhooksOptHTTPRetry = ffm("WebhookSubOptions.retry", "Webhooks only: a set of options for retrying the webhook call")
697+
WebhooksOptInputQuery = ffm("WebhookInputOptions.query", "A top-level property of the first data input, to use for query parameters")
698+
WebhooksOptInputHeaders = ffm("WebhookInputOptions.headers", "A top-level property of the first data input, to use for headers")
699+
WebhooksOptInputBody = ffm("WebhookInputOptions.body", "A top-level property of the first data input, to use for the request body. Default is the whole first body")
700+
WebhooksOptInputPath = ffm("WebhookInputOptions.path", "A top-level property of the first data input, to use for a path to append with escaping to the webhook path")
701+
WebhooksOptInputReplyTx = ffm("WebhookInputOptions.replytx", "A top-level property of the first data input, to use to dynamically set whether to pin the response (so the requester can choose)")
702+
WebhooksOptRetryEnabled = ffm("WebhookRetryOptions.enabled", "Enables retry on HTTP calls, defaults to false")
703+
WebhooksOptRetryCount = ffm("WebhookRetryOptions.count", "Number of times to retry the webhook call in case of failure")
704+
WebhooksOptRetryInitialDelay = ffm("WebhookRetryOptions.initialDelay", "Initial delay between retries when we retry the webhook call")
705+
WebhooksOptRetryMaxDelay = ffm("WebhookRetryOptions.maxDelay", "Max delay between retries when we retry the webhookcall")
706+
WebhookOptHTTPExpectContinueTimeout = ffm("WebhookHTTPOptions.expectContinueTimeout", "See [ExpectContinueTimeout in the Go docs](https://pkg.go.dev/net/http#Transport)")
707+
WebhookOptHTTPIdleTimeout = ffm("WebhookHTTPOptions.idleTimeout", "The max duration to hold a HTTP keepalive connection between calls")
708+
WebhookOptHTTPMaxIdleConns = ffm("WebhookHTTPOptions.maxIdleConns", "The max number of idle connections to hold pooled")
709+
WebhookOptHTTPConnectionTimeout = ffm("WebhookHTTPOptions.connectionTimeout", "")
710+
WebhookOptHTTPTLSHandshakeTimeout = ffm("WebhookHTTPOptions.tlsHandshakeTimeout", "The max duration to hold a TLS handshake alive")
711+
WebhookOptHTTPRequestTimeout = ffm("WebhookHTTPOptions.requestTimeout", "The max duration to hold a TLS handshake alive")
700712

701713
// PublishInput field descriptions
702714
PublishInputIdempotencyKey = ffm("PublishInput.idempotencyKey", "An optional identifier to allow idempotent submission of requests. Stored on the transaction uniquely within a namespace")

internal/events/event_dispatcher.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright © 2022 Kaleido, Inc.
1+
// Copyright © 2023 Kaleido, Inc.
22
//
33
// SPDX-License-Identifier: Apache-2.0
44
//
@@ -379,7 +379,7 @@ func (ed *eventDispatcher) deliverEvents() {
379379
data, _, err = ed.data.GetMessageDataCached(ed.ctx, event.Message)
380380
}
381381
if err == nil {
382-
err = ed.transport.DeliveryRequest(ed.connID, ed.subscription.definition, event, data)
382+
err = ed.transport.DeliveryRequest(ed.ctx, ed.connID, ed.subscription.definition, event, data)
383383
}
384384
if err != nil {
385385
ed.deliveryResponse(&core.EventDeliveryResponse{ID: event.ID, Rejected: true})

internal/events/event_dispatcher_test.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -177,9 +177,9 @@ func TestEventDispatcherReadAheadOutOfOrderAcks(t *testing.T) {
177177
mdm := ed.data.(*datamocks.Manager)
178178

179179
eventDeliveries := make(chan *core.EventDelivery)
180-
deliveryRequestMock := mei.On("DeliveryRequest", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
180+
deliveryRequestMock := mei.On("DeliveryRequest", ed.ctx, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
181181
deliveryRequestMock.RunFn = func(a mock.Arguments) {
182-
eventDeliveries <- a.Get(2).(*core.EventDelivery)
182+
eventDeliveries <- a.Get(3).(*core.EventDelivery)
183183
}
184184

185185
// Setup the IDs
@@ -272,9 +272,9 @@ func TestEventDispatcherNoReadAheadInOrder(t *testing.T) {
272272
mei := ed.transport.(*eventsmocks.Plugin)
273273

274274
eventDeliveries := make(chan *core.EventDelivery)
275-
deliveryRequestMock := mei.On("DeliveryRequest", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
275+
deliveryRequestMock := mei.On("DeliveryRequest", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
276276
deliveryRequestMock.RunFn = func(a mock.Arguments) {
277-
eventDeliveries <- a.Get(2).(*core.EventDelivery)
277+
eventDeliveries <- a.Get(3).(*core.EventDelivery)
278278
}
279279

280280
// Setup the IDs
@@ -603,9 +603,9 @@ func TestEnrichTransactionEvents(t *testing.T) {
603603
mei := ed.transport.(*eventsmocks.Plugin)
604604

605605
eventDeliveries := make(chan *core.EventDelivery)
606-
deliveryRequestMock := mei.On("DeliveryRequest", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
606+
deliveryRequestMock := mei.On("DeliveryRequest", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
607607
deliveryRequestMock.RunFn = func(a mock.Arguments) {
608-
eventDeliveries <- a.Get(2).(*core.EventDelivery)
608+
eventDeliveries <- a.Get(3).(*core.EventDelivery)
609609
}
610610

611611
// Setup the IDs
@@ -692,9 +692,9 @@ func TestEnrichBlockchainEventEvents(t *testing.T) {
692692
mei := ed.transport.(*eventsmocks.Plugin)
693693

694694
eventDeliveries := make(chan *core.EventDelivery)
695-
deliveryRequestMock := mei.On("DeliveryRequest", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
695+
deliveryRequestMock := mei.On("DeliveryRequest", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
696696
deliveryRequestMock.RunFn = func(a mock.Arguments) {
697-
eventDeliveries <- a.Get(2).(*core.EventDelivery)
697+
eventDeliveries <- a.Get(3).(*core.EventDelivery)
698698
}
699699

700700
// Setup the IDs
@@ -814,7 +814,7 @@ func TestBufferedDeliveryClosedContext(t *testing.T) {
814814
mdi := ed.database.(*databasemocks.Plugin)
815815
mei := ed.transport.(*eventsmocks.Plugin)
816816
mdi.On("GetDataRefs", mock.Anything, mock.Anything).Return(nil, nil, nil)
817-
mei.On("DeliveryRequest", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
817+
mei.On("DeliveryRequest", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
818818

819819
repoll, err := ed.bufferedDelivery([]core.LocallySequenced{&core.Event{ID: fftypes.NewUUID()}})
820820
assert.False(t, repoll)
@@ -837,7 +837,7 @@ func TestBufferedDeliveryNackRewind(t *testing.T) {
837837
mdi.On("UpdateOffset", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
838838

839839
delivered := make(chan struct{})
840-
deliver := mei.On("DeliveryRequest", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
840+
deliver := mei.On("DeliveryRequest", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
841841
deliver.RunFn = func(a mock.Arguments) {
842842
close(delivered)
843843
}
@@ -882,7 +882,7 @@ func TestBufferedDeliveryFailNack(t *testing.T) {
882882
mdi.On("UpdateOffset", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(fmt.Errorf("pop"))
883883

884884
failNacked := make(chan bool)
885-
deliver := mei.On("DeliveryRequest", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(fmt.Errorf("pop"))
885+
deliver := mei.On("DeliveryRequest", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(fmt.Errorf("pop"))
886886
deliver.RunFn = func(a mock.Arguments) {
887887
failNacked <- true
888888
}

internal/events/event_manager_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ func newTestEventManagerCommon(t *testing.T, metrics, dbconcurrency bool) *testE
146146
mbi.On("VerifierType").Return(core.VerifierTypeEthAddress).Maybe()
147147
mdi.On("Capabilities").Return(&database.Capabilities{Concurrency: dbconcurrency}).Maybe()
148148
mev.On("SetHandler", "ns1", mock.Anything).Return(nil).Maybe()
149-
mev.On("ValidateOptions", mock.Anything).Return(nil).Maybe()
149+
mev.On("ValidateOptions", mock.Anything, mock.Anything).Return(nil).Maybe()
150150
ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"}
151151
emi, err := NewEventManager(ctx, ns, mdi, mbi, mim, msh, mdm, mds, mbm, mpm, mam, msd, mmi, mom, txHelper, events, mmp, cmi)
152152
em := emi.(*eventManager)

internal/events/subscription_manager.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,11 @@ func (sm *subscriptionManager) parseSubscriptionDef(ctx context.Context, subDef
258258
return nil, i18n.NewError(ctx, coremsgs.MsgUnknownEventTransportPlugin, subDef.Transport)
259259
}
260260

261-
if err := transport.ValidateOptions(&subDef.Options); err != nil {
261+
if subDef.Options.TLSConfigName != "" && sm.namespace.TLSConfigs[subDef.Options.TLSConfigName] != nil {
262+
subDef.Options.TLSConfig = sm.namespace.TLSConfigs[subDef.Options.TLSConfigName]
263+
}
264+
265+
if err := transport.ValidateOptions(ctx, &subDef.Options); err != nil {
262266
return nil, err
263267
}
264268

@@ -337,10 +341,6 @@ func (sm *subscriptionManager) parseSubscriptionDef(ctx context.Context, subDef
337341
}
338342
}
339343

340-
if subDef.Options.TLSConfigName != "" && sm.namespace.TLSConfigs[subDef.Options.TLSConfigName] != nil {
341-
subDef.Options.TLSConfig = sm.namespace.TLSConfigs[subDef.Options.TLSConfigName]
342-
}
343-
344344
sub = &subscription{
345345
dispatcherElection: make(chan bool, 1),
346346
definition: subDef,

0 commit comments

Comments
 (0)