From e940ce5deaabe74f8cf4971b52ee52fc5bf5d64f Mon Sep 17 00:00:00 2001 From: Brian Sam-Bodden Date: Wed, 22 Oct 2025 11:33:01 -0700 Subject: [PATCH] feat: use AbstractPipeline for Redis Cluster compatibility Port of Python RedisVL fix for issue #365. Ensures storage classes use AbstractPipeline instead of Pipeline to support both regular Pipeline and MultiClusterPipeline objects. Changes: - BaseStorage: Changed Pipeline to AbstractPipeline in write() and get() methods, removed unsafe casts to Pipeline - HashStorage: Updated set() and getResponse() signatures to accept AbstractPipeline, updated get() to use AbstractPipeline - JsonStorage: Updated set() and getResponse() signatures to accept AbstractPipeline - EmbeddingsCache: Updated mset(), mget(), and mexists() to use AbstractPipeline - Add AbstractPipelineTest with 4 tests verifying compatibility This ensures compatibility with both standalone Redis and Redis Cluster deployments, matching Python's graceful handling of both Pipeline and ClusterPipeline types via isinstance() checks. In Java/Jedis: - Pipeline extends PipelineBase (deprecated) - MultiClusterPipeline extends PipelineBase - Both extend AbstractPipeline as common base - UnifiedJedis.pipelined() returns AbstractPipeline (may be Pipeline or MultiClusterPipeline depending on provider) Without this fix, code would fail with ClassCastException when using MultiClusterPipeline in cluster deployments. --- .../vl/extensions/cache/EmbeddingsCache.java | 8 +- .../com/redis/vl/storage/BaseStorage.java | 19 ++- .../com/redis/vl/storage/HashStorage.java | 8 +- .../com/redis/vl/storage/JsonStorage.java | 6 +- .../AbstractPipelineIntegrationTest.java | 126 ++++++++++++++++++ 5 files changed, 151 insertions(+), 16 deletions(-) create mode 100644 core/src/test/java/com/redis/vl/storage/AbstractPipelineIntegrationTest.java diff --git a/core/src/main/java/com/redis/vl/extensions/cache/EmbeddingsCache.java b/core/src/main/java/com/redis/vl/extensions/cache/EmbeddingsCache.java index 3b61586..29df1a6 100644 --- a/core/src/main/java/com/redis/vl/extensions/cache/EmbeddingsCache.java +++ b/core/src/main/java/com/redis/vl/extensions/cache/EmbeddingsCache.java @@ -8,7 +8,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import redis.clients.jedis.Pipeline; +import redis.clients.jedis.AbstractPipeline; import redis.clients.jedis.Response; import redis.clients.jedis.UnifiedJedis; @@ -160,7 +160,7 @@ public void mset(Map embeddings, String modelName) { return; } - try (Pipeline pipeline = (Pipeline) redisClient.pipelined()) { + try (AbstractPipeline pipeline = redisClient.pipelined()) { for (Map.Entry entry : embeddings.entrySet()) { String key = generateKey(entry.getKey(), modelName); byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8); @@ -192,7 +192,7 @@ public Map mget(List texts, String modelName) { Map results = new HashMap<>(); Map> responses = new HashMap<>(); - try (Pipeline pipeline = (Pipeline) redisClient.pipelined()) { + try (AbstractPipeline pipeline = redisClient.pipelined()) { for (String text : texts) { String key = generateKey(text, modelName); byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8); @@ -227,7 +227,7 @@ public Map mexists(List texts, String modelName) { Map results = new HashMap<>(); Map> responses = new HashMap<>(); - try (Pipeline pipeline = (Pipeline) redisClient.pipelined()) { + try (AbstractPipeline pipeline = redisClient.pipelined()) { for (String text : texts) { String key = generateKey(text, modelName); responses.put(text, pipeline.exists(key)); diff --git a/core/src/main/java/com/redis/vl/storage/BaseStorage.java b/core/src/main/java/com/redis/vl/storage/BaseStorage.java index 0bad388..7363299 100644 --- a/core/src/main/java/com/redis/vl/storage/BaseStorage.java +++ b/core/src/main/java/com/redis/vl/storage/BaseStorage.java @@ -7,7 +7,7 @@ import java.util.function.Function; import lombok.AccessLevel; import lombok.Getter; -import redis.clients.jedis.Pipeline; +import redis.clients.jedis.AbstractPipeline; import redis.clients.jedis.Response; import redis.clients.jedis.UnifiedJedis; @@ -325,7 +325,7 @@ public List write( // Pass 2: Write all valid objects in batches List addedKeys = new ArrayList<>(); - try (Pipeline pipeline = (Pipeline) redisClient.pipelined()) { + try (AbstractPipeline pipeline = redisClient.pipelined()) { for (int i = 0; i < preparedObjects.size(); i++) { KeyValuePair kvp = preparedObjects.get(i); set(pipeline, kvp.key, kvp.value); @@ -465,7 +465,7 @@ public List> get( // Use a pipeline to batch the retrieval List>> responses = new ArrayList<>(); - try (Pipeline pipeline = (Pipeline) redisClient.pipelined()) { + try (AbstractPipeline pipeline = redisClient.pipelined()) { for (String key : keys) { Response> response = getResponse(pipeline, key); responses.add(response); @@ -517,20 +517,29 @@ protected Map convertBytes(Map map) { /** * Set a key-value pair in Redis using a pipeline. * + *

