diff --git a/Realtime/Broadcast/BroadcastOptions.cs b/Realtime/Broadcast/BroadcastOptions.cs
index 1114de4..53fc358 100644
--- a/Realtime/Broadcast/BroadcastOptions.cs
+++ b/Realtime/Broadcast/BroadcastOptions.cs
@@ -3,7 +3,7 @@
namespace Supabase.Realtime.Broadcast;
///
-/// Options
+/// Options
///
public class BroadcastOptions
{
@@ -19,6 +19,32 @@ public class BroadcastOptions
[JsonProperty("ack")]
public bool BroadcastAck { get; set; } = false;
+ ///
+ /// replay option instructs server to replay broadcast messages
+ ///
+ [JsonProperty("replay", NullValueHandling = NullValueHandling.Ignore)]
+ public ReplayOptions? Replay { get; set; }
+
+ ///
+ /// Options for replaying events in broadcast configurations.
+ ///
+ public class ReplayOptions
+ {
+ ///
+ /// Specifies the starting point in time, in milliseconds since the Unix epoch,
+ /// from which events should be replayed in the broadcast configuration.
+ ///
+ [JsonProperty("since")]
+ public long Since { get; set; }
+
+ ///
+ /// Specifies the maximum number of events to be replayed during broadcast.
+ /// When set to null, there is no limit to the number of events replayed.
+ ///
+ [JsonProperty("limit", NullValueHandling = NullValueHandling.Ignore)]
+ public int? Limit { get; set; }
+ }
+
///
/// Initializes broadcast options
///
@@ -29,4 +55,4 @@ public BroadcastOptions(bool broadcastSelf = false, bool broadcastAck = false)
BroadcastSelf = broadcastSelf;
BroadcastAck = broadcastAck;
}
-}
\ No newline at end of file
+}
diff --git a/Realtime/Channel/ChannelOptions.cs b/Realtime/Channel/ChannelOptions.cs
index 48868c1..f621bdb 100644
--- a/Realtime/Channel/ChannelOptions.cs
+++ b/Realtime/Channel/ChannelOptions.cs
@@ -1,12 +1,16 @@
-using Newtonsoft.Json;
-using System;
+using System;
using System.Collections.Generic;
+using Newtonsoft.Json;
namespace Supabase.Realtime.Channel;
///
-/// Channel Options
+/// Represents configuration options for a Realtime channel.
///
+///
+/// This class contains all the necessary configuration options for establishing and maintaining
+/// a Realtime channel connection, including authentication, parameters, and serialization settings.
+///
public class ChannelOptions
{
///
@@ -29,16 +33,48 @@ public class ChannelOptions
///
public JsonSerializerSettings SerializerSettings { get; }
+ ///
+ /// Gets a value indicating whether the channel is private.
+ ///
+ ///
+ /// true if the channel is private; otherwise, false.
+ ///
+ public bool IsPrivate { get; } = false;
+
+ ///
+ /// The Channel Options (typically only called from within the )
+ ///
+ /// The client configuration options.
+ /// A function that returns the current access token.
+ /// The JSON serializer settings to be used for message serialization.
+ public ChannelOptions(
+ ClientOptions clientOptions,
+ Func retrieveAccessToken,
+ JsonSerializerSettings serializerSettings
+ )
+ {
+ ClientOptions = clientOptions;
+ SerializerSettings = serializerSettings;
+ RetrieveAccessToken = retrieveAccessToken;
+ }
+
///
/// The Channel Options (typically only called from within the )
///
- ///
- ///
- ///
- public ChannelOptions(ClientOptions clientOptions, Func retrieveAccessToken, JsonSerializerSettings serializerSettings)
+ /// The client configuration options.
+ /// A function that returns the current access token.
+ /// The JSON serializer settings to be used for message serialization.
+ /// A value indicating whether the channel is private.
+ public ChannelOptions(
+ ClientOptions clientOptions,
+ Func retrieveAccessToken,
+ JsonSerializerSettings serializerSettings,
+ bool isPrivate
+ )
{
ClientOptions = clientOptions;
SerializerSettings = serializerSettings;
RetrieveAccessToken = retrieveAccessToken;
+ IsPrivate = isPrivate;
}
-}
\ No newline at end of file
+}
diff --git a/Realtime/Channel/JoinPush.cs b/Realtime/Channel/JoinPush.cs
index d24d8a3..7401790 100644
--- a/Realtime/Channel/JoinPush.cs
+++ b/Realtime/Channel/JoinPush.cs
@@ -11,13 +11,14 @@ internal class JoinPush
[JsonProperty("config")]
public JoinPushConfig Config { get; private set; }
- public JoinPush(BroadcastOptions? broadcastOptions = null, PresenceOptions? presenceOptions = null, List? postgresChangesOptions = null)
+ public JoinPush(BroadcastOptions? broadcastOptions = null, PresenceOptions? presenceOptions = null, List? postgresChangesOptions = null, bool? isPrivate = null)
{
Config = new JoinPushConfig
{
Broadcast = broadcastOptions,
Presence = presenceOptions,
- PostgresChanges = postgresChangesOptions ?? new List()
+ PostgresChanges = postgresChangesOptions ?? new List(),
+ IsPrivate = isPrivate
};
}
@@ -31,5 +32,8 @@ internal class JoinPushConfig
[JsonProperty("postgres_changes", NullValueHandling = NullValueHandling.Ignore)]
public List PostgresChanges { get; set; } = new List { };
+
+ [JsonProperty("private", NullValueHandling = NullValueHandling.Ignore)]
+ public bool? IsPrivate { get; set; }
}
}
\ No newline at end of file
diff --git a/Realtime/Client.cs b/Realtime/Client.cs
index 1e54549..ca9c6ed 100644
--- a/Realtime/Client.cs
+++ b/Realtime/Client.cs
@@ -360,6 +360,29 @@ public RealtimeChannel Channel(string channelName)
return subscription;
}
+ ///
+ /// Adds a RealtimeChannel subscription with custom options - if a subscription exists with the same signature, the existing subscription will be returned.
+ ///
+ /// The name of the Channel to join
+ /// Custom channel options for configuring the subscription
+ /// A RealtimeChannel instance representing the subscription
+ /// Thrown when Socket is null, indicating Connect() was not called
+ public RealtimeChannel Channel(string channelName, ChannelOptions options)
+ {
+ var topic = $"realtime:{channelName}";
+
+ if (_subscriptions.TryGetValue(topic, out var channel))
+ return channel;
+
+ if (Socket == null)
+ throw new Exception("Socket must exist, was `Connect` called?");
+
+ var subs = new RealtimeChannel(Socket!, topic, options);
+ _subscriptions.Add(topic, subs);
+
+ return subs;
+ }
+
///
/// Adds a RealtimeChannel subscription - if a subscription exists with the same signature, the existing subscription will be returned.
///
diff --git a/Realtime/Interfaces/IRealtimeChannel.cs b/Realtime/Interfaces/IRealtimeChannel.cs
index f6d5513..896e70a 100644
--- a/Realtime/Interfaces/IRealtimeChannel.cs
+++ b/Realtime/Interfaces/IRealtimeChannel.cs
@@ -215,6 +215,16 @@ public interface IRealtimeChannel
RealtimeBroadcast Register(bool broadcastSelf = false,
bool broadcastAck = false) where TBroadcastResponse : BaseBroadcast;
+
+ ///
+ /// Registers the channel with the specified configuration options.
+ ///
+ /// The configuration options for the broadcast registration.
+ /// The type of the broadcast response, which must inherit from .
+ /// A instance for managing the broadcast.
+ public RealtimeBroadcast Register(BroadcastOptions options)
+ where TBroadcastResponse : BaseBroadcast;
+
///
/// Register presence options, must be called to use , and prior to
///
diff --git a/Realtime/Interfaces/IRealtimeClient.cs b/Realtime/Interfaces/IRealtimeClient.cs
index a8f9cf7..c78cde5 100644
--- a/Realtime/Interfaces/IRealtimeClient.cs
+++ b/Realtime/Interfaces/IRealtimeClient.cs
@@ -5,6 +5,7 @@
using System.Net.WebSockets;
using System.Threading.Tasks;
using Supabase.Core.Interfaces;
+using Supabase.Realtime.Channel;
using Supabase.Realtime.Exceptions;
using static Supabase.Realtime.Constants;
@@ -85,6 +86,15 @@ public interface IRealtimeClient: IGettableHeaders
///
TChannel Channel(string channelName);
+ ///
+ /// Adds a RealtimeChannel subscription with custom options - if a subscription exists with the same signature, the existing subscription will be returned.
+ ///
+ /// The name of the Channel to join
+ /// Custom channel options for configuring the subscription
+ /// A RealtimeChannel instance representing the subscription
+ /// Thrown when Socket is null, indicating Connect() was not called
+ TChannel Channel(string channelName, ChannelOptions options);
+
///
/// Shorthand initialization of a channel with postgres_changes options already set.
///
diff --git a/Realtime/Models/BaseBroadcast.cs b/Realtime/Models/BaseBroadcast.cs
index 62b0013..ed24cad 100644
--- a/Realtime/Models/BaseBroadcast.cs
+++ b/Realtime/Models/BaseBroadcast.cs
@@ -32,4 +32,20 @@ public class BaseBroadcast
///
[JsonProperty("payload")]
public Dictionary? Payload { get; set; }
+
+ ///
+ /// Additional metadata associated with a broadcast event.
+ ///
+ [JsonProperty("meta", NullValueHandling = NullValueHandling.Ignore)]
+ public Meta? Meta { get; set; }
+}
+
+
+public class Meta
+{
+ [JsonProperty("id")]
+ public string Id { get; set; }
+
+ [JsonProperty("replayed")]
+ public bool Replayed { get; set; }
}
\ No newline at end of file
diff --git a/Realtime/RealtimeChannel.cs b/Realtime/RealtimeChannel.cs
index c38ea8f..b981667 100644
--- a/Realtime/RealtimeChannel.cs
+++ b/Realtime/RealtimeChannel.cs
@@ -218,6 +218,33 @@ public RealtimeBroadcast Register(bool b
return instance;
}
+ ///
+ /// Registers the channel for broadcast with the specified options.
+ ///
+ /// The type of the broadcast response, which must inherit from .
+ /// The broadcast options to configure the channel's broadcast behavior.
+ /// Returns an instance of initialized with the specified broadcast options.
+ /// Thrown if the method is called multiple times for the same channel.
+ public RealtimeBroadcast Register(BroadcastOptions options) where TBroadcastResponse : BaseBroadcast
+ {
+ if (_broadcast != null)
+ throw new InvalidOperationException(
+ "Register can only be called with broadcast options for a channel once.");
+
+ if (!Options.IsPrivate && options.Replay != null)
+ throw new InvalidOperationException($"tried to use replay on public channel '{Topic}'. It must be a private channel.");
+
+ BroadcastOptions = options;
+
+ var instance =
+ new RealtimeBroadcast(this, BroadcastOptions, Options.SerializerSettings);
+ _broadcast = instance;
+
+ BroadcastHandler = (_, response) => _broadcast.TriggerReceived(response);
+
+ return instance;
+ }
+
///
/// Registers a instance - allowing presence responses to be parsed and state to be tracked.
///
@@ -592,7 +619,7 @@ internal void Enqueue(Push push)
///
///
private Push GenerateJoinPush() => new(Socket, this, ChannelEventJoin,
- payload: new JoinPush(BroadcastOptions, PresenceOptions, PostgresChangesOptions));
+ payload: new JoinPush(BroadcastOptions, PresenceOptions, PostgresChangesOptions, Options.IsPrivate));
///
/// Generates an auth push.
diff --git a/RealtimeTests/ChannelBroadcastTests.cs b/RealtimeTests/ChannelBroadcastTests.cs
index aa8b3b6..8535cc2 100644
--- a/RealtimeTests/ChannelBroadcastTests.cs
+++ b/RealtimeTests/ChannelBroadcastTests.cs
@@ -1,10 +1,13 @@
using System;
+using System.Collections.Generic;
using System.Threading.Tasks;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using Newtonsoft.Json;
-using Supabase.Postgrest.Interfaces;
using RealtimeTests.Models;
+using Supabase.Postgrest.Interfaces;
using Supabase.Realtime;
+using Supabase.Realtime.Broadcast;
+using Supabase.Realtime.Channel;
using Supabase.Realtime.Interfaces;
using Supabase.Realtime.Models;
using Supabase.Realtime.PostgresChanges;
@@ -14,7 +17,8 @@ namespace RealtimeTests;
public class BroadcastExample : BaseBroadcast
{
- [JsonProperty("userId")] public string? UserId { get; set; }
+ [JsonProperty("userId")]
+ public string? UserId { get; set; }
}
[TestClass]
@@ -48,23 +52,27 @@ public async Task ClientCanListenForBroadcast()
var channel1 = _socketClient!.Channel("online-users");
var broadcast1 = channel1.Register(true, true);
- broadcast1.AddBroadcastEventHandler((_, _) =>
- {
- var broadcast = broadcast1.Current();
- if (broadcast?.UserId != guid1 && broadcast?.Event == "user")
- tsc.TrySetResult(true);
- });
+ broadcast1.AddBroadcastEventHandler(
+ (_, _) =>
+ {
+ var broadcast = broadcast1.Current();
+ if (broadcast?.UserId != guid1 && broadcast?.Event == "user")
+ tsc.TrySetResult(true);
+ }
+ );
var client2 = Helpers.SocketClient();
await client2.ConnectAsync();
var channel2 = client2.Channel("online-users");
var broadcast2 = channel2.Register(true, true);
- broadcast2.AddBroadcastEventHandler((_, _) =>
- {
- var broadcast = broadcast2.Current();
- if (broadcast?.UserId != guid2 && broadcast?.Event == "user")
- tsc2.TrySetResult(true);
- });
+ broadcast2.AddBroadcastEventHandler(
+ (_, _) =>
+ {
+ var broadcast = broadcast2.Current();
+ if (broadcast?.UserId != guid2 && broadcast?.Event == "user")
+ tsc2.TrySetResult(true);
+ }
+ );
await channel1.Subscribe();
await channel2.Subscribe();
@@ -75,6 +83,195 @@ public async Task ClientCanListenForBroadcast()
await Task.WhenAll(new[] { tsc.Task, tsc2.Task });
}
+ [TestMethod("Channel: Can listen for broadcast")]
+ public async Task ClientCanListenForBroadcastPrivate()
+ {
+ var tsc = new TaskCompletionSource();
+ var tsc2 = new TaskCompletionSource();
+
+ var guid1 = Guid.NewGuid().ToString();
+ var guid2 = Guid.NewGuid().ToString();
+
+ var client1 = Helpers.PrivateSocketClient();
+ await client1.ConnectAsync();
+ var options1 = new ChannelOptions(
+ client1.Options,
+ () => Helpers.ApiKey,
+ new JsonSerializerSettings(),
+ true
+ );
+ var channel1 = client1.Channel("online-users", options1);
+ var broadcast1 = channel1.Register(true, true);
+ broadcast1.AddBroadcastEventHandler(
+ (_, _) =>
+ {
+ var broadcast = broadcast1.Current();
+ if (broadcast?.UserId != guid1 && broadcast?.Event == "user")
+ tsc.TrySetResult(true);
+ }
+ );
+
+ var client2 = Helpers.PrivateSocketClient();
+ await client2.ConnectAsync();
+ var options2 = new ChannelOptions(
+ _socketClient!.Options,
+ () => Helpers.ApiKey,
+ new JsonSerializerSettings(),
+ true
+ );
+ var channel2 = client2.Channel("online-users", options2);
+ var broadcast2 = channel2.Register(true, true);
+ broadcast2.AddBroadcastEventHandler(
+ (_, _) =>
+ {
+ var broadcast = broadcast2.Current();
+ if (broadcast?.UserId != guid2 && broadcast?.Event == "user")
+ tsc2.TrySetResult(true);
+ }
+ );
+
+ await channel1.Subscribe();
+ await channel2.Subscribe();
+
+ await broadcast1.Send("user", new BroadcastExample { UserId = guid1 });
+ await broadcast2.Send("user", new BroadcastExample { UserId = guid2 });
+
+ await Task.WhenAll(new[] { tsc.Task, tsc2.Task });
+ }
+
+ [TestMethod("Channel: Cannot listen for private broadcast")]
+ public async Task ClientCannotListenForBroadcastPrivate()
+ {
+ var tsc = new TaskCompletionSource();
+ var tsc2 = new TaskCompletionSource();
+
+ var guid1 = Guid.NewGuid().ToString();
+ var guid2 = Guid.NewGuid().ToString();
+
+ var channel1 = _socketClient!.Channel("online-users");
+ var broadcast1 = channel1.Register(true, true);
+ broadcast1.AddBroadcastEventHandler(
+ (_, broadcast) =>
+ {
+ if (broadcast is not BroadcastExample broad1)
+ {
+ tsc.TrySetResult(true);
+ return;
+ }
+
+ if (broad1.UserId == guid1 && broad1.Event == "user1")
+ tsc.TrySetResult(true);
+ }
+ );
+
+ var client2 = Helpers.PrivateSocketClient();
+ await client2.ConnectAsync();
+ var options = new ChannelOptions(
+ client2.Options,
+ () => Helpers.ApiKey,
+ new JsonSerializerSettings(),
+ true
+ );
+ var channel2 = client2.Channel("online-users", options);
+ var broadcast2 = channel2.Register(true, true);
+ broadcast2.AddBroadcastEventHandler(
+ (sender, broad) =>
+ {
+ if (broad is not BroadcastExample broadcast)
+ {
+ tsc2.TrySetResult(false);
+ return;
+ }
+
+ if (broadcast.UserId == guid2 && broadcast.Event == "user2")
+ tsc2.TrySetResult(true);
+ }
+ );
+
+ await channel1.Subscribe();
+ await channel2.Subscribe();
+
+ await broadcast1.Send("user1", new BroadcastExample { UserId = guid1 });
+ await broadcast2.Send("user2", new BroadcastExample { UserId = guid2 });
+
+ await Task.WhenAll(new[] { tsc.Task, tsc2.Task });
+ }
+
+ [TestMethod("Channel: Can listen history for private broadcast")]
+ public async Task ClientCanListenHistoryForBroadcastPrivate()
+ {
+ var send = new Dictionary
+ {
+ { "event", "user" },
+ { "topic", "online-users" },
+ { "private", true }
+ };
+ await _restClient!.Rpc("send", send);
+
+ var tsc = new TaskCompletionSource();
+
+ var client1 = Helpers.PrivateSocketClient();
+ await client1.ConnectAsync();
+ var options1 = new ChannelOptions(
+ client1.Options,
+ () => null,
+ new JsonSerializerSettings(),
+ true
+ );
+ var broadcastOptions = new BroadcastOptions
+ {
+ BroadcastAck = true,
+ BroadcastSelf = true,
+ Replay = new BroadcastOptions.ReplayOptions
+ {
+ Limit = 10,
+ Since = DateTimeOffset.UtcNow.AddDays(-3).ToUnixTimeMilliseconds()
+ }
+ };
+ var channel1 = client1.Channel("online-users", options1);
+ var broadcast1 = channel1.Register(broadcastOptions);
+ broadcast1.AddBroadcastEventHandler(
+ (_, _) =>
+ {
+
+ var broadcast = broadcast1.Current();
+ if (broadcast is { Event: "user", Meta.Replayed: true })
+ tsc.TrySetResult(true);
+ }
+ );
+
+ await channel1.Subscribe();
+
+ await Task.WhenAll(tsc.Task);
+ }
+
+ [TestMethod("Channel: Cannot listen broadcast replay on public channel")]
+ public async Task ClientCannotListenForBroadcastReplay()
+ {
+ var client1 = Helpers.PrivateSocketClient();
+ await client1.ConnectAsync();
+ var options1 = new ChannelOptions(
+ client1.Options,
+ () => null,
+ new JsonSerializerSettings(),
+ false
+ );
+ var broadcastOptions = new BroadcastOptions
+ {
+ BroadcastAck = true,
+ BroadcastSelf = true,
+ Replay = new BroadcastOptions.ReplayOptions
+ {
+ Limit = 10,
+ Since = DateTimeOffset.UtcNow.AddDays(-3).ToUnixTimeMilliseconds()
+ }
+ };
+ var channel1 = client1.Channel("online-users", options1);
+ var action = () => channel1.Register(broadcastOptions);
+
+ Assert.ThrowsException(action);
+ }
+
[TestMethod("Channel: Payload returns a modeled response (if possible)")]
public async Task ChannelPayloadReturnsModel()
{
@@ -82,17 +279,22 @@ public async Task ChannelPayloadReturnsModel()
var channel = _socketClient!.Channel("example");
channel.Register(new PostgresChangesOptions("public", "*"));
- channel.AddPostgresChangeHandler(ListenType.Inserts, (_, changes) =>
- {
- var model = changes.Model();
- tsc.SetResult(model != null);
- });
+ channel.AddPostgresChangeHandler(
+ ListenType.Inserts,
+ (_, changes) =>
+ {
+ var model = changes.Model();
+ tsc.SetResult(model != null);
+ }
+ );
await channel.Subscribe();
- await _restClient!.Table().Insert(new Todo { UserId = 1, Details = "Client Models a response? ✅" });
+ await _restClient!
+ .Table()
+ .Insert(new Todo { UserId = 1, Details = "Client Models a response? ✅" });
var check = await tsc.Task;
Assert.IsTrue(check);
}
-}
\ No newline at end of file
+}
diff --git a/RealtimeTests/Helpers.cs b/RealtimeTests/Helpers.cs
index ffd05f8..3b16077 100644
--- a/RealtimeTests/Helpers.cs
+++ b/RealtimeTests/Helpers.cs
@@ -1,4 +1,5 @@
-using System.Diagnostics;
+using System.Collections.Generic;
+using System.Diagnostics;
using Supabase.Realtime;
using Supabase.Realtime.Socket;
using Client = Supabase.Realtime.Client;
@@ -7,7 +8,8 @@ namespace RealtimeTests;
internal static class Helpers
{
- private const string ApiKey = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZS1kZW1vIiwicm9sZSI6ImFub24iLCJleHAiOjE5ODM4MTI5OTZ9.CRXP1A7WOeoJeXxjNni43kdQwgnWNReilDMblYTn_I0";
+ public const string ApiKeyAnon = "sb_publishable_ACJWlzQHlZjBrEguHvfOxg_3BJgxAaH";
+ public const string ApiKey = "sb_secret_N7UND0UgjKTVK-Uodkm0Hg_xSvEMPvz";
private const string SocketEndpoint = "ws://127.0.0.1:54321/realtime/v1";
private const string RestEndpoint = "http://localhost:54321/rest/v1";
@@ -15,6 +17,21 @@ internal static class Helpers
public static Supabase.Postgrest.Client RestClient() => new(RestEndpoint, new Supabase.Postgrest.ClientOptions());
public static Client SocketClient()
+ {
+ var client = new Client(SocketEndpoint, new ClientOptions
+ {
+ Parameters = new SocketOptionsParameters
+ {
+ ApiKey = ApiKeyAnon
+ }
+ });
+
+ client.AddDebugHandler((_, message, _) => Debug.WriteLine(message));
+
+ return client;
+ }
+
+ public static Client PrivateSocketClient()
{
var client = new Client(SocketEndpoint, new ClientOptions
{
diff --git a/supabase/migrations/20250224164421_init.sql b/supabase/migrations/20250224164421_init.sql
index 718e09b..08fc4a1 100644
--- a/supabase/migrations/20250224164421_init.sql
+++ b/supabase/migrations/20250224164421_init.sql
@@ -182,4 +182,35 @@ SELECT status
from users
WHERE username = name_param;
$$
- LANGUAGE SQL IMMUTABLE;
\ No newline at end of file
+ LANGUAGE SQL IMMUTABLE;
+
+
+CREATE OR REPLACE FUNCTION public.send(
+ event text,
+ topic text,
+ private boolean
+)
+RETURNS void
+LANGUAGE plpgsql
+AS $$
+BEGIN
+BEGIN
+ -- Set the topic configuration
+EXECUTE format('SET LOCAL realtime.topic TO %L', topic);
+
+-- Attempt to insert the message
+INSERT INTO realtime.messages (payload, event, topic, private, extension)
+VALUES (null, event, topic, private, 'broadcast');
+EXCEPTION
+ WHEN OTHERS THEN
+ -- Capture and notify the error
+ RAISE WARNING 'ErrorSendingBroadcastMessage: %', SQLERRM;
+END;
+END;
+$$;
+
+CREATE POLICY messages_insert_all
+ON realtime.messages
+FOR INSERT
+TO PUBLIC
+WITH CHECK (true);