Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 16 additions & 5 deletions src/main/java/io/kurrent/dbclient/ReadResponseObserver.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.slf4j.LoggerFactory;

import java.nio.charset.Charset;
import java.time.Instant;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

Expand Down Expand Up @@ -118,11 +119,21 @@ else if (value.hasLastStreamPosition())
else if (value.hasLastAllStreamPosition()) {
Shared.AllStreamPosition position = value.getLastAllStreamPosition();
consumer.onLastAllStreamPosition(position.getCommitPosition(), position.getPreparePosition());
} else if (value.hasCaughtUp())
consumer.onCaughtUp();
else if (value.hasFellBehind())
consumer.onFellBehind();
else {
} else if (value.hasCaughtUp()) {
StreamsOuterClass.ReadResp.CaughtUp caughtUp = value.getCaughtUp();
Instant timestamp = Instant.ofEpochSecond(caughtUp.getTimestamp().getSeconds(), caughtUp.getTimestamp().getNanos());
Long streamRevision = caughtUp.hasStreamRevision() ? caughtUp.getStreamRevision() : null;
Position position = caughtUp.hasPosition() ? new Position(caughtUp.getPosition().getCommitPosition(), caughtUp.getPosition().getPreparePosition()) : null;

consumer.onCaughtUp(timestamp, streamRevision, position);
} else if (value.hasFellBehind()) {
StreamsOuterClass.ReadResp.FellBehind fellBehind = value.getFellBehind();
Instant timestamp = Instant.ofEpochSecond(fellBehind.getTimestamp().getSeconds(), fellBehind.getTimestamp().getNanos());
Long streamRevision = fellBehind.hasStreamRevision() ? fellBehind.getStreamRevision() : null;
Position position = fellBehind.hasPosition() ? new Position(fellBehind.getPosition().getCommitPosition(), fellBehind.getPosition().getPreparePosition()) : null;

consumer.onFellBehind(timestamp, streamRevision, position);
} else {
logger.warn("received unknown message variant");
}

Expand Down
6 changes: 4 additions & 2 deletions src/main/java/io/kurrent/dbclient/ReadStreamConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import org.reactivestreams.Subscriber;

import java.time.Instant;

class ReadStreamConsumer implements StreamConsumer {
private final Subscriber<? super ReadMessage> subscriber;

Expand Down Expand Up @@ -41,10 +43,10 @@ public void onLastAllStreamPosition(long commit, long prepare) {
}

@Override
public void onCaughtUp() {}
public void onCaughtUp(Instant timestamp, Long streamRevision, Position position) {}

@Override
public void onFellBehind() {}
public void onFellBehind(Instant timestamp, Long streamRevision, Position position) {}

@Override
public void onCancelled(Throwable exception) {
Expand Down
6 changes: 4 additions & 2 deletions src/main/java/io/kurrent/dbclient/StreamConsumer.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.kurrent.dbclient;

import java.time.Instant;

public interface StreamConsumer {
default void onSubscribe(org.reactivestreams.Subscription subscription) {}
void onEvent(ResolvedEvent event);
Expand All @@ -9,8 +11,8 @@ default void onSubscribe(org.reactivestreams.Subscription subscription) {}
void onFirstStreamPosition(long position);
void onLastStreamPosition(long position);
void onLastAllStreamPosition(long commit, long prepare);
void onCaughtUp();
void onFellBehind();
void onCaughtUp(Instant timestamp, Long streamRevision, Position position);
void onFellBehind(Instant timestamp, Long streamRevision, Position position);
void onCancelled(Throwable exception);
void onComplete();
}
6 changes: 4 additions & 2 deletions src/main/java/io/kurrent/dbclient/SubscriptionListener.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.kurrent.dbclient;

import java.time.Instant;

/**
* Listener used to handle catch-up subscription notifications raised throughout its lifecycle.
*/
Expand Down Expand Up @@ -28,12 +30,12 @@ public void onConfirmation(Subscription subscription) {}
* Called when the subscription has reached the head of the stream.
* @param subscription handle to the subscription.
*/
public void onCaughtUp(Subscription subscription) {}
public void onCaughtUp(Subscription subscription, Instant timestamp, Long streamRevision, Position position) {}

/**
* Called when the subscription has fallen behind, meaning it's no longer keeping up with the
* stream's pace.
* @param subscription handle to the subscription.
*/
public void onFellBehind(Subscription subscription) {}
public void onFellBehind(Subscription subscription, Instant timestamp, Long streamRevision, Position position) {}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.kurrent.dbclient;


import java.time.Instant;
import java.util.concurrent.CompletableFuture;

class SubscriptionStreamConsumer implements StreamConsumer {
Expand Down Expand Up @@ -56,13 +57,13 @@ public void onLastStreamPosition(long position) {}
public void onLastAllStreamPosition(long commit, long prepare) {}

@Override
public void onCaughtUp() {
this.listener.onCaughtUp(this.subscription);
public void onCaughtUp(Instant timestamp, Long streamRevision, Position position) {
this.listener.onCaughtUp(this.subscription, timestamp, streamRevision, position);
}

@Override
public void onFellBehind() {
this.listener.onFellBehind(this.subscription);
public void onFellBehind(Instant timestamp, Long streamRevision, Position position) {
this.listener.onFellBehind(this.subscription, timestamp, streamRevision, position);
}

@Override
Expand Down
41 changes: 39 additions & 2 deletions src/main/proto/streams.proto
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,37 @@ message ReadResp {
FellBehind fell_behind = 9;
}

message CaughtUp {}
// The $all or stream subscription has caught up and become live.
message CaughtUp {
// Current time in the server when the subscription caught up
google.protobuf.Timestamp timestamp = 1;

// Checkpoint for resuming a stream subscription.
// For stream subscriptions it is populated unless the stream is empty.
// For $all subscriptions it is not populated.
optional int64 stream_revision = 2;

// Checkpoint for resuming a $all subscription.
// For stream subscriptions it is not populated.
// For $all subscriptions it is populated unless the database is empty.
optional Position position = 3;
}

// The $all or stream subscription has fallen back into catchup mode and is no longer live.
message FellBehind {
// Current time in the server when the subscription fell behind
google.protobuf.Timestamp timestamp = 1;

// Checkpoint for resuming a stream subscription.
// For stream subscriptions it is populated unless the stream is empty.
// For $all subscriptions it is not populated.
optional int64 stream_revision = 2;

message FellBehind {}
// Checkpoint for resuming a $all subscription.
// For stream subscriptions it is not populated.
// For $all subscriptions it is populated unless the database is empty.
optional Position position = 3;
}

message ReadEvent {
RecordedEvent event = 1;
Expand All @@ -132,7 +160,16 @@ message ReadResp {
message Checkpoint {
uint64 commit_position = 1;
uint64 prepare_position = 2;

// Current time in the server when the checkpoint was reached
google.protobuf.Timestamp timestamp = 3;
}

message Position {
uint64 commit_position = 1;
uint64 prepare_position = 2;
}

message StreamNotFound {
event_store.client.StreamIdentifier stream_identifier = 1;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import org.junit.jupiter.api.Timeout;

import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -176,7 +177,7 @@ default void testCaughtUpMessageIsReceived() throws Throwable {

Subscription subscription = client.subscribeToStream(streamName, new SubscriptionListener() {
@Override
public void onCaughtUp(Subscription subscription) {
public void onCaughtUp(Subscription subscription, Instant timestamp, Long streamRevision, Position position) {
caughtUp.countDown();
}
}, SubscribeToStreamOptions.get().fromStart()).get();
Expand Down
Loading