Skip to content

Commit b3b2c7e

Browse files
committed
box: introduce box_insert_arrow()
The new method inserts into a given space the data, provided in Arrow columnar format [1]. At the moment it is not supported by memtx and vinyl spaces. Also the IPROTO_INSERT_ARROW request is introduced. It inserts the data that is serialized into Arrow IPC format [2]. 1. https://arrow.apache.org/docs/format/Columnar.html 2. https://arrow.apache.org/docs/format/Columnar.html#serialization-and-interprocess-communication-ipc Closes #10508 Needed for tarantool/tarantool-ee#820 NO_CHANGELOG=No sense in mentioning in CE release notes @TarantoolBot document Title: Document iproto request IPROTO_INSERT_ARROW Product: Tarantool Since: 3.3 Root documents: https://www.tarantool.io/en/doc/latest/reference/internals/iproto/requests/ https://www.tarantool.io/en/doc/latest/reference/internals/iproto/keys/#internals-iproto-keys-features Available IPROTO_FEATURES are the following: * IPROTO_FEATURE_INSERT_ARROW = 12 - Support of data insertion in Arrow format. IPROTO version supporting the this feature is 10 or newer. Client-server requests and responses: * IPROTO_INSERT_ARROW = 0x11 - Insert Arrow data request. The structure of the `IPROTO_INSERT_ARROW` request is similar to `IPROTO_INSERT`, the only difference is `IPROTO_ARROW : MP_ARROW` instead of `IPROTO_TUPLE : MP_ARRAY`. The response to `IPROTO_INSERT_ARROW` contains an empty body.
1 parent 3ae8776 commit b3b2c7e

34 files changed

+593
-41
lines changed

extra/exports

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ box_index_tuple_position
7373
box_info_lsn
7474
box_init_latest_dd_version_id
7575
box_insert
76+
box_insert_arrow
7677
box_iproto_override
7778
box_iproto_send
7879
box_is_ro

