Skip to content
This repository was archived by the owner on Feb 25, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@
import io.flutter.plugin.common.BinaryMessenger;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.WeakHashMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
Expand All @@ -33,22 +34,37 @@ class DartMessenger implements BinaryMessenger, PlatformMessageHandler {

@NonNull private final FlutterJNI flutterJNI;

@NonNull private final ConcurrentHashMap<String, HandlerInfo> messageHandlers;
/**
* Maps a channel name to an object that contains the task queue and the handler associated with
* the channel.
*
* <p>Reads and writes to this map must lock {@code handlersLock}.
*/
@NonNull private final Map<String, HandlerInfo> messageHandlers = new HashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of two Map objects, could we jus tkeep one ConcurrentHashMap and either create a wrapper object for the buffered messages or add them to HandlerInfo?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The wrapper object would still need to represent two different states. One with a Handler, and one without a handler, but with buffered messages. FWIW, having two separate maps provides a more transparent separation of concerns.


/**
* Maps a channel name to an object that holds information about the incoming Dart message.
*
* <p>Reads and writes to this map must lock {@code handlersLock}.
*/
@NonNull private final Map<String, List<BufferedMessageInfo>> bufferedMessages = new HashMap<>();

@NonNull private final Object handlersLock = new Object();
private boolean enableBufferingIncomingMessages = false;

@NonNull private final Map<Integer, BinaryMessenger.BinaryReply> pendingReplies;
@NonNull private final Map<Integer, BinaryMessenger.BinaryReply> pendingReplies = new HashMap<>();
private int nextReplyId = 1;

@NonNull private final DartMessengerTaskQueue platformTaskQueue = new PlatformTaskQueue();

@NonNull private WeakHashMap<TaskQueue, DartMessengerTaskQueue> createdTaskQueues;
@NonNull
private WeakHashMap<TaskQueue, DartMessengerTaskQueue> createdTaskQueues =
new WeakHashMap<TaskQueue, DartMessengerTaskQueue>();

@NonNull private TaskQueueFactory taskQueueFactory;

DartMessenger(@NonNull FlutterJNI flutterJNI, @NonNull TaskQueueFactory taskQueueFactory) {
this.flutterJNI = flutterJNI;
this.messageHandlers = new ConcurrentHashMap<>();
this.pendingReplies = new HashMap<>();
this.createdTaskQueues = new WeakHashMap<TaskQueue, DartMessengerTaskQueue>();
this.taskQueueFactory = taskQueueFactory;
}

Expand All @@ -72,6 +88,10 @@ public DartMessengerTaskQueue makeBackgroundTaskQueue() {
}
}

/**
* Holds information about a platform handler, such as the task queue that processes messages from
* Dart.
*/
private static class HandlerInfo {
@NonNull public final BinaryMessenger.BinaryMessageHandler handler;
@Nullable public final DartMessengerTaskQueue taskQueue;
Expand All @@ -84,6 +104,22 @@ private static class HandlerInfo {
}
}

/**
* Holds information that allows to dispatch a Dart message to a platform handler when it becomes
* available.
*/
private static class BufferedMessageInfo {
@NonNull public final ByteBuffer message;
int replyId;
long messageData;

BufferedMessageInfo(@NonNull ByteBuffer message, int replyId, long messageData) {
this.message = message;
this.replyId = replyId;
this.messageData = messageData;
}
}

