From c5354c0912368d5af4e3f90e8b650b086e5383ca Mon Sep 17 00:00:00 2001 From: Diego Santo Date: Wed, 5 Nov 2025 21:23:20 -0300 Subject: [PATCH] feat: enable config to set channel to private - insert config to get replay from broadcast --- Realtime/Broadcast/BroadcastOptions.cs | 30 ++- Realtime/Channel/ChannelOptions.cs | 52 ++++- Realtime/Channel/JoinPush.cs | 8 +- Realtime/Client.cs | 23 ++ Realtime/Interfaces/IRealtimeChannel.cs | 10 + Realtime/Interfaces/IRealtimeClient.cs | 10 + Realtime/Models/BaseBroadcast.cs | 16 ++ Realtime/RealtimeChannel.cs | 29 ++- RealtimeTests/ChannelBroadcastTests.cs | 244 ++++++++++++++++++-- RealtimeTests/Helpers.cs | 21 +- supabase/migrations/20250224164421_init.sql | 33 ++- 11 files changed, 439 insertions(+), 37 deletions(-) diff --git a/Realtime/Broadcast/BroadcastOptions.cs b/Realtime/Broadcast/BroadcastOptions.cs index 1114de4..53fc358 100644 --- a/Realtime/Broadcast/BroadcastOptions.cs +++ b/Realtime/Broadcast/BroadcastOptions.cs @@ -3,7 +3,7 @@ namespace Supabase.Realtime.Broadcast; /// -/// Options +/// Options /// public class BroadcastOptions { @@ -19,6 +19,32 @@ public class BroadcastOptions [JsonProperty("ack")] public bool BroadcastAck { get; set; } = false; + /// + /// replay option instructs server to replay broadcast messages + /// + [JsonProperty("replay", NullValueHandling = NullValueHandling.Ignore)] + public ReplayOptions? Replay { get; set; } + + /// + /// Options for replaying events in broadcast configurations. + /// + public class ReplayOptions + { + /// + /// Specifies the starting point in time, in milliseconds since the Unix epoch, + /// from which events should be replayed in the broadcast configuration. + /// + [JsonProperty("since")] + public long Since { get; set; } + + /// + /// Specifies the maximum number of events to be replayed during broadcast. + /// When set to null, there is no limit to the number of events replayed. + /// + [JsonProperty("limit", NullValueHandling = NullValueHandling.Ignore)] + public int? Limit { get; set; } + } + /// /// Initializes broadcast options /// @@ -29,4 +55,4 @@ public BroadcastOptions(bool broadcastSelf = false, bool broadcastAck = false) BroadcastSelf = broadcastSelf; BroadcastAck = broadcastAck; } -} \ No newline at end of file +} diff --git a/Realtime/Channel/ChannelOptions.cs b/Realtime/Channel/ChannelOptions.cs index 48868c1..f621bdb 100644 --- a/Realtime/Channel/ChannelOptions.cs +++ b/Realtime/Channel/ChannelOptions.cs @@ -1,12 +1,16 @@ -using Newtonsoft.Json; -using System; +using System; using System.Collections.Generic; +using Newtonsoft.Json; namespace Supabase.Realtime.Channel; /// -/// Channel Options +/// Represents configuration options for a Realtime channel. /// +/// +/// This class contains all the necessary configuration options for establishing and maintaining +/// a Realtime channel connection, including authentication, parameters, and serialization settings. +/// public class ChannelOptions { /// @@ -29,16 +33,48 @@ public class ChannelOptions /// public JsonSerializerSettings SerializerSettings { get; } + /// + /// Gets a value indicating whether the channel is private. + /// + /// + /// true if the channel is private; otherwise, false. + /// + public bool IsPrivate { get; } = false; + + /// + /// The Channel Options (typically only called from within the ) + /// + /// The client configuration options. + /// A function that returns the current access token. + /// The JSON serializer settings to be used for message serialization. + public ChannelOptions( + ClientOptions clientOptions, + Func retrieveAccessToken, + JsonSerializerSettings serializerSettings + ) + { + ClientOptions = clientOptions; + SerializerSettings = serializerSettings; + RetrieveAccessToken = retrieveAccessToken; + } + /// /// The Channel Options (typically only called from within the ) /// - /// - /// - /// - public ChannelOptions(ClientOptions clientOptions, Func retrieveAccessToken, JsonSerializerSettings serializerSettings) + /// The client configuration options. + /// A function that returns the current access token. + /// The JSON serializer settings to be used for message serialization. + /// A value indicating whether the channel is private. + public ChannelOptions( + ClientOptions clientOptions, + Func retrieveAccessToken, + JsonSerializerSettings serializerSettings, + bool isPrivate + ) { ClientOptions = clientOptions; SerializerSettings = serializerSettings; RetrieveAccessToken = retrieveAccessToken; + IsPrivate = isPrivate; } -} \ No newline at end of file +} diff --git a/Realtime/Channel/JoinPush.cs b/Realtime/Channel/JoinPush.cs index d24d8a3..7401790 100644 --- a/Realtime/Channel/JoinPush.cs +++ b/Realtime/Channel/JoinPush.cs @@ -11,13 +11,14 @@ internal class JoinPush [JsonProperty("config")] public JoinPushConfig Config { get; private set; } - public JoinPush(BroadcastOptions? broadcastOptions = null, PresenceOptions? presenceOptions = null, List? postgresChangesOptions = null) + public JoinPush(BroadcastOptions? broadcastOptions = null, PresenceOptions? presenceOptions = null, List? postgresChangesOptions = null, bool? isPrivate = null) { Config = new JoinPushConfig { Broadcast = broadcastOptions, Presence = presenceOptions, - PostgresChanges = postgresChangesOptions ?? new List() + PostgresChanges = postgresChangesOptions ?? new List(), + IsPrivate = isPrivate }; } @@ -31,5 +32,8 @@ internal class JoinPushConfig [JsonProperty("postgres_changes", NullValueHandling = NullValueHandling.Ignore)] public List PostgresChanges { get; set; } = new List { }; + + [JsonProperty("private", NullValueHandling = NullValueHandling.Ignore)] + public bool? IsPrivate { get; set; } } } \ No newline at end of file diff --git a/Realtime/Client.cs b/Realtime/Client.cs index 1e54549..ca9c6ed 100644 --- a/Realtime/Client.cs +++ b/Realtime/Client.cs @@ -360,6 +360,29 @@ public RealtimeChannel Channel(string channelName) return subscription; } + /// + /// Adds a RealtimeChannel subscription with custom options - if a subscription exists with the same signature, the existing subscription will be returned. + /// + /// The name of the Channel to join + /// Custom channel options for configuring the subscription + /// A RealtimeChannel instance representing the subscription + /// Thrown when Socket is null, indicating Connect() was not called + public RealtimeChannel Channel(string channelName, ChannelOptions options) + { + var topic = $"realtime:{channelName}"; + + if (_subscriptions.TryGetValue(topic, out var channel)) + return channel; + + if (Socket == null) + throw new Exception("Socket must exist, was `Connect` called?"); + + var subs = new RealtimeChannel(Socket!, topic, options); + _subscriptions.Add(topic, subs); + + return subs; + } + /// /// Adds a RealtimeChannel subscription - if a subscription exists with the same signature, the existing subscription will be returned. /// diff --git a/Realtime/Interfaces/IRealtimeChannel.cs b/Realtime/Interfaces/IRealtimeChannel.cs index f6d5513..896e70a 100644 --- a/Realtime/Interfaces/IRealtimeChannel.cs +++ b/Realtime/Interfaces/IRealtimeChannel.cs @@ -215,6 +215,16 @@ public interface IRealtimeChannel RealtimeBroadcast Register(bool broadcastSelf = false, bool broadcastAck = false) where TBroadcastResponse : BaseBroadcast; + + /// + /// Registers the channel with the specified configuration options. + /// + /// The configuration options for the broadcast registration. + /// The type of the broadcast response, which must inherit from . + /// A instance for managing the broadcast. + public RealtimeBroadcast Register(BroadcastOptions options) + where TBroadcastResponse : BaseBroadcast; + /// /// Register presence options, must be called to use , and prior to /// diff --git a/Realtime/Interfaces/IRealtimeClient.cs b/Realtime/Interfaces/IRealtimeClient.cs index a8f9cf7..c78cde5 100644 --- a/Realtime/Interfaces/IRealtimeClient.cs +++ b/Realtime/Interfaces/IRealtimeClient.cs @@ -5,6 +5,7 @@ using System.Net.WebSockets; using System.Threading.Tasks; using Supabase.Core.Interfaces; +using Supabase.Realtime.Channel; using Supabase.Realtime.Exceptions; using static Supabase.Realtime.Constants; @@ -85,6 +86,15 @@ public interface IRealtimeClient: IGettableHeaders /// TChannel Channel(string channelName); + /// + /// Adds a RealtimeChannel subscription with custom options - if a subscription exists with the same signature, the existing subscription will be returned. + /// + /// The name of the Channel to join + /// Custom channel options for configuring the subscription + /// A RealtimeChannel instance representing the subscription + /// Thrown when Socket is null, indicating Connect() was not called + TChannel Channel(string channelName, ChannelOptions options); + /// /// Shorthand initialization of a channel with postgres_changes options already set. /// diff --git a/Realtime/Models/BaseBroadcast.cs b/Realtime/Models/BaseBroadcast.cs index 62b0013..ed24cad 100644 --- a/Realtime/Models/BaseBroadcast.cs +++ b/Realtime/Models/BaseBroadcast.cs @@ -32,4 +32,20 @@ public class BaseBroadcast /// [JsonProperty("payload")] public Dictionary? Payload { get; set; } + + /// + /// Additional metadata associated with a broadcast event. + /// + [JsonProperty("meta", NullValueHandling = NullValueHandling.Ignore)] + public Meta? Meta { get; set; } +} + + +public class Meta +{ + [JsonProperty("id")] + public string Id { get; set; } + + [JsonProperty("replayed")] + public bool Replayed { get; set; } } \ No newline at end of file diff --git a/Realtime/RealtimeChannel.cs b/Realtime/RealtimeChannel.cs index c38ea8f..b981667 100644 --- a/Realtime/RealtimeChannel.cs +++ b/Realtime/RealtimeChannel.cs @@ -218,6 +218,33 @@ public RealtimeBroadcast Register(bool b return instance; } + /// + /// Registers the channel for broadcast with the specified options. + /// + /// The type of the broadcast response, which must inherit from . + /// The broadcast options to configure the channel's broadcast behavior. + /// Returns an instance of initialized with the specified broadcast options. + /// Thrown if the method is called multiple times for the same channel. + public RealtimeBroadcast Register(BroadcastOptions options) where TBroadcastResponse : BaseBroadcast + { + if (_broadcast != null) + throw new InvalidOperationException( + "Register can only be called with broadcast options for a channel once."); + + if (!Options.IsPrivate && options.Replay != null) + throw new InvalidOperationException($"tried to use replay on public channel '{Topic}'. It must be a private channel."); + + BroadcastOptions = options; + + var instance = + new RealtimeBroadcast(this, BroadcastOptions, Options.SerializerSettings); + _broadcast = instance; + + BroadcastHandler = (_, response) => _broadcast.TriggerReceived(response); + + return instance; + } + /// /// Registers a instance - allowing presence responses to be parsed and state to be tracked. /// @@ -592,7 +619,7 @@ internal void Enqueue(Push push) /// /// private Push GenerateJoinPush() => new(Socket, this, ChannelEventJoin, - payload: new JoinPush(BroadcastOptions, PresenceOptions, PostgresChangesOptions)); + payload: new JoinPush(BroadcastOptions, PresenceOptions, PostgresChangesOptions, Options.IsPrivate)); /// /// Generates an auth push. diff --git a/RealtimeTests/ChannelBroadcastTests.cs b/RealtimeTests/ChannelBroadcastTests.cs index aa8b3b6..8535cc2 100644 --- a/RealtimeTests/ChannelBroadcastTests.cs +++ b/RealtimeTests/ChannelBroadcastTests.cs @@ -1,10 +1,13 @@ using System; +using System.Collections.Generic; using System.Threading.Tasks; using Microsoft.VisualStudio.TestTools.UnitTesting; using Newtonsoft.Json; -using Supabase.Postgrest.Interfaces; using RealtimeTests.Models; +using Supabase.Postgrest.Interfaces; using Supabase.Realtime; +using Supabase.Realtime.Broadcast; +using Supabase.Realtime.Channel; using Supabase.Realtime.Interfaces; using Supabase.Realtime.Models; using Supabase.Realtime.PostgresChanges; @@ -14,7 +17,8 @@ namespace RealtimeTests; public class BroadcastExample : BaseBroadcast { - [JsonProperty("userId")] public string? UserId { get; set; } + [JsonProperty("userId")] + public string? UserId { get; set; } } [TestClass] @@ -48,23 +52,27 @@ public async Task ClientCanListenForBroadcast() var channel1 = _socketClient!.Channel("online-users"); var broadcast1 = channel1.Register(true, true); - broadcast1.AddBroadcastEventHandler((_, _) => - { - var broadcast = broadcast1.Current(); - if (broadcast?.UserId != guid1 && broadcast?.Event == "user") - tsc.TrySetResult(true); - }); + broadcast1.AddBroadcastEventHandler( + (_, _) => + { + var broadcast = broadcast1.Current(); + if (broadcast?.UserId != guid1 && broadcast?.Event == "user") + tsc.TrySetResult(true); + } + ); var client2 = Helpers.SocketClient(); await client2.ConnectAsync(); var channel2 = client2.Channel("online-users"); var broadcast2 = channel2.Register(true, true); - broadcast2.AddBroadcastEventHandler((_, _) => - { - var broadcast = broadcast2.Current(); - if (broadcast?.UserId != guid2 && broadcast?.Event == "user") - tsc2.TrySetResult(true); - }); + broadcast2.AddBroadcastEventHandler( + (_, _) => + { + var broadcast = broadcast2.Current(); + if (broadcast?.UserId != guid2 && broadcast?.Event == "user") + tsc2.TrySetResult(true); + } + ); await channel1.Subscribe(); await channel2.Subscribe(); @@ -75,6 +83,195 @@ public async Task ClientCanListenForBroadcast() await Task.WhenAll(new[] { tsc.Task, tsc2.Task }); } + [TestMethod("Channel: Can listen for broadcast")] + public async Task ClientCanListenForBroadcastPrivate() + { + var tsc = new TaskCompletionSource(); + var tsc2 = new TaskCompletionSource(); + + var guid1 = Guid.NewGuid().ToString(); + var guid2 = Guid.NewGuid().ToString(); + + var client1 = Helpers.PrivateSocketClient(); + await client1.ConnectAsync(); + var options1 = new ChannelOptions( + client1.Options, + () => Helpers.ApiKey, + new JsonSerializerSettings(), + true + ); + var channel1 = client1.Channel("online-users", options1); + var broadcast1 = channel1.Register(true, true); + broadcast1.AddBroadcastEventHandler( + (_, _) => + { + var broadcast = broadcast1.Current(); + if (broadcast?.UserId != guid1 && broadcast?.Event == "user") + tsc.TrySetResult(true); + } + ); + + var client2 = Helpers.PrivateSocketClient(); + await client2.ConnectAsync(); + var options2 = new ChannelOptions( + _socketClient!.Options, + () => Helpers.ApiKey, + new JsonSerializerSettings(), + true + ); + var channel2 = client2.Channel("online-users", options2); + var broadcast2 = channel2.Register(true, true); + broadcast2.AddBroadcastEventHandler( + (_, _) => + { + var broadcast = broadcast2.Current(); + if (broadcast?.UserId != guid2 && broadcast?.Event == "user") + tsc2.TrySetResult(true); + } + ); + + await channel1.Subscribe(); + await channel2.Subscribe(); + + await broadcast1.Send("user", new BroadcastExample { UserId = guid1 }); + await broadcast2.Send("user", new BroadcastExample { UserId = guid2 }); + + await Task.WhenAll(new[] { tsc.Task, tsc2.Task }); + } + + [TestMethod("Channel: Cannot listen for private broadcast")] + public async Task ClientCannotListenForBroadcastPrivate() + { + var tsc = new TaskCompletionSource(); + var tsc2 = new TaskCompletionSource(); + + var guid1 = Guid.NewGuid().ToString(); + var guid2 = Guid.NewGuid().ToString(); + + var channel1 = _socketClient!.Channel("online-users"); + var broadcast1 = channel1.Register(true, true); + broadcast1.AddBroadcastEventHandler( + (_, broadcast) => + { + if (broadcast is not BroadcastExample broad1) + { + tsc.TrySetResult(true); + return; + } + + if (broad1.UserId == guid1 && broad1.Event == "user1") + tsc.TrySetResult(true); + } + ); + + var client2 = Helpers.PrivateSocketClient(); + await client2.ConnectAsync(); + var options = new ChannelOptions( + client2.Options, + () => Helpers.ApiKey, + new JsonSerializerSettings(), + true + ); + var channel2 = client2.Channel("online-users", options); + var broadcast2 = channel2.Register(true, true); + broadcast2.AddBroadcastEventHandler( + (sender, broad) => + { + if (broad is not BroadcastExample broadcast) + { + tsc2.TrySetResult(false); + return; + } + + if (broadcast.UserId == guid2 && broadcast.Event == "user2") + tsc2.TrySetResult(true); + } + ); + + await channel1.Subscribe(); + await channel2.Subscribe(); + + await broadcast1.Send("user1", new BroadcastExample { UserId = guid1 }); + await broadcast2.Send("user2", new BroadcastExample { UserId = guid2 }); + + await Task.WhenAll(new[] { tsc.Task, tsc2.Task }); + } + + [TestMethod("Channel: Can listen history for private broadcast")] + public async Task ClientCanListenHistoryForBroadcastPrivate() + { + var send = new Dictionary + { + { "event", "user" }, + { "topic", "online-users" }, + { "private", true } + }; + await _restClient!.Rpc("send", send); + + var tsc = new TaskCompletionSource(); + + var client1 = Helpers.PrivateSocketClient(); + await client1.ConnectAsync(); + var options1 = new ChannelOptions( + client1.Options, + () => null, + new JsonSerializerSettings(), + true + ); + var broadcastOptions = new BroadcastOptions + { + BroadcastAck = true, + BroadcastSelf = true, + Replay = new BroadcastOptions.ReplayOptions + { + Limit = 10, + Since = DateTimeOffset.UtcNow.AddDays(-3).ToUnixTimeMilliseconds() + } + }; + var channel1 = client1.Channel("online-users", options1); + var broadcast1 = channel1.Register(broadcastOptions); + broadcast1.AddBroadcastEventHandler( + (_, _) => + { + + var broadcast = broadcast1.Current(); + if (broadcast is { Event: "user", Meta.Replayed: true }) + tsc.TrySetResult(true); + } + ); + + await channel1.Subscribe(); + + await Task.WhenAll(tsc.Task); + } + + [TestMethod("Channel: Cannot listen broadcast replay on public channel")] + public async Task ClientCannotListenForBroadcastReplay() + { + var client1 = Helpers.PrivateSocketClient(); + await client1.ConnectAsync(); + var options1 = new ChannelOptions( + client1.Options, + () => null, + new JsonSerializerSettings(), + false + ); + var broadcastOptions = new BroadcastOptions + { + BroadcastAck = true, + BroadcastSelf = true, + Replay = new BroadcastOptions.ReplayOptions + { + Limit = 10, + Since = DateTimeOffset.UtcNow.AddDays(-3).ToUnixTimeMilliseconds() + } + }; + var channel1 = client1.Channel("online-users", options1); + var action = () => channel1.Register(broadcastOptions); + + Assert.ThrowsException(action); + } + [TestMethod("Channel: Payload returns a modeled response (if possible)")] public async Task ChannelPayloadReturnsModel() { @@ -82,17 +279,22 @@ public async Task ChannelPayloadReturnsModel() var channel = _socketClient!.Channel("example"); channel.Register(new PostgresChangesOptions("public", "*")); - channel.AddPostgresChangeHandler(ListenType.Inserts, (_, changes) => - { - var model = changes.Model(); - tsc.SetResult(model != null); - }); + channel.AddPostgresChangeHandler( + ListenType.Inserts, + (_, changes) => + { + var model = changes.Model(); + tsc.SetResult(model != null); + } + ); await channel.Subscribe(); - await _restClient!.Table().Insert(new Todo { UserId = 1, Details = "Client Models a response? ✅" }); + await _restClient! + .Table() + .Insert(new Todo { UserId = 1, Details = "Client Models a response? ✅" }); var check = await tsc.Task; Assert.IsTrue(check); } -} \ No newline at end of file +} diff --git a/RealtimeTests/Helpers.cs b/RealtimeTests/Helpers.cs index ffd05f8..3b16077 100644 --- a/RealtimeTests/Helpers.cs +++ b/RealtimeTests/Helpers.cs @@ -1,4 +1,5 @@ -using System.Diagnostics; +using System.Collections.Generic; +using System.Diagnostics; using Supabase.Realtime; using Supabase.Realtime.Socket; using Client = Supabase.Realtime.Client; @@ -7,7 +8,8 @@ namespace RealtimeTests; internal static class Helpers { - private const string ApiKey = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZS1kZW1vIiwicm9sZSI6ImFub24iLCJleHAiOjE5ODM4MTI5OTZ9.CRXP1A7WOeoJeXxjNni43kdQwgnWNReilDMblYTn_I0"; + public const string ApiKeyAnon = "sb_publishable_ACJWlzQHlZjBrEguHvfOxg_3BJgxAaH"; + public const string ApiKey = "sb_secret_N7UND0UgjKTVK-Uodkm0Hg_xSvEMPvz"; private const string SocketEndpoint = "ws://127.0.0.1:54321/realtime/v1"; private const string RestEndpoint = "http://localhost:54321/rest/v1"; @@ -15,6 +17,21 @@ internal static class Helpers public static Supabase.Postgrest.Client RestClient() => new(RestEndpoint, new Supabase.Postgrest.ClientOptions()); public static Client SocketClient() + { + var client = new Client(SocketEndpoint, new ClientOptions + { + Parameters = new SocketOptionsParameters + { + ApiKey = ApiKeyAnon + } + }); + + client.AddDebugHandler((_, message, _) => Debug.WriteLine(message)); + + return client; + } + + public static Client PrivateSocketClient() { var client = new Client(SocketEndpoint, new ClientOptions { diff --git a/supabase/migrations/20250224164421_init.sql b/supabase/migrations/20250224164421_init.sql index 718e09b..08fc4a1 100644 --- a/supabase/migrations/20250224164421_init.sql +++ b/supabase/migrations/20250224164421_init.sql @@ -182,4 +182,35 @@ SELECT status from users WHERE username = name_param; $$ - LANGUAGE SQL IMMUTABLE; \ No newline at end of file + LANGUAGE SQL IMMUTABLE; + + +CREATE OR REPLACE FUNCTION public.send( + event text, + topic text, + private boolean +) +RETURNS void +LANGUAGE plpgsql +AS $$ +BEGIN +BEGIN + -- Set the topic configuration +EXECUTE format('SET LOCAL realtime.topic TO %L', topic); + +-- Attempt to insert the message +INSERT INTO realtime.messages (payload, event, topic, private, extension) +VALUES (null, event, topic, private, 'broadcast'); +EXCEPTION + WHEN OTHERS THEN + -- Capture and notify the error + RAISE WARNING 'ErrorSendingBroadcastMessage: %', SQLERRM; +END; +END; +$$; + +CREATE POLICY messages_insert_all +ON realtime.messages +FOR INSERT +TO PUBLIC +WITH CHECK (true);