src/box/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,7 @@ set(box_sources
282282
decimal.c
283283
read_view.c
284284
mp_box_ctx.c
285+
arrow_ipc.c
285286
${sql_sources}
286287
${lua_sources}
287288
lua/init.c

src/box/arrow_ipc.c

Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
/*
2+
* SPDX-License-Identifier: BSD-2-Clause
3+
*
4+
* Copyright 2010-2024, Tarantool AUTHORS, please see AUTHORS file.
5+
*/
6+
#include "arrow_ipc.h"
7+
8+
#include "diag.h"
9+
#include "error.h"
10+
#include "small/region.h"
11+
#include "nanoarrow/nanoarrow_ipc.h"
12+
13+
int
14+
arrow_ipc_encode(struct ArrowArray *array, struct ArrowSchema *schema,
15+
struct region *region, const char **ret_data,
16+
const char **ret_data_end)
17+
{
18+
ArrowErrorCode rc;
19+
struct ArrowError error;
20+
struct ArrowBuffer buffer;
21+
ArrowBufferInit(&buffer);
22+
23+
struct ArrowArrayView array_view;
24+
rc = ArrowArrayViewInitFromSchema(&array_view, schema, &error);
25+
if (rc != NANOARROW_OK) {
26+
diag_set(ClientError, ER_ARROW_IPC_ENCODE,
27+
"ArrowArrayViewInitFromSchema", error.message);
28+
return -1;
29+
}
30+
31+
/* Set buffer sizes and data pointers from an array. */
32+
rc = ArrowArrayViewSetArray(&array_view, array, &error);
33+
if (rc != NANOARROW_OK) {
34+
diag_set(ClientError, ER_ARROW_IPC_ENCODE,
35+
"ArrowArrayViewSetArray", error.message);
36+
goto error1;
37+
}
38+
39+
/* All bytes written to the stream will be appended to the buffer. */
40+
struct ArrowIpcOutputStream stream;
41+
rc = ArrowIpcOutputStreamInitBuffer(&stream, &buffer);
42+
if (rc != NANOARROW_OK) {
43+
diag_set(ClientError, ER_ARROW_IPC_ENCODE,
44+
"ArrowIpcOutputStreamInitBuffer", NULL);
45+
goto error1;
46+
}
47+
48+
/*
49+
* A stream writer which encodes schema and array into an IPC byte
50+
* stream. The writer takes ownership of the output byte stream.
51+
*/
52+
struct ArrowIpcWriter writer;
53+
rc = ArrowIpcWriterInit(&writer, &stream);
54+
if (rc != NANOARROW_OK) {
55+
diag_set(ClientError, ER_ARROW_IPC_ENCODE, "ArrowIpcWriterInit",
56+
NULL);
57+
stream.release(&stream);
58+
goto error1;
59+
}
60+
61+
rc = ArrowIpcWriterWriteSchema(&writer, schema, &error);
62+
if (rc != NANOARROW_OK) {
63+
diag_set(ClientError, ER_ARROW_IPC_ENCODE,
64+
"ArrowIpcWriterWriteSchema", error.message);
65+
goto error2;
66+
}
67+
68+
rc = ArrowIpcWriterWriteArrayView(&writer, &array_view, &error);
69+
if (rc != NANOARROW_OK) {
70+
diag_set(ClientError, ER_ARROW_IPC_ENCODE,
71+
"ArrowIpcWriterWriteArrayView", error.message);
72+
goto error2;
73+
}
74+
75+
/*
76+
* TODO: It is possible to avoid extra `memcpy()' by switching
77+
* `ArrowBuffer' to `region_realloc()'.
78+
*/
79+
char *data = xregion_alloc(region, buffer.size_bytes);
80+
memcpy(data, buffer.data, buffer.size_bytes);
81+
*ret_data = data;
82+
*ret_data_end = data + buffer.size_bytes;
83+
84+
ArrowIpcWriterReset(&writer);
85+
ArrowArrayViewReset(&array_view);
86+
ArrowBufferReset(&buffer);
87+
return 0;
88+
error2:
89+
ArrowIpcWriterReset(&writer);
90+
error1:
91+
ArrowArrayViewReset(&array_view);
92+
ArrowBufferReset(&buffer);
93+
return -1;
94+
}
95+
96+
int
97+
arrow_ipc_decode(struct ArrowArray *array, struct ArrowSchema *schema,
98+
const char *data, const char *data_end)
99+
{
100+
ssize_t size = data_end - data;
101+
if (size <= 0) {
102+
diag_set(ClientError, ER_ARROW_IPC_DECODE, NULL,
103+
"Unexpected data size");
104+
return -1;
105+
}
106+
107+
ArrowErrorCode rc;
108+
struct ArrowError error;
109+
struct ArrowBuffer buffer;
110+
ArrowBufferInit(&buffer);
111+
112+
rc = ArrowBufferAppend(&buffer, data, size);
113+
if (rc != NANOARROW_OK) {
114+
diag_set(ClientError, ER_ARROW_IPC_DECODE, "ArrowBufferAppend",
115+
NULL);
116+
ArrowBufferReset(&buffer);
117+
return -1;
118+
}
119+
120+
/*
121+
* Create an input stream from a buffer.
122+
* The stream takes ownership of the buffer and reads bytes from it.
123+
*/
124+
struct ArrowIpcInputStream input_stream;
125+
rc = ArrowIpcInputStreamInitBuffer(&input_stream, &buffer);
126+
if (rc != NANOARROW_OK) {
127+
diag_set(ClientError, ER_ARROW_IPC_DECODE,
128+
"ArrowIpcInputStreamInitBuffer", NULL);
129+
ArrowBufferReset(&buffer);
130+
return -1;
131+
}
132+
133+
/*
134+
* Initialize an array stream from an input stream of bytes.
135+
* The array_stream takes ownership of input_stream.
136+
*/
137+
struct ArrowArrayStream array_stream;
138+
rc = ArrowIpcArrayStreamReaderInit(&array_stream, &input_stream, NULL);
139+
if (rc != NANOARROW_OK) {
140+
diag_set(ClientError, ER_ARROW_IPC_DECODE,
141+
"ArrowIpcArrayStreamReaderInit", NULL);
142+
input_stream.release(&input_stream);
143+
return -1;
144+
}
145+
146+
rc = ArrowArrayStreamGetSchema(&array_stream, schema, &error);
147+
if (rc != NANOARROW_OK) {
148+
diag_set(ClientError, ER_ARROW_IPC_DECODE,
149+
"ArrowArrayStreamGetSchema", error.message);
150+
goto error;
151+
}
152+
153+
rc = ArrowArrayStreamGetNext(&array_stream, array, &error);
154+
if (rc != NANOARROW_OK) {
155+
diag_set(ClientError, ER_ARROW_IPC_DECODE,
156+
"ArrowArrayStreamGetNext", error.message);
157+
schema->release(schema);
158+
goto error;
159+
}
160+
161+
ArrowArrayStreamRelease(&array_stream);
162+
return 0;
163+
error:
164+
ArrowArrayStreamRelease(&array_stream);
165+
return -1;
166+
}

src/box/arrow_ipc.h

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* SPDX-License-Identifier: BSD-2-Clause
3+
*
4+
* Copyright 2010-2024, Tarantool AUTHORS, please see AUTHORS file.
5+
*/
6+
#pragma once
7+
8+
#include "arrow/abi.h"
9+
10+
#if defined(__cplusplus)
11+
extern "C" {
12+
#endif /* defined(__cplusplus) */
13+
14+
struct region;
15+
16+
/**
17+
* Encodes `array' and `schema' into Arrow IPC format. The memory is allocated
18+
* on `region', and the address is returned via `ret_data' and `ret_data_end'.
19+
* Returns 0 on success, -1 on failure (diag is set).
20+
*/
21+
int
22+
arrow_ipc_encode(struct ArrowArray *array, struct ArrowSchema *schema,
23+
struct region *region, const char **ret_data,
24+
const char **ret_data_end);
25+
26+
/**
27+
* Decodes `array' and `schema' from the `data' in Arrow IPC format.
28+
* Returns 0 on success, -1 on failure (diag is set).
29+
*/
30+
int
31+
arrow_ipc_decode(struct ArrowArray *array, struct ArrowSchema *schema,
32+
const char *data, const char *data_end);
33+
34+
#if defined(__cplusplus)
35+
} /* extern "C" */
36+
#endif /* defined(__cplusplus) */

src/box/blackhole.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ static const struct space_vtab blackhole_space_vtab = {
113113
/* .execute_delete = */ blackhole_space_execute_delete,
114114
/* .execute_update = */ blackhole_space_execute_update,
115115
/* .execute_upsert = */ blackhole_space_execute_upsert,
116+
/* .execute_insert_arrow = */ generic_space_execute_insert_arrow,
116117
/* .ephemeral_replace = */ generic_space_ephemeral_replace,
117118
/* .ephemeral_delete = */ generic_space_ephemeral_delete,
118119
/* .ephemeral_rowid_next = */ generic_space_ephemeral_rowid_next,

src/box/box.cc

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4011,6 +4011,19 @@ box_upsert(uint32_t space_id, uint32_t index_id, const char *tuple,
40114011
return box_process1(&request, result);
40124012
}
40134013

4014+
API_EXPORT int
4015+
box_insert_arrow(uint32_t space_id, struct ArrowArray *array,
4016+
struct ArrowSchema *schema)
4017+
{
4018+
struct request request;
4019+
memset(&request, 0, sizeof(request));
4020+
request.type = IPROTO_INSERT_ARROW;
4021+
request.space_id = space_id;
4022+
request.arrow_array = array;
4023+
request.arrow_schema = schema;
4024+
return box_process1(&request, NULL);
4025+
}
4026+
40144027
/**
40154028
* Trigger space truncation by bumping a counter
40164029
* in _truncate space.

src/box/box.h

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -666,6 +666,31 @@ box_upsert(uint32_t space_id, uint32_t index_id, const char *tuple,
666666
const char *tuple_end, const char *ops, const char *ops_end,
667667
int index_base, box_tuple_t **result);
668668

669+
struct ArrowArray;
670+
struct ArrowSchema;
671+
672+
/**
673+
* Executes a batch insert request.
674+
*
675+
* A record batch from the Arrow `array` is inserted into the space columns,
676+
* whose names are provided by the Arrow `schema`. Column types in the schema
677+
* must match the types of the corresponding fields in the space format.
678+
*
679+
* If a column is nullable in space format, it can be omitted. All non-nullable
680+
* columns (including primary key parts) must be present in the batch.
681+
*
682+
* This function does not release neither `array` nor `schema`.
683+
*
684+
* \param space_id space identifier
685+
* \param array input data in ArrowArray format
686+
* \param schema definition of the input data in ArrowSchema format
687+
* \retval 0 on success
688+
* \retval -1 on error (check box_error_last())
689+
*/
690+
API_EXPORT int
691+
box_insert_arrow(uint32_t space_id, struct ArrowArray *array,
692+
struct ArrowSchema *schema);
693+
669694
/**
670695
* Truncate space.
671696
*

src/box/errcode.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -433,6 +433,8 @@ struct errcode_record {
433433
_(ER_READ_VIEW_CLOSED, 286, "The read view is closed") \
434434
_(ER_WAL_QUEUE_FULL, 287, "The WAL queue is full") \
435435
_(ER_INVALID_VCLOCK, 288, "Invalid vclock", "value", STRING) \
436+
_(ER_ARROW_IPC_ENCODE, 289, "Failed to encode Arrow IPC data", "method", STRING, "details", STRING) \
437+
_(ER_ARROW_IPC_DECODE, 290, "Failed to decode Arrow IPC data", "method", STRING, "details", STRING) \
436438
TEST_ERROR_CODES(_) /** This one should be last. */
437439

438440
/*

src/box/iproto.cc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1866,6 +1866,7 @@ iproto_msg_decode(struct iproto_msg *msg, struct cmsg_hop **route)
18661866
case IPROTO_UPDATE:
18671867
case IPROTO_DELETE:
18681868
case IPROTO_UPSERT:
1869+
case IPROTO_INSERT_ARROW:
18691870
assert(type < sizeof(iproto_thread->dml_route) /
18701871
sizeof(*iproto_thread->dml_route));
18711872
*route = iproto_thread->dml_route[type];
@@ -3655,6 +3656,7 @@ iproto_thread_init_routes(struct iproto_thread *iproto_thread)
36553656
assert(dml_route[IPROTO_BEGIN] == NULL);
36563657
assert(dml_route[IPROTO_COMMIT] == NULL);
36573658
assert(dml_route[IPROTO_ROLLBACK] == NULL);
3659+
dml_route[IPROTO_INSERT_ARROW] = iproto_thread->process1_route;
36583660

36593661
iproto_thread->connect_route[0] =
36603662
{ tx_process_connect, &iproto_thread->net_pipe };

src/box/iproto_constants.c

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -32,23 +32,24 @@
3232

3333
#define bit(c) (1ULL<<IPROTO_##c)
3434
const uint64_t iproto_body_key_map[IPROTO_TYPE_STAT_MAX] = {
35-
0, /* unused */
36-
bit(SPACE_ID) | bit(LIMIT) | bit(KEY), /* SELECT */
37-
bit(SPACE_ID) | bit(TUPLE), /* INSERT */
38-
bit(SPACE_ID) | bit(TUPLE), /* REPLACE */
39-
bit(SPACE_ID) | bit(KEY) | bit(TUPLE), /* UPDATE */
40-
bit(SPACE_ID) | bit(KEY), /* DELETE */
41-
0, /* CALL_16 */
42-
0, /* AUTH */
43-
0, /* EVAL */
44-
bit(SPACE_ID) | bit(OPS) | bit(TUPLE), /* UPSERT */
45-
0, /* CALL */
46-
0, /* EXECUTE */
47-
0, /* NOP */
48-
0, /* PREPARE */
49-
0, /* BEGIN */
50-
0, /* COMMIT */
51-
0, /* ROLLBACK */
35+
0, /* unused */
36+
bit(SPACE_ID) | bit(LIMIT) | bit(KEY), /* SELECT */
37+
bit(SPACE_ID) | bit(TUPLE), /* INSERT */
38+
bit(SPACE_ID) | bit(TUPLE), /* REPLACE */
39+
bit(SPACE_ID) | bit(KEY) | bit(TUPLE), /* UPDATE */
40+
bit(SPACE_ID) | bit(KEY), /* DELETE */
41+
0, /* CALL_16 */
42+
0, /* AUTH */
43+
0, /* EVAL */
44+
bit(SPACE_ID) | bit(OPS) | bit(TUPLE), /* UPSERT */
45+
0, /* CALL */
46+
0, /* EXECUTE */
47+
0, /* NOP */
48+
0, /* PREPARE */
49+
0, /* BEGIN */
50+
0, /* COMMIT */
51+
0, /* ROLLBACK */
52+
bit(SPACE_ID) | bit(ARROW), /* INSERT_ARROW */
5253
};
5354
#undef bit
5455

0 commit comments

Comments
 (0)