Python port: Uses AbstractPipeline to support both regular Pipeline and + * MultiClusterPipeline, matching Python's handling of both Pipeline and ClusterPipeline types + * (issue #365). + * * @param pipeline The Redis pipeline to use * @param key The Redis key * @param obj The object to store */ - protected abstract void set(Pipeline pipeline, String key, Map obj); + protected abstract void set(AbstractPipeline pipeline, String key, Map obj); /** * Get a response for retrieving a value from Redis using a pipeline. * + *

Python port: Uses AbstractPipeline to support both regular Pipeline and + * MultiClusterPipeline, matching Python's handling of both Pipeline and ClusterPipeline types + * (issue #365). + * * @param pipeline The Redis pipeline to use * @param key The Redis key * @return Response containing the retrieved object */ - protected abstract Response> getResponse(Pipeline pipeline, String key); + protected abstract Response> getResponse( + AbstractPipeline pipeline, String key); /** Helper class for key-value pairs used during preprocessing and validation. */ protected static class KeyValuePair { diff --git a/core/src/main/java/com/redis/vl/storage/HashStorage.java b/core/src/main/java/com/redis/vl/storage/HashStorage.java index 1d33d09..6a4ac95 100644 --- a/core/src/main/java/com/redis/vl/storage/HashStorage.java +++ b/core/src/main/java/com/redis/vl/storage/HashStorage.java @@ -9,7 +9,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import redis.clients.jedis.Pipeline; +import redis.clients.jedis.AbstractPipeline; import redis.clients.jedis.Response; /** @@ -29,7 +29,7 @@ public HashStorage(IndexSchema indexSchema) { } @Override - protected void set(Pipeline pipeline, String key, Map obj) { + protected void set(AbstractPipeline pipeline, String key, Map obj) { Map binaryFields = new HashMap<>(); Map stringFields = new HashMap<>(); @@ -75,7 +75,7 @@ protected void set(Pipeline pipeline, String key, Map obj) { @Override @SuppressWarnings("unchecked") - protected Response> getResponse(Pipeline pipeline, String key) { + protected Response> getResponse(AbstractPipeline pipeline, String key) { // For hash, we use hgetAll to get all fields Response> response = pipeline.hgetAll(key); // We need to return Response> so cast it @@ -101,7 +101,7 @@ public List> get( List>> stringResponses = new ArrayList<>(); Map>> vectorResponses = new HashMap<>(); - try (Pipeline pipeline = (Pipeline) redisClient.pipelined()) { + try (AbstractPipeline pipeline = redisClient.pipelined()) { // Get all string fields and identify vector fields for (String key : keys) { Response> response = pipeline.hgetAll(key); diff --git a/core/src/main/java/com/redis/vl/storage/JsonStorage.java b/core/src/main/java/com/redis/vl/storage/JsonStorage.java index dbd9294..5833997 100644 --- a/core/src/main/java/com/redis/vl/storage/JsonStorage.java +++ b/core/src/main/java/com/redis/vl/storage/JsonStorage.java @@ -7,7 +7,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import redis.clients.jedis.Pipeline; +import redis.clients.jedis.AbstractPipeline; import redis.clients.jedis.Response; import redis.clients.jedis.json.Path2; @@ -28,7 +28,7 @@ public JsonStorage(IndexSchema indexSchema) { } @Override - protected void set(Pipeline pipeline, String key, Map obj) { + protected void set(AbstractPipeline pipeline, String key, Map obj) { // For JSON storage, vectors are stored as JSON arrays Map jsonDocument = new HashMap<>(); @@ -83,7 +83,7 @@ protected void set(Pipeline pipeline, String key, Map obj) { @Override @SuppressWarnings("unchecked") - protected Response> getResponse(Pipeline pipeline, String key) { + protected Response> getResponse(AbstractPipeline pipeline, String key) { // For JSON, we get the entire document Response response = pipeline.jsonGet(key); // We need to return Response> so cast it diff --git a/core/src/test/java/com/redis/vl/storage/AbstractPipelineIntegrationTest.java b/core/src/test/java/com/redis/vl/storage/AbstractPipelineIntegrationTest.java new file mode 100644 index 0000000..29b6680 --- /dev/null +++ b/core/src/test/java/com/redis/vl/storage/AbstractPipelineIntegrationTest.java @@ -0,0 +1,126 @@ +package com.redis.vl.storage; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.redis.vl.BaseIntegrationTest; +import com.redis.vl.schema.IndexSchema; +import java.util.*; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import redis.clients.jedis.AbstractPipeline; + +/** + * Integration tests for AbstractPipeline compatibility in storage classes. + * + *

