diff --git a/PowerSync/PowerSync.Common/Client/Sync/Stream/Remote.cs b/PowerSync/PowerSync.Common/Client/Sync/Stream/Remote.cs index 3b6d8dc..f47228f 100644 --- a/PowerSync/PowerSync.Common/Client/Sync/Stream/Remote.cs +++ b/PowerSync/PowerSync.Common/Client/Sync/Stream/Remote.cs @@ -30,6 +30,9 @@ public class RequestDetails public class Remote { + + private const int STREAMING_POST_TIMEOUT_MS = 30_000; + private readonly HttpClient httpClient; protected IPowerSyncBackendConnector connector; @@ -148,12 +151,22 @@ public async Task Get(string path, Dictionary? headers = n using var reader = new StreamReader(stream, Encoding.UTF8); string? line; + using var timeoutCts = new CancellationTokenSource(); + using var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(options.CancellationToken, timeoutCts.Token); + + linkedCts.Token.Register(() => + { + stream.Close(); + }); + while ((line = await reader.ReadLineAsync()) != null) { + timeoutCts.CancelAfter(TimeSpan.FromMilliseconds(STREAMING_POST_TIMEOUT_MS)); yield return ParseStreamingSyncLine(JObject.Parse(line)); } } + public static StreamingSyncLine? ParseStreamingSyncLine(JObject json) { // Determine the type based on available keys diff --git a/PowerSync/PowerSync.Common/Client/Sync/Stream/StreamingSyncImplementation.cs b/PowerSync/PowerSync.Common/Client/Sync/Stream/StreamingSyncImplementation.cs index d4cb85a..c2c56fb 100644 --- a/PowerSync/PowerSync.Common/Client/Sync/Stream/StreamingSyncImplementation.cs +++ b/PowerSync/PowerSync.Common/Client/Sync/Stream/StreamingSyncImplementation.cs @@ -1,5 +1,6 @@ namespace PowerSync.Common.Client.Sync.Stream; +using System.Net.Sockets; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; @@ -285,7 +286,14 @@ protected async Task StreamingSync(CancellationToken? signal, PowerSyncConnectio } catch (Exception ex) { - logger.LogError("Caught exception in streaming sync: {message}", ex.Message); + var exMessage = ex.Message; + if (ex.InnerException != null && (ex.InnerException is ObjectDisposedException || ex.InnerException is SocketException)) + { + exMessage = "Stream closed or timed out -" + ex.InnerException.Message; + } + + + logger.LogError("Caught exception in streaming sync: {message}", exMessage); // Either: // - A network request failed with a failed connection or not OKAY response code.