Skip to content

Commit d6486ab

Browse files
Implement binary format support for SQL clear cursor
- support binary_format parameter on _sql/close - make cursor close response consistent with the query protocol (ie. in ODBC/JDBC/CLI return CBOR response for cursor close - as for query)
1 parent 85359f3 commit d6486ab

File tree

12 files changed

+160
-20
lines changed

12 files changed

+160
-20
lines changed

docs/changelog/84230.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 84230
2+
summary: Implement binary format support for SQL clear cursor
3+
area: SQL
4+
type: bug
5+
issues:
6+
- 53359

x-pack/plugin/sql/jdbc/src/test/java/org/elasticsearch/xpack/sql/jdbc/JdbcHttpClientRequestTests.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,14 @@ private void assertBinaryRequest(boolean isBinary, XContentType xContentType) th
9595
logger.info("Ignored SQLException", e);
9696
}
9797
assertValues(isBinary, xContentType);
98+
99+
prepareMockResponse();
100+
try {
101+
httpClient.queryClose("");
102+
} catch (SQLException e) {
103+
logger.info("Ignored SQLException", e);
104+
}
105+
assertValues(isBinary, xContentType);
98106
}
99107

100108
private void assertValues(boolean isBinary, XContentType xContentType) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
package org.elasticsearch.xpack.sql.qa.jdbc.single_node;
8+
9+
import org.elasticsearch.xpack.sql.qa.jdbc.CloseCursorTestCase;
10+
11+
public class JdbcCloseCursorIT extends CloseCursorTestCase {}
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
package org.elasticsearch.xpack.sql.qa.jdbc;
8+
9+
import org.elasticsearch.core.CheckedConsumer;
10+
import org.junit.Before;
11+
12+
import java.io.IOException;
13+
import java.sql.Connection;
14+
import java.sql.ResultSet;
15+
import java.sql.SQLException;
16+
import java.sql.Statement;
17+
18+
public abstract class CloseCursorTestCase extends JdbcIntegrationTestCase {
19+
20+
@Before
21+
public void initIndex() throws IOException {
22+
index("library", "1", builder -> { builder.field("name", "foo"); });
23+
index("library", "2", builder -> { builder.field("name", "bar"); });
24+
index("library", "3", builder -> { builder.field("name", "baz"); });
25+
}
26+
27+
public void testCloseCursor() throws SQLException {
28+
doWithQuery("SELECT name FROM library", results -> {
29+
assertTrue(results.next());
30+
results.close(); // force sending a cursor close since more pages are available
31+
assertTrue(results.isClosed());
32+
});
33+
}
34+
35+
public void testCloseConsumedCursor() throws SQLException {
36+
doWithQuery("SELECT name FROM library", results -> {
37+
for (int i = 0; i < 3; i++) {
38+
assertTrue(results.next());
39+
}
40+
assertFalse(results.next());
41+
results.close();
42+
assertTrue(results.isClosed());
43+
});
44+
}
45+
46+
public void testCloseNoCursor() throws SQLException {
47+
doWithQuery("SELECT name FROM library WHERE name = 'zzz'", results -> {
48+
results.close();
49+
assertTrue(results.isClosed());
50+
});
51+
}
52+
53+
private void doWithQuery(String query, CheckedConsumer<ResultSet, SQLException> consumer) throws SQLException {
54+
try (Connection connection = createConnection(connectionProperties()); Statement statement = connection.createStatement()) {
55+
statement.setFetchSize(1);
56+
ResultSet results = statement.executeQuery(query);
57+
consumer.accept(results);
58+
}
59+
}
60+
}

