Skip to content

Commit d4439be

Browse files
lantigafilipecosta90
authored andcommitted
Enable chunking for large models in AOF, RDB, replication and MODELGET (#387)
* Enable chunking for large models in AOF, RDB, replication and MODELGET * Remove spurious printfs * Make chunk size configurable via MODEL_CHUNK_SIZE. Return blob directly if n_chunks==1 * Make chunk size configurable, add tests * Update docs * Address review comments
1 parent b360284 commit d4439be

File tree

9 files changed

+195
-39
lines changed

9 files changed

+195
-39
lines changed

docs/commands.md

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,7 @@ An array of alternating key-value pairs as follows:
221221
1. **MINBATCHSIZE**: The minimum size of any batch of incoming requests.
222222
1. **INPUTS**: array reply with one or more names of the model's input nodes (applicable only for TensorFlow models)
223223
1. **OUTPUTS**: array reply with one or more names of the model's output nodes (applicable only for TensorFlow models)
224-
1. **BLOB**: a blob containing the serialized model (when called with the `BLOB` argument) as a String
224+
1. **BLOB**: a blob containing the serialized model (when called with the `BLOB` argument) as a String. If the size of the serialized model exceeds `MODEL_CHUNK_SIZE` (see `AI.CONFIG` command), then an array of chunks is returned. The full serialized model can be obtained by concatenating the chunks.
225225

226226
**Examples**
227227

@@ -721,6 +721,7 @@ _Arguments_
721721
* **TFLITE**: The TensorFlow Lite backend
722722
* **TORCH**: The PyTorch backend
723723
* **ONNX**: ONNXRuntime backend
724+
* **MODEL_CHUNK_SIZE**: Sets the size of chunks (in bytes) in which model payloads are split for serialization, replication and `MODELGET`. Default is `511 * 1024 * 1024`.
724725

725726
_Return_
726727

@@ -748,3 +749,10 @@ This loads the PyTorch backend with a full path:
748749
redis> AI.CONFIG LOADBACKEND TORCH /usr/lib/redis/modules/redisai/backends/redisai_torch/redisai_torch.so
749750
OK
750751
```
752+
753+
This sets model chunk size to one megabyte (not recommended):
754+
755+
```
756+
redis> AI.CONFIG MODEL_CHUNK_SIZE 1048576
757+
OK
758+
```

src/config.c

Lines changed: 61 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ long long backends_intra_op_parallelism; // number of threads used within an
2020
long long
2121
backends_inter_op_parallelism; // number of threads used for parallelism
2222
// between independent operations.
23+
long long model_chunk_size; // size of chunks used to break up model payloads.
2324

2425
/**
2526
*
@@ -69,6 +70,30 @@ int setBackendsIntraOpParallelism(long long num_threads) {
6970
return result;
7071
}
7172

73+
/**
74+
* @return size of chunks (in bytes) in which models are split for
75+
* set, get, serialization and replication.
76+
*/
77+
long long getModelChunkSize() {
78+
return model_chunk_size;
79+
}
80+
81+
/**
82+
* Set size of chunks (in bytes) in which models are split for set,
83+
* get, serialization and replication.
84+
*
85+
* @param size
86+
* @return 0 on success, or 1 if failed
87+
*/
88+
int setModelChunkSize(long long size) {
89+
int result = 1;
90+
if (size > 0) {
91+
model_chunk_size = size;
92+
result = 0;
93+
}
94+
return result;
95+
}
96+
7297
/**
7398
* Helper method for AI.CONFIG LOADBACKEND <backend_identifier>
7499
* <location_of_backend_library>
@@ -175,6 +200,26 @@ int RedisAI_Config_IntraOperationParallelism(
175200
return result;
176201
}
177202

203+
/**
204+
* Set size of chunks in which model payloads are split for set,
205+
* get, serialization and replication.
206+
*
207+
* @param chunk_size_string string containing chunk size (in bytes)
208+
* @return REDISMODULE_OK on success, or REDISMODULE_ERR if failed
209+
*/
210+
int RedisAI_Config_ModelChunkSize(RedisModuleString *chunk_size_string) {
211+
long long temp;
212+
int result = RedisModule_StringToLongLong(chunk_size_string, &temp);
213+
// make sure chunk size is a positive integer
214+
// if not set the value to the default
215+
if (result == REDISMODULE_OK && temp < 1) {
216+
temp = REDISAI_DEFAULT_MODEL_CHUNK_SIZE;
217+
result = REDISMODULE_ERR;
218+
}
219+
result = setModelChunkSize(temp);
220+
return result;
221+
}
222+
178223
/**
179224
*
180225
* @param ctx Context in which Redis modules operate
@@ -199,34 +244,30 @@ int RAI_configParamParse(RedisModuleCtx *ctx, const char *key,
199244
else if (strcasecmp((key), "THREADS_PER_QUEUE") == 0) {
200245
ret = RedisAI_Config_QueueThreads(rsval);
201246
if (ret == REDISMODULE_OK) {
202-
char *buffer = RedisModule_Alloc(
203-
(3 + strlen(REDISAI_INFOMSG_THREADS_PER_QUEUE) + strlen((val))) *
204-
sizeof(*buffer));
205-
sprintf(buffer, "%s: %s", REDISAI_INFOMSG_THREADS_PER_QUEUE, (val));
206-
RedisModule_Log(ctx, "notice", buffer);
207-
RedisModule_Free(buffer);
247+
RedisModule_Log(ctx, "notice", "%s: %s",
248+
REDISAI_INFOMSG_THREADS_PER_QUEUE,
249+
(val));
208250
}
209251
} else if (strcasecmp((key), "INTRA_OP_PARALLELISM") == 0) {
210252
ret = RedisAI_Config_IntraOperationParallelism(rsval);
211253
if (ret == REDISMODULE_OK) {
212-
char *buffer = RedisModule_Alloc(
213-
(3 + strlen(REDISAI_INFOMSG_INTRA_OP_PARALLELISM) + strlen((val))) *
214-
sizeof(*buffer));
215-
sprintf(buffer, "%s: %lld", REDISAI_INFOMSG_INTRA_OP_PARALLELISM,
216-
getBackendsIntraOpParallelism());
217-
RedisModule_Log(ctx, "notice", buffer);
218-
RedisModule_Free(buffer);
254+
RedisModule_Log(ctx, "notice", "%s: %lld",
255+
REDISAI_INFOMSG_INTRA_OP_PARALLELISM,
256+
getBackendsIntraOpParallelism());
219257
}
220258
} else if (strcasecmp((key), "INTER_OP_PARALLELISM") == 0) {
221259
ret = RedisAI_Config_InterOperationParallelism(rsval);
222260
if (ret == REDISMODULE_OK) {
223-
char *buffer = RedisModule_Alloc(
224-
(3 + strlen(REDISAI_INFOMSG_INTER_OP_PARALLELISM) + strlen((val))) *
225-
sizeof(*buffer));
226-
sprintf(buffer, "%s: %lld", REDISAI_INFOMSG_INTER_OP_PARALLELISM,
227-
getBackendsInterOpParallelism());
228-
RedisModule_Log(ctx, "notice", buffer);
229-
RedisModule_Free(buffer);
261+
RedisModule_Log(ctx, "notice", "%s: %lld",
262+
REDISAI_INFOMSG_INTER_OP_PARALLELISM,
263+
getBackendsInterOpParallelism());
264+
}
265+
} else if (strcasecmp((key), "MODEL_CHUNK_SIZE") == 0) {
266+
ret = RedisAI_Config_ModelChunkSize(rsval);
267+
if (ret == REDISMODULE_OK) {
268+
RedisModule_Log(ctx, "notice", "%s: %lld",
269+
REDISAI_INFOMSG_MODEL_CHUNK_SIZE,
270+
getModelChunkSize());
230271
}
231272
} else if (strcasecmp((key), "BACKENDSPATH") == 0) {
232273
// already taken care of

src/config.h

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ typedef enum { RAI_DEVICE_CPU = 0, RAI_DEVICE_GPU = 1 } RAI_Device;
2323
#define REDISAI_DEFAULT_THREADS_PER_QUEUE 1
2424
#define REDISAI_DEFAULT_INTRA_OP_PARALLELISM 0
2525
#define REDISAI_DEFAULT_INTER_OP_PARALLELISM 0
26+
#define REDISAI_DEFAULT_MODEL_CHUNK_SIZE 535822336 // (511 * 1024 * 1024)
2627
#define REDISAI_ERRORMSG_PROCESSING_ARG "ERR error processing argument"
2728
#define REDISAI_ERRORMSG_THREADS_PER_QUEUE \
2829
"ERR error setting THREADS_PER_QUEUE to"
@@ -37,6 +38,8 @@ typedef enum { RAI_DEVICE_CPU = 0, RAI_DEVICE_GPU = 1 } RAI_Device;
3738
"Setting INTRA_OP_PARALLELISM parameter to"
3839
#define REDISAI_INFOMSG_INTER_OP_PARALLELISM \
3940
"Setting INTER_OP_PARALLELISM parameter to"
41+
#define REDISAI_INFOMSG_MODEL_CHUNK_SIZE \
42+
"Setting MODEL_CHUNK_SIZE parameter to"
4043

4144
/**
4245
* Get number of threads used for parallelism between independent operations, by
@@ -72,6 +75,21 @@ long long getBackendsIntraOpParallelism();
7275
*/
7376
int setBackendsIntraOpParallelism(long long num_threads);
7477

78+
/**
79+
* @return size of chunks (in bytes) in which models are split for
80+
* set, get, serialization and replication.
81+
*/
82+
long long getModelChunkSize();
83+
84+
/**
85+
* Set size of chunks (in bytes) in which models are split for set,
86+
* get, serialization and replication.
87+
*
88+
* @param size
89+
* @return 0 on success, or 1 if failed
90+
*/
91+
int setModelChunkSize(long long size);
92+
7593
/**
7694
* Helper method for AI.CONFIG LOADBACKEND <backend_identifier>
7795
* <location_of_backend_library>
@@ -123,6 +141,16 @@ int RedisAI_Config_InterOperationParallelism(
123141
int RedisAI_Config_IntraOperationParallelism(
124142
RedisModuleString *num_threads_string);
125143

144+
/**
145+
* Set size of chunks in which model payloads are split for set,
146+
* get, serialization and replication.
147+
*
148+
* @param chunk_size_string string containing chunk size (in bytes)
149+
* @return REDISMODULE_OK on success, or REDISMODULE_ERR if failed
150+
*/
151+
int RedisAI_Config_ModelChunkSize(
152+
RedisModuleString *chunk_size_string);
153+
126154
/**
127155
*
128156
* @param ctx Context in which Redis modules operate

src/model.c

Lines changed: 46 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,24 @@ static void* RAI_Model_RdbLoad(struct RedisModuleIO *io, int encver) {
5858
};
5959

6060
size_t len;
61+
char *buffer = NULL;
6162

62-
char *buffer = RedisModule_LoadStringBuffer(io, &len);
63+
if (encver <= 100) {
64+
buffer = RedisModule_LoadStringBuffer(io, &len);
65+
}
66+
else {
67+
len = RedisModule_LoadUnsigned(io);
68+
buffer = RedisModule_Alloc(len);
69+
const size_t n_chunks = RedisModule_LoadUnsigned(io);
70+
long long chunk_offset = 0;
71+
for (size_t i=0; i<n_chunks; i++) {
72+
size_t chunk_len;
73+
char *chunk_buffer = RedisModule_LoadStringBuffer(io, &chunk_len);
74+
memcpy(buffer + chunk_offset, chunk_buffer, chunk_len);
75+
chunk_offset += chunk_len;
76+
RedisModule_Free(chunk_buffer);
77+
}
78+
}
6379

6480
RAI_Error err = {0};
6581

@@ -136,7 +152,14 @@ static void RAI_Model_RdbSave(RedisModuleIO *io, void *value) {
136152
for (size_t i=0; i<model->noutputs; i++) {
137153
RedisModule_SaveStringBuffer(io, model->outputs[i], strlen(model->outputs[i]) + 1);
138154
}
139-
RedisModule_SaveStringBuffer(io, buffer, len);
155+
long long chunk_size = getModelChunkSize();
156+
const size_t n_chunks = len / chunk_size + 1;
157+
RedisModule_SaveUnsigned(io, len);
158+
RedisModule_SaveUnsigned(io, n_chunks);
159+
for (size_t i=0; i<n_chunks; i++) {
160+
size_t chunk_len = i < n_chunks - 1 ? chunk_size : len % chunk_size;
161+
RedisModule_SaveStringBuffer(io, buffer + i * chunk_size, chunk_len);
162+
}
140163

141164
if (buffer) {
142165
RedisModule_Free(buffer);
@@ -177,32 +200,44 @@ static void RAI_Model_AofRewrite(RedisModuleIO *aof, RedisModuleString *key, voi
177200
array_append(outputs_, RedisModule_CreateString(ctx, model->outputs[i], strlen(model->outputs[i])));
178201
}
179202

203+
long long chunk_size = getModelChunkSize();
204+
const size_t n_chunks = len / chunk_size + 1;
205+
RedisModuleString **buffers_ = array_new(RedisModuleString*, n_chunks);
206+
207+
for (size_t i=0; i<n_chunks; i++) {
208+
size_t chunk_len = i < n_chunks - 1 ? chunk_size : len % chunk_size;
209+
array_append(buffers_, RedisModule_CreateString(ctx, buffer + i * chunk_size, chunk_len));
210+
}
211+
212+
if (buffer) {
213+
RedisModule_Free(buffer);
214+
}
215+
180216
const char* backendstr = RAI_BackendName(model->backend);
181217

182-
RedisModule_EmitAOF(aof, "AI.MODELSET", "slccclclcvcvb",
218+
RedisModule_EmitAOF(aof, "AI.MODELSET", "slccclclcvcvcv",
183219
key,
184220
backendstr, model->devicestr, model->tag,
185221
"BATCHSIZE", model->opts.batchsize,
186222
"MINBATCHSIZE", model->opts.minbatchsize,
187223
"INPUTS", inputs_, model->ninputs,
188224
"OUTPUTS", outputs_, model->noutputs,
189-
buffer, len);
190-
191-
if (buffer) {
192-
RedisModule_Free(buffer);
193-
}
225+
"BLOB", buffers_, n_chunks);
194226

195227
for (size_t i=0; i<model->ninputs; i++) {
196228
RedisModule_FreeString(ctx, inputs_[i]);
197229
}
198-
199230
array_free(inputs_);
200231

201232
for (size_t i=0; i<model->noutputs; i++) {
202233
RedisModule_FreeString(ctx, outputs_[i]);
203234
}
204-
205235
array_free(outputs_);
236+
237+
for (size_t i=0; i<n_chunks; i++) {
238+
RedisModule_FreeString(ctx, buffers_[i]);
239+
}
240+
array_free(buffers_);
206241
}
207242

208243

@@ -248,7 +283,7 @@ int RAI_ModelInit(RedisModuleCtx* ctx) {
248283
.digest = NULL
249284
};
250285

251-
RedisAI_ModelType = RedisModule_CreateDataType(ctx, "AI__MODEL", 0, &tmModel);
286+
RedisAI_ModelType = RedisModule_CreateDataType(ctx, "AI__MODEL", RAI_ENC_VER_MM, &tmModel);
252287
return RedisAI_ModelType != NULL;
253288
}
254289

src/redisai.c

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -360,6 +360,21 @@ int RedisAI_ModelSet_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv,
360360
return REDISMODULE_OK;
361361
}
362362

363+
void RAI_ReplyWithChunks(RedisModuleCtx *ctx, const char* buffer, long long len) {
364+
long long chunk_size = getModelChunkSize();
365+
const size_t n_chunks = len / chunk_size + 1;
366+
if (n_chunks > 1) {
367+
RedisModule_ReplyWithArray(ctx, (long)n_chunks);
368+
for (size_t i=0; i<n_chunks; i++) {
369+
size_t chunk_len = i < n_chunks - 1 ? chunk_size : len % chunk_size;
370+
RedisModule_ReplyWithStringBuffer(ctx, buffer + i * chunk_size, chunk_len);
371+
}
372+
}
373+
else {
374+
RedisModule_ReplyWithStringBuffer(ctx, buffer, len);
375+
}
376+
}
377+
363378
/**
364379
* AI.MODELGET model_key [META] [BLOB]
365380
*/
@@ -412,7 +427,7 @@ int RedisAI_ModelGet_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv,
412427
}
413428

414429
if (!meta && blob) {
415-
RedisModule_ReplyWithStringBuffer(ctx, buffer, len);
430+
RAI_ReplyWithChunks(ctx, buffer, len);
416431
RedisModule_Free(buffer);
417432
RedisModule_CloseKey(key);
418433
return REDISMODULE_OK;
@@ -456,7 +471,7 @@ int RedisAI_ModelGet_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv,
456471

457472
if (meta && blob) {
458473
RedisModule_ReplyWithCString(ctx, "blob");
459-
RedisModule_ReplyWithStringBuffer(ctx, buffer, len);
474+
RAI_ReplyWithChunks(ctx, buffer, len);
460475
RedisModule_Free(buffer);
461476
}
462477

@@ -874,7 +889,9 @@ int RedisAI_Info_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int
874889
}
875890

876891
/**
877-
* AI.CONFIG [BACKENDSPATH <default_location_of_backend_libraries> | LOADBACKEND <backend_identifier> <location_of_backend_library>]
892+
* AI.CONFIG [BACKENDSPATH <default_location_of_backend_libraries> |
893+
LOADBACKEND <backend_identifier> <location_of_backend_library> |
894+
CHUNKLEN <len>]
878895
*/
879896
int RedisAI_Config_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
880897
if (argc < 2) return RedisModule_WrongArity(ctx);
@@ -894,6 +911,16 @@ int RedisAI_Config_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, i
894911
}
895912
}
896913

914+
if (!strcasecmp(subcommand, "MODEL_CHUNK_SIZE")) {
915+
if (argc > 2) {
916+
RedisAI_Config_ModelChunkSize(argv[2]);
917+
return RedisModule_ReplyWithSimpleString(ctx, "OK");
918+
} else {
919+
return RedisModule_ReplyWithError(
920+
ctx, "ERR MODEL_CHUNK_SIZE: missing chunk size");
921+
}
922+
}
923+
897924
return RedisModule_ReplyWithError(ctx, "ERR unsupported subcommand");
898925
}
899926

@@ -1111,6 +1138,7 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
11111138
perqueueThreadPoolSize = REDISAI_DEFAULT_THREADS_PER_QUEUE;
11121139
setBackendsInterOpParallelism(REDISAI_DEFAULT_INTER_OP_PARALLELISM);
11131140
setBackendsIntraOpParallelism(REDISAI_DEFAULT_INTRA_OP_PARALLELISM);
1141+
setModelChunkSize(REDISAI_DEFAULT_MODEL_CHUNK_SIZE);
11141142

11151143
RAI_loadTimeConfig(ctx,argv,argc);
11161144

0 commit comments

Comments
 (0)