|
1 | 1 | 'use strict'; |
2 | 2 |
|
3 | | -const v02 = require('cloudevents-sdk/v02'); |
4 | | -const v03 = require('cloudevents-sdk/v03'); |
5 | | -const v1 = require('cloudevents-sdk/v1'); |
| 3 | +const V03Binary = require('cloudevents-sdk/lib/bindings/http/receiver_binary_0_3'); |
| 4 | +const V03Structured = require('cloudevents-sdk/lib/bindings/http/receiver_structured_0_3.js'); |
| 5 | +const V1Binary = require('cloudevents-sdk/lib/bindings/http/receiver_binary_1.js'); |
| 6 | +const V1Structured = require('cloudevents-sdk/lib/bindings/http/receiver_structured_1.js'); |
| 7 | + |
6 | 8 | const Spec = require('./ce-constants.js').Spec; |
7 | 9 |
|
8 | | -const v02Unmarshaller = new v02.HTTPUnmarshaller(); |
9 | | -const v03Unmarshaller = new v03.HTTPUnmarshaller(); |
10 | | -const v1BinaryReceiver = new v1.BinaryHTTPReceiver(); |
11 | | -const v1StructuredReceiver = new v1.StructuredHTTPReceiver(); |
| 10 | +const receivers = { |
| 11 | + v1: { |
| 12 | + structured: new V1Structured(), |
| 13 | + binary: new V1Binary() |
| 14 | + }, |
| 15 | + v03: { |
| 16 | + structured: new V03Structured(), |
| 17 | + binary: new V03Binary() |
| 18 | + } |
| 19 | +}; |
12 | 20 |
|
13 | 21 | function use(fastify, opts, done) { |
14 | 22 | fastify.addContentTypeParser('application/cloudevents+json', |
15 | 23 | { parseAs: 'string' }, |
16 | 24 | function(req, body, done) { |
17 | | - // unmarshallEvent() handles parsing |
18 | 25 | done(null, body); |
19 | 26 | }); |
20 | 27 |
|
21 | 28 | fastify.decorateRequest('isCloudEvent', function() { |
22 | 29 | if (Spec.type in this.req.headers) { |
23 | 30 | return true; |
24 | | - } else { |
25 | | - const contentType = this.req.headers['content-type']; |
26 | | - if (contentType && contentType.match(/application\/cloudevents/)) { |
27 | | - return true; |
28 | | - } |
29 | 31 | } |
30 | | - return false; |
| 32 | + const contentType = this.req.headers['content-type']; |
| 33 | + return contentType && contentType.match(/application\/cloudevents/); |
31 | 34 | }); |
32 | 35 |
|
33 | 36 | // Any incoming requests for cloud events will only be |
34 | 37 | // processed if it's a cloud event spec version we know about |
35 | | - fastify.addHook('preHandler', async(request, reply) => { |
| 38 | + fastify.addHook('preHandler', function(request, reply, done) { |
36 | 39 | if (request.isCloudEvent()) { |
37 | | - const version = request.headers[Spec.version]; |
38 | | - // if there is no version in the headers, it is a |
39 | | - // structured event |
40 | | - if (version && !acceptsVersion(version)) { |
| 40 | + try { |
| 41 | + request.context.cloudevent = |
| 42 | + accept(request.headers, request.body).format(); |
| 43 | + } catch (err) { |
41 | 44 | reply.code(406); |
42 | | - const error = new Error( |
43 | | - `Unsupported cloud event version detected: ${version}`); |
44 | | - error.code = 406; |
45 | | - throw error; |
46 | | - } else { |
47 | | - try { |
48 | | - await unmarshallEvent(request); |
49 | | - } catch (err) { |
50 | | - throw new Error(err.message || |
51 | | - `Failed to unmarshall cloud event: ${err}`); |
52 | | - } |
| 45 | + done(err); |
53 | 46 | } |
54 | 47 | } |
| 48 | + done(); |
55 | 49 | }); |
56 | 50 |
|
57 | 51 | done(); |
58 | 52 | } |
59 | 53 |
|
60 | | -async function unmarshallEvent(request) { |
61 | | - const version = request.headers[Spec.version]; |
62 | | - if (!version) { |
63 | | - // it's a structured event and the version is in the body |
64 | | - // currently only v1 structured events are supported |
65 | | - try { |
66 | | - const event = v1StructuredReceiver.parse(request.body, request.headers); |
67 | | - request.context.cloudevent = event.format(); |
68 | | - } catch (err) { |
69 | | - return Promise.reject(err); |
| 54 | +function accept(headers, body) { |
| 55 | + const mode = getMode(headers); |
| 56 | + const version = getVersion(mode, headers, body); |
| 57 | + switch (version) { |
| 58 | + case '1.0': |
| 59 | + return receivers.v1[mode].parse(body, headers); |
| 60 | + case '0.3': |
| 61 | + return receivers.v03[mode].parse(body, headers); |
| 62 | + default: |
| 63 | + console.error(`Unknown spec version ${version}`); |
| 64 | + throw new TypeError( |
| 65 | + `Unsupported cloud event version detected: ${version}`); |
| 66 | + } |
| 67 | +} |
| 68 | + |
| 69 | +function getMode(headers) { |
| 70 | + let mode = 'binary'; |
| 71 | + if (headers['content-type']) { |
| 72 | + if (headers['content-type'].startsWith('application/cloudevents')) { |
| 73 | + mode = 'structured'; |
70 | 74 | } |
71 | | - } else if (version === '0.2') { |
72 | | - return v02Unmarshaller.unmarshall(request.body, request.headers) |
73 | | - .then(cloudevent => (request.context.cloudevent = cloudevent.format())); |
74 | | - } else if (version === '0.3') { |
75 | | - return v03Unmarshaller.unmarshall(request.body, request.headers) |
76 | | - .then(cloudevent => (request.context.cloudevent = cloudevent.format())); |
77 | | - } else if (version === '1.0') { |
78 | | - const event = v1BinaryReceiver.parse(request.body, request.headers); |
79 | | - request.context.cloudevent = event.format(); |
80 | 75 | } |
| 76 | + return mode; |
81 | 77 | } |
82 | 78 |
|
83 | | -function acceptsVersion(version) { |
84 | | - return ['0.2', '0.3', '1.0'].find(elem => version === elem) !== undefined; |
| 79 | +function getVersion(mode, headers, body) { |
| 80 | + let version = '1.0'; // default to 1.0 |
| 81 | + |
| 82 | + if (mode === 'binary') { |
| 83 | + // Check the headers for the version |
| 84 | + if (headers['ce-specversion']) { |
| 85 | + version = headers['ce-specversion']; |
| 86 | + } |
| 87 | + } else { |
| 88 | + // structured mode - the version is in the body |
| 89 | + version = typeof body === 'string' |
| 90 | + ? JSON.parse(body).specversion : body.specversion; |
| 91 | + } |
| 92 | + return version; |
85 | 93 | } |
86 | 94 |
|
87 | 95 | module.exports = exports = use; |
0 commit comments