Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,13 @@ class EventChannel {
// Registers a stream handler on this channel.
// If no handler has been registered, any incoming stream setup requests will
// be handled silently by providing an empty stream.
//
// Note that the EventChannel does not own the handler and will not
// unregister it on destruction. The caller is responsible for unregistering
// the handler if it should no longer be called.
void SetStreamHandler(std::unique_ptr<StreamHandler<T>> handler) {
if (!handler) {
messenger_->SetMessageHandler(name_, nullptr);
is_listening_ = false;
return;
}

Expand All @@ -61,69 +64,75 @@ class EventChannel {
const MethodCodec<T>* codec = codec_;
const std::string channel_name = name_;
const BinaryMessenger* messenger = messenger_;
BinaryMessageHandler binary_handler = [shared_handler, codec, channel_name,
messenger,
this](const uint8_t* message,
const size_t message_size,
BinaryReply reply) {
constexpr char kOnListenMethod[] = "listen";
constexpr char kOnCancelMethod[] = "cancel";

std::unique_ptr<MethodCall<T>> method_call =
codec->DecodeMethodCall(message, message_size);
if (!method_call) {
std::cerr << "Unable to construct method call from message on channel: "
<< channel_name << std::endl;
reply(nullptr, 0);
return;
}

const std::string& method = method_call->method_name();
if (method.compare(kOnListenMethod) == 0) {
if (is_listening_) {
std::unique_ptr<StreamHandlerError<T>> error =
shared_handler->OnCancel(nullptr);
if (error) {
std::cerr << "Failed to cancel existing stream: "
<< (error->error_code) << ", " << (error->error_message)
<< ", " << (error->error_details);
BinaryMessageHandler binary_handler =
[shared_handler, codec, channel_name, messenger,
// Mutable state to track the handler's listening status.
is_listening = bool(false)](const uint8_t* message,
const size_t message_size,
BinaryReply reply) mutable {
constexpr char kOnListenMethod[] = "listen";
constexpr char kOnCancelMethod[] = "cancel";

std::unique_ptr<MethodCall<T>> method_call =
codec->DecodeMethodCall(message, message_size);
if (!method_call) {
std::cerr
<< "Unable to construct method call from message on channel: "
<< channel_name << std::endl;
reply(nullptr, 0);
return;
}
}
is_listening_ = true;

std::unique_ptr<std::vector<uint8_t>> result;
auto sink = std::make_unique<EventSinkImplementation>(
messenger, channel_name, codec);
std::unique_ptr<StreamHandlerError<T>> error =
shared_handler->OnListen(method_call->arguments(), std::move(sink));
if (error) {
result = codec->EncodeErrorEnvelope(
error->error_code, error->error_message, error->error_details);
} else {
result = codec->EncodeSuccessEnvelope();
}
reply(result->data(), result->size());
} else if (method.compare(kOnCancelMethod) == 0) {
std::unique_ptr<std::vector<uint8_t>> result;
if (is_listening_) {
std::unique_ptr<StreamHandlerError<T>> error =
shared_handler->OnCancel(method_call->arguments());
if (error) {
result = codec->EncodeErrorEnvelope(
error->error_code, error->error_message, error->error_details);

const std::string& method = method_call->method_name();
if (method.compare(kOnListenMethod) == 0) {
if (is_listening) {
std::unique_ptr<StreamHandlerError<T>> error =
shared_handler->OnCancel(nullptr);
if (error) {
std::cerr << "Failed to cancel existing stream: "
<< (error->error_code) << ", "
<< (error->error_message) << ", "
<< (error->error_details);
}
}
is_listening = true;

std::unique_ptr<std::vector<uint8_t>> result;
auto sink = std::make_unique<EventSinkImplementation>(
messenger, channel_name, codec);
std::unique_ptr<StreamHandlerError<T>> error =
shared_handler->OnListen(method_call->arguments(),
std::move(sink));
if (error) {
result = codec->EncodeErrorEnvelope(error->error_code,
error->error_message,
error->error_details);
} else {
result = codec->EncodeSuccessEnvelope();
}
reply(result->data(), result->size());
} else if (method.compare(kOnCancelMethod) == 0) {
std::unique_ptr<std::vector<uint8_t>> result;
if (is_listening) {
std::unique_ptr<StreamHandlerError<T>> error =
shared_handler->OnCancel(method_call->arguments());
if (error) {
result = codec->EncodeErrorEnvelope(error->error_code,
error->error_message,
error->error_details);
} else {
result = codec->EncodeSuccessEnvelope();
}
is_listening = false;
} else {
result = codec->EncodeErrorEnvelope(
"error", "No active stream to cancel", nullptr);
}
reply(result->data(), result->size());
} else {
result = codec->EncodeSuccessEnvelope();
reply(nullptr, 0);
}
is_listening_ = false;
} else {
result = codec->EncodeErrorEnvelope(
"error", "No active stream to cancel", nullptr);
}
reply(result->data(), result->size());
} else {
reply(nullptr, 0);
}
};
};
messenger_->SetMessageHandler(name_, std::move(binary_handler));
}

Expand Down Expand Up @@ -165,7 +174,6 @@ class EventChannel {
BinaryMessenger* messenger_;
const std::string name_;
const MethodCodec<T>* codec_;
bool is_listening_ = false;
};

} // namespace flutter
Expand Down