1- import { CloudEvent , CloudEventV03 , CloudEventV1 , CONSTANTS , Version } from "../.." ;
2- import { Message , Sender , Headers } from ".." ;
1+ import { CloudEvent , CloudEventV03 , CloudEventV1 , CONSTANTS , Mode , Version } from "../.." ;
2+ import { Message , Headers } from ".." ;
33
4- import { headersFor , v1binaryParsers , validate } from "./headers" ;
4+ import { headersFor , sanitize , v03structuredParsers , v1binaryParsers , v1structuredParsers , validate } from "./headers" ;
55import { asData , isBase64 , isString , isStringOrObjectOrThrow , ValidationError } from "../../event/validation" ;
66import { validateCloudEvent } from "../../event/spec" ;
7- import { MappedParser , parserByContentType } from "../../parsers" ;
8-
9- // Sender is a function that takes headers and body for transmission
10- // over HTTP. Users supply this function as a parameter to HTTP.emit()
11- // Sends a message by invoking sender(). Implements Invoker
12- export function invoke ( sender : Sender , message : Message ) : Promise < boolean > {
13- return sender ( message . headers , message . body ) ;
14- }
7+ import { Base64Parser , JSONParser , MappedParser , Parser , parserByContentType } from "../../parsers" ;
158
169// implements Serializer
1710export function binary ( event : CloudEvent ) : Message {
@@ -33,6 +26,69 @@ export function structured(event: CloudEvent): Message {
3326 } ;
3427}
3528
29+ /**
30+ * Converts a Message to a CloudEvent
31+ *
32+ * @param {Message } message the incoming message
33+ * @return {CloudEvent } A new {CloudEvent} instance
34+ */
35+ export function deserialize ( message : Message ) : CloudEvent {
36+ const cleanHeaders : Headers = sanitize ( message . headers ) ;
37+ const mode : Mode = getMode ( cleanHeaders ) ;
38+ let version = getVersion ( mode , cleanHeaders , message . body ) ;
39+ if ( version !== Version . V03 && version !== Version . V1 ) {
40+ console . error ( `Unknown spec version ${ version } . Default to ${ Version . V1 } ` ) ;
41+ version = Version . V1 ;
42+ }
43+ switch ( mode ) {
44+ case Mode . BINARY :
45+ return parseBinary ( message , version ) ;
46+ case Mode . STRUCTURED :
47+ return parseStructured ( message , version ) ;
48+ default :
49+ throw new ValidationError ( "Unknown Message mode" ) ;
50+ }
51+ }
52+
53+ /**
54+ * Determines the HTTP transport mode (binary or structured) based
55+ * on the incoming HTTP headers.
56+ * @param {Headers } headers the incoming HTTP headers
57+ * @returns {Mode } the transport mode
58+ */
59+ function getMode ( headers : Headers ) : Mode {
60+ const contentType = headers [ CONSTANTS . HEADER_CONTENT_TYPE ] ;
61+ if ( contentType && contentType . startsWith ( CONSTANTS . MIME_CE ) ) {
62+ return Mode . STRUCTURED ;
63+ }
64+ if ( headers [ CONSTANTS . CE_HEADERS . ID ] ) {
65+ return Mode . BINARY ;
66+ }
67+ throw new ValidationError ( "no cloud event detected" ) ;
68+ }
69+
70+ /**
71+ * Determines the version of an incoming CloudEvent based on the
72+ * HTTP headers or HTTP body, depending on transport mode.
73+ * @param {Mode } mode the HTTP transport mode
74+ * @param {Headers } headers the incoming HTTP headers
75+ * @param {Record<string, unknown> } body the HTTP request body
76+ * @returns {Version } the CloudEvent specification version
77+ */
78+ function getVersion ( mode : Mode , headers : Headers , body : string | Record < string , string > ) {
79+ if ( mode === Mode . BINARY ) {
80+ // Check the headers for the version
81+ const versionHeader = headers [ CONSTANTS . CE_HEADERS . SPEC_VERSION ] ;
82+ if ( versionHeader ) {
83+ return versionHeader ;
84+ }
85+ } else {
86+ // structured mode - the version is in the body
87+ return typeof body === "string" ? JSON . parse ( body ) . specversion : ( body as CloudEvent ) . specversion ;
88+ }
89+ return Version . V1 ;
90+ }
91+
3692/**
3793 * Parses an incoming HTTP Message, converting it to a {CloudEvent}
3894 * instance if it conforms to the Cloud Event specification for this receiver.
@@ -42,7 +98,7 @@ export function structured(event: CloudEvent): Message {
4298 * @returns {CloudEvent } an instance of CloudEvent representing the incoming request
4399 * @throws {ValidationError } of the event does not conform to the spec
44100 */
45- export function receive ( message : Message , version : Version = Version . V1 ) : CloudEvent {
101+ function parseBinary ( message : Message , version : Version ) : CloudEvent {
46102 const headers = message . headers ;
47103 let body = message . body ;
48104
@@ -102,3 +158,72 @@ export function receive(message: Message, version: Version = Version.V1): CloudE
102158 validateCloudEvent ( cloudevent ) ;
103159 return cloudevent ;
104160}
161+
162+ /**
163+ * Creates a new CloudEvent instance based on the provided payload and headers.
164+ *
165+ * @param {Message } message the incoming Message
166+ * @param {Version } version the spec version of this message (v1 or v03)
167+ * @returns {CloudEvent } a new CloudEvent instance for the provided headers and payload
168+ * @throws {ValidationError } if the payload and header combination do not conform to the spec
169+ */
170+ function parseStructured ( message : Message , version : Version ) : CloudEvent {
171+ let payload = message . body ;
172+ const headers = message . headers ;
173+
174+ if ( ! payload ) throw new ValidationError ( "payload is null or undefined" ) ;
175+ if ( ! headers ) throw new ValidationError ( "headers is null or undefined" ) ;
176+ isStringOrObjectOrThrow ( payload , new ValidationError ( "payload must be an object or a string" ) ) ;
177+
178+ if (
179+ headers [ CONSTANTS . CE_HEADERS . SPEC_VERSION ] &&
180+ headers [ CONSTANTS . CE_HEADERS . SPEC_VERSION ] != Version . V03 &&
181+ headers [ CONSTANTS . CE_HEADERS . SPEC_VERSION ] != Version . V1
182+ ) {
183+ throw new ValidationError ( `invalid spec version ${ headers [ CONSTANTS . CE_HEADERS . SPEC_VERSION ] } ` ) ;
184+ }
185+
186+ payload = isString ( payload ) && isBase64 ( payload ) ? Buffer . from ( payload as string , "base64" ) . toString ( ) : payload ;
187+
188+ // Clone and low case all headers names
189+ const sanitizedHeaders = sanitize ( headers ) ;
190+
191+ const contentType = sanitizedHeaders [ CONSTANTS . HEADER_CONTENT_TYPE ] ;
192+ const parser : Parser = contentType ? parserByContentType [ contentType ] : new JSONParser ( ) ;
193+ if ( ! parser ) throw new ValidationError ( `invalid content type ${ sanitizedHeaders [ CONSTANTS . HEADER_CONTENT_TYPE ] } ` ) ;
194+ const incoming = { ...( parser . parse ( payload ) as Record < string , unknown > ) } ;
195+
196+ const eventObj : { [ key : string ] : unknown } = { } ;
197+ const parserMap : Record < string , MappedParser > = version === Version . V1 ? v1structuredParsers : v03structuredParsers ;
198+
199+ for ( const key in parserMap ) {
200+ const property = incoming [ key ] ;
201+ if ( property ) {
202+ const parser : MappedParser = parserMap [ key ] ;
203+ eventObj [ parser . name ] = parser . parser . parse ( property as string ) ;
204+ }
205+ delete incoming [ key ] ;
206+ }
207+
208+ // extensions are what we have left after processing all other properties
209+ for ( const key in incoming ) {
210+ eventObj [ key ] = incoming [ key ] ;
211+ }
212+
213+ // ensure data content is correctly encoded
214+ if ( eventObj . data && eventObj . datacontentencoding ) {
215+ if ( eventObj . datacontentencoding === CONSTANTS . ENCODING_BASE64 && ! isBase64 ( eventObj . data ) ) {
216+ throw new ValidationError ( "invalid payload" ) ;
217+ } else if ( eventObj . datacontentencoding === CONSTANTS . ENCODING_BASE64 ) {
218+ const dataParser = new Base64Parser ( ) ;
219+ eventObj . data = JSON . parse ( dataParser . parse ( eventObj . data as string ) ) ;
220+ delete eventObj . datacontentencoding ;
221+ }
222+ }
223+
224+ const cloudevent = new CloudEvent ( eventObj as CloudEventV1 | CloudEventV03 ) ;
225+
226+ // Validates the event
227+ validateCloudEvent ( cloudevent ) ;
228+ return cloudevent ;
229+ }
0 commit comments