private static class DefaultTaskQueue implements DartMessengerTaskQueue {
@NonNull private final ExecutorService executor;

Expand Down Expand Up @@ -124,21 +160,47 @@ public void setMessageHandler(
@Nullable TaskQueue taskQueue) {
if (handler == null) {
Log.v(TAG, "Removing handler for channel '" + channel + "'");
messageHandlers.remove(channel);
} else {
DartMessengerTaskQueue dartMessengerTaskQueue = null;
if (taskQueue != null) {
dartMessengerTaskQueue = createdTaskQueues.get(taskQueue);
if (dartMessengerTaskQueue == null) {
throw new IllegalArgumentException(
"Unrecognized TaskQueue, use BinaryMessenger to create your TaskQueue (ex makeBackgroundTaskQueue).");
}
synchronized (handlersLock) {
messageHandlers.remove(channel);
}
return;
}
DartMessengerTaskQueue dartMessengerTaskQueue = null;
if (taskQueue != null) {
dartMessengerTaskQueue = createdTaskQueues.get(taskQueue);
if (dartMessengerTaskQueue == null) {
throw new IllegalArgumentException(
"Unrecognized TaskQueue, use BinaryMessenger to create your TaskQueue (ex makeBackgroundTaskQueue).");
}
Log.v(TAG, "Setting handler for channel '" + channel + "'");
}
Log.v(TAG, "Setting handler for channel '" + channel + "'");

LinkedList<BufferedMessageInfo> list;
synchronized (handlersLock) {
messageHandlers.put(channel, new HandlerInfo(handler, dartMessengerTaskQueue));
if (!bufferedMessages.containsKey(channel)) {
return;
}
list = (LinkedList) bufferedMessages.get(channel);
bufferedMessages.remove(channel);
}
for (BufferedMessageInfo info : list) {
dispatchMessageToQueue(
channel, messageHandlers.get(channel), info.message, info.replyId, info.messageData);
}
}

/**
* Enables the ability to queue tasks when messages are received from Dart.
*
* <p>This is useful when there are pending channel handler registrations. For example, Dart may
* be initialized concurrently, and prior to the registration of the channel handlers. This
* implies that Dart may start sending messages while plugins are being registered.
*/
public void enableBufferingIncomingMessages() {
enableBufferingIncomingMessages = true;
}

@Override
@UiThread
public void send(@NonNull String channel, @NonNull ByteBuffer message) {
Expand Down Expand Up @@ -188,25 +250,21 @@ private void invokeHandler(
}
}

@Override
public void handleMessageFromDart(
@NonNull final String channel,
private void dispatchMessageToQueue(
@NonNull String channel,
@Nullable HandlerInfo handlerInfo,
@Nullable ByteBuffer message,
final int replyId,
int replyId,
long messageData) {
// Called from the ui thread.
Log.v(TAG, "Received message from Dart over channel '" + channel + "'");
@Nullable final HandlerInfo handlerInfo = messageHandlers.get(channel);
@Nullable
final DartMessengerTaskQueue taskQueue = (handlerInfo != null) ? handlerInfo.taskQueue : null;
Runnable myRunnable =
() -> {
Trace.beginSection("DartMessenger#handleMessageFromDart on " + channel);
try {
invokeHandler(handlerInfo, message, replyId);
if (message != null && message.isDirect()) {
// This ensures that if a user retains an instance to the ByteBuffer and it happens to
// be direct they will get a deterministic error.
// This ensures that if a user retains an instance to the ByteBuffer and it
// happens to be direct they will get a deterministic error.
message.limit(0);
}
} finally {
Expand All @@ -215,12 +273,43 @@ public void handleMessageFromDart(
Trace.endSection();
}
};
@NonNull
final DartMessengerTaskQueue nonnullTaskQueue =
taskQueue == null ? platformTaskQueue : taskQueue;
nonnullTaskQueue.dispatch(myRunnable);
}

@Override
public void handleMessageFromDart(
@NonNull String channel, @Nullable ByteBuffer message, int replyId, long messageData) {
// Called from the ui thread.
Log.v(TAG, "Received message from Dart over channel '" + channel + "'");

HandlerInfo handlerInfo;
boolean messageDeferred;
synchronized (handlersLock) {
handlerInfo = messageHandlers.get(channel);
messageDeferred = (enableBufferingIncomingMessages && handlerInfo == null);
if (messageDeferred) {
// The channel is not defined when the Dart VM sends a message before the channels are
// registered.
//
// This is possible if the Dart VM starts before channel registration, and if the thread
// that registers the channels is busy or slow at registering the channel handlers.
//
// In such cases, the task dispatchers are queued, and processed when the channel is
// defined.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't this also happen if the handler gets unregistered?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that is right. In this case, enableBufferingIncomingMessages should be set to false. That makes sense to add.

if (!bufferedMessages.containsKey(channel)) {
bufferedMessages.put(channel, new LinkedList<>());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I implemented Channel Buffers I got a lot of flak about having unbounded buffering of platform channels. The resolution we ultimately decided upon was a RingBuffer of size 1 with control messages that can be used to adjust the size. The concern was that incorrect usage could blow up memory.

I thought it was overly cautious. Maybe we should have a RingBuffer at least so it isn't completely unbounded.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the suggestion to override messages? How is that desired in an app that runs into this situation?

Copy link
Member

@gaaclarke gaaclarke Nov 2, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A RingBuffer drops messages after its capacity is exhausted, dropping messages isn't a desired behavior but also queueing up infinite messages isn't desired.

Messages that write data can have 2 different semantics, x = y or x = f(x, y). In the first case dropping messages isn't actually an error. The only thing that matters is the last operation. In the latter case, it is a problem. It impossible to know which write semantics the channel has without the user declaring it.

So, in summary, a RingBuffer doesn't always drop messages and when it does it isn't necessarily a logical error. If I were king of Flutter I'd say just chose some reasonable upper bounds for the RingBuffer, like 1024. I can tell you though that we've had this discussion before as a team and the resolution was that the RingBuffer size should be 1 (with an option to change it).

attn @Hixie who was involved in deciding how Channel Buffers should work.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a RingBuffer is preferred, I'm happy to change it. I wasn't aware that it's ok to drop some messages. e.g. could that yield to invalid state in a way that is really hard to debug?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the plan that buffering messages only happens for a limited amount of time via enableBufferingIncomingMessages? It doesn't appear to be doing that but if that's the case it isn't as bad since you only have a limited amount of time to queue up infinite messages.

What is the plan for messages that are sent, buffered, but no handler is ever registered for them? Are you going to hold onto them indefinitely? In the past they would have gotten an exception that maybe was ignored but now will be held onto. Maybe once we turn off the time frame where messages are buffered all of those messages without handlers could be cleared out to match the previous behavior.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the plan that buffering messages only happens for a limited amount of time via enableBufferingIncomingMessages?

right on

What is the plan for messages that are sent, buffered, but no handler is ever registered for them?

Added some logic to address this in disableBufferingIncomingMessages

}
List<BufferedMessageInfo> buffer = bufferedMessages.get(channel);
buffer.add(new BufferedMessageInfo(message, replyId, messageData));
}
}
if (!messageDeferred) {
dispatchMessageToQueue(channel, handlerInfo, message, replyId, messageData);
}
}

@Override
public void handlePlatformMessageResponse(int replyId, @Nullable ByteBuffer reply) {
Log.v(TAG, "Received message reply from Dart.");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
package io.flutter.embedding.engine.dart;

import static android.os.Looper.getMainLooper;
import static junit.framework.TestCase.assertEquals;
import static junit.framework.TestCase.assertNotNull;
import static junit.framework.TestCase.assertTrue;
import static org.junit.Assert.assertArrayEquals;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.robolectric.Shadows.shadowOf;

import io.flutter.embedding.engine.FlutterJNI;
import io.flutter.embedding.engine.dart.DartMessenger.DartMessengerTaskQueue;
Expand All @@ -16,6 +21,7 @@
import java.nio.ByteBuffer;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.robolectric.RobolectricTestRunner;
import org.robolectric.annotation.Config;
Expand Down Expand Up @@ -155,16 +161,16 @@ public void replyIdIncrementsOnNullReply() {
public void cleansUpMessageData() throws InterruptedException {
final FlutterJNI fakeFlutterJni = mock(FlutterJNI.class);
final DartMessenger messenger = new DartMessenger(fakeFlutterJni, () -> synchronousTaskQueue);
BinaryMessenger.TaskQueue taskQueue = messenger.makeBackgroundTaskQueue();
String channel = "foobar";
final BinaryMessenger.TaskQueue taskQueue = messenger.makeBackgroundTaskQueue();
final String channel = "foobar";
BinaryMessenger.BinaryMessageHandler handler =
(ByteBuffer message, BinaryMessenger.BinaryReply reply) -> {
reply.reply(null);
};
messenger.setMessageHandler(channel, handler, taskQueue);
final ByteBuffer message = ByteBuffer.allocateDirect(4 * 2);
int replyId = 1;
long messageData = 1234;
final int replyId = 1;
final long messageData = 1234;
messenger.handleMessageFromDart(channel, message, replyId, messageData);
verify(fakeFlutterJni).cleanupMessageData(eq(messageData));
}
Expand All @@ -173,17 +179,63 @@ public void cleansUpMessageData() throws InterruptedException {
public void cleansUpMessageDataOnError() throws InterruptedException {
final FlutterJNI fakeFlutterJni = mock(FlutterJNI.class);
final DartMessenger messenger = new DartMessenger(fakeFlutterJni, () -> synchronousTaskQueue);
BinaryMessenger.TaskQueue taskQueue = messenger.makeBackgroundTaskQueue();
String channel = "foobar";
final BinaryMessenger.TaskQueue taskQueue = messenger.makeBackgroundTaskQueue();
final String channel = "foobar";
BinaryMessenger.BinaryMessageHandler handler =
(ByteBuffer message, BinaryMessenger.BinaryReply reply) -> {
throw new RuntimeException("hello");
};
messenger.setMessageHandler(channel, handler, taskQueue);
final ByteBuffer message = ByteBuffer.allocateDirect(4 * 2);
int replyId = 1;
long messageData = 1234;
final int replyId = 1;
final long messageData = 1234;

messenger.handleMessageFromDart(channel, message, replyId, messageData);
verify(fakeFlutterJni).cleanupMessageData(eq(messageData));
}

@Test
public void emptyResponseWhenHandlerIsNotSet() throws InterruptedException {
final FlutterJNI fakeFlutterJni = mock(FlutterJNI.class);
final DartMessenger messenger = new DartMessenger(fakeFlutterJni, () -> synchronousTaskQueue);
final String channel = "foobar";
final ByteBuffer message = ByteBuffer.allocateDirect(4 * 2);
final int replyId = 1;
final long messageData = 1234;

messenger.handleMessageFromDart(channel, message, replyId, messageData);
shadowOf(getMainLooper()).idle();
verify(fakeFlutterJni).invokePlatformMessageEmptyResponseCallback(replyId);
}

@Test
public void buffersResponseWhenHandlerIsNotSet() throws InterruptedException {
final FlutterJNI fakeFlutterJni = mock(FlutterJNI.class);
final DartMessenger messenger = new DartMessenger(fakeFlutterJni, () -> synchronousTaskQueue);
final BinaryMessenger.TaskQueue taskQueue = messenger.makeBackgroundTaskQueue();
final String channel = "foobar";
final ByteBuffer message = ByteBuffer.allocateDirect(4 * 2);
final int replyId = 1;
final long messageData = 1234;

messenger.enableBufferingIncomingMessages();
messenger.handleMessageFromDart(channel, message, replyId, messageData);

shadowOf(getMainLooper()).idle();
verify(fakeFlutterJni, never()).invokePlatformMessageEmptyResponseCallback(eq(replyId));

final BinaryMessenger.BinaryMessageHandler handler =
(ByteBuffer msg, BinaryMessenger.BinaryReply reply) -> {
reply.reply(ByteBuffer.wrap("done".getBytes()));
};
messenger.setMessageHandler(channel, handler, taskQueue);

shadowOf(getMainLooper()).idle();
verify(fakeFlutterJni, never()).invokePlatformMessageEmptyResponseCallback(eq(replyId));

final ArgumentCaptor<ByteBuffer> response = ArgumentCaptor.forClass(ByteBuffer.class);
verify(fakeFlutterJni)
.invokePlatformMessageResponseCallback(anyInt(), response.capture(), anyInt());
assertArrayEquals("done".getBytes(), response.getValue().array());
}
}