Tests the fix for issue #365: Ensures BaseStorage uses AbstractPipeline instead of Pipeline to + * support both regular Pipeline and MultiClusterPipeline objects, matching Python's graceful + * handling of both Pipeline and ClusterPipeline types. + * + *

Python port: Matches Python's isinstance() checks for (AsyncPipeline, AsyncClusterPipeline) by + * using the common base class AbstractPipeline in Java. + */ +@DisplayName("AbstractPipeline Compatibility Tests") +class AbstractPipelineIntegrationTest extends BaseIntegrationTest { + + @Test + @DisplayName("Should accept AbstractPipeline in HashStorage.set()") + void testHashStorageAcceptsAbstractPipeline() { + // Create a simple schema + Map schemaDict = + Map.of( + "index", + Map.of("name", "test-index", "prefix", "test", "storage_type", "hash"), + "fields", + List.of(Map.of("name", "field1", "type", "text"))); + + IndexSchema schema = IndexSchema.fromDict(schemaDict); + HashStorage storage = new HashStorage(schema); + + // Verify the set() method signature accepts AbstractPipeline + Map testData = Map.of("field1", "value1"); + + // Use the unifiedJedis from BaseIntegrationTest (Testcontainers) + try (AbstractPipeline pipeline = unifiedJedis.pipelined()) { + // This should compile without ClassCastException + storage.set(pipeline, "test:key", testData); + pipeline.sync(); + } + + assertThat(true).as("Method signature accepts AbstractPipeline").isTrue(); + } + + @Test + @DisplayName("Should accept AbstractPipeline in JsonStorage.set()") + void testJsonStorageAcceptsAbstractPipeline() { + // Create a simple schema + Map schemaDict = + Map.of( + "index", + Map.of("name", "test-index", "prefix", "test", "storage_type", "json"), + "fields", + List.of(Map.of("name", "field1", "type", "text", "path", "$.field1"))); + + IndexSchema schema = IndexSchema.fromDict(schemaDict); + JsonStorage storage = new JsonStorage(schema); + + // Verify the set() method signature accepts AbstractPipeline + Map testData = Map.of("field1", "value1"); + + // Use the unifiedJedis from BaseIntegrationTest (Testcontainers) + try (AbstractPipeline pipeline = unifiedJedis.pipelined()) { + // This should compile without ClassCastException + storage.set(pipeline, "test:key", testData); + pipeline.sync(); + } + + assertThat(true).as("Method signature accepts AbstractPipeline").isTrue(); + } + + @Test + @DisplayName("Should accept AbstractPipeline in HashStorage.getResponse()") + void testHashStorageGetResponseAcceptsAbstractPipeline() { + // Create a simple schema + Map schemaDict = + Map.of( + "index", + Map.of("name", "test-index", "prefix", "test", "storage_type", "hash"), + "fields", + List.of(Map.of("name", "field1", "type", "text"))); + + IndexSchema schema = IndexSchema.fromDict(schemaDict); + HashStorage storage = new HashStorage(schema); + + // Use the unifiedJedis from BaseIntegrationTest (Testcontainers) + try (AbstractPipeline pipeline = unifiedJedis.pipelined()) { + // This should compile without ClassCastException + storage.getResponse(pipeline, "test:key"); + pipeline.sync(); + } + + assertThat(true).as("Method signature accepts AbstractPipeline").isTrue(); + } + + @Test + @DisplayName("Should accept AbstractPipeline in JsonStorage.getResponse()") + void testJsonStorageGetResponseAcceptsAbstractPipeline() { + // Create a simple schema + Map schemaDict = + Map.of( + "index", + Map.of("name", "test-index", "prefix", "test", "storage_type", "json"), + "fields", + List.of(Map.of("name", "field1", "type", "text", "path", "$.field1"))); + + IndexSchema schema = IndexSchema.fromDict(schemaDict); + JsonStorage storage = new JsonStorage(schema); + + // Use the unifiedJedis from BaseIntegrationTest (Testcontainers) + try (AbstractPipeline pipeline = unifiedJedis.pipelined()) { + // This should compile without ClassCastException + storage.getResponse(pipeline, "test:key"); + pipeline.sync(); + } + + assertThat(true).as("Method signature accepts AbstractPipeline").isTrue(); + } +}