@@ -73,11 +73,11 @@ void main() {
7373 buckets: [
7474 bucketDescription ('a' ,
7575 subscriptions: [
76- {'def ' : 'my_default_stream' }
76+ {'default ' : 0 }
7777 ],
7878 priority: 1 ),
7979 ],
80- streams: [('my_default_stream' , true )],
80+ streams: [stream ('my_default_stream' , true )],
8181 ),
8282 ),
8383 );
@@ -122,7 +122,7 @@ void main() {
122122 syncTest ('are deleted' , (_) {
123123 control ('start' , null );
124124
125- for (final stream in ['s1' , 's2' ]) {
125+ for (final name in ['s1' , 's2' ]) {
126126 control (
127127 'line_text' ,
128128 json.encode (
@@ -131,11 +131,11 @@ void main() {
131131 buckets: [
132132 bucketDescription ('a' ,
133133 subscriptions: [
134- {'def ' : stream }
134+ {'default ' : 0 }
135135 ],
136136 priority: 1 ),
137137 ],
138- streams: [( stream, true )],
138+ streams: [stream (name , true )],
139139 ),
140140 ),
141141 );
@@ -164,11 +164,11 @@ void main() {
164164 buckets: [
165165 bucketDescription ('a' ,
166166 subscriptions: [
167- {'def ' : 'a' }
167+ {'default ' : 0 }
168168 ],
169169 priority: 1 ),
170170 ],
171- streams: [('a' , true )],
171+ streams: [stream ('a' , true )],
172172 ),
173173 ),
174174 );
@@ -210,6 +210,41 @@ void main() {
210210 expect (stored, containsPair ('is_default' , 0 ));
211211 expect (stored, containsPair ('ttl' , isNotNull));
212212 });
213+
214+ syncTest ('reports errors' , (_) {
215+ control ('start' , null );
216+ final response = control (
217+ 'line_text' ,
218+ json.encode (
219+ checkpoint (
220+ lastOpId: 1 ,
221+ buckets: [
222+ bucketDescription ('a' ,
223+ subscriptions: [
224+ {'default' : 0 }
225+ ],
226+ priority: 1 ),
227+ ],
228+ streams: [
229+ stream ('a' , true , errors: [
230+ {'message' : 'error message' , 'subscription' : 'default' }
231+ ])
232+ ],
233+ ),
234+ ),
235+ );
236+
237+ expect (
238+ response,
239+ contains (
240+ containsPair (
241+ 'LogLine' ,
242+ containsPair (
243+ 'line' , 'Default subscription a has errors: error message' ),
244+ ),
245+ ),
246+ );
247+ });
213248 });
214249
215250 group ('explicit subscriptions' , () {
@@ -300,7 +335,6 @@ void main() {
300335 'stream' : 'my_stream' ,
301336 'parameters' : null ,
302337 'override_priority' : null ,
303- 'client_id' : '1' ,
304338 }
305339 ],
306340 },
@@ -348,7 +382,7 @@ void main() {
348382 checkpoint (
349383 lastOpId: 1 ,
350384 buckets: [],
351- streams: [('a' , true )],
385+ streams: [stream ('a' , true )],
352386 ),
353387 ),
354388 );
@@ -386,5 +420,76 @@ void main() {
386420 ),
387421 );
388422 });
423+
424+ syncTest ('reports errors' , (_) {
425+ control (
426+ 'subscriptions' ,
427+ json.encode ({
428+ 'subscribe' : {'stream' : 'a' , 'params' : 'invalid' }
429+ }),
430+ );
431+ control (
432+ 'subscriptions' ,
433+ json.encode ({
434+ 'subscribe' : {'stream' : 'a' , 'params' : 'valid' }
435+ }),
436+ );
437+
438+ final start = control ('start' , null );
439+ expect (
440+ start,
441+ contains (
442+ containsPair (
443+ 'EstablishSyncStream' ,
444+ containsPair (
445+ 'request' ,
446+ containsPair (
447+ 'streams' ,
448+ {
449+ 'include_defaults' : true ,
450+ 'subscriptions' : [
451+ {
452+ 'stream' : 'a' ,
453+ 'parameters' : 'invalid' ,
454+ 'override_priority' : null
455+ },
456+ {
457+ 'stream' : 'a' ,
458+ 'parameters' : 'valid' ,
459+ 'override_priority' : null
460+ }
461+ ]
462+ },
463+ ),
464+ ),
465+ ),
466+ ),
467+ );
468+ final response = control (
469+ 'line_text' ,
470+ json.encode (
471+ checkpoint (
472+ lastOpId: 1 ,
473+ buckets: [],
474+ streams: [
475+ stream ('a' , true , errors: [
476+ {'message' : 'error message' , 'subscription' : 0 }
477+ ])
478+ ],
479+ ),
480+ ),
481+ );
482+
483+ expect (
484+ response,
485+ contains (
486+ containsPair (
487+ 'LogLine' ,
488+ containsPair ('line' ,
489+ 'Subscription to stream a (with parameters "invalid") could not be resolved: error message' ),
490+ ),
491+ ),
492+ );
493+ });
389494 });
390495}
0 commit comments