Skip to content

Commit de70e48

Browse files
dodu232mp911de
authored andcommitted
Add stream-key auto-conversion for ByteRecord.
Add automatic type conversion logic in StreamRecords.ofBytes() method to handle different stream key types safely. Signed-off-by: Seo Bo Gyeong <[email protected]> Closes #3204
1 parent f047b4d commit de70e48

File tree

2 files changed

+88
-2
lines changed

2 files changed

+88
-2
lines changed

src/main/java/org/springframework/data/redis/connection/stream/StreamRecords.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
* {@link StreamRecords} provides utilities to create specific {@link Record} instances.
3131
*
3232
* @author Christoph Strobl
33+
* @author Seo Bo Gyeong
3334
* @since 2.2
3435
*/
3536
public class StreamRecords {
@@ -197,8 +198,24 @@ public <V> ObjectRecord<S, V> ofObject(V value) {
197198
*/
198199
public ByteRecord ofBytes(Map<byte[], byte[]> value) {
199200

200-
// todo auto conversion of known values
201-
return new ByteMapBackedRecord((byte[]) stream, id, value);
201+
byte[] streamKey = convertStreamToByteArray(stream);
202+
return new ByteMapBackedRecord(streamKey, id, value);
203+
}
204+
205+
private byte[] convertStreamToByteArray(@Nullable Object stream) {
206+
if (stream instanceof byte[]) {
207+
return (byte[]) stream;
208+
} else if (stream instanceof String) {
209+
return ((String) stream).getBytes();
210+
} else if (stream instanceof ByteBuffer buffer) {
211+
byte[] result = new byte[buffer.remaining()];
212+
buffer.get(result);
213+
return result;
214+
} else if (stream == null) {
215+
return null;
216+
} else {
217+
throw new IllegalArgumentException("Stream key %s cannot be converted to byte array".formatted(stream));
218+
}
202219
}
203220

204221
/**

src/test/java/org/springframework/data/redis/connection/StreamRecordsUnitTests.java

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import static org.assertj.core.api.Assertions.*;
1919

2020
import java.io.Serializable;
21+
import java.nio.ByteBuffer;
2122
import java.util.Collections;
2223
import java.util.Map;
2324

@@ -164,4 +165,72 @@ static HashMapper<Object, String, String> simpleString(String value) {
164165
}
165166
}
166167

168+
@Test // Stream key auto conversion for ofBytes method
169+
void ofBytesWithStringStreamKey() {
170+
171+
ByteRecord record = StreamRecords.newRecord()
172+
.in(STRING_STREAM_KEY)
173+
.withId(RECORD_ID)
174+
.ofBytes(Collections.singletonMap(SERIALIZED_STRING_MAP_KEY, SERIALIZED_STRING_VAL));
175+
176+
assertThat(record.getId()).isEqualTo(RECORD_ID);
177+
assertThat(record.getStream()).isEqualTo(SERIALIZED_STRING_STREAM_KEY);
178+
assertThat(record.getValue().keySet().iterator().next()).isEqualTo(SERIALIZED_STRING_MAP_KEY);
179+
assertThat(record.getValue().values().iterator().next()).isEqualTo(SERIALIZED_STRING_VAL);
180+
}
181+
182+
@Test // Stream key auto conversion for ofBytes method with byte array
183+
void ofBytesWithByteArrayStreamKey() {
184+
185+
ByteRecord record = StreamRecords.newRecord()
186+
.in(SERIALIZED_STRING_STREAM_KEY)
187+
.withId(RECORD_ID)
188+
.ofBytes(Collections.singletonMap(SERIALIZED_STRING_MAP_KEY, SERIALIZED_STRING_VAL));
189+
190+
assertThat(record.getId()).isEqualTo(RECORD_ID);
191+
assertThat(record.getStream()).isEqualTo(SERIALIZED_STRING_STREAM_KEY);
192+
assertThat(record.getValue().keySet().iterator().next()).isEqualTo(SERIALIZED_STRING_MAP_KEY);
193+
assertThat(record.getValue().values().iterator().next()).isEqualTo(SERIALIZED_STRING_VAL);
194+
}
195+
196+
@Test // Stream key auto conversion for ofBytes method with null stream key
197+
void ofBytesWithNullStreamKey() {
198+
199+
ByteRecord record = StreamRecords.newRecord()
200+
.withId(RECORD_ID)
201+
.ofBytes(Collections.singletonMap(SERIALIZED_STRING_MAP_KEY, SERIALIZED_STRING_VAL));
202+
203+
assertThat(record.getId()).isEqualTo(RECORD_ID);
204+
assertThat(record.getStream()).isNull();
205+
assertThat(record.getValue().keySet().iterator().next()).isEqualTo(SERIALIZED_STRING_MAP_KEY);
206+
assertThat(record.getValue().values().iterator().next()).isEqualTo(SERIALIZED_STRING_VAL);
207+
}
208+
209+
@Test // Stream key auto conversion for ofBytes method with ByteBuffer stream key
210+
void ofBytesWithByteBufferStreamKey() {
211+
212+
ByteBuffer streamKeyBuffer = ByteBuffer.wrap(SERIALIZED_STRING_STREAM_KEY);
213+
214+
ByteRecord record = StreamRecords.newRecord()
215+
.in(streamKeyBuffer)
216+
.withId(RECORD_ID)
217+
.ofBytes(Collections.singletonMap(SERIALIZED_STRING_MAP_KEY, SERIALIZED_STRING_VAL));
218+
219+
assertThat(record.getId()).isEqualTo(RECORD_ID);
220+
assertThat(record.getStream()).isEqualTo(SERIALIZED_STRING_STREAM_KEY);
221+
assertThat(record.getValue().keySet().iterator().next()).isEqualTo(SERIALIZED_STRING_MAP_KEY);
222+
assertThat(record.getValue().values().iterator().next()).isEqualTo(SERIALIZED_STRING_VAL);
223+
}
224+
225+
@Test // Stream key auto conversion for ofBytes method with unsupported type
226+
void ofBytesWithUnsupportedStreamKeyType() {
227+
228+
assertThatThrownBy(() -> StreamRecords.newRecord()
229+
.in(123L) // Unsupported type
230+
.withId(RECORD_ID)
231+
.ofBytes(Collections.singletonMap(SERIALIZED_STRING_MAP_KEY, SERIALIZED_STRING_VAL)))
232+
.isInstanceOf(IllegalArgumentException.class)
233+
.hasMessageContaining("Stream key 123 cannot be converted to byte array");
234+
}
235+
167236
}

0 commit comments

Comments
 (0)