Skip to content

Commit 22993f2

Browse files
committed
docs: Multi stream append (#340)
1 parent ff089fc commit 22993f2

File tree

1 file changed

+145
-1
lines changed

1 file changed

+145
-1
lines changed

docs/api/appending-events.md

Lines changed: 145 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -245,4 +245,148 @@ AppendToStreamOptions options = AppendToStreamOptions.get()
245245

246246
client.appendToStream("some-stream", options, eventData)
247247
.get();
248-
```
248+
```
249+
250+
## Append to multiple streams
251+
252+
::: note
253+
This feature is only available in KurrentDB 25.1 and later.
254+
:::
255+
256+
You can append events to multiple streams in a single atomic operation. Either all streams are updated, or the entire operation fails.
257+
258+
The `multiStreamAppend` method accepts a collection of `AppendStreamRequest` objects and returns a `MultiAppendWriteResult`. Each `AppendStreamRequest` contains:
259+
260+
- **streamName** - The name of the stream
261+
- **expectedState** - The expected state of the stream for optimistic concurrency control
262+
- **events** - A collection of `EventData` objects to append
263+
264+
The operation returns a `MultiAppendWriteResult` that contains either:
265+
- A list of `AppendStreamSuccess` objects if all streams were successfully updated
266+
- A list of `AppendStreamFailure` objects if any streams failed to update
267+
268+
::: warning
269+
Event metadata in `EventData` must be valid JSON objects. This requirement will
270+
be removed in a future major release.
271+
:::
272+
273+
Here's a basic example of appending events to multiple streams:
274+
275+
```java
276+
JsonMapper mapper = new JsonMapper();
277+
278+
Map<String, Object> metadata = new HashMap<>();
279+
metadata.put("timestamp", Instant.now().toString());
280+
metadata.put("source", "OrderProcessingSystem");
281+
metadata.put("version", 1.0);
282+
283+
byte[] metadataBytes = mapper.writeValueAsBytes(metadata);
284+
285+
EventData orderEvent = EventData
286+
.builderAsJson("OrderCreated", mapper.writeValueAsBytes(new OrderCreated("12345", 99.99)))
287+
.metadataAsBytes(metadataBytes)
288+
.build();
289+
290+
EventData inventoryEvent = EventData
291+
.builderAsJson("ProductPurchased", mapper.writeValueAsBytes(new ProductPurchased("ABC123", 2, 19.99)))
292+
.metadataAsBytes(metadataBytes)
293+
.build();
294+
295+
List<AppendStreamRequest> requests = Arrays.asList(
296+
new AppendStreamRequest(
297+
"order-stream-1",
298+
Collections.singletonList(orderEvent).iterator(),
299+
StreamState.any()
300+
),
301+
new AppendStreamRequest(
302+
"product-stream-1",
303+
Collections.singletonList(inventoryEvent).iterator(),
304+
StreamState.any()
305+
)
306+
);
307+
308+
MultiAppendWriteResult result = client.multiStreamAppend(requests.iterator()).get();
309+
310+
if (result.getSuccesses().isPresent())
311+
result.getSuccesses().get().forEach(success -> {
312+
System.out.println(success.getStreamName() + " updated at " + success.getPosition());
313+
});
314+
```
315+
316+
If the operation doesn't succeed, you can handle the failures as follows:
317+
318+
```java
319+
if (result.getFailures().isPresent()) {
320+
MultiAppendErrorVisitor visitor = new MultiAppendErrorVisitor();
321+
result.getFailures().get().forEach(failure -> {
322+
failure.visit(visitor);
323+
324+
if (visitor.wasWrongExpectedRevisionVisited()) {
325+
System.out.println("Wrong revision for stream: " + failure.getStreamName());
326+
} else if (visitor.wasStreamDeletedVisited()) {
327+
System.out.println("Stream deleted: " + failure.getStreamName());
328+
} else if (visitor.wasAccessDenied()) {
329+
System.out.println("Access denied: " + failure.getStreamName());
330+
} else if (visitor.wasTransactionMaxSizeExceeded()) {
331+
System.out.println("Transaction too large: " + failure.getStreamName());
332+
} else {
333+
System.out.println("Unknown error: " + failure.getStreamName());
334+
}
335+
});
336+
}
337+
```
338+
339+
::: details Click here to see the implementaton of `MultiAppendErrorVisitor`
340+
341+
```java
342+
class MultiAppendErrorVisitor implements MultiAppendStreamErrorVisitor {
343+
private boolean wrongExpectedRevisionVisited = false;
344+
private boolean streamDeletedVisited = false;
345+
private boolean transactionMaxSizeExceeded = false;
346+
private boolean accessDenied = false;
347+
private long actualRevision = -1;
348+
349+
@Override
350+
public void onAccessDenied(ErrorDetails.AccessDenied detail) {
351+
this.accessDenied = true;
352+
}
353+
354+
@Override
355+
public void onWrongExpectedRevision(long streamRevision) {
356+
this.wrongExpectedRevisionVisited = true;
357+
this.actualRevision = streamRevision;
358+
}
359+
360+
@Override
361+
public void onStreamDeleted() {
362+
this.streamDeletedVisited = true;
363+
}
364+
365+
@Override
366+
public void onTransactionMaxSizeExceeded(int maxSize) {
367+
this.transactionMaxSizeExceeded = true;
368+
}
369+
370+
public boolean wasWrongExpectedRevisionVisited() {
371+
return wrongExpectedRevisionVisited;
372+
}
373+
374+
public boolean wasStreamDeletedVisited() {
375+
return streamDeletedVisited;
376+
}
377+
378+
public boolean wasAccessDenied() {
379+
return accessDenied;
380+
}
381+
382+
public boolean wasTransactionMaxSizeExceeded() {
383+
return transactionMaxSizeExceeded;
384+
}
385+
386+
public long getActualRevision() {
387+
return actualRevision;
388+
}
389+
}
390+
```
391+
392+
:::

0 commit comments

Comments
 (0)