x-pack/plugin/sql/sql-action/src/main/java/org/elasticsearch/xpack/sql/action/SqlClearCursorRequest.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import org.elasticsearch.common.io.stream.StreamInput;
1111
import org.elasticsearch.common.io.stream.StreamOutput;
1212
import org.elasticsearch.xcontent.ConstructingObjectParser;
13+
import org.elasticsearch.xcontent.ParseField;
1314
import org.elasticsearch.xcontent.XContentParser;
1415
import org.elasticsearch.xpack.sql.proto.Mode;
1516
import org.elasticsearch.xpack.sql.proto.RequestInfo;
@@ -24,12 +25,16 @@
2425
import static org.elasticsearch.xpack.sql.action.AbstractSqlQueryRequest.CURSOR;
2526
import static org.elasticsearch.xpack.sql.action.AbstractSqlQueryRequest.MODE;
2627
import static org.elasticsearch.xpack.sql.action.AbstractSqlQueryRequest.VERSION;
28+
import static org.elasticsearch.xpack.sql.proto.CoreProtocol.BINARY_COMMUNICATION;
29+
import static org.elasticsearch.xpack.sql.proto.CoreProtocol.BINARY_FORMAT_NAME;
2730

2831
/**
2932
* Request to clean all SQL resources associated with the cursor
3033
*/
3134
public class SqlClearCursorRequest extends AbstractSqlRequest {
3235

36+
static final ParseField BINARY_FORMAT = new ParseField(BINARY_FORMAT_NAME);
37+
3338
private static final ConstructingObjectParser<SqlClearCursorRequest, Void> PARSER =
3439
// here the position in "objects" is the same as the fields parser declarations below
3540
new ConstructingObjectParser<>(SqlClearCursorAction.NAME, objects -> {
@@ -43,9 +48,11 @@ public class SqlClearCursorRequest extends AbstractSqlRequest {
4348
PARSER.declareString(optionalConstructorArg(), MODE);
4449
PARSER.declareString(optionalConstructorArg(), CLIENT_ID);
4550
PARSER.declareString(optionalConstructorArg(), VERSION);
51+
PARSER.declareBoolean(SqlClearCursorRequest::binaryCommunication, BINARY_FORMAT);
4652
}
4753

4854
private String cursor;
55+
private Boolean binaryCommunication = BINARY_COMMUNICATION;
4956

5057
public SqlClearCursorRequest() {}
5158

@@ -80,12 +87,23 @@ public String getDescription() {
8087
public SqlClearCursorRequest(StreamInput in) throws IOException {
8188
super(in);
8289
cursor = in.readString();
90+
binaryCommunication = in.readOptionalBoolean();
91+
}
92+
93+
public SqlClearCursorRequest binaryCommunication(Boolean binaryCommunication) {
94+
this.binaryCommunication = binaryCommunication;
95+
return this;
96+
}
97+
98+
public Boolean binaryCommunication() {
99+
return this.binaryCommunication;
83100
}
84101

85102
@Override
86103
public void writeTo(StreamOutput out) throws IOException {
87104
super.writeTo(out);
88105
out.writeString(cursor);
106+
out.writeOptionalBoolean(binaryCommunication);
89107
}
90108

91109
@Override
@@ -94,12 +112,12 @@ public boolean equals(Object o) {
94112
if (o == null || getClass() != o.getClass()) return false;
95113
if (super.equals(o) == false) return false;
96114
SqlClearCursorRequest that = (SqlClearCursorRequest) o;
97-
return Objects.equals(cursor, that.cursor);
115+
return Objects.equals(cursor, that.cursor) && Objects.equals(binaryCommunication, that.binaryCommunication);
98116
}
99117

100118
@Override
101119
public int hashCode() {
102-
return Objects.hash(super.hashCode(), cursor);
120+
return Objects.hash(super.hashCode(), cursor, binaryCommunication);
103121
}
104122

105123
public static SqlClearCursorRequest fromXContent(XContentParser parser) {

x-pack/plugin/sql/sql-action/src/test/java/org/elasticsearch/xpack/sql/action/SqlClearCursorRequestTests.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,9 @@ protected TestSqlClearCursorRequest createXContextTestInstance(XContentType xCon
3636

3737
@Override
3838
protected TestSqlClearCursorRequest createTestInstance() {
39-
return new TestSqlClearCursorRequest(requestInfo, randomAlphaOfLength(100));
39+
TestSqlClearCursorRequest result = new TestSqlClearCursorRequest(requestInfo, randomAlphaOfLength(100));
40+
result.binaryCommunication(randomBoolean());
41+
return result;
4042
}
4143

4244
@Override
@@ -58,9 +60,11 @@ protected TestSqlClearCursorRequest mutateInstance(TestSqlClearCursorRequest ins
5860
@SuppressWarnings("unchecked")
5961
Consumer<TestSqlClearCursorRequest> mutator = randomFrom(
6062
request -> request.requestInfo(randomValueOtherThan(request.requestInfo(), this::randomRequestInfo)),
61-
request -> request.setCursor(randomValueOtherThan(request.getCursor(), SqlQueryResponseTests::randomStringCursor))
63+
request -> request.setCursor(randomValueOtherThan(request.getCursor(), SqlQueryResponseTests::randomStringCursor)),
64+
request -> request.binaryCommunication(randomValueOtherThan(request.binaryCommunication(), () -> randomBoolean()))
6265
);
6366
TestSqlClearCursorRequest newRequest = new TestSqlClearCursorRequest(instance.requestInfo(), instance.getCursor());
67+
newRequest.binaryCommunication(instance.binaryCommunication());
6468
mutator.accept(newRequest);
6569
return newRequest;
6670
}

x-pack/plugin/sql/sql-action/src/test/java/org/elasticsearch/xpack/sql/action/SqlRequestParsersTests.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.elasticsearch.xcontent.XContentParser;
1515
import org.elasticsearch.xcontent.XContentParserConfiguration;
1616
import org.elasticsearch.xcontent.XContentType;
17+
import org.elasticsearch.xpack.sql.proto.CoreProtocol;
1718
import org.elasticsearch.xpack.sql.proto.Mode;
1819
import org.elasticsearch.xpack.sql.proto.SqlTypedParamValue;
1920

@@ -61,23 +62,27 @@ public void testClearCursorRequestParser() throws IOException {
6162
"cursor": "whatever",
6263
"mode": "%s",
6364
"client_id": "bla",
64-
"version": "1.2.3"
65+
"version": "1.2.3",
66+
"binary_format": true
6567
}""".formatted(randomMode), SqlClearCursorRequest::fromXContent);
6668
assertNull(request.clientId());
6769
assertNull(request.version());
6870
assertEquals(randomMode, request.mode());
6971
assertEquals("whatever", request.getCursor());
72+
assertTrue(request.binaryCommunication());
7073

7174
randomMode = randomFrom(Mode.values());
7275
request = generateRequest("""
7376
{
7477
"cursor": "whatever",
7578
"mode": "%s",
76-
"client_id": "bla"
79+
"client_id": "bla",
80+
"binary_format": false
7781
}""".formatted(randomMode.toString()), SqlClearCursorRequest::fromXContent);
7882
assertNull(request.clientId());
7983
assertEquals(randomMode, request.mode());
8084
assertEquals("whatever", request.getCursor());
85+
assertFalse(request.binaryCommunication());
8186

8287
request = generateRequest("{\"cursor\" : \"whatever\"}", SqlClearCursorRequest::fromXContent);
8388
assertNull(request.clientId());
@@ -94,6 +99,7 @@ public void testClearCursorRequestParser() throws IOException {
9499
assertNull(request.version());
95100
assertEquals(Mode.PLAIN, request.mode());
96101
assertEquals("whatever", request.getCursor());
102+
assertEquals(CoreProtocol.BINARY_COMMUNICATION, request.binaryCommunication());
97103

98104
request = generateRequest("""
99105
{"cursor" : "whatever", "client_id" : "cANVAs"}""", SqlClearCursorRequest::fromXContent);

x-pack/plugin/sql/sql-action/src/test/java/org/elasticsearch/xpack/sql/action/TestSqlClearCursorRequest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@ public TestSqlClearCursorRequest(RequestInfo requestInfo, String cursor) {
3333
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
3434
org.elasticsearch.xpack.sql.proto.SqlClearCursorRequest protoInstance = new org.elasticsearch.xpack.sql.proto.SqlClearCursorRequest(
3535
this.getCursor(),
36-
this.requestInfo()
36+
this.requestInfo(),
37+
this.binaryCommunication()
3738
);
3839
return SqlTestUtils.toXContentBuilder(builder, g -> Payloads.generate(g, protoInstance));
3940
}

x-pack/plugin/sql/sql-client/src/main/java/org/elasticsearch/xpack/sql/client/HttpClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ public SqlQueryResponse nextPage(String cursor) throws SQLException {
124124
public boolean queryClose(String cursor, Mode mode) throws SQLException {
125125
ResponseWithWarnings<SqlClearCursorResponse> response = post(
126126
CoreProtocol.CLEAR_CURSOR_REST_ENDPOINT,
127-
new SqlClearCursorRequest(cursor, new RequestInfo(mode)),
127+
new SqlClearCursorRequest(cursor, new RequestInfo(mode), cfg.binaryCommunication()),
128128
Payloads::parseClearCursorResponse
129129
);
130130
return response.response().isSucceeded();

x-pack/plugin/sql/sql-proto/src/main/java/org/elasticsearch/xpack/sql/proto/Payloads.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,7 @@ public static void generate(JsonGenerator generator, SqlClearCursorRequest reque
170170
generator.writeStringField("mode", request.mode().toString());
171171
writeIfValid(generator, "client_id", request.clientId());
172172
writeIfValidAsString(generator, "version", request.version());
173+
writeIfValid(generator, "binary_format", request.binaryCommunication());
173174

174175
generator.writeEndObject();
175176
}
@@ -222,20 +223,15 @@ public static void generate(
222223
if (request.pageTimeout() != CoreProtocol.PAGE_TIMEOUT) {
223224
generator.writeStringField(PAGE_TIMEOUT_NAME, request.pageTimeout().getStringRep());
224225
}
225-
if (request.columnar() != null) {
226-
generator.writeBooleanField(COLUMNAR_NAME, request.columnar());
227-
}
226+
writeIfValid(generator, COLUMNAR_NAME, request.columnar());
228227
if (request.fieldMultiValueLeniency()) {
229228
generator.writeBooleanField(FIELD_MULTI_VALUE_LENIENCY_NAME, request.fieldMultiValueLeniency());
230229
}
231230
if (request.indexIncludeFrozen()) {
232231
generator.writeBooleanField(INDEX_INCLUDE_FROZEN_NAME, request.indexIncludeFrozen());
233232
}
234-
if (request.binaryCommunication() != null) {
235-
generator.writeBooleanField(BINARY_FORMAT_NAME, request.binaryCommunication());
236-
}
233+
writeIfValid(generator, BINARY_FORMAT_NAME, request.binaryCommunication());
237234
writeIfValid(generator, CURSOR_NAME, request.cursor());
238-
239235
writeIfValidAsString(generator, WAIT_FOR_COMPLETION_TIMEOUT_NAME, request.waitForCompletionTimeout(), TimeValue::getStringRep);
240236

241237
if (request.keepOnCompletion()) {
@@ -255,6 +251,12 @@ private static void writeIfValid(JsonGenerator generator, String name, String va
255251
}
256252
}
257253

254+
private static void writeIfValid(JsonGenerator generator, String name, Boolean value) throws IOException {
255+
if (value != null) {
256+
generator.writeBooleanField(name, value);
257+
}
258+
}
259+
258260
private static void writeIfValidAsString(JsonGenerator generator, String name, Object value) throws IOException {
259261
writeIfValidAsString(generator, name, value, Object::toString);
260262
}

0 commit comments

Comments
 (0)