diff --git a/src/main/java/io/kurrent/dbclient/ReadResponseObserver.java b/src/main/java/io/kurrent/dbclient/ReadResponseObserver.java index da6408c8..029d30a2 100644 --- a/src/main/java/io/kurrent/dbclient/ReadResponseObserver.java +++ b/src/main/java/io/kurrent/dbclient/ReadResponseObserver.java @@ -12,6 +12,7 @@ import org.slf4j.LoggerFactory; import java.nio.charset.Charset; +import java.time.Instant; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -118,11 +119,21 @@ else if (value.hasLastStreamPosition()) else if (value.hasLastAllStreamPosition()) { Shared.AllStreamPosition position = value.getLastAllStreamPosition(); consumer.onLastAllStreamPosition(position.getCommitPosition(), position.getPreparePosition()); - } else if (value.hasCaughtUp()) - consumer.onCaughtUp(); - else if (value.hasFellBehind()) - consumer.onFellBehind(); - else { + } else if (value.hasCaughtUp()) { + StreamsOuterClass.ReadResp.CaughtUp caughtUp = value.getCaughtUp(); + Instant timestamp = Instant.ofEpochSecond(caughtUp.getTimestamp().getSeconds(), caughtUp.getTimestamp().getNanos()); + Long streamRevision = caughtUp.hasStreamRevision() ? caughtUp.getStreamRevision() : null; + Position position = caughtUp.hasPosition() ? new Position(caughtUp.getPosition().getCommitPosition(), caughtUp.getPosition().getPreparePosition()) : null; + + consumer.onCaughtUp(timestamp, streamRevision, position); + } else if (value.hasFellBehind()) { + StreamsOuterClass.ReadResp.FellBehind fellBehind = value.getFellBehind(); + Instant timestamp = Instant.ofEpochSecond(fellBehind.getTimestamp().getSeconds(), fellBehind.getTimestamp().getNanos()); + Long streamRevision = fellBehind.hasStreamRevision() ? fellBehind.getStreamRevision() : null; + Position position = fellBehind.hasPosition() ? new Position(fellBehind.getPosition().getCommitPosition(), fellBehind.getPosition().getPreparePosition()) : null; + + consumer.onFellBehind(timestamp, streamRevision, position); + } else { logger.warn("received unknown message variant"); } diff --git a/src/main/java/io/kurrent/dbclient/ReadStreamConsumer.java b/src/main/java/io/kurrent/dbclient/ReadStreamConsumer.java index 492f0088..a4c30aad 100644 --- a/src/main/java/io/kurrent/dbclient/ReadStreamConsumer.java +++ b/src/main/java/io/kurrent/dbclient/ReadStreamConsumer.java @@ -2,6 +2,8 @@ import org.reactivestreams.Subscriber; +import java.time.Instant; + class ReadStreamConsumer implements StreamConsumer { private final Subscriber subscriber; @@ -41,10 +43,10 @@ public void onLastAllStreamPosition(long commit, long prepare) { } @Override - public void onCaughtUp() {} + public void onCaughtUp(Instant timestamp, Long streamRevision, Position position) {} @Override - public void onFellBehind() {} + public void onFellBehind(Instant timestamp, Long streamRevision, Position position) {} @Override public void onCancelled(Throwable exception) { diff --git a/src/main/java/io/kurrent/dbclient/StreamConsumer.java b/src/main/java/io/kurrent/dbclient/StreamConsumer.java index 36613e22..8bd4c8e1 100644 --- a/src/main/java/io/kurrent/dbclient/StreamConsumer.java +++ b/src/main/java/io/kurrent/dbclient/StreamConsumer.java @@ -1,5 +1,7 @@ package io.kurrent.dbclient; +import java.time.Instant; + public interface StreamConsumer { default void onSubscribe(org.reactivestreams.Subscription subscription) {} void onEvent(ResolvedEvent event); @@ -9,8 +11,8 @@ default void onSubscribe(org.reactivestreams.Subscription subscription) {} void onFirstStreamPosition(long position); void onLastStreamPosition(long position); void onLastAllStreamPosition(long commit, long prepare); - void onCaughtUp(); - void onFellBehind(); + void onCaughtUp(Instant timestamp, Long streamRevision, Position position); + void onFellBehind(Instant timestamp, Long streamRevision, Position position); void onCancelled(Throwable exception); void onComplete(); } diff --git a/src/main/java/io/kurrent/dbclient/SubscriptionListener.java b/src/main/java/io/kurrent/dbclient/SubscriptionListener.java index afd62fa1..5fc43989 100644 --- a/src/main/java/io/kurrent/dbclient/SubscriptionListener.java +++ b/src/main/java/io/kurrent/dbclient/SubscriptionListener.java @@ -1,5 +1,7 @@ package io.kurrent.dbclient; +import java.time.Instant; + /** * Listener used to handle catch-up subscription notifications raised throughout its lifecycle. */ @@ -28,12 +30,12 @@ public void onConfirmation(Subscription subscription) {} * Called when the subscription has reached the head of the stream. * @param subscription handle to the subscription. */ - public void onCaughtUp(Subscription subscription) {} + public void onCaughtUp(Subscription subscription, Instant timestamp, Long streamRevision, Position position) {} /** * Called when the subscription has fallen behind, meaning it's no longer keeping up with the * stream's pace. * @param subscription handle to the subscription. */ - public void onFellBehind(Subscription subscription) {} + public void onFellBehind(Subscription subscription, Instant timestamp, Long streamRevision, Position position) {} } diff --git a/src/main/java/io/kurrent/dbclient/SubscriptionStreamConsumer.java b/src/main/java/io/kurrent/dbclient/SubscriptionStreamConsumer.java index 4770cfef..80f70a12 100644 --- a/src/main/java/io/kurrent/dbclient/SubscriptionStreamConsumer.java +++ b/src/main/java/io/kurrent/dbclient/SubscriptionStreamConsumer.java @@ -1,6 +1,7 @@ package io.kurrent.dbclient; +import java.time.Instant; import java.util.concurrent.CompletableFuture; class SubscriptionStreamConsumer implements StreamConsumer { @@ -56,13 +57,13 @@ public void onLastStreamPosition(long position) {} public void onLastAllStreamPosition(long commit, long prepare) {} @Override - public void onCaughtUp() { - this.listener.onCaughtUp(this.subscription); + public void onCaughtUp(Instant timestamp, Long streamRevision, Position position) { + this.listener.onCaughtUp(this.subscription, timestamp, streamRevision, position); } @Override - public void onFellBehind() { - this.listener.onFellBehind(this.subscription); + public void onFellBehind(Instant timestamp, Long streamRevision, Position position) { + this.listener.onFellBehind(this.subscription, timestamp, streamRevision, position); } @Override diff --git a/src/main/proto/streams.proto b/src/main/proto/streams.proto index 0eb05295..4ff613a7 100644 --- a/src/main/proto/streams.proto +++ b/src/main/proto/streams.proto @@ -103,9 +103,37 @@ message ReadResp { FellBehind fell_behind = 9; } - message CaughtUp {} + // The $all or stream subscription has caught up and become live. + message CaughtUp { + // Current time in the server when the subscription caught up + google.protobuf.Timestamp timestamp = 1; + + // Checkpoint for resuming a stream subscription. + // For stream subscriptions it is populated unless the stream is empty. + // For $all subscriptions it is not populated. + optional int64 stream_revision = 2; + + // Checkpoint for resuming a $all subscription. + // For stream subscriptions it is not populated. + // For $all subscriptions it is populated unless the database is empty. + optional Position position = 3; + } + + // The $all or stream subscription has fallen back into catchup mode and is no longer live. + message FellBehind { + // Current time in the server when the subscription fell behind + google.protobuf.Timestamp timestamp = 1; + + // Checkpoint for resuming a stream subscription. + // For stream subscriptions it is populated unless the stream is empty. + // For $all subscriptions it is not populated. + optional int64 stream_revision = 2; - message FellBehind {} + // Checkpoint for resuming a $all subscription. + // For stream subscriptions it is not populated. + // For $all subscriptions it is populated unless the database is empty. + optional Position position = 3; + } message ReadEvent { RecordedEvent event = 1; @@ -132,7 +160,16 @@ message ReadResp { message Checkpoint { uint64 commit_position = 1; uint64 prepare_position = 2; + + // Current time in the server when the checkpoint was reached + google.protobuf.Timestamp timestamp = 3; } + + message Position { + uint64 commit_position = 1; + uint64 prepare_position = 2; + } + message StreamNotFound { event_store.client.StreamIdentifier stream_identifier = 1; } diff --git a/src/test/java/io/kurrent/dbclient/streams/SubscriptionTests.java b/src/test/java/io/kurrent/dbclient/streams/SubscriptionTests.java index c05e6f68..9798ee42 100644 --- a/src/test/java/io/kurrent/dbclient/streams/SubscriptionTests.java +++ b/src/test/java/io/kurrent/dbclient/streams/SubscriptionTests.java @@ -7,6 +7,7 @@ import org.junit.jupiter.api.Timeout; import java.io.IOException; +import java.time.Instant; import java.util.ArrayList; import java.util.Optional; import java.util.concurrent.CountDownLatch; @@ -176,7 +177,7 @@ default void testCaughtUpMessageIsReceived() throws Throwable { Subscription subscription = client.subscribeToStream(streamName, new SubscriptionListener() { @Override - public void onCaughtUp(Subscription subscription) { + public void onCaughtUp(Subscription subscription, Instant timestamp, Long streamRevision, Position position) { caughtUp.countDown(); } }, SubscribeToStreamOptions.get().fromStart()).get();