diff --git a/lib/ui/channel_buffers.dart b/lib/ui/channel_buffers.dart index a32a557e1f121..a9343eed84c7b 100644 --- a/lib/ui/channel_buffers.dart +++ b/lib/ui/channel_buffers.dart @@ -4,213 +4,541 @@ // @dart = 2.10 +// KEEP THIS SYNCHRONIZED WITH ../web_ui/lib/src/ui/channel_buffers.dart + part of dart.ui; +/// Signature for [ChannelBuffers.drain]'s `callback` argument. +/// +/// The first argument is the data sent by the plugin. +/// +/// The second argument is a closure that, when called, will send messages +/// back to the plugin. +// TODO(ianh): deprecate this once the framework is migrated to [ChannelCallback]. +typedef DrainChannelCallback = Future Function(ByteData? data, PlatformMessageResponseCallback callback); + +/// Signature for [ChannelBuffers.setListener]'s `callback` argument. +/// +/// The first argument is the data sent by the plugin. +/// +/// The second argument is a closure that, when called, will send messages +/// back to the plugin. +/// +/// See also: +/// +/// * [PlatformMessageResponseCallback], the type used for replies. +typedef ChannelCallback = void Function(ByteData? data, PlatformMessageResponseCallback callback); + +/// The data and logic required to store and invoke a callback. +/// +/// This tracks (and applies) the [Zone]. +class _ChannelCallbackRecord { + _ChannelCallbackRecord(this.callback) : zone = Zone.current; + final ChannelCallback callback; + final Zone zone; + + /// Call [callback] in [zone], using the given arguments. + void invoke(ByteData? dataArg, PlatformMessageResponseCallback callbackArg) { + _invoke2(callback, zone, dataArg, callbackArg); + } +} + /// A saved platform message for a channel with its callback. class _StoredMessage { - /// Default constructor, takes in a [ByteData] that represents the + /// Wraps the data and callback for a platform message into + /// a [_StoredMessage] instance. + /// + /// The first argument is a [ByteData] that represents the /// payload of the message and a [PlatformMessageResponseCallback] /// that represents the callback that will be called when the message /// is handled. - _StoredMessage(this._data, this._callback); + const _StoredMessage(this.data, this.callback); /// Representation of the message's payload. - final ByteData? _data; - ByteData? get data => _data; + final ByteData? data; - /// Callback to be called when the message is received. - final PlatformMessageResponseCallback _callback; - PlatformMessageResponseCallback get callback => _callback; + /// Callback to be used when replying to the message. + final PlatformMessageResponseCallback callback; } -/// A fixed-size circular queue. -class _RingBuffer { - /// The underlying data for the RingBuffer. ListQueues dynamically resize, - /// [_RingBuffer]s do not. - final collection.ListQueue _queue; +/// The internal storage for a platform channel. +/// +/// This consists of a fixed-size circular queue of [_StoredMessage]s, +/// and the channel's callback, if any has been registered. +class _Channel { + _Channel([ this._capacity = ChannelBuffers.kDefaultBufferSize ]) + : _queue = collection.ListQueue<_StoredMessage>(_capacity); - _RingBuffer(this._capacity) - : _queue = collection.ListQueue(_capacity); + /// The underlying data for the buffered messages. + final collection.ListQueue<_StoredMessage> _queue; - /// Returns the number of items in the [_RingBuffer]. + /// The number of messages currently in the [_Channel]. + /// + /// This is equal to or less than the [capacity]. int get length => _queue.length; - /// The number of items that can be stored in the [_RingBuffer]. - int _capacity; - int get capacity => _capacity; - - /// Returns true if there are no items in the [_RingBuffer]. - bool get isEmpty => _queue.isEmpty; + /// Whether to dump messages to the console when a message is + /// discarded due to the channel overflowing. + /// + /// Has no effect in release builds. + bool debugEnableDiscardWarnings = true; - /// A callback that get's called when items are ejected from the [_RingBuffer] - /// by way of an overflow or a resizing. - Function(T)? _dropItemCallback; - set dropItemCallback(Function(T) callback) { - _dropItemCallback = callback; + /// The number of messages that _can_ be stored in the [_Channel]. + /// + /// When additional messages are stored, earlier ones are discarded, + /// in a first-in-first-out fashion. + int get capacity => _capacity; + int _capacity; + /// Set the [capacity] of the channel to the given size. + /// + /// If the new size is smaller than the [length], the oldest + /// messages are discarded until the capacity is reached. No + /// message is shown in case of overflow, regardless of the + /// value of [debugEnableDiscardWarnings]. + set capacity(int newSize) { + _capacity = newSize; + _dropOverflowMessages(newSize); } - /// Returns true on overflow. - bool push(T val) { + /// Whether a microtask is queued to call [_drainStep]. + /// + /// This is used to queue messages received while draining, rather + /// than sending them out of order. This generally cannot happen in + /// production but is possible in test scenarios. + /// + /// This is also necessary to avoid situations where multiple drains are + /// invoked simultaneously. For example, if a listener is set + /// (queuing a drain), then unset, then set again (which would queue + /// a drain again), all in one stack frame (not allowing the drain + /// itself an opportunity to check if a listener is set). + bool _draining = false; + + /// Adds a message to the channel. + /// + /// If the channel overflows, earlier messages are discarded, in a + /// first-in-first-out fashion. See [capacity]. If + /// [debugEnableDiscardWarnings] is true, this method returns true + /// on overflow. It is the responsibility of the caller to show the + /// warning message. + bool push(_StoredMessage message) { + if (!_draining && _channelCallbackRecord != null) { + assert(_queue.isEmpty); + _channelCallbackRecord!.invoke(message.data, message.callback); + return false; + } if (_capacity <= 0) { - return true; - } else { - final int overflowCount = _dropOverflowItems(_capacity - 1); - _queue.addLast(val); - return overflowCount > 0; + return debugEnableDiscardWarnings; } + final bool result = _dropOverflowMessages(_capacity - 1); + _queue.addLast(message); + return result; } - /// Returns null when empty. - T? pop() { - return _queue.isEmpty ? null : _queue.removeFirst(); - } + /// Returns the first message in the channel and removes it. + /// + /// Throws when empty. + _StoredMessage pop() => _queue.removeFirst(); - /// Removes items until then length reaches [lengthLimit] and returns - /// the number of items removed. - int _dropOverflowItems(int lengthLimit) { - int result = 0; + /// Removes messages until [length] reaches `lengthLimit`. + /// + /// The callback of each removed message is invoked with null + /// as its argument. + /// + /// If any messages are removed, and [debugEnableDiscardWarnings] is + /// true, then returns true. The caller is responsible for showing + /// the warning message in that case. + bool _dropOverflowMessages(int lengthLimit) { + bool result = false; while (_queue.length > lengthLimit) { - final T item = _queue.removeFirst(); - _dropItemCallback?.call(item); - result += 1; + final _StoredMessage message = _queue.removeFirst(); + message.callback(null); // send empty reply to the plugin side + result = true; } return result; } - /// Returns the number of discarded items resulting from resize. - int resize(int newSize) { - _capacity = newSize; - return _dropOverflowItems(newSize); + _ChannelCallbackRecord? _channelCallbackRecord; + + /// Sets the listener for this channel. + /// + /// When there is a listener, messages are sent immediately. + /// + /// If any messages were queued before the listener is added, + /// they are drained asynchronously after this method returns. + /// (See [_drain].) + /// + /// Only one listener may be set at a time. Setting a + /// new listener clears the previous one. + /// + /// Callbacks are invoked in their own stack frame and + /// use the zone that was current when the callback was + /// registered. + void setListener(ChannelCallback callback) { + final bool needDrain = _channelCallbackRecord == null; + _channelCallbackRecord = _ChannelCallbackRecord(callback); + if (needDrain && !_draining) + _drain(); + } + + /// Clears the listener for this channel. + /// + /// When there is no listener, messages are queued, up to [capacity], + /// and then discarded in a first-in-first-out fashion. + void clearListener() { + _channelCallbackRecord = null; } -} -/// Signature for [ChannelBuffers.drain]. -typedef DrainChannelCallback = Future Function(ByteData?, PlatformMessageResponseCallback); + /// Drains all the messages in the channel (invoking the currently + /// registered listener for each one). + /// + /// Each message is handled in its own microtask. No messages can + /// be queued by plugins while the queue is being drained, but any + /// microtasks queued by the handler itself will be processed before + /// the next message is handled. + /// + /// The draining stops if the listener is removed. + /// + /// See also: + /// + /// * [setListener], which is used to register the callback. + /// * [clearListener], which removes it. + void _drain() { + assert(!_draining); + _draining = true; + scheduleMicrotask(_drainStep); + } + + /// Drains a single message and then reinvokes itself asynchronously. + /// + /// See [_drain] for more details. + void _drainStep() { + assert(_draining); + if (_queue.isNotEmpty && _channelCallbackRecord != null) { + final _StoredMessage message = pop(); + _channelCallbackRecord!.invoke(message.data, message.callback); + scheduleMicrotask(_drainStep); + } else { + _draining = false; + } + } +} -/// Storage of channel messages until the channels are completely routed, -/// i.e. when a message handler is attached to the channel on the framework side. +/// The buffering and dispatch mechanism for messages sent by plugins +/// on the engine side to their corresponding plugin code on the +/// framework side. +/// +/// Messages for a channel are stored until a listener is provided for that channel, +/// using [setListener]. Only one listener may be configured per channel. +/// +/// Typically these buffers are drained once a callback is setup on +/// the [BinaryMessenger] in the Flutter framework. (See [setListener].) +/// +/// ## Buffer capacity and overflow +/// +/// Each channel has a finite buffer capacity and messages will +/// be deleted in a first-in-first-out (FIFO) manner if the capacity is exceeded. +/// +/// By default buffers store one message per channel, and when a +/// message overflows, in debug mode, a message is printed to the +/// console. The message looks like the following: /// -/// Each channel has a finite buffer capacity and in a FIFO manner messages will -/// be deleted if the capacity is exceeded. The intention is that these buffers -/// will be drained once a callback is setup on the BinaryMessenger in the -/// Flutter framework. +/// ``` +/// A message on the com.example channel was discarded before it could be +/// handled. +/// This happens when a plugin sends messages to the framework side before the +/// framework has had an opportunity to register a listener. See the +/// ChannelBuffers API documentation for details on how to configure the channel +/// to expect more messages, or to expect messages to get discarded: +/// https://api.flutter.dev/flutter/dart-ui/ChannelBuffers-class.html +/// ``` /// -/// Clients of Flutter shouldn't need to allocate their own ChannelBuffers -/// and should only access this package's [channelBuffers] if they are writing -/// their own custom [BinaryMessenger]. +/// There are tradeoffs associated with any size. The correct size +/// should be chosen for the semantics of the channel. To change the +/// size a plugin can send a message using the control channel, +/// as described below. +/// +/// Size 0 is appropriate for channels where channels sent before +/// the engine and framework are ready should be ignored. For +/// example, a plugin that notifies the framework any time a +/// radiation sensor detects an ionization event might set its size +/// to zero since past ionization events are typically not +/// interesting, only instantaneous readings are worth tracking. +/// +/// Size 1 is appropriate for level-triggered plugins. For example, +/// a plugin that notifies the framework of the current value of a +/// pressure sensor might leave its size at one (the default), while +/// sending messages continually; once the framework side of the plugin +/// registers with the channel, it will immediately receive the most +/// up to date value and earlier messages will have been discarded. +/// +/// Sizes greater than one are appropriate for plugins where every +/// message is important. For example, a plugin that itself +/// registers with another system that has been buffering events, +/// and immediately forwards all the previously-buffered events, +/// would likely wish to avoid having any messages dropped on the +/// floor. In such situations, it is important to select a size that +/// will avoid overflows. It is also important to consider the +/// potential for the framework side to never fully initialize (e.g. if +/// the user starts the application, but terminates it soon +/// afterwards, leaving time for the platform side of a plugin to +/// run but not the framework side). +/// +/// ## The control channel +/// +/// A plugin can configure its channel's buffers by sending messages to the +/// control channel, `dev.flutter/channel-buffers` (see [kControlChannelName]). +/// +/// There are two messages that can be sent to this control channel, to adjust +/// the buffer size and to disable the overflow warnings. See [handleMessage] +/// for details on these messages. class ChannelBuffers { - /// By default we store one message per channel. There are tradeoffs associated - /// with any size. The correct size should be chosen for the semantics of your - /// channel. - /// - /// Size 0 implies you want to ignore any message that gets sent before the engine - /// is ready (keeping in mind there is no way to know when the engine is ready). - /// - /// Size 1 implies that you only care about the most recent value. + /// Create a buffer pool for platform messages. /// - /// Size >1 means you want to process every single message and want to chose a - /// buffer size that will avoid any overflows. + /// It is generally not necessary to create an instance of this class; + /// the global [channelBuffers] instance is the one used by the engine. + ChannelBuffers(); + + /// The number of messages that channel buffers will store by default. static const int kDefaultBufferSize = 1; + /// The name of the channel that plugins can use to communicate with the + /// channel buffers system. + /// + /// These messages are handled by [handleMessage]. static const String kControlChannelName = 'dev.flutter/channel-buffers'; - /// A mapping between a channel name and its associated [_RingBuffer]. - final Map?> _messages = - ?>{}; - - _RingBuffer<_StoredMessage> _makeRingBuffer(int size) { - final _RingBuffer<_StoredMessage> result = _RingBuffer<_StoredMessage>(size); - result.dropItemCallback = _onDropItem; - return result; - } - - void _onDropItem(_StoredMessage message) { - message.callback(null); - } + /// A mapping between a channel name and its associated [_Channel]. + final Map _channels = {}; - /// Returns true on overflow. - bool push(String channel, ByteData? data, PlatformMessageResponseCallback callback) { - _RingBuffer<_StoredMessage>? queue = _messages[channel]; - if (queue == null) { - queue = _makeRingBuffer(kDefaultBufferSize); - _messages[channel] = queue; - } - final bool didOverflow = queue.push(_StoredMessage(data, callback)); - if (didOverflow) { - // TODO(gaaclarke): Update this message to include instructions on how to resize - // the buffer once that is available to users and print in all engine builds - // after we verify that dropping messages isn't part of normal execution. - _printDebug('Overflow on channel: $channel. ' - 'Messages on this channel are being discarded in FIFO fashion. ' - 'The engine may not be running or you need to adjust ' - 'the buffer size of the channel.'); + /// Adds a message (`data`) to the named channel buffer (`name`). + /// + /// The `callback` argument is a closure that, when called, will send messages + /// back to the plugin. + /// + /// If a message overflows the channel, and the channel has not been + /// configured to expect overflow, then, in debug mode, a message + /// will be printed to the console warning about the overflow. + void push(String name, ByteData? data, PlatformMessageResponseCallback callback) { + final _Channel channel = _channels.putIfAbsent(name, () => _Channel()); + if (channel.push(_StoredMessage(data, callback))) { + _printDebug( + 'A message on the $name channel was discarded before it could be handled.\n' + 'This happens when a plugin sends messages to the framework side before the ' + 'framework has had an opportunity to register a listener. See the ChannelBuffers ' + 'API documentation for details on how to configure the channel to expect more ' + 'messages, or to expect messages to get discarded:\n' + ' https://api.flutter.dev/flutter/dart-ui/ChannelBuffers-class.html' + ); } - return didOverflow; - } - - /// Returns null on underflow. - _StoredMessage? _pop(String channel) { - final _RingBuffer<_StoredMessage>? queue = _messages[channel]; - final _StoredMessage? result = queue?.pop(); - return result; } - bool _isEmpty(String channel) { - final _RingBuffer<_StoredMessage>? queue = _messages[channel]; - return queue == null || queue.isEmpty; + /// Sets the listener for the specified channel. + /// + /// When there is a listener, messages are sent immediately. + /// + /// Each channel may have up to one listener set at a time. Setting + /// a new listener on a channel with an existing listener clears the + /// previous one. + /// + /// Callbacks are invoked in their own stack frame and + /// use the zone that was current when the callback was + /// registered. + /// + /// ## Draining + /// + /// If any messages were queued before the listener is added, + /// they are drained asynchronously after this method returns. + /// + /// Each message is handled in its own microtask. No messages can + /// be queued by plugins while the queue is being drained, but any + /// microtasks queued by the handler itself will be processed before + /// the next message is handled. + /// + /// The draining stops if the listener is removed. + void setListener(String name, ChannelCallback callback) { + final _Channel channel = _channels.putIfAbsent(name, () => _Channel()); + channel.setListener(callback); } - /// Changes the capacity of the queue associated with the given channel. + /// Clears the listener for the specified channel. /// - /// This could result in the dropping of messages if newSize is less - /// than the current length of the queue. - void _resize(String channel, int newSize) { - _RingBuffer<_StoredMessage>? queue = _messages[channel]; - if (queue == null) { - queue = _makeRingBuffer(newSize); - _messages[channel] = queue; - } else { - final int numberOfDroppedMessages = queue.resize(newSize); - if (numberOfDroppedMessages > 0) { - _Logger._printString('Dropping messages on channel "$channel" as a result of shrinking the buffer size.'); - } - } + /// When there is no listener, messages on that channel are queued, + /// up to [kDefaultBufferSize] (or the size configured via the + /// control channel), and then discarded in a first-in-first-out + /// fashion. + void clearListener(String name) { + final _Channel? channel = _channels[name]; + if (channel != null) + channel.clearListener(); } /// Remove and process all stored messages for a given channel. /// /// This should be called once a channel is prepared to handle messages /// (i.e. when a message handler is setup in the framework). - Future drain(String channel, DrainChannelCallback callback) async { - while (!_isEmpty(channel)) { - final _StoredMessage message = _pop(channel)!; + /// + /// The messages are processed by calling the given `callback`. Each message + /// is processed in its own microtask. + // TODO(ianh): deprecate once framework uses [setListener]. + Future drain(String name, DrainChannelCallback callback) async { + final _Channel? channel = _channels[name]; + while (channel != null && !channel._queue.isEmpty) { + final _StoredMessage message = channel.pop(); await callback(message.data, message.callback); } } - String _getString(ByteData data) { - final ByteBuffer buffer = data.buffer; - final Uint8List list = buffer.asUint8List(data.offsetInBytes, data.lengthInBytes); - return utf8.decode(list); - } - /// Handle a control message. /// - /// This is intended to be called by the platform messages dispatcher. + /// This is intended to be called by the platform messages dispatcher, forwarding + /// messages from plugins to the [kControlChannelName] channel. + /// + /// Messages use the [StandardMethodCodec] format. There are two methods + /// supported: `resize` and `overflow`. The `resize` method changes the size + /// of the buffer, and the `overflow` method controls whether overflow is + /// expected or not. + /// + /// ## `resize` + /// + /// The `resize` method takes as its argument a list with two values, first + /// the channel name (a UTF-8 string less than 254 bytes long), and second the + /// allowed size of the channel buffer (an integer between 0 and 2147483647). /// - /// Available messages: - /// - Name: resize - /// Arity: 2 - /// Format: `resize\r\r` - /// Description: Allows you to set the size of a channel's buffer. + /// Upon receiving the message, the channel's buffer is resized. If necessary, + /// messages are silently discarded to ensure the buffer is no bigger than + /// specified. + /// + /// For historical reasons, this message can also be sent using a bespoke + /// format consisting of a UTF-8-encoded string with three parts separated + /// from each other by U+000D CARRIAGE RETURN (CR) characters, the three parts + /// being the string `resize`, the string giving the channel name, and then + /// the string giving the decimal serialization of the new channel buffer + /// size. For example: `resize\rchannel\r1` + /// + /// ## `overflow` + /// + /// The `overflow` method takes as its argument a list with two values, first + /// the channel name (a UTF-8 string less than 254 bytes long), and second a + /// boolean which is true if overflow is expected and false if it is not. + /// + /// This sets a flag on the channel in debug mode. In release mode the message + /// is silently ignored. The flag indicates whether overflow is expected on this + /// channel. When the flag is set, messages are discarded silently. When the + /// flag is cleared (the default), any overflow on the channel causes a message + /// to be printed to the console, warning that a message was lost. void handleMessage(ByteData data) { - final List command = _getString(data).split('\r'); - if (command.length == /*arity=*/2 + 1 && command[0] == 'resize') { - _resize(command[1], int.parse(command[2])); + // We hard-code the deserialization here because the StandardMethodCodec class + // is part of the framework, not dart:ui. + final Uint8List bytes = data.buffer.asUint8List(data.offsetInBytes, data.lengthInBytes); + if (bytes[0] == 0x07) { // 7 = value code for string + final int methodNameLength = bytes[1]; + if (methodNameLength >= 254) // lengths greater than 253 have more elaborate encoding + throw Exception('Unrecognized message sent to $kControlChannelName (method name too long)'); + int index = 2; // where we are in reading the bytes + final String methodName = utf8.decode(bytes.sublist(index, index + methodNameLength)); + index += methodNameLength; + switch (methodName) { + case 'resize': + if (bytes[index] != 0x0C) // 12 = value code for list + throw Exception('Invalid arguments for \'resize\' method sent to $kControlChannelName (arguments must be a two-element list, channel name and new capacity)'); + index += 1; + if (bytes[index] < 0x02) // We ignore extra arguments, in case we need to support them in the future, hence <2 rather than !=2. + throw Exception('Invalid arguments for \'resize\' method sent to $kControlChannelName (arguments must be a two-element list, channel name and new capacity)'); + index += 1; + if (bytes[index] != 0x07) // 7 = value code for string + throw Exception('Invalid arguments for \'resize\' method sent to $kControlChannelName (first argument must be a string)'); + index += 1; + final int channelNameLength = bytes[index]; + if (channelNameLength >= 254) // lengths greater than 253 have more elaborate encoding + throw Exception('Invalid arguments for \'resize\' method sent to $kControlChannelName (channel name must be less than 254 characters long)'); + index += 1; + final String channelName = utf8.decode(bytes.sublist(index, index + channelNameLength)); + index += channelNameLength; + if (bytes[index] != 0x03) // 3 = value code for uint32 + throw Exception('Invalid arguments for \'resize\' method sent to $kControlChannelName (second argument must be an integer in the range 0 to 2147483647)'); + index += 1; + resize(channelName, data.getUint32(index, Endian.host)); + break; + case 'overflow': + if (bytes[index] != 0x0C) // 12 = value code for list + throw Exception('Invalid arguments for \'overflow\' method sent to $kControlChannelName (arguments must be a two-element list, channel name and flag state)'); + index += 1; + if (bytes[index] < 0x02) // We ignore extra arguments, in case we need to support them in the future, hence <2 rather than !=2. + throw Exception('Invalid arguments for \'overflow\' method sent to $kControlChannelName (arguments must be a two-element list, channel name and flag state)'); + index += 1; + if (bytes[index] != 0x07) // 7 = value code for string + throw Exception('Invalid arguments for \'overflow\' method sent to $kControlChannelName (first argument must be a string)'); + index += 1; + final int channelNameLength = bytes[index]; + if (channelNameLength >= 254) // lengths greater than 253 have more elaborate encoding + throw Exception('Invalid arguments for \'overflow\' method sent to $kControlChannelName (channel name must be less than 254 characters long)'); + index += 1; + final String channelName = utf8.decode(bytes.sublist(index, index + channelNameLength)); + index += channelNameLength; + if (bytes[index] != 0x01 && bytes[index] != 0x02) // 1 = value code for true, 2 = value code for false + throw Exception('Invalid arguments for \'overflow\' method sent to $kControlChannelName (second argument must be a boolean)'); + allowOverflow(channelName, bytes[index] == 0x01); + break; + default: + throw Exception('Unrecognized method \'$methodName\' sent to $kControlChannelName'); + } + } else { + final List parts = utf8.decode(bytes).split('\r'); + if (parts.length == 1 + /*arity=*/2 && parts[0] == 'resize') { + resize(parts[1], int.parse(parts[2])); + } else { + // If the message couldn't be decoded as UTF-8, a FormatException will + // have been thrown by utf8.decode() above. + throw Exception('Unrecognized message $parts sent to $kControlChannelName.'); + } + } + } + + /// Changes the capacity of the queue associated with the given channel. + /// + /// This could result in the dropping of messages if newSize is less + /// than the current length of the queue. + /// + /// This is expected to be called by platform-specific plugin code (indirectly + /// via the control channel), not by code on the framework side. See + /// [handleMessage]. + /// + /// Calling this from framework code is redundant since by the time framework + /// code can be running, it can just subscribe to the relevant channel and + /// there is therefore no need for any buffering. + void resize(String name, int newSize) { + _Channel? channel = _channels[name]; + if (channel == null) { + channel = _Channel(newSize); + _channels[name] = channel; } else { - throw Exception('Unrecognized command $command sent to $kControlChannelName.'); + channel.capacity = newSize; } } + + /// Toggles whether the channel should show warning messages when discarding + /// messages due to overflow. + /// + /// This is expected to be called by platform-specific plugin code (indirectly + /// via the control channel), not by code on the framework side. See + /// [handleMessage]. + /// + /// Calling this from framework code is redundant since by the time framework + /// code can be running, it can just subscribe to the relevant channel and + /// there is therefore no need for any messages to overflow. + /// + /// This method has no effect in release builds. + void allowOverflow(String name, bool allowed) { + assert(() { + _Channel? channel = _channels[name]; + if (channel == null && allowed) { + channel = _Channel(); + _channels[name] = channel; + } + channel?.debugEnableDiscardWarnings = !allowed; + return true; + }()); + } } /// [ChannelBuffers] that allow the storage of messages between the @@ -218,5 +546,6 @@ class ChannelBuffers { /// are stored here until the Framework is able to process them. /// /// See also: -/// * [BinaryMessenger] - The place where ChannelBuffers are typically read. +/// +/// * [BinaryMessenger], where [ChannelBuffers] are typically read. final ChannelBuffers channelBuffers = ChannelBuffers(); diff --git a/lib/ui/hooks.dart b/lib/ui/hooks.dart index 3e7643b63ddcd..386aee3c5e781 100644 --- a/lib/ui/hooks.dart +++ b/lib/ui/hooks.dart @@ -159,6 +159,10 @@ void _invoke(void Function()? callback, Zone zone) { } /// Invokes [callback] inside the given [zone] passing it [arg]. +/// +/// The 1 in the name refers to the number of arguments expected by +/// the callback (and thus passed to this function, in addition to the +/// callback itself and the zone in which the callback is executed). void _invoke1(void Function(A a)? callback, Zone zone, A arg) { if (callback == null) { return; @@ -173,14 +177,33 @@ void _invoke1(void Function(A a)? callback, Zone zone, A arg) { } } +/// Invokes [callback] inside the given [zone] passing it [arg1] and [arg2]. +/// +/// The 2 in the name refers to the number of arguments expected by +/// the callback (and thus passed to this function, in addition to the +/// callback itself and the zone in which the callback is executed). +void _invoke2(void Function(A1 a1, A2 a2)? callback, Zone zone, A1 arg1, A2 arg2) { + if (callback == null) { + return; + } + + assert(zone != null); // ignore: unnecessary_null_comparison + + if (identical(zone, Zone.current)) { + callback(arg1, arg2); + } else { + zone.runGuarded(() { + callback(arg1, arg2); + }); + } +} + /// Invokes [callback] inside the given [zone] passing it [arg1], [arg2], and [arg3]. -void _invoke3( - void Function(A1 a1, A2 a2, A3 a3)? callback, - Zone zone, - A1 arg1, - A2 arg2, - A3 arg3, -) { +/// +/// The 3 in the name refers to the number of arguments expected by +/// the callback (and thus passed to this function, in addition to the +/// callback itself and the zone in which the callback is executed). +void _invoke3(void Function(A1 a1, A2 a2, A3 a3)? callback, Zone zone, A1 arg1, A2 arg2, A3 arg3) { if (callback == null) { return; } diff --git a/lib/ui/platform_dispatcher.dart b/lib/ui/platform_dispatcher.dart index b56957adc1f83..22dba09c24d8d 100644 --- a/lib/ui/platform_dispatcher.dart +++ b/lib/ui/platform_dispatcher.dart @@ -38,6 +38,7 @@ typedef SemanticsActionCallback = void Function(int id, SemanticsAction action, typedef PlatformMessageResponseCallback = void Function(ByteData? data); /// Signature for [PlatformDispatcher.onPlatformMessage]. +// TODO(ianh): deprecate once framework uses [ChannelBuffers.setListener]. typedef PlatformMessageCallback = void Function(String name, ByteData? data, PlatformMessageResponseCallback? callback); // Signature for _setNeedsReportTimings. @@ -409,6 +410,8 @@ class PlatformDispatcher { /// /// The framework invokes this callback in the same zone in which the callback /// was set. + // TODO(ianh): Deprecate onPlatformMessage once the framework is moved over + // to using channel buffers exclusively. PlatformMessageCallback? get onPlatformMessage => _onPlatformMessage; PlatformMessageCallback? _onPlatformMessage; Zone _onPlatformMessageZone = Zone.root; @@ -438,13 +441,15 @@ class PlatformDispatcher { }; } - // Called from the engine, via hooks.dart + /// Send a message to the framework using the [ChannelBuffers]. + /// + /// This method constructs the appropriate callback to respond + /// with the given `responseId`. It should only be called for messages + /// from the platform. void _dispatchPlatformMessage(String name, ByteData? data, int responseId) { if (name == ChannelBuffers.kControlChannelName) { try { channelBuffers.handleMessage(data!); - } catch (ex) { - _printDebug('Message to "$name" caused exception $ex'); } finally { _respondToPlatformMessage(responseId, null); } @@ -454,7 +459,7 @@ class PlatformDispatcher { _onPlatformMessageZone, name, data, - (ByteData? responseData) { + (ByteData? responseData) { _respondToPlatformMessage(responseId, responseData); }, ); diff --git a/lib/ui/text.dart b/lib/ui/text.dart index 175d2c4304873..d7760b6eb86c3 100644 --- a/lib/ui/text.dart +++ b/lib/ui/text.dart @@ -2273,11 +2273,18 @@ final ByteData _fontChangeMessage = utf8.encoder.convert( ).buffer.asByteData(); FutureOr _sendFontChangeMessage() async { - PlatformDispatcher.instance.onPlatformMessage?.call( - 'flutter/system', - _fontChangeMessage, - (_) {}, - ); + const String kSystemChannelName = 'flutter/system'; + if (PlatformDispatcher.instance.onPlatformMessage != null) { + _invoke3( + PlatformDispatcher.instance.onPlatformMessage, + PlatformDispatcher.instance._onPlatformMessageZone, + kSystemChannelName, + _fontChangeMessage, + (ByteData? responseData) { }, + ); + } else { + channelBuffers.push(kSystemChannelName, _fontChangeMessage, (ByteData? responseData) { }); + } } // TODO(gspencergoog): remove this template block once the framework templates diff --git a/lib/ui/window.dart b/lib/ui/window.dart index 383978a6e3cba..4e204e7bf6318 100644 --- a/lib/ui/window.dart +++ b/lib/ui/window.dart @@ -696,6 +696,7 @@ class SingletonFlutterWindow extends FlutterWindow { /// /// The framework invokes this callback in the same zone in which the /// callback was set. + // TODO(ianh): deprecate once framework uses [ChannelBuffers.setListener]. PlatformMessageCallback? get onPlatformMessage => platformDispatcher.onPlatformMessage; set onPlatformMessage(PlatformMessageCallback? callback) { platformDispatcher.onPlatformMessage = callback; diff --git a/lib/web_ui/lib/src/engine/keyboard.dart b/lib/web_ui/lib/src/engine/keyboard.dart index 23e3104e08f75..95068184cb984 100644 --- a/lib/web_ui/lib/src/engine/keyboard.dart +++ b/lib/web_ui/lib/src/engine/keyboard.dart @@ -81,10 +81,6 @@ class Keyboard { final html.KeyboardEvent keyboardEvent = event; - if (EnginePlatformDispatcher.instance._onPlatformMessage == null) { - return; - } - if (_shouldPreventDefault(event)) { event.preventDefault(); } diff --git a/lib/web_ui/lib/src/engine/navigation/history.dart b/lib/web_ui/lib/src/engine/navigation/history.dart index 5cfc6c6c0af8f..98f74fc48d3fb 100644 --- a/lib/web_ui/lib/src/engine/navigation/history.dart +++ b/lib/web_ui/lib/src/engine/navigation/history.dart @@ -151,17 +151,15 @@ class MultiEntriesBrowserHistory extends BrowserHistory { currentPath); } _lastSeenSerialCount = _currentSerialCount; - if (EnginePlatformDispatcher.instance._onPlatformMessage != null) { - EnginePlatformDispatcher.instance.invokeOnPlatformMessage( - 'flutter/navigation', - const JSONMethodCodec().encodeMethodCall( - MethodCall('pushRouteInformation', { - 'location': currentPath, - 'state': event.state?['state'], - })), - (_) {}, - ); - } + EnginePlatformDispatcher.instance.invokeOnPlatformMessage( + 'flutter/navigation', + const JSONMethodCodec().encodeMethodCall( + MethodCall('pushRouteInformation', { + 'location': currentPath, + 'state': event.state?['state'], + })), + (_) {}, + ); } @override @@ -272,13 +270,11 @@ class SingleEntryBrowserHistory extends BrowserHistory { _setupFlutterEntry(urlStrategy!); // 2. Send a 'popRoute' platform message so the app can handle it accordingly. - if (EnginePlatformDispatcher.instance._onPlatformMessage != null) { - EnginePlatformDispatcher.instance.invokeOnPlatformMessage( - 'flutter/navigation', - const JSONMethodCodec().encodeMethodCall(_popRouteMethodCall), - (_) {}, - ); - } + EnginePlatformDispatcher.instance.invokeOnPlatformMessage( + 'flutter/navigation', + const JSONMethodCodec().encodeMethodCall(_popRouteMethodCall), + (_) {}, + ); } else if (_isFlutterEntry(event.state)) { // We get into this scenario when the user changes the url manually. It // causes a new entry to be pushed on top of our "flutter" one. When this @@ -291,15 +287,13 @@ class SingleEntryBrowserHistory extends BrowserHistory { _userProvidedRouteName = null; // Send a 'pushRoute' platform message so the app handles it accordingly. - if (EnginePlatformDispatcher.instance._onPlatformMessage != null) { - EnginePlatformDispatcher.instance.invokeOnPlatformMessage( - 'flutter/navigation', - const JSONMethodCodec().encodeMethodCall( - MethodCall('pushRoute', newRouteName), - ), - (_) {}, - ); - } + EnginePlatformDispatcher.instance.invokeOnPlatformMessage( + 'flutter/navigation', + const JSONMethodCodec().encodeMethodCall( + MethodCall('pushRoute', newRouteName), + ), + (_) {}, + ); } else { // The user has pushed a new entry on top of our flutter entry. This could // happen when the user modifies the hash part of the url directly, for diff --git a/lib/web_ui/lib/src/engine/platform_dispatcher.dart b/lib/web_ui/lib/src/engine/platform_dispatcher.dart index a8ba5b85c5117..6ca71d440d41f 100644 --- a/lib/web_ui/lib/src/engine/platform_dispatcher.dart +++ b/lib/web_ui/lib/src/engine/platform_dispatcher.dart @@ -44,7 +44,7 @@ class EnginePlatformDispatcher extends ui.PlatformDispatcher { /// Engine code should use this method instead of the callback directly. /// Otherwise zones won't work properly. void invokeOnPlatformConfigurationChanged() { - _invoke(_onPlatformConfigurationChanged, _onPlatformConfigurationChangedZone); + invoke(_onPlatformConfigurationChanged, _onPlatformConfigurationChangedZone); } /// The current list of windows, @@ -88,7 +88,7 @@ class EnginePlatformDispatcher extends ui.PlatformDispatcher { /// Otherwise zones won't work properly. void invokeOnMetricsChanged() { if (_onMetricsChanged != null) { - _invoke(_onMetricsChanged, _onMetricsChangedZone); + invoke(_onMetricsChanged, _onMetricsChangedZone); } } @@ -121,7 +121,7 @@ class EnginePlatformDispatcher extends ui.PlatformDispatcher { /// Engine code should use this method instead of the callback directly. /// Otherwise zones won't work properly. void invokeOnBeginFrame(Duration duration) { - _invoke1(_onBeginFrame, _onBeginFrameZone, duration); + invoke1(_onBeginFrame, _onBeginFrameZone, duration); } /// A callback that is invoked for each frame after [onBeginFrame] has @@ -142,7 +142,7 @@ class EnginePlatformDispatcher extends ui.PlatformDispatcher { /// Engine code should use this method instead of the callback directly. /// Otherwise zones won't work properly. void invokeOnDrawFrame() { - _invoke(_onDrawFrame, _onDrawFrameZone); + invoke(_onDrawFrame, _onDrawFrameZone); } /// A callback that is invoked when pointer data is available. @@ -167,7 +167,7 @@ class EnginePlatformDispatcher extends ui.PlatformDispatcher { /// Engine code should use this method instead of the callback directly. /// Otherwise zones won't work properly. void invokeOnPointerDataPacket(ui.PointerDataPacket dataPacket) { - _invoke1(_onPointerDataPacket, _onPointerDataPacketZone, dataPacket); + invoke1(_onPointerDataPacket, _onPointerDataPacketZone, dataPacket); } /// A callback that is invoked to report the [FrameTiming] of recently @@ -204,7 +204,7 @@ class EnginePlatformDispatcher extends ui.PlatformDispatcher { /// Engine code should use this method instead of the callback directly. /// Otherwise zones won't work properly. void invokeOnReportTimings(List timings) { - _invoke1>(_onReportTimings, _onReportTimingsZone, timings); + invoke1>(_onReportTimings, _onReportTimingsZone, timings); } @override @@ -216,6 +216,8 @@ class EnginePlatformDispatcher extends ui.PlatformDispatcher { _sendPlatformMessage(name, data, _zonedPlatformMessageResponseCallback(callback)); } + // TODO(ianh): Deprecate onPlatformMessage once the framework is moved over + // to using channel buffers exclusively. @override ui.PlatformMessageCallback? get onPlatformMessage => _onPlatformMessage; ui.PlatformMessageCallback? _onPlatformMessage; @@ -228,15 +230,29 @@ class EnginePlatformDispatcher extends ui.PlatformDispatcher { /// Engine code should use this method instead of the callback directly. /// Otherwise zones won't work properly. - void invokeOnPlatformMessage(String name, ByteData? data, - ui.PlatformMessageResponseCallback callback) { - _invoke3( - _onPlatformMessage, - _onPlatformMessageZone, - name, - data, - callback, - ); + void invokeOnPlatformMessage( + String name, + ByteData? data, + ui.PlatformMessageResponseCallback callback, + ) { + if (name == ui.ChannelBuffers.kControlChannelName) { + // TODO(ianh): move this logic into ChannelBuffers once we remove onPlatformMessage + try { + ui.channelBuffers.handleMessage(data!); + } finally { + callback(null); + } + } else if (_onPlatformMessage != null) { + invoke3( + _onPlatformMessage, + _onPlatformMessageZone, + name, + data, + callback, + ); + } else { + ui.channelBuffers.push(name, data, callback); + } } /// Wraps the given [callback] in another callback that ensures that the @@ -516,7 +532,7 @@ class EnginePlatformDispatcher extends ui.PlatformDispatcher { /// Engine code should use this method instead of the callback directly. /// Otherwise zones won't work properly. void invokeOnAccessibilityFeaturesChanged() { - _invoke(_onAccessibilityFeaturesChanged, _onAccessibilityFeaturesChangedZone); + invoke(_onAccessibilityFeaturesChanged, _onAccessibilityFeaturesChangedZone); } /// Change the retained semantics data about this window. @@ -628,7 +644,7 @@ class EnginePlatformDispatcher extends ui.PlatformDispatcher { /// Engine code should use this method instead of the callback directly. /// Otherwise zones won't work properly. void invokeOnLocaleChanged() { - _invoke(_onLocaleChanged, _onLocaleChangedZone); + invoke(_onLocaleChanged, _onLocaleChangedZone); } /// The system-reported text scale. @@ -671,7 +687,7 @@ class EnginePlatformDispatcher extends ui.PlatformDispatcher { /// Engine code should use this method instead of the callback directly. /// Otherwise zones won't work properly. void invokeOnTextScaleFactorChanged() { - _invoke(_onTextScaleFactorChanged, _onTextScaleFactorChangedZone); + invoke(_onTextScaleFactorChanged, _onTextScaleFactorChangedZone); } /// The setting indicating the current brightness mode of the host platform. @@ -741,7 +757,7 @@ class EnginePlatformDispatcher extends ui.PlatformDispatcher { /// Engine code should use this method instead of the callback directly. /// Otherwise zones won't work properly. void invokeOnPlatformBrightnessChanged() { - _invoke(_onPlatformBrightnessChanged, _onPlatformBrightnessChangedZone); + invoke(_onPlatformBrightnessChanged, _onPlatformBrightnessChangedZone); } /// Whether the user has requested that [updateSemantics] be called when @@ -766,7 +782,7 @@ class EnginePlatformDispatcher extends ui.PlatformDispatcher { /// Engine code should use this method instead of the callback directly. /// Otherwise zones won't work properly. void invokeOnSemanticsEnabledChanged() { - _invoke(_onSemanticsEnabledChanged, _onSemanticsEnabledChangedZone); + invoke(_onSemanticsEnabledChanged, _onSemanticsEnabledChangedZone); } /// A callback that is invoked whenever the user requests an action to be @@ -789,7 +805,7 @@ class EnginePlatformDispatcher extends ui.PlatformDispatcher { /// Otherwise zones won't work properly. void invokeOnSemanticsAction( int id, ui.SemanticsAction action, ByteData? args) { - _invoke3( + invoke3( _onSemanticsAction, _onSemanticsActionZone, id, action, args); } @@ -865,7 +881,7 @@ bool _handleWebTestEnd2EndMessage(MethodCodec codec, ByteData? data) { } /// Invokes [callback] inside the given [zone]. -void _invoke(void callback()?, Zone? zone) { +void invoke(void callback()?, Zone? zone) { if (callback == null) { return; } @@ -880,7 +896,7 @@ void _invoke(void callback()?, Zone? zone) { } /// Invokes [callback] inside the given [zone] passing it [arg]. -void _invoke1(void callback(A a)?, Zone? zone, A arg) { +void invoke1(void callback(A a)?, Zone? zone, A arg) { if (callback == null) { return; } @@ -894,24 +910,35 @@ void _invoke1(void callback(A a)?, Zone? zone, A arg) { } } +/// Invokes [callback] inside the given [zone] passing it [arg1] and [arg2]. +void invoke2(void Function(A1 a1, A2 a2)? callback, Zone? zone, A1 arg1, A2 arg2) { + if (callback == null) { + return; + } + + assert(zone != null); + + if (identical(zone, Zone.current)) { + callback(arg1, arg2); + } else { + zone!.runGuarded(() { + callback(arg1, arg2); + }); + } +} + /// Invokes [callback] inside the given [zone] passing it [arg1], [arg2], and [arg3]. -void _invoke3( - void callback(A1 a1, A2 a2, A3 a3)?, - Zone? zone, - A1 arg1, - A2 arg2, - A3 arg3, - ) { +void invoke3(void Function(A1 a1, A2 a2, A3 a3)? callback, Zone? zone, A1 arg1, A2 arg2, A3 arg3) { if (callback == null) { return; } assert(zone != null); - if (identical(zone!, Zone.current)) { + if (identical(zone, Zone.current)) { callback(arg1, arg2, arg3); } else { - zone.runGuarded(() { + zone!.runGuarded(() { callback(arg1, arg2, arg3); }); } diff --git a/lib/web_ui/lib/src/engine/text_editing/text_editing.dart b/lib/web_ui/lib/src/engine/text_editing/text_editing.dart index baa4b5e506d77..ade757d0e71fa 100644 --- a/lib/web_ui/lib/src/engine/text_editing/text_editing.dart +++ b/lib/web_ui/lib/src/engine/text_editing/text_editing.dart @@ -276,21 +276,19 @@ class EngineAutofillForm { /// Sends the 'TextInputClient.updateEditingStateWithTag' message to the framework. void _sendAutofillEditingState(String? tag, EditingState editingState) { - if (EnginePlatformDispatcher.instance._onPlatformMessage != null) { - EnginePlatformDispatcher.instance.invokeOnPlatformMessage( - 'flutter/textinput', - const JSONMethodCodec().encodeMethodCall( - MethodCall( - 'TextInputClient.updateEditingStateWithTag', - [ - 0, - {tag: editingState.toFlutter()} - ], - ), + EnginePlatformDispatcher.instance.invokeOnPlatformMessage( + 'flutter/textinput', + const JSONMethodCodec().encodeMethodCall( + MethodCall( + 'TextInputClient.updateEditingStateWithTag', + [ + 0, + {tag: editingState.toFlutter()} + ], ), - _emptyCallback, - ); - } + ), + _emptyCallback, + ); } } @@ -1392,50 +1390,44 @@ class TextEditingChannel { /// Sends the 'TextInputClient.updateEditingState' message to the framework. void updateEditingState(int? clientId, EditingState? editingState) { - if (EnginePlatformDispatcher.instance._onPlatformMessage != null) { - EnginePlatformDispatcher.instance.invokeOnPlatformMessage( - 'flutter/textinput', - const JSONMethodCodec().encodeMethodCall( - MethodCall('TextInputClient.updateEditingState', [ - clientId, - editingState!.toFlutter(), - ]), - ), - _emptyCallback, - ); - } + EnginePlatformDispatcher.instance.invokeOnPlatformMessage( + 'flutter/textinput', + const JSONMethodCodec().encodeMethodCall( + MethodCall('TextInputClient.updateEditingState', [ + clientId, + editingState!.toFlutter(), + ]), + ), + _emptyCallback, + ); } /// Sends the 'TextInputClient.performAction' message to the framework. void performAction(int? clientId, String? inputAction) { - if (EnginePlatformDispatcher.instance._onPlatformMessage != null) { - EnginePlatformDispatcher.instance.invokeOnPlatformMessage( - 'flutter/textinput', - const JSONMethodCodec().encodeMethodCall( - MethodCall( - 'TextInputClient.performAction', - [clientId, inputAction], - ), + EnginePlatformDispatcher.instance.invokeOnPlatformMessage( + 'flutter/textinput', + const JSONMethodCodec().encodeMethodCall( + MethodCall( + 'TextInputClient.performAction', + [clientId, inputAction], ), - _emptyCallback, - ); - } + ), + _emptyCallback, + ); } /// Sends the 'TextInputClient.onConnectionClosed' message to the framework. void onConnectionClosed(int? clientId) { - if (EnginePlatformDispatcher.instance._onPlatformMessage != null) { - EnginePlatformDispatcher.instance.invokeOnPlatformMessage( - 'flutter/textinput', - const JSONMethodCodec().encodeMethodCall( - MethodCall( - 'TextInputClient.onConnectionClosed', - [clientId], - ), + EnginePlatformDispatcher.instance.invokeOnPlatformMessage( + 'flutter/textinput', + const JSONMethodCodec().encodeMethodCall( + MethodCall( + 'TextInputClient.onConnectionClosed', + [clientId], ), - _emptyCallback, - ); - } + ), + _emptyCallback, + ); } } diff --git a/lib/web_ui/lib/src/engine/util.dart b/lib/web_ui/lib/src/engine/util.dart index 0338a6f2d7ea3..14f93bea964a8 100644 --- a/lib/web_ui/lib/src/engine/util.dart +++ b/lib/web_ui/lib/src/engine/util.dart @@ -511,8 +511,7 @@ final ByteData? _fontChangeMessage = bool _fontChangeScheduled = false; FutureOr sendFontChangeMessage() async { - if (EnginePlatformDispatcher.instance._onPlatformMessage != null && - !_fontChangeScheduled) { + if (!_fontChangeScheduled) { _fontChangeScheduled = true; // Batch updates into next animationframe. html.window.requestAnimationFrame((num _) { @@ -520,7 +519,7 @@ FutureOr sendFontChangeMessage() async { EnginePlatformDispatcher.instance.invokeOnPlatformMessage( 'flutter/system', _fontChangeMessage, - (_) {}, + (_) {}, ); }); } diff --git a/lib/web_ui/lib/src/ui/channel_buffers.dart b/lib/web_ui/lib/src/ui/channel_buffers.dart index 9ecb59a4b72fa..e045e75864cc7 100644 --- a/lib/web_ui/lib/src/ui/channel_buffers.dart +++ b/lib/web_ui/lib/src/ui/channel_buffers.dart @@ -3,141 +3,243 @@ // found in the LICENSE file. // @dart = 2.10 + +// This is identical to ../../../../ui/channel_buffers.dart with the +// following exceptions: +// +// * All comments except this one are removed. +// * _invoke2 is replaced with engine.invoke2 +// * _printDebug is replaced with print in an assert. + part of ui; +typedef DrainChannelCallback = Future Function(ByteData? data, PlatformMessageResponseCallback callback); + +typedef ChannelCallback = void Function(ByteData? data, PlatformMessageResponseCallback callback); + +class _ChannelCallbackRecord { + _ChannelCallbackRecord(this.callback) : zone = Zone.current; + final ChannelCallback callback; + final Zone zone; + + void invoke(ByteData? dataArg, PlatformMessageResponseCallback callbackArg) { + engine.invoke2(callback, zone, dataArg, callbackArg); + } +} + class _StoredMessage { - _StoredMessage(this._data, this._callback); - final ByteData? _data; - ByteData? get data => _data; - final PlatformMessageResponseCallback _callback; - PlatformMessageResponseCallback get callback => _callback; + const _StoredMessage(this.data, this.callback); + + final ByteData? data; + + final PlatformMessageResponseCallback callback; } -class _RingBuffer { - final collection.ListQueue _queue; +class _Channel { + _Channel([ this._capacity = ChannelBuffers.kDefaultBufferSize ]) + : _queue = collection.ListQueue<_StoredMessage>(_capacity); + + final collection.ListQueue<_StoredMessage> _queue; - _RingBuffer(this._capacity) : _queue = collection.ListQueue(_capacity); int get length => _queue.length; - int _capacity; + + bool debugEnableDiscardWarnings = true; + int get capacity => _capacity; - bool get isEmpty => _queue.isEmpty; - Function(T)? _dropItemCallback; - set dropItemCallback(Function(T) callback) { - _dropItemCallback = callback; + int _capacity; + set capacity(int newSize) { + _capacity = newSize; + _dropOverflowMessages(newSize); } - bool push(T val) { + bool _draining = false; + + bool push(_StoredMessage message) { + if (!_draining && _channelCallbackRecord != null) { + assert(_queue.isEmpty); + _channelCallbackRecord!.invoke(message.data, message.callback); + return false; + } if (_capacity <= 0) { - return true; - } else { - final int overflowCount = _dropOverflowItems(_capacity - 1); - _queue.addLast(val); - return overflowCount > 0; + return debugEnableDiscardWarnings; } + final bool result = _dropOverflowMessages(_capacity - 1); + _queue.addLast(message); + return result; } - T? pop() { - return _queue.isEmpty ? null : _queue.removeFirst(); - } + _StoredMessage pop() => _queue.removeFirst(); - int _dropOverflowItems(int lengthLimit) { - int result = 0; + bool _dropOverflowMessages(int lengthLimit) { + bool result = false; while (_queue.length > lengthLimit) { - final T item = _queue.removeFirst(); - _dropItemCallback?.call(item); - result += 1; + final _StoredMessage message = _queue.removeFirst(); + message.callback(null); // send empty reply to the plugin side + result = true; } return result; } - int resize(int newSize) { - _capacity = newSize; - return _dropOverflowItems(newSize); + _ChannelCallbackRecord? _channelCallbackRecord; + + void setListener(ChannelCallback callback) { + final bool needDrain = _channelCallbackRecord == null; + _channelCallbackRecord = _ChannelCallbackRecord(callback); + if (needDrain && !_draining) + _drain(); + } + + void clearListener() { + _channelCallbackRecord = null; } -} -typedef DrainChannelCallback = Future Function(ByteData?, PlatformMessageResponseCallback); + void _drain() { + assert(!_draining); + _draining = true; + scheduleMicrotask(_drainStep); + } + + void _drainStep() { + assert(_draining); + if (_queue.isNotEmpty && _channelCallbackRecord != null) { + final _StoredMessage message = pop(); + _channelCallbackRecord!.invoke(message.data, message.callback); + scheduleMicrotask(_drainStep); + } else { + _draining = false; + } + } +} class ChannelBuffers { + ChannelBuffers(); + static const int kDefaultBufferSize = 1; static const String kControlChannelName = 'dev.flutter/channel-buffers'; - final Map?> _messages = - ?>{}; - _RingBuffer<_StoredMessage> _makeRingBuffer(int size) { - final _RingBuffer<_StoredMessage> result = _RingBuffer<_StoredMessage>(size); - result.dropItemCallback = _onDropItem; - return result; + final Map _channels = {}; + + void push(String name, ByteData? data, PlatformMessageResponseCallback callback) { + final _Channel channel = _channels.putIfAbsent(name, () => _Channel()); + if (channel.push(_StoredMessage(data, callback))) { + assert(() { + print( + 'A message on the $name channel was discarded before it could be handled.\n' + 'This happens when a plugin sends messages to the framework side before the ' + 'framework has had an opportunity to register a listener. See the ChannelBuffers ' + 'API documentation for details on how to configure the channel to expect more ' + 'messages, or to expect messages to get discarded:\n' + ' https://api.flutter.dev/flutter/dart-ui/ChannelBuffers-class.html' + ); + return true; + }()); + } } - void _onDropItem(_StoredMessage message) { - message.callback(null); + void setListener(String name, ChannelCallback callback) { + final _Channel channel = _channels.putIfAbsent(name, () => _Channel()); + channel.setListener(callback); } - bool push(String channel, ByteData? data, PlatformMessageResponseCallback callback) { - _RingBuffer<_StoredMessage>? queue = _messages[channel]; - if (queue == null) { - queue = _makeRingBuffer(kDefaultBufferSize); - _messages[channel] = queue; - } - final bool didOverflow = queue.push(_StoredMessage(data, callback)); - if (didOverflow) { - // TODO(aaclarke): Update this message to include instructions on how to resize - // the buffer once that is available to users and print in all engine builds - // after we verify that dropping messages isn't part of normal execution. - _debugPrintWarning('Overflow on channel: $channel. ' - 'Messages on this channel are being discarded in FIFO fashion. ' - 'The engine may not be running or you need to adjust ' - 'the buffer size if of the channel.'); - } - return didOverflow; - } - - _StoredMessage? _pop(String channel) { - final _RingBuffer<_StoredMessage>? queue = _messages[channel]; - final _StoredMessage? result = queue?.pop(); - return result; + void clearListener(String name) { + final _Channel? channel = _channels[name]; + if (channel != null) + channel.clearListener(); } - bool _isEmpty(String channel) { - final _RingBuffer<_StoredMessage>? queue = _messages[channel]; - return (queue == null) ? true : queue.isEmpty; + Future drain(String name, DrainChannelCallback callback) async { + final _Channel? channel = _channels[name]; + while (channel != null && !channel._queue.isEmpty) { + final _StoredMessage message = channel.pop(); + await callback(message.data, message.callback); + } } - void _resize(String channel, int newSize) { - _RingBuffer<_StoredMessage>? queue = _messages[channel]; - if (queue == null) { - queue = _makeRingBuffer(newSize); - _messages[channel] = queue; + void handleMessage(ByteData data) { + final Uint8List bytes = data.buffer.asUint8List(data.offsetInBytes, data.lengthInBytes); + if (bytes[0] == 0x07) { // 7 = value code for string + final int methodNameLength = bytes[1]; + if (methodNameLength >= 254) // lengths greater than 253 have more elaborate encoding + throw Exception('Unrecognized message sent to $kControlChannelName (method name too long)'); + int index = 2; // where we are in reading the bytes + final String methodName = utf8.decode(bytes.sublist(index, index + methodNameLength)); + index += methodNameLength; + switch (methodName) { + case 'resize': + if (bytes[index] != 0x0C) // 12 = value code for list + throw Exception('Invalid arguments for \'resize\' method sent to $kControlChannelName (arguments must be a two-element list, channel name and new capacity)'); + index += 1; + if (bytes[index] < 0x02) // We ignore extra arguments, in case we need to support them in the future, hence <2 rather than !=2. + throw Exception('Invalid arguments for \'resize\' method sent to $kControlChannelName (arguments must be a two-element list, channel name and new capacity)'); + index += 1; + if (bytes[index] != 0x07) // 7 = value code for string + throw Exception('Invalid arguments for \'resize\' method sent to $kControlChannelName (first argument must be a string)'); + index += 1; + final int channelNameLength = bytes[index]; + if (channelNameLength >= 254) // lengths greater than 253 have more elaborate encoding + throw Exception('Invalid arguments for \'resize\' method sent to $kControlChannelName (channel name must be less than 254 characters long)'); + index += 1; + final String channelName = utf8.decode(bytes.sublist(index, index + channelNameLength)); + index += channelNameLength; + if (bytes[index] != 0x03) // 3 = value code for uint32 + throw Exception('Invalid arguments for \'resize\' method sent to $kControlChannelName (second argument must be an integer in the range 0 to 2147483647)'); + index += 1; + resize(channelName, data.getUint32(index, Endian.host)); + break; + case 'overflow': + if (bytes[index] != 0x0C) // 12 = value code for list + throw Exception('Invalid arguments for \'overflow\' method sent to $kControlChannelName (arguments must be a two-element list, channel name and flag state)'); + index += 1; + if (bytes[index] < 0x02) // We ignore extra arguments, in case we need to support them in the future, hence <2 rather than !=2. + throw Exception('Invalid arguments for \'overflow\' method sent to $kControlChannelName (arguments must be a two-element list, channel name and flag state)'); + index += 1; + if (bytes[index] != 0x07) // 7 = value code for string + throw Exception('Invalid arguments for \'overflow\' method sent to $kControlChannelName (first argument must be a string)'); + index += 1; + final int channelNameLength = bytes[index]; + if (channelNameLength >= 254) // lengths greater than 253 have more elaborate encoding + throw Exception('Invalid arguments for \'overflow\' method sent to $kControlChannelName (channel name must be less than 254 characters long)'); + index += 1; + final String channelName = utf8.decode(bytes.sublist(index, index + channelNameLength)); + index += channelNameLength; + if (bytes[index] != 0x01 && bytes[index] != 0x02) // 1 = value code for true, 2 = value code for false + throw Exception('Invalid arguments for \'overflow\' method sent to $kControlChannelName (second argument must be a boolean)'); + allowOverflow(channelName, bytes[index] == 0x01); + break; + default: + throw Exception('Unrecognized method \'$methodName\' sent to $kControlChannelName'); + } } else { - final int numberOfDroppedMessages = queue.resize(newSize); - if (numberOfDroppedMessages > 0) { - _debugPrintWarning('Dropping messages on channel "$channel" as a result of shrinking the buffer size.'); + final List parts = utf8.decode(bytes).split('\r'); + if (parts.length == 1 + /*arity=*/2 && parts[0] == 'resize') { + resize(parts[1], int.parse(parts[2])); + } else { + throw Exception('Unrecognized message $parts sent to $kControlChannelName.'); } } } - Future drain(String channel, DrainChannelCallback callback) async { - while (!_isEmpty(channel)) { - final _StoredMessage message = _pop(channel)!; - await callback(message.data, message.callback); + void resize(String name, int newSize) { + _Channel? channel = _channels[name]; + if (channel == null) { + channel = _Channel(newSize); + _channels[name] = channel; + } else { + channel.capacity = newSize; } } - String _getString(ByteData data) { - final ByteBuffer buffer = data.buffer; - final Uint8List list = buffer.asUint8List(data.offsetInBytes, data.lengthInBytes); - return utf8.decode(list); - } - - void handleMessage(ByteData data) { - final List command = _getString(data).split('\r'); - if (command.length == /*arity=*/2 + 1 && command[0] == 'resize') { - _resize(command[1], int.parse(command[2])); - } else { - throw Exception('Unrecognized command $command sent to $kControlChannelName.'); - } + void allowOverflow(String name, bool allowed) { + assert(() { + _Channel? channel = _channels[name]; + if (channel == null && allowed) { + channel = _Channel(); + _channels[name] = channel; + } + channel?.debugEnableDiscardWarnings = !allowed; + return true; + }()); } } diff --git a/lib/web_ui/lib/src/ui/natives.dart b/lib/web_ui/lib/src/ui/natives.dart index dbdb7a9144214..e4817afca7241 100644 --- a/lib/web_ui/lib/src/ui/natives.dart +++ b/lib/web_ui/lib/src/ui/natives.dart @@ -5,16 +5,6 @@ // @dart = 2.10 part of ui; -/// Prints a warning to the browser's debug console. -void _debugPrintWarning(String warning) { - if (engine.assertionsEnabled) { - // Use a lower log level message to reduce noise in release mode. - html.window.console.debug(warning); - return; - } - html.window.console.warn(warning); -} - List saveCompilationTrace() { if (engine.assertionsEnabled) { throw UnimplementedError('saveCompilationTrace is not implemented on the web.'); diff --git a/lib/web_ui/test/channel_buffers_test.dart b/lib/web_ui/test/channel_buffers_test.dart new file mode 100644 index 0000000000000..e853efbbfb910 --- /dev/null +++ b/lib/web_ui/test/channel_buffers_test.dart @@ -0,0 +1,358 @@ +// Copyright 2013 The Flutter Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +// @dart = 2.6 + +// This is identical to +// ../../../testing/dart/channel_buffers_test.dart except for: +// +// * The imports are a bit different. +// * The main method has been renamed testMain. +// * A new main method here bootstraps the web tests. + +import 'dart:async'; +import 'dart:convert'; +import 'dart:typed_data'; + +import 'package:ui/ui.dart' as ui; + +import 'package:test/bootstrap/browser.dart'; +import 'package:test/test.dart'; + +void main() { + internalBootstrapBrowserTest(() => testMain); +} + +ByteData _makeByteData(String str) { + final Uint8List list = utf8.encode(str) as Uint8List; + final ByteBuffer buffer = list is Uint8List ? list.buffer : Uint8List.fromList(list).buffer; + return ByteData.view(buffer); +} + +void _resize(ui.ChannelBuffers buffers, String name, int newSize) { + buffers.handleMessage(_makeByteData('resize\r$name\r$newSize')); +} + +void testMain() { + test('push drain', () async { + const String channel = 'foo'; + final ByteData data = _makeByteData('bar'); + final ui.ChannelBuffers buffers = ui.ChannelBuffers(); + final ui.PlatformMessageResponseCallback callback = (ByteData responseData) {}; + buffers.push(channel, data, callback); + await buffers.drain(channel, (ByteData drainedData, ui.PlatformMessageResponseCallback drainedCallback) { + expect(drainedData, equals(data)); + expect(drainedCallback, equals(callback)); + return; + }); + }); + + test('drain is sync', () async { + const String channel = 'foo'; + final ByteData data = _makeByteData('message'); + final ui.ChannelBuffers buffers = ui.ChannelBuffers(); + final ui.PlatformMessageResponseCallback callback = (ByteData responseData) {}; + buffers.push(channel, data, callback); + final List log = []; + final Completer completer = Completer(); + scheduleMicrotask(() { log.add('before drain, microtask'); }); + log.add('before drain'); + buffers.drain(channel, (ByteData drainedData, ui.PlatformMessageResponseCallback drainedCallback) async { + log.add('callback'); + completer.complete(); + }); + log.add('after drain, before await'); + await completer.future; + log.add('after await'); + expect(log, [ + 'before drain', + 'callback', + 'after drain, before await', + 'before drain, microtask', + 'after await' + ]); + }); + + test('push drain zero', () async { + const String channel = 'foo'; + final ByteData data = _makeByteData('bar'); + final + ui.ChannelBuffers buffers = ui.ChannelBuffers(); + final ui.PlatformMessageResponseCallback callback = (ByteData responseData) {}; + _resize(buffers, channel, 0); + buffers.push(channel, data, callback); + bool didCall = false; + await buffers.drain(channel, (ByteData drainedData, ui.PlatformMessageResponseCallback drainedCallback) { + didCall = true; + return; + }); + expect(didCall, equals(false)); + }); + + test('drain when empty', () async { + const String channel = 'foo'; + final ui.ChannelBuffers buffers = ui.ChannelBuffers(); + bool didCall = false; + await buffers.drain(channel, (ByteData drainedData, ui.PlatformMessageResponseCallback drainedCallback) { + didCall = true; + return; + }); + expect(didCall, equals(false)); + }); + + test('overflow', () async { + const String channel = 'foo'; + final ByteData one = _makeByteData('one'); + final ByteData two = _makeByteData('two'); + final ByteData three = _makeByteData('three'); + final ByteData four = _makeByteData('four'); + final ui.ChannelBuffers buffers = ui.ChannelBuffers(); + final ui.PlatformMessageResponseCallback callback = (ByteData responseData) {}; + _resize(buffers, channel, 3); + buffers.push(channel, one, callback); + buffers.push(channel, two, callback); + buffers.push(channel, three, callback); + buffers.push(channel, four, callback); + int counter = 0; + await buffers.drain(channel, (ByteData drainedData, ui.PlatformMessageResponseCallback drainedCallback) { + switch (counter) { + case 0: + expect(drainedData, equals(two)); + expect(drainedCallback, equals(callback)); + break; + case 1: + expect(drainedData, equals(three)); + expect(drainedCallback, equals(callback)); + break; + case 2: + expect(drainedData, equals(four)); + expect(drainedCallback, equals(callback)); + break; + } + counter += 1; + return; + }); + expect(counter, equals(3)); + }); + + test('resize drop', () async { + const String channel = 'foo'; + final ByteData one = _makeByteData('one'); + final ByteData two = _makeByteData('two'); + final ui.ChannelBuffers buffers = ui.ChannelBuffers(); + _resize(buffers, channel, 100); + final ui.PlatformMessageResponseCallback callback = (ByteData responseData) {}; + buffers.push(channel, one, callback); + buffers.push(channel, two, callback); + _resize(buffers, channel, 1); + int counter = 0; + await buffers.drain(channel, (ByteData drainedData, ui.PlatformMessageResponseCallback drainedCallback) { + switch (counter) { + case 0: + expect(drainedData, equals(two)); + expect(drainedCallback, equals(callback)); + } + counter += 1; + return; + }); + expect(counter, equals(1)); + }); + + test('resize dropping calls callback', () async { + const String channel = 'foo'; + final ByteData one = _makeByteData('one'); + final ByteData two = _makeByteData('two'); + final ui.ChannelBuffers buffers = ui.ChannelBuffers(); + bool didCallCallback = false; + final ui.PlatformMessageResponseCallback oneCallback = (ByteData responseData) { + expect(responseData, isNull); + didCallCallback = true; + }; + final ui.PlatformMessageResponseCallback twoCallback = (ByteData responseData) { + throw TestFailure('wrong callback called'); + }; + _resize(buffers, channel, 100); + buffers.push(channel, one, oneCallback); + buffers.push(channel, two, twoCallback); + expect(didCallCallback, equals(false)); + _resize(buffers, channel, 1); + expect(didCallCallback, equals(true)); + }); + + test('overflow calls callback', () async { + const String channel = 'foo'; + final ByteData one = _makeByteData('one'); + final ByteData two = _makeByteData('two'); + final ui.ChannelBuffers buffers = ui.ChannelBuffers(); + bool didCallCallback = false; + final ui.PlatformMessageResponseCallback oneCallback = (ByteData responseData) { + expect(responseData, isNull); + didCallCallback = true; + }; + final ui.PlatformMessageResponseCallback twoCallback = (ByteData responseData) { + throw TestFailure('wrong callback called'); + }; + _resize(buffers, channel, 1); + buffers.push(channel, one, oneCallback); + buffers.push(channel, two, twoCallback); + expect(didCallCallback, equals(true)); + }); + + test('handle garbage', () async { + final ui.ChannelBuffers buffers = ui.ChannelBuffers(); + expect(() => buffers.handleMessage(_makeByteData('asdfasdf')), + throwsException); + }); + + test('handle resize garbage', () async { + final ui.ChannelBuffers buffers = ui.ChannelBuffers(); + expect(() => buffers.handleMessage(_makeByteData('resize\rfoo\rbar')), + throwsException); + }); + + test('ChannelBuffers.setListener', () async { + final List log = []; + final ui.ChannelBuffers buffers = ui.ChannelBuffers(); + final ByteData one = _makeByteData('one'); + final ByteData two = _makeByteData('two'); + final ByteData three = _makeByteData('three'); + final ByteData four = _makeByteData('four'); + final ByteData five = _makeByteData('five'); + final ByteData six = _makeByteData('six'); + final ByteData seven = _makeByteData('seven'); + buffers.push('a', one, (ByteData data) { }); + buffers.push('b', two, (ByteData data) { }); + buffers.push('a', three, (ByteData data) { }); + log.add('top'); + buffers.setListener('a', (ByteData data, ui.PlatformMessageResponseCallback callback) { + log.add('a1: ${utf8.decode(data.buffer.asUint8List())}'); + }); + log.add('-1'); + await null; + log.add('-2'); + buffers.setListener('a', (ByteData data, ui.PlatformMessageResponseCallback callback) { + log.add('a2: ${utf8.decode(data.buffer.asUint8List())}'); + }); + log.add('-3'); + await null; + log.add('-4'); + buffers.setListener('b', (ByteData data, ui.PlatformMessageResponseCallback callback) { + log.add('b: ${utf8.decode(data.buffer.asUint8List())}'); + }); + log.add('-5'); + await null; // first microtask after setting listener drains the first message + await null; // second microtask ends the draining. + log.add('-6'); + buffers.push('b', four, (ByteData data) { }); + buffers.push('a', five, (ByteData data) { }); + log.add('-7'); + await null; + log.add('-8'); + buffers.clearListener('a'); + buffers.push('a', six, (ByteData data) { }); + buffers.push('b', seven, (ByteData data) { }); + await null; + log.add('-9'); + expect(log, [ + 'top', + '-1', + 'a1: three', + '-2', + '-3', + '-4', + '-5', + 'b: two', + '-6', + 'b: four', + 'a2: five', + '-7', + '-8', + 'b: seven', + '-9', + ]); + }); + + test('ChannelBuffers.clearListener', () async { + final List log = []; + final ui.ChannelBuffers buffers = ui.ChannelBuffers(); + final ByteData one = _makeByteData('one'); + final ByteData two = _makeByteData('two'); + final ByteData three = _makeByteData('three'); + final ByteData four = _makeByteData('four'); + buffers.handleMessage(_makeByteData('resize\ra\r10')); + buffers.push('a', one, (ByteData data) { }); + buffers.push('a', two, (ByteData data) { }); + buffers.push('a', three, (ByteData data) { }); + log.add('-1'); + buffers.setListener('a', (ByteData data, ui.PlatformMessageResponseCallback callback) { + log.add('a1: ${utf8.decode(data.buffer.asUint8List())}'); + }); + await null; // handles one + log.add('-2'); + buffers.clearListener('a'); + await null; + log.add('-3'); + buffers.setListener('a', (ByteData data, ui.PlatformMessageResponseCallback callback) { + log.add('a2: ${utf8.decode(data.buffer.asUint8List())}'); + }); + log.add('-4'); + await null; + buffers.push('a', four, (ByteData data) { }); + log.add('-5'); + await null; + log.add('-6'); + await null; + log.add('-7'); + await null; + expect(log, [ + '-1', + 'a1: one', + '-2', + '-3', + '-4', + 'a2: two', + '-5', + 'a2: three', + '-6', + 'a2: four', + '-7', + ]); + }); + + test('ChannelBuffers.handleMessage for resize', () async { + final List log = []; + final ui.ChannelBuffers buffers = _TestChannelBuffers(log); + // Created as follows: + // print(StandardMethodCodec().encodeMethodCall(MethodCall('resize', ['abcdef', 12345])).buffer.asUint8List()); + // ...with three 0xFF bytes on either side to ensure the method works with an offer on the underlying buffer. + buffers.handleMessage(ByteData.sublistView(Uint8List.fromList([255, 255, 255, 7, 6, 114, 101, 115, 105, 122, 101, 12, 2, 7, 6, 97, 98, 99, 100, 101, 102, 3, 57, 48, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 255, 255, 255]), 3, 27)); + expect(log, const ['resize abcdef 12345']); + }); + + test('ChannelBuffers.handleMessage for overflow', () async { + final List log = []; + final ui.ChannelBuffers buffers = _TestChannelBuffers(log); + // Created as follows: + // print(StandardMethodCodec().encodeMethodCall(MethodCall('overflow', ['abcdef', false])).buffer.asUint8List()); + // ...with three 0xFF bytes on either side to ensure the method works with an offer on the underlying buffer. + buffers.handleMessage(ByteData.sublistView(Uint8List.fromList([255, 255, 255, 7, 8, 111, 118, 101, 114, 102, 108, 111, 119, 12, 2, 7, 6, 97, 98, 99, 100, 101, 102, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 255, 255, 255]), 3, 24)); + expect(log, const ['allowOverflow abcdef false']); + }); +} + +class _TestChannelBuffers extends ui.ChannelBuffers { + _TestChannelBuffers(this.log); + + final List log; + + @override + void resize(String name, int newSize) { + log.add('resize $name $newSize'); + } + + @override + void allowOverflow(String name, bool allowed) { + log.add('allowOverflow $name $allowed'); + } +} diff --git a/testing/dart/channel_buffers_test.dart b/testing/dart/channel_buffers_test.dart index a6d0152015a00..0c49036057c71 100644 --- a/testing/dart/channel_buffers_test.dart +++ b/testing/dart/channel_buffers_test.dart @@ -3,24 +3,27 @@ // found in the LICENSE file. // @dart = 2.6 + +// KEEP THIS SYNCHRONIZED WITH ../../lib/web_ui/test/channel_buffers_test.dart + +import 'dart:async'; import 'dart:convert'; import 'dart:typed_data'; import 'dart:ui' as ui; import 'package:test/test.dart'; -void main() { - - ByteData _makeByteData(String str) { - final Uint8List list = utf8.encode(str) as Uint8List; - final ByteBuffer buffer = list is Uint8List ? list.buffer : Uint8List.fromList(list).buffer; - return ByteData.view(buffer); - } +ByteData _makeByteData(String str) { + final Uint8List list = utf8.encode(str) as Uint8List; + final ByteBuffer buffer = list is Uint8List ? list.buffer : Uint8List.fromList(list).buffer; + return ByteData.view(buffer); +} - void _resize(ui.ChannelBuffers buffers, String name, int newSize) { - buffers.handleMessage(_makeByteData('resize\r$name\r$newSize')); - } +void _resize(ui.ChannelBuffers buffers, String name, int newSize) { + buffers.handleMessage(_makeByteData('resize\r$name\r$newSize')); +} +void main() { test('push drain', () async { const String channel = 'foo'; final ByteData data = _makeByteData('bar'); @@ -34,6 +37,32 @@ void main() { }); }); + test('drain is sync', () async { + const String channel = 'foo'; + final ByteData data = _makeByteData('message'); + final ui.ChannelBuffers buffers = ui.ChannelBuffers(); + final ui.PlatformMessageResponseCallback callback = (ByteData responseData) {}; + buffers.push(channel, data, callback); + final List log = []; + final Completer completer = Completer(); + scheduleMicrotask(() { log.add('before drain, microtask'); }); + log.add('before drain'); + buffers.drain(channel, (ByteData drainedData, ui.PlatformMessageResponseCallback drainedCallback) async { + log.add('callback'); + completer.complete(); + }); + log.add('after drain, before await'); + await completer.future; + log.add('after await'); + expect(log, [ + 'before drain', + 'callback', + 'after drain, before await', + 'before drain, microtask', + 'after await' + ]); + }); + test('push drain zero', () async { const String channel = 'foo'; final ByteData data = _makeByteData('bar'); @@ -50,7 +79,7 @@ void main() { expect(didCall, equals(false)); }); - test('empty', () async { + test('drain when empty', () async { const String channel = 'foo'; final ui.ChannelBuffers buffers = ui.ChannelBuffers(); bool didCall = false; @@ -70,16 +99,27 @@ void main() { final ui.ChannelBuffers buffers = ui.ChannelBuffers(); final ui.PlatformMessageResponseCallback callback = (ByteData responseData) {}; _resize(buffers, channel, 3); - expect(buffers.push(channel, one, callback), equals(false)); - expect(buffers.push(channel, two, callback), equals(false)); - expect(buffers.push(channel, three, callback), equals(false)); - expect(buffers.push(channel, four, callback), equals(true)); + buffers.push(channel, one, callback); + buffers.push(channel, two, callback); + buffers.push(channel, three, callback); + buffers.push(channel, four, callback); int counter = 0; await buffers.drain(channel, (ByteData drainedData, ui.PlatformMessageResponseCallback drainedCallback) { - if (counter++ == 0) { - expect(drainedData, equals(two)); - expect(drainedCallback, equals(callback)); + switch (counter) { + case 0: + expect(drainedData, equals(two)); + expect(drainedCallback, equals(callback)); + break; + case 1: + expect(drainedData, equals(three)); + expect(drainedCallback, equals(callback)); + break; + case 2: + expect(drainedData, equals(four)); + expect(drainedCallback, equals(callback)); + break; } + counter += 1; return; }); expect(counter, equals(3)); @@ -92,15 +132,17 @@ void main() { final ui.ChannelBuffers buffers = ui.ChannelBuffers(); _resize(buffers, channel, 100); final ui.PlatformMessageResponseCallback callback = (ByteData responseData) {}; - expect(buffers.push(channel, one, callback), equals(false)); - expect(buffers.push(channel, two, callback), equals(false)); + buffers.push(channel, one, callback); + buffers.push(channel, two, callback); _resize(buffers, channel, 1); int counter = 0; await buffers.drain(channel, (ByteData drainedData, ui.PlatformMessageResponseCallback drainedCallback) { - if (counter++ == 0) { - expect(drainedData, equals(two)); - expect(drainedCallback, equals(callback)); + switch (counter) { + case 0: + expect(drainedData, equals(two)); + expect(drainedCallback, equals(callback)); } + counter += 1; return; }); expect(counter, equals(1)); @@ -113,12 +155,16 @@ void main() { final ui.ChannelBuffers buffers = ui.ChannelBuffers(); bool didCallCallback = false; final ui.PlatformMessageResponseCallback oneCallback = (ByteData responseData) { + expect(responseData, isNull); didCallCallback = true; }; - final ui.PlatformMessageResponseCallback twoCallback = (ByteData responseData) {}; + final ui.PlatformMessageResponseCallback twoCallback = (ByteData responseData) { + throw TestFailure('wrong callback called'); + }; _resize(buffers, channel, 100); - expect(buffers.push(channel, one, oneCallback), equals(false)); - expect(buffers.push(channel, two, twoCallback), equals(false)); + buffers.push(channel, one, oneCallback); + buffers.push(channel, two, twoCallback); + expect(didCallCallback, equals(false)); _resize(buffers, channel, 1); expect(didCallCallback, equals(true)); }); @@ -130,12 +176,15 @@ void main() { final ui.ChannelBuffers buffers = ui.ChannelBuffers(); bool didCallCallback = false; final ui.PlatformMessageResponseCallback oneCallback = (ByteData responseData) { + expect(responseData, isNull); didCallCallback = true; }; - final ui.PlatformMessageResponseCallback twoCallback = (ByteData responseData) {}; + final ui.PlatformMessageResponseCallback twoCallback = (ByteData responseData) { + throw TestFailure('wrong callback called'); + }; _resize(buffers, channel, 1); - expect(buffers.push(channel, one, oneCallback), equals(false)); - expect(buffers.push(channel, two, twoCallback), equals(true)); + buffers.push(channel, one, oneCallback); + buffers.push(channel, two, twoCallback); expect(didCallCallback, equals(true)); }); @@ -150,4 +199,149 @@ void main() { expect(() => buffers.handleMessage(_makeByteData('resize\rfoo\rbar')), throwsException); }); + + test('ChannelBuffers.setListener', () async { + final List log = []; + final ui.ChannelBuffers buffers = ui.ChannelBuffers(); + final ByteData one = _makeByteData('one'); + final ByteData two = _makeByteData('two'); + final ByteData three = _makeByteData('three'); + final ByteData four = _makeByteData('four'); + final ByteData five = _makeByteData('five'); + final ByteData six = _makeByteData('six'); + final ByteData seven = _makeByteData('seven'); + buffers.push('a', one, (ByteData data) { }); + buffers.push('b', two, (ByteData data) { }); + buffers.push('a', three, (ByteData data) { }); + log.add('top'); + buffers.setListener('a', (ByteData data, ui.PlatformMessageResponseCallback callback) { + log.add('a1: ${utf8.decode(data.buffer.asUint8List())}'); + }); + log.add('-1'); + await null; + log.add('-2'); + buffers.setListener('a', (ByteData data, ui.PlatformMessageResponseCallback callback) { + log.add('a2: ${utf8.decode(data.buffer.asUint8List())}'); + }); + log.add('-3'); + await null; + log.add('-4'); + buffers.setListener('b', (ByteData data, ui.PlatformMessageResponseCallback callback) { + log.add('b: ${utf8.decode(data.buffer.asUint8List())}'); + }); + log.add('-5'); + await null; // first microtask after setting listener drains the first message + await null; // second microtask ends the draining. + log.add('-6'); + buffers.push('b', four, (ByteData data) { }); + buffers.push('a', five, (ByteData data) { }); + log.add('-7'); + await null; + log.add('-8'); + buffers.clearListener('a'); + buffers.push('a', six, (ByteData data) { }); + buffers.push('b', seven, (ByteData data) { }); + await null; + log.add('-9'); + expect(log, [ + 'top', + '-1', + 'a1: three', + '-2', + '-3', + '-4', + '-5', + 'b: two', + '-6', + 'b: four', + 'a2: five', + '-7', + '-8', + 'b: seven', + '-9', + ]); + }); + + test('ChannelBuffers.clearListener', () async { + final List log = []; + final ui.ChannelBuffers buffers = ui.ChannelBuffers(); + final ByteData one = _makeByteData('one'); + final ByteData two = _makeByteData('two'); + final ByteData three = _makeByteData('three'); + final ByteData four = _makeByteData('four'); + buffers.handleMessage(_makeByteData('resize\ra\r10')); + buffers.push('a', one, (ByteData data) { }); + buffers.push('a', two, (ByteData data) { }); + buffers.push('a', three, (ByteData data) { }); + log.add('-1'); + buffers.setListener('a', (ByteData data, ui.PlatformMessageResponseCallback callback) { + log.add('a1: ${utf8.decode(data.buffer.asUint8List())}'); + }); + await null; // handles one + log.add('-2'); + buffers.clearListener('a'); + await null; + log.add('-3'); + buffers.setListener('a', (ByteData data, ui.PlatformMessageResponseCallback callback) { + log.add('a2: ${utf8.decode(data.buffer.asUint8List())}'); + }); + log.add('-4'); + await null; + buffers.push('a', four, (ByteData data) { }); + log.add('-5'); + await null; + log.add('-6'); + await null; + log.add('-7'); + await null; + expect(log, [ + '-1', + 'a1: one', + '-2', + '-3', + '-4', + 'a2: two', + '-5', + 'a2: three', + '-6', + 'a2: four', + '-7', + ]); + }); + + test('ChannelBuffers.handleMessage for resize', () async { + final List log = []; + final ui.ChannelBuffers buffers = _TestChannelBuffers(log); + // Created as follows: + // print(StandardMethodCodec().encodeMethodCall(MethodCall('resize', ['abcdef', 12345])).buffer.asUint8List()); + // ...with three 0xFF bytes on either side to ensure the method works with an offer on the underlying buffer. + buffers.handleMessage(ByteData.sublistView(Uint8List.fromList([255, 255, 255, 7, 6, 114, 101, 115, 105, 122, 101, 12, 2, 7, 6, 97, 98, 99, 100, 101, 102, 3, 57, 48, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 255, 255, 255]), 3, 27)); + expect(log, const ['resize abcdef 12345']); + }); + + test('ChannelBuffers.handleMessage for overflow', () async { + final List log = []; + final ui.ChannelBuffers buffers = _TestChannelBuffers(log); + // Created as follows: + // print(StandardMethodCodec().encodeMethodCall(MethodCall('overflow', ['abcdef', false])).buffer.asUint8List()); + // ...with three 0xFF bytes on either side to ensure the method works with an offer on the underlying buffer. + buffers.handleMessage(ByteData.sublistView(Uint8List.fromList([255, 255, 255, 7, 8, 111, 118, 101, 114, 102, 108, 111, 119, 12, 2, 7, 6, 97, 98, 99, 100, 101, 102, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 255, 255, 255]), 3, 24)); + expect(log, const ['allowOverflow abcdef false']); + }); +} + +class _TestChannelBuffers extends ui.ChannelBuffers { + _TestChannelBuffers(this.log); + + final List log; + + @override + void resize(String name, int newSize) { + log.add('resize $name $newSize'); + } + + @override + void allowOverflow(String name, bool allowed) { + log.add('allowOverflow $name $allowed'); + } }