2323 {
2424 % header :: maybe(#'v1_0.header'{}),
2525 % delivery_annotations :: maybe(#'v1_0.delivery_annotations'{}),
26- message_annotations :: maybe (# 'v1_0.message_annotations' {}),
27- properties :: maybe (# 'v1_0.properties' {}),
28- application_properties :: maybe (# 'v1_0.application_properties' {}),
29- data :: maybe (amqp10_data ())
26+ message_annotations :: maybe (# 'v1_0.message_annotations' {}) | iodata () ,
27+ properties :: maybe (# 'v1_0.properties' {}) | iodata () ,
28+ application_properties :: maybe (# 'v1_0.application_properties' {}) | iodata () ,
29+ data :: maybe (amqp10_data ()) | iodata ()
3030 % footer :: maybe(#'v1_0.footer'{})
3131 }).
3232
4141 state / 0
4242 ]).
4343
44+ - define (AMQP10_TYPE , <<" amqp-1.0" >>).
45+ - define (AMQP10_PROPERTIES_HEADER , <<" x-amqp-1.0-properties" >>).
46+ - define (AMQP10_APP_PROPERTIES_HEADER , <<" x-amqp-1.0-app-properties" >>).
47+ - define (AMQP10_MESSAGE_ANNOTATIONS_HEADER , <<" x-amqp-1.0-message-annotations" >>).
48+
4449% % this module acts as a wrapper / converter for the internal binary storage format
4550% % (AMQP 1.0) and any format it needs to be converted to / from.
4651% % Efficiency is key. No unnecessary allocations or work should be done until it
5055- spec init (binary ()) -> state ().
5156init (Bin ) when is_binary (Bin ) ->
5257 % % TODO: delay parsing until needed
53- {MA , P , AP , D } = decode (amqp10_framing :decode_bin (Bin ),
54- {undefined , undefined , undefined , undefined }),
58+ {MA , P , AP , D0 } = decode (amqp10_framing :decode_bin (Bin ),
59+ {undefined , undefined , undefined , undefined }),
60+
61+ D1 = case D0 of
62+ Sections when is_list (D0 ) ->
63+ lists :reverse (Sections );
64+ _ ->
65+ D0
66+ end ,
67+
5568 #? MODULE {cfg = # cfg {},
5669 msg = # msg {properties = P ,
5770 application_properties = AP ,
5871 message_annotations = MA ,
59- data = D }}.
72+ data = D1 }}.
6073
6174decode ([], Acc ) ->
6275 Acc ;
@@ -66,13 +79,23 @@ decode([#'v1_0.properties'{} = P | Rem], {MA, _, AP, D}) ->
6679 decode (Rem , {MA , P , AP , D });
6780decode ([# 'v1_0.application_properties' {} = AP | Rem ], {MA , P , _ , D }) ->
6881 decode (Rem , {MA , P , AP , D });
69- decode ([# 'v1_0.data' {} = D | Rem ], {MA , P , AP , _ }) ->
70- decode (Rem , {MA , P , AP , D }).
82+ decode ([# 'v1_0.amqp_value' {} = D | Rem ], {MA , P , AP , _ }) ->
83+ decode (Rem , {MA , P , AP , D });
84+ decode ([# 'v1_0.data' {} = D | Rem ], {MA , P , AP , undefined }) ->
85+ decode (Rem , {MA , P , AP , D });
86+ decode ([# 'v1_0.data' {} = D | Rem ], {MA , P , AP , B }) when is_list (B ) ->
87+ decode (Rem , {MA , P , AP , [D | B ]});
88+ decode ([# 'v1_0.data' {} = D | Rem ], {MA , P , AP , B }) ->
89+ decode (Rem , {MA , P , AP , [D , B ]});
90+ decode ([# 'v1_0.amqp_sequence' {} = D | Rem ], {MA , P , AP , undefined }) ->
91+ decode (Rem , {MA , P , AP , [D ]});
92+ decode ([# 'v1_0.amqp_sequence' {} = D | Rem ], {MA , P , AP , B }) when is_list (B ) ->
93+ decode (Rem , {MA , P , AP , [D | B ]}).
94+
7195
7296amqp10_properties_empty (# 'v1_0.properties' {message_id = undefined ,
7397 user_id = undefined ,
7498 to = undefined ,
75- % subject = wrap(utf8, RKey),
7699 reply_to = undefined ,
77100 correlation_id = undefined ,
78101 content_type = undefined ,
@@ -92,42 +115,67 @@ to_iodata(#?MODULE{msg = #msg{properties = P,
92115 case MA of
93116 # 'v1_0.message_annotations' {content = []} ->
94117 <<>>;
95- _ ->
96- amqp10_framing :encode_bin (MA )
118+ # 'v1_0.message_annotations' {} ->
119+ amqp10_framing :encode_bin (MA );
120+ MsgAnnotBin ->
121+ MsgAnnotBin
97122 end ,
98- case amqp10_properties_empty (P ) of
99- true -> <<>>;
100- false ->
101- amqp10_framing :encode_bin (P )
123+ case P of
124+ # 'v1_0.properties' {} ->
125+ case amqp10_properties_empty (P ) of
126+ true -> <<>>;
127+ false ->
128+ amqp10_framing :encode_bin (P )
129+ end ;
130+ PropsBin ->
131+ PropsBin
102132 end ,
103133 case AP of
104134 # 'v1_0.application_properties' {content = []} ->
105135 <<>>;
106- _ ->
107- amqp10_framing :encode_bin (AP )
136+ # 'v1_0.application_properties' {} ->
137+ amqp10_framing :encode_bin (AP );
138+ AppPropsBin ->
139+ AppPropsBin
108140 end ,
109- amqp10_framing :encode_bin (Data )
141+ case Data of
142+ DataBin when is_binary (Data ) orelse is_list (Data ) ->
143+ DataBin ;
144+ _ ->
145+ amqp10_framing :encode_bin (Data )
146+ end
110147 ].
111148
112149% % TODO: refine type spec here
113150- spec add_message_annotations (#{binary () => {atom (), term ()}}, state ()) ->
114151 state ().
115152add_message_annotations (Anns ,
116153 #? MODULE {msg =
117- # msg {message_annotations = MA0 } = Msg } = State ) ->
154+ # msg {message_annotations = undefined } = Msg } = State ) ->
155+ add_message_annotations (Anns ,
156+ State #? MODULE {msg = Msg # msg {message_annotations =
157+ # 'v1_0.message_annotations' {content = []}}});
158+ add_message_annotations (Anns ,
159+ #? MODULE {msg =
160+ # msg {message_annotations =
161+ # 'v1_0.message_annotations' {content = C }} = Msg } = State ) ->
118162 Content = maps :fold (
119163 fun (K , {T , V }, Acc ) ->
120164 map_add (symbol , K , T , V , Acc )
121165 end ,
122- case MA0 of
123- undefined -> [];
124- # 'v1_0.message_annotations' {content = C } -> C
125- end ,
166+ C ,
126167 Anns ),
127168
128169 State #? MODULE {msg =
129170 Msg # msg {message_annotations =
130- # 'v1_0.message_annotations' {content = Content }}}.
171+ # 'v1_0.message_annotations' {content = Content }}};
172+ add_message_annotations (Anns ,
173+ #? MODULE {msg =
174+ # msg {message_annotations = MABin } = Msg } = State0 ) ->
175+ [MA ] = amqp10_framing :decode_bin (iolist_to_binary (MABin )),
176+ State1 = State0 #? MODULE {msg =
177+ Msg # msg {message_annotations = MA }},
178+ add_message_annotations (Anns , State1 ).
131179
132180% % TODO: refine
133181- type amqp10_term () :: {atom (), term ()}.
@@ -159,58 +207,111 @@ message_annotation(Key,
159207% % parses it and returns the current parse state
160208% % this is the input function from storage and from, e.g. socket input
161209- spec from_amqp091 (# 'P_basic' {}, iodata ()) -> state ().
162- from_amqp091 (# 'P_basic' {message_id = MsgId ,
163- expiration = Expiration ,
164- delivery_mode = DelMode ,
165- headers = Headers ,
166- user_id = UserId ,
167- reply_to = ReplyTo ,
168- type = Type ,
169- priority = Priority ,
170- app_id = AppId ,
171- correlation_id = CorrId ,
172- content_type = ContentType ,
173- content_encoding = ContentEncoding ,
174- timestamp = Timestamp
175- }, Data ) ->
176- % % TODO: support parsing properties bin directly?
210+ from_amqp091 (# 'P_basic' {type = T } = PB , Data ) ->
211+ MA = from_amqp091_to_amqp10_message_annotations (PB ),
212+ P = from_amqp091_to_amqp10_properties (PB ),
213+ AP = from_amqp091_to_amqp10_app_properties (PB ),
214+
215+ D = case T of
216+ ? AMQP10_TYPE ->
217+ % % the body is already AMQP 1.0 binary content, so leaving it as-is
218+ Data ;
219+ _ ->
220+ # 'v1_0.data' {content = Data }
221+ end ,
222+
223+ #? MODULE {cfg = # cfg {},
224+ msg = # msg {properties = P ,
225+ application_properties = AP ,
226+ message_annotations = MA ,
227+ data = D }}.
228+
229+ from_amqp091_to_amqp10_properties (# 'P_basic' {headers = Headers } = P ) when is_list (Headers ) ->
230+ case proplists :lookup (? AMQP10_PROPERTIES_HEADER , Headers ) of
231+ none ->
232+ convert_amqp091_to_amqp10_properties (P );
233+ {_ , _ , PropsBin } ->
234+ PropsBin
235+ end ;
236+ from_amqp091_to_amqp10_properties (P ) ->
237+ convert_amqp091_to_amqp10_properties (P ).
238+
239+ convert_amqp091_to_amqp10_properties (# 'P_basic' {message_id = MsgId ,
240+ user_id = UserId ,
241+ reply_to = ReplyTo ,
242+ correlation_id = CorrId ,
243+ content_type = ContentType ,
244+ content_encoding = ContentEncoding ,
245+ timestamp = Timestamp
246+ }) ->
177247 ConvertedTs = case Timestamp of
178248 undefined ->
179249 undefined ;
180250 _ ->
181251 Timestamp * 1000
182252 end ,
183- P = # 'v1_0.properties' {message_id = wrap (utf8 , MsgId ),
184- user_id = wrap (binary , UserId ),
185- to = undefined ,
186- % subject = wrap(utf8, RKey),
187- reply_to = wrap (utf8 , ReplyTo ),
188- correlation_id = wrap (utf8 , CorrId ),
189- content_type = wrap (symbol , ContentType ),
190- content_encoding = wrap (symbol , ContentEncoding ),
191- creation_time = wrap (timestamp , ConvertedTs )},
253+ # 'v1_0.properties' {message_id = wrap (utf8 , MsgId ),
254+ user_id = wrap (binary , UserId ),
255+ to = undefined ,
256+ reply_to = wrap (utf8 , ReplyTo ),
257+ correlation_id = wrap (utf8 , CorrId ),
258+ content_type = wrap (symbol , ContentType ),
259+ content_encoding = wrap (symbol , ContentEncoding ),
260+ creation_time = wrap (timestamp , ConvertedTs )}.
261+
262+ from_amqp091_to_amqp10_app_properties (# 'P_basic' {headers = Headers } = P )
263+ when is_list (Headers ) ->
264+ case proplists :lookup (? AMQP10_APP_PROPERTIES_HEADER , Headers ) of
265+ none ->
266+ convert_amqp091_to_amqp10_app_properties (P );
267+ {_ , _ , AppPropsBin } ->
268+ AppPropsBin
269+ end ;
270+ from_amqp091_to_amqp10_app_properties (P ) ->
271+ convert_amqp091_to_amqp10_app_properties (P ).
192272
273+ convert_amqp091_to_amqp10_app_properties (# 'P_basic' {headers = Headers ,
274+ type = Type ,
275+ app_id = AppId }) ->
193276 APC0 = [{wrap (utf8 , K ), from_091 (T , V )} || {K , T , V }
194277 <- case Headers of
195278 undefined -> [];
196279 _ -> Headers
197- end , not unsupported_header_value_type (T )],
280+ end , not unsupported_header_value_type (T ),
281+ not filtered_header (K )],
282+
283+ APC1 = case Type of
284+ ? AMQP10_TYPE ->
285+ % % no need to modify the application properties for the type
286+ % % this info will be restored on decoding if necessary
287+ APC0 ;
288+ _ ->
289+ map_add (utf8 , <<" x-basic-type" >>, utf8 , Type , APC0 )
290+ end ,
291+
198292 % % properties that do not map directly to AMQP 1.0 properties are stored
199293 % % in application properties
200- APC = map_add (utf8 , <<" x-basic-type" >>, utf8 , Type ,
201- map_add (utf8 , <<" x-basic-app-id" >>, utf8 , AppId , APC0 )),
294+ APC2 = map_add (utf8 , <<" x-basic-app-id" >>, utf8 , AppId , APC1 ),
295+ # 'v1_0.application_properties' {content = APC2 }.
296+
297+ from_amqp091_to_amqp10_message_annotations (# 'P_basic' {headers = Headers } = P ) when is_list (Headers ) ->
298+ case proplists :lookup (? AMQP10_MESSAGE_ANNOTATIONS_HEADER , Headers ) of
299+ none ->
300+ convert_amqp091_to_amqp10_message_annotations (P );
301+ {_ , _ , MessageAnnotationsBin } ->
302+ MessageAnnotationsBin
303+ end ;
304+ from_amqp091_to_amqp10_message_annotations (P ) ->
305+ convert_amqp091_to_amqp10_message_annotations (P ).
202306
307+ convert_amqp091_to_amqp10_message_annotations (# 'P_basic' {priority = Priority ,
308+ delivery_mode = DelMode ,
309+ expiration = Expiration }) ->
203310 MAC = map_add (symbol , <<" x-basic-priority" >>, ubyte , Priority ,
204311 map_add (symbol , <<" x-basic-delivery-mode" >>, ubyte , DelMode ,
205312 map_add (symbol , <<" x-basic-expiration" >>, utf8 , Expiration , []))),
206313
207- AP = # 'v1_0.application_properties' {content = APC },
208- MA = # 'v1_0.message_annotations' {content = MAC },
209- #? MODULE {cfg = # cfg {},
210- msg = # msg {properties = P ,
211- application_properties = AP ,
212- message_annotations = MA ,
213- data = # 'v1_0.data' {content = Data }}}.
314+ # 'v1_0.message_annotations' {content = MAC }.
214315
215316map_add (_T , _Key , _Type , undefined , Acc ) ->
216317 Acc ;
@@ -223,12 +324,22 @@ to_amqp091(#?MODULE{msg = #msg{properties = P,
223324 message_annotations = MAR ,
224325 data = Data }}) ->
225326
226- Payload = case Data of
227- undefined ->
228- <<>>;
229- # 'v1_0.data' {content = C } ->
230- C
231- end ,
327+ % % anything else than a single data section is expected to be AMQP 1.0 binary content
328+ % % enforcing this convention
329+ {Payload , IsAmqp10 } = case Data of
330+ undefined ->
331+ % % not an expected value,
332+ % % but handling it with an empty binary anyway
333+ {<<>>, false };
334+ # 'v1_0.data' {content = C } ->
335+ {C , false };
336+ Sections when is_list (Data )->
337+ B = [amqp10_framing :encode_bin (S ) || S <- Sections ],
338+ {iolist_to_binary (B ),
339+ true };
340+ V ->
341+ {iolist_to_binary (amqp10_framing :encode_bin (V )), true }
342+ end ,
232343
233344 # 'v1_0.properties' {message_id = MsgId ,
234345 user_id = UserId ,
@@ -252,7 +363,12 @@ to_amqp091(#?MODULE{msg = #msg{properties = P,
252363 _ -> []
253364 end ,
254365
255- {Type , AP1 } = amqp10_map_get (utf8 (<<" x-basic-type" >>), AP0 ),
366+ {Type , AP1 } = case {amqp10_map_get (utf8 (<<" x-basic-type" >>), AP0 ), IsAmqp10 } of
367+ {{undefined , M }, true } ->
368+ {? AMQP10_TYPE , M };
369+ {{T , M }, _ } ->
370+ {T , M }
371+ end ,
256372 {AppId , AP } = amqp10_map_get (utf8 (<<" x-basic-app-id" >>), AP1 ),
257373
258374 {Priority , MA1 } = amqp10_map_get (symbol (<<" x-basic-priority" >>), MA0 ),
@@ -390,12 +506,21 @@ message_id({utf8, S}, HKey, H0) ->
390506message_id (MsgId , _ , H ) ->
391507 {H , unwrap (MsgId )}.
392508
393- unsupported_header_value_type (array ) ->
394- true ;
395- unsupported_header_value_type (table ) ->
396- true ;
397- unsupported_header_value_type (_ ) ->
398- false .
509+ unsupported_header_value_type (array ) ->
510+ true ;
511+ unsupported_header_value_type (table ) ->
512+ true ;
513+ unsupported_header_value_type (_ ) ->
514+ false .
515+
516+ filtered_header (? AMQP10_PROPERTIES_HEADER ) ->
517+ true ;
518+ filtered_header (? AMQP10_APP_PROPERTIES_HEADER ) ->
519+ true ;
520+ filtered_header (? AMQP10_MESSAGE_ANNOTATIONS_HEADER ) ->
521+ true ;
522+ filtered_header (_ ) ->
523+ false .
399524
400525- ifdef (TEST ).
401526- include_lib (" eunit/include/eunit.hrl" ).
0 commit comments