From 817c8a222e1216148716a4c1e78d48e9b56634cc Mon Sep 17 00:00:00 2001 From: Bogdan Pintea Date: Thu, 3 Jun 2021 18:30:41 +0200 Subject: [PATCH 1/4] Allow text formats to work in async mode This extends the async implementation to support working with the text formats (txt, csv, tsv). A test validating the administrator operation of a user with the "manage" permission has also been added. --- .../sql/qa/server/security/build.gradle | 1 + .../plugin/sql/qa/server/security/roles.yml | 4 + .../qa/security/RestSqlSecurityAsyncIT.java | 27 ++ .../xpack/sql/qa/single_node/RestSqlIT.java | 2 +- .../sql/qa/rest/BaseRestSqlTestCase.java | 20 +- .../xpack/sql/qa/rest/RestSqlTestCase.java | 269 ++++++++++++++++-- .../xpack/sql/action/SqlQueryResponse.java | 6 +- .../xpack/sql/proto/Protocol.java | 9 + .../plugin/RestSqlAsyncGetResultsAction.java | 12 +- .../xpack/sql/plugin/RestSqlQueryAction.java | 59 +--- .../xpack/sql/plugin/SqlMediaTypeParser.java | 36 ++- .../sql/plugin/SqlResponseFormatter.java | 95 +++++++ .../xpack/sql/plugin/TextFormat.java | 13 +- .../sql/plugin/SqlMediaTypeParserTests.java | 42 +-- 14 files changed, 479 insertions(+), 116 deletions(-) create mode 100644 x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/SqlResponseFormatter.java diff --git a/x-pack/plugin/sql/qa/server/security/build.gradle b/x-pack/plugin/sql/qa/server/security/build.gradle index 66a41794625f6..375c654645955 100644 --- a/x-pack/plugin/sql/qa/server/security/build.gradle +++ b/x-pack/plugin/sql/qa/server/security/build.gradle @@ -40,6 +40,7 @@ subprojects { user username: "test_admin", password: "x-pack-test-password" user username: "user1", password: 'x-pack-test-password', role: "user1" user username: "user2", password: 'x-pack-test-password', role: "user2" + user username: "manage_user", password: 'x-pack-test-password', role: "manage_user" } File testArtifactsDir = project.file("$buildDir/testArtifacts") diff --git a/x-pack/plugin/sql/qa/server/security/roles.yml b/x-pack/plugin/sql/qa/server/security/roles.yml index b31022c82a8a3..01c9698681968 100644 --- a/x-pack/plugin/sql/qa/server/security/roles.yml +++ b/x-pack/plugin/sql/qa/server/security/roles.yml @@ -111,3 +111,7 @@ user2: - write - create_index - indices:admin/refresh + +manage_user: + cluster: + - manage diff --git a/x-pack/plugin/sql/qa/server/security/src/test/java/org/elasticsearch/xpack/sql/qa/security/RestSqlSecurityAsyncIT.java b/x-pack/plugin/sql/qa/server/security/src/test/java/org/elasticsearch/xpack/sql/qa/security/RestSqlSecurityAsyncIT.java index c890c0b72afd6..69b22cb4f2f4d 100644 --- a/x-pack/plugin/sql/qa/server/security/src/test/java/org/elasticsearch/xpack/sql/qa/security/RestSqlSecurityAsyncIT.java +++ b/x-pack/plugin/sql/qa/server/security/src/test/java/org/elasticsearch/xpack/sql/qa/security/RestSqlSecurityAsyncIT.java @@ -21,6 +21,7 @@ import org.elasticsearch.test.rest.ESRestTestCase; import org.elasticsearch.xpack.core.XPackPlugin; import org.elasticsearch.xpack.core.async.AsyncExecutionId; +import org.elasticsearch.xpack.sql.qa.rest.BaseRestSqlTestCase; import org.junit.Before; import java.io.IOException; @@ -101,6 +102,24 @@ private void testCase(String user, String otherUser) throws Exception { assertThat(exc.getResponse().getStatusLine().getStatusCode(), equalTo(400)); } + // user with manage privilege can check status and delete + public void testWithManager() throws IOException { + Response submitResp = submitAsyncSqlSearch("SELECT event_type FROM \"index\" WHERE val=0", TimeValue.timeValueSeconds(10), "user1"); + assertOK(submitResp); + String id = extractResponseId(submitResp); + Response getResp = getAsyncSqlSearch(id, "user1"); + assertOK(getResp); + + Response getStatus = getAsyncSqlStatus(id, "manage_user"); + assertOK(getStatus); + Map status = BaseRestSqlTestCase.toMap(getStatus, null); + assertEquals(200, status.get("completion_status")); + + Response deleteResp = deleteAsyncSqlSearch(id, "manage_user"); + assertOK(deleteResp); + + } + static String extractResponseId(Response response) throws IOException { Map map = toMap(response); return (String) map.get("id"); @@ -148,6 +167,14 @@ static Response getAsyncSqlSearch(String id, String user) throws IOException { final Request request = new Request("GET", "/_sql/async/" + id); setRunAsHeader(request, user); request.addParameter("wait_for_completion_timeout", "0ms"); + request.addParameter("format", "json"); + return client().performRequest(request); + } + + static Response getAsyncSqlStatus(String id, String user) throws IOException { + final Request request = new Request("GET", "/_sql/async/status/" + id); + setRunAsHeader(request, user); + request.addParameter("format", "json"); return client().performRequest(request); } diff --git a/x-pack/plugin/sql/qa/server/single-node/src/test/java/org/elasticsearch/xpack/sql/qa/single_node/RestSqlIT.java b/x-pack/plugin/sql/qa/server/single-node/src/test/java/org/elasticsearch/xpack/sql/qa/single_node/RestSqlIT.java index aad2c7dffb0f2..c0a1a79e4c9a7 100644 --- a/x-pack/plugin/sql/qa/server/single-node/src/test/java/org/elasticsearch/xpack/sql/qa/single_node/RestSqlIT.java +++ b/x-pack/plugin/sql/qa/server/single-node/src/test/java/org/elasticsearch/xpack/sql/qa/single_node/RestSqlIT.java @@ -85,7 +85,7 @@ public void testIncorrectAcceptHeader() throws IOException { request.setEntity(stringEntity); expectBadRequest( () -> toMap(client().performRequest(request), "plain"), - containsString("Invalid response content type: Accept=[application/fff]") + containsString("Invalid request content type: Accept=[application/fff]") ); } } diff --git a/x-pack/plugin/sql/qa/server/src/main/java/org/elasticsearch/xpack/sql/qa/rest/BaseRestSqlTestCase.java b/x-pack/plugin/sql/qa/server/src/main/java/org/elasticsearch/xpack/sql/qa/rest/BaseRestSqlTestCase.java index fdcb6f936b955..b40bfe2a2806f 100644 --- a/x-pack/plugin/sql/qa/server/src/main/java/org/elasticsearch/xpack/sql/qa/rest/BaseRestSqlTestCase.java +++ b/x-pack/plugin/sql/qa/server/src/main/java/org/elasticsearch/xpack/sql/qa/rest/BaseRestSqlTestCase.java @@ -24,17 +24,20 @@ import static org.elasticsearch.xpack.sql.proto.Protocol.BINARY_FORMAT_NAME; import static org.elasticsearch.xpack.sql.proto.Protocol.CLIENT_ID_NAME; -import static org.elasticsearch.xpack.sql.proto.Protocol.VERSION_NAME; import static org.elasticsearch.xpack.sql.proto.Protocol.COLUMNAR_NAME; import static org.elasticsearch.xpack.sql.proto.Protocol.CURSOR_NAME; import static org.elasticsearch.xpack.sql.proto.Protocol.FETCH_SIZE_NAME; import static org.elasticsearch.xpack.sql.proto.Protocol.FIELD_MULTI_VALUE_LENIENCY_NAME; import static org.elasticsearch.xpack.sql.proto.Protocol.FILTER_NAME; +import static org.elasticsearch.xpack.sql.proto.Protocol.KEEP_ALIVE_NAME; +import static org.elasticsearch.xpack.sql.proto.Protocol.KEEP_ON_COMPLETION_NAME; import static org.elasticsearch.xpack.sql.proto.Protocol.MODE_NAME; import static org.elasticsearch.xpack.sql.proto.Protocol.PARAMS_NAME; import static org.elasticsearch.xpack.sql.proto.Protocol.QUERY_NAME; import static org.elasticsearch.xpack.sql.proto.Protocol.RUNTIME_MAPPINGS_NAME; import static org.elasticsearch.xpack.sql.proto.Protocol.TIME_ZONE_NAME; +import static org.elasticsearch.xpack.sql.proto.Protocol.VERSION_NAME; +import static org.elasticsearch.xpack.sql.proto.Protocol.WAIT_FOR_COMPLETION_TIMEOUT_NAME; public abstract class BaseRestSqlTestCase extends ESRestTestCase { @@ -107,6 +110,21 @@ public RequestObjectBuilder binaryFormat(Boolean binaryFormat) { return this; } + public RequestObjectBuilder waitForCompletionTimeout(String timeout) { + request.append(field(WAIT_FOR_COMPLETION_TIMEOUT_NAME, timeout)); + return this; + } + + public RequestObjectBuilder keepOnCompletion(Boolean keepOnCompletion) { + request.append(field(KEEP_ON_COMPLETION_NAME, keepOnCompletion)); + return this; + } + + public RequestObjectBuilder keepAlive(String keepAlive) { + request.append(field(KEEP_ALIVE_NAME, keepAlive)); + return this; + } + public RequestObjectBuilder fieldMultiValueLeniency(Boolean fieldMultiValueLeniency) { request.append(field(FIELD_MULTI_VALUE_LENIENCY_NAME, fieldMultiValueLeniency)); return this; diff --git a/x-pack/plugin/sql/qa/server/src/main/java/org/elasticsearch/xpack/sql/qa/rest/RestSqlTestCase.java b/x-pack/plugin/sql/qa/server/src/main/java/org/elasticsearch/xpack/sql/qa/rest/RestSqlTestCase.java index ab109d4ca6011..c7f7637d4038b 100644 --- a/x-pack/plugin/sql/qa/server/src/main/java/org/elasticsearch/xpack/sql/qa/rest/RestSqlTestCase.java +++ b/x-pack/plugin/sql/qa/server/src/main/java/org/elasticsearch/xpack/sql/qa/rest/RestSqlTestCase.java @@ -16,6 +16,7 @@ import org.elasticsearch.client.Response; import org.elasticsearch.client.ResponseException; import org.elasticsearch.common.CheckedSupplier; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.core.Tuple; @@ -31,6 +32,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; +import java.net.URLEncoder; import java.nio.charset.StandardCharsets; import java.sql.JDBCType; import java.time.Instant; @@ -49,7 +51,23 @@ import static java.util.Collections.singletonList; import static java.util.Collections.singletonMap; import static java.util.Collections.unmodifiableMap; +import static org.elasticsearch.common.Strings.hasText; import static org.elasticsearch.xpack.ql.TestUtils.getNumberOfSearchContexts; +import static org.elasticsearch.xpack.sql.proto.Protocol.COLUMNS_NAME; +import static org.elasticsearch.xpack.sql.proto.Protocol.HEADER_NAME_ASYNC_ID; +import static org.elasticsearch.xpack.sql.proto.Protocol.HEADER_NAME_ASYNC_PARTIAL; +import static org.elasticsearch.xpack.sql.proto.Protocol.HEADER_NAME_ASYNC_RUNNING; +import static org.elasticsearch.xpack.sql.proto.Protocol.HEADER_NAME_CURSOR; +import static org.elasticsearch.xpack.sql.proto.Protocol.ID_NAME; +import static org.elasticsearch.xpack.sql.proto.Protocol.IS_PARTIAL_NAME; +import static org.elasticsearch.xpack.sql.proto.Protocol.IS_RUNNING_NAME; +import static org.elasticsearch.xpack.sql.proto.Protocol.ROWS_NAME; +import static org.elasticsearch.xpack.sql.proto.Protocol.SQL_ASYNC_DELETE_REST_ENDPOINT; +import static org.elasticsearch.xpack.sql.proto.Protocol.SQL_ASYNC_REST_ENDPOINT; +import static org.elasticsearch.xpack.sql.proto.Protocol.SQL_ASYNC_STATUS_REST_ENDPOINT; +import static org.elasticsearch.xpack.sql.proto.Protocol.URL_PARAM_DELIMITER; +import static org.elasticsearch.xpack.sql.proto.Protocol.URL_PARAM_FORMAT; +import static org.elasticsearch.xpack.sql.proto.Protocol.WAIT_FOR_COMPLETION_TIMEOUT_NAME; import static org.hamcrest.Matchers.containsString; /** @@ -91,17 +109,10 @@ public void testBasicQuery() throws IOException { } public void testNextPage() throws IOException { - Request request = new Request("POST", "/test/_bulk"); - request.addParameter("refresh", "true"); - String mode = randomMode(); - StringBuilder bulk = new StringBuilder(); - for (int i = 0; i < 20; i++) { - bulk.append("{\"index\":{\"_id\":\"" + i + "\"}}\n"); - bulk.append("{\"text\":\"text" + i + "\", \"number\":" + i + "}\n"); - } - request.setJsonEntity(bulk.toString()); - client().performRequest(request); + final int count = 20; + bulkLoadTestData(count); + String mode = randomMode(); boolean columnar = randomBoolean(); String sqlRequest = query("SELECT text, number, SQRT(number) AS s, SCORE()" + " FROM test" + " ORDER BY number, SCORE()").mode( mode @@ -109,7 +120,7 @@ public void testNextPage() throws IOException { Number value = xContentDependentFloatingNumberValue(mode, 1f); String cursor = null; - for (int i = 0; i < 20; i += 2) { + for (int i = 0; i < count; i += 2) { Map response; if (i == 0) { response = runSql(new StringEntity(sqlRequest, ContentType.APPLICATION_JSON), "", mode); @@ -1137,7 +1148,20 @@ private void executeQueryWithNextPage(String format, String expectedHeader, Stri assertEquals(0, getNumberOfSearchContexts(client(), "test")); } - private Tuple runSqlAsText(String sql, String accept) throws IOException { + private static void bulkLoadTestData(int count) throws IOException { + Request request = new Request("POST", "/test/_bulk"); + request.addParameter("refresh", "true"); + StringBuilder bulk = new StringBuilder(); + for (int i = 0; i < count; i++) { + bulk.append("{\"index\":{\"_id\":\"" + i + "\"}}\n"); + bulk.append("{\"text\":\"text" + i + "\", \"number\":" + i + "}\n"); + } + request.setJsonEntity(bulk.toString()); + client().performRequest(request); + + } + + private static Tuple runSqlAsText(String sql, String accept) throws IOException { return runSqlAsText(StringUtils.EMPTY, new StringEntity(query(sql).toString(), ContentType.APPLICATION_JSON), accept); } @@ -1145,7 +1169,7 @@ private Tuple runSqlAsText(String sql, String accept) throws IOE * Run SQL as text using the {@code Accept} header to specify the format * rather than the {@code format} parameter. */ - private Tuple runSqlAsText(String suffix, HttpEntity entity, String accept) throws IOException { + private static Tuple runSqlAsText(String suffix, HttpEntity entity, String accept) throws IOException { Request request = new Request("POST", SQL_QUERY_REST_ENDPOINT + suffix); request.addParameter("error_trace", "true"); request.setEntity(entity); @@ -1153,27 +1177,25 @@ private Tuple runSqlAsText(String suffix, HttpEntity entity, Str options.addHeader("Accept", accept); request.setOptions(options); Response response = client().performRequest(request); - return new Tuple<>( - Streams.copyToString(new InputStreamReader(response.getEntity().getContent(), StandardCharsets.UTF_8)), - response.getHeader("Cursor") - ); + return new Tuple<>(responseBody(response), response.getHeader("Cursor")); + } + + private static String responseBody(Response response) throws IOException { + return Streams.copyToString(new InputStreamReader(response.getEntity().getContent(), StandardCharsets.UTF_8)); } /** * Run SQL as text using the {@code format} parameter to specify the format * rather than an {@code Accept} header. */ - private Tuple runSqlAsTextFormat(String sql, String format) throws IOException { + private static Tuple runSqlAsTextFormat(String sql, String format) throws IOException { Request request = new Request("POST", SQL_QUERY_REST_ENDPOINT); request.addParameter("error_trace", "true"); request.addParameter("format", format); request.setJsonEntity(query(sql).toString()); Response response = client().performRequest(request); - return new Tuple<>( - Streams.copyToString(new InputStreamReader(response.getEntity().getContent(), StandardCharsets.UTF_8)), - response.getHeader("Cursor") - ); + return new Tuple<>(responseBody(response), response.getHeader("Cursor")); } public static void assertResponse(Map expected, Map actual) { @@ -1183,4 +1205,207 @@ public static void assertResponse(Map expected, Map expected = new HashMap<>(); + expected.put(IS_PARTIAL_NAME, false); + expected.put(IS_RUNNING_NAME, false); + expected.put(COLUMNS_NAME, singletonList(columnInfo(mode, "1", "integer", JDBCType.INTEGER, 11))); + expected.put(ROWS_NAME, singletonList(singletonList(1))); + assertAsyncResponse(expected, runSql(builder, mode)); + } + + public void testAsyncTextWait() throws IOException { + RequestObjectBuilder builder = query("SELECT 1").waitForCompletionTimeout("1d").keepOnCompletion(false); + + Map contentMap = new HashMap<>() { + { + put("txt", " 1 \n---------------\n1 \n"); + put("csv", "1\r\n1\r\n"); + put("tsv", "1\n1\n"); + } + }; + + for (String format : contentMap.keySet()) { + Response response = runSqlAsTextFormat(builder, format); + + assertEquals(contentMap.get(format), responseBody(response)); + + assertTrue(hasText(response.getHeader(HEADER_NAME_ASYNC_ID))); + assertEquals("false", response.getHeader(HEADER_NAME_ASYNC_PARTIAL)); + assertEquals("false", response.getHeader(HEADER_NAME_ASYNC_RUNNING)); + } + } + + public void testAsyncTextPaginated() throws IOException, InterruptedException { + final Map acceptMap = new HashMap<>() { + { + put("txt", "text/plain"); + put("csv", "text/csv"); + put("tsv", "text/tab-separated-values"); + } + }; + final int fetchSize = randomIntBetween(1, 10); + final int fetchCount = randomIntBetween(1, 9); + bulkLoadTestData(fetchSize * fetchCount); // NB: product needs to stay below 100, for txt format tests + + String format = randomFrom(acceptMap.keySet()); + String mode = randomMode(); + String cursor = null; + for (int i = 0; i <= fetchCount; i++) { // the last iteration (the equality in `<=`) checks on no-cursor & no-results + // start the query + RequestObjectBuilder builder = (hasText(cursor) ? cursor(cursor) : query("SELECT text, number FROM test")).fetchSize(fetchSize) + .waitForCompletionTimeout("0d") // don't wait at all + .keepOnCompletion(true) + .keepAlive("1d") // keep "forever" + .mode(mode) + .binaryFormat(false); // prevent JDBC mode to (ignore `format` and) enforce CBOR + Response response = runSqlAsTextFormat(builder, format); + + Character csvDelimiter = ','; + + assertEquals(200, response.getStatusLine().getStatusCode()); + assertEquals(response.getHeader(HEADER_NAME_ASYNC_PARTIAL), response.getHeader(HEADER_NAME_ASYNC_RUNNING)); + String asyncId = response.getHeader(HEADER_NAME_ASYNC_ID); + assertTrue(hasText(asyncId)); + + // it happens though rarely that the whole response comes through despite the given 0-wait + if (response.getHeader(HEADER_NAME_ASYNC_PARTIAL).equals("true")) { + + // potentially wait for it to complete + boolean pollForCompletion = randomBoolean(); + if (pollForCompletion) { + Request request = new Request("GET", SQL_ASYNC_STATUS_REST_ENDPOINT + asyncId); + Map asyncStatus = null; + long millis = 1; + for (boolean isRunning = true; isRunning; Thread.sleep(millis *= 2)) { + asyncStatus = toMap(client().performRequest(request), null); + isRunning = (boolean) asyncStatus.get(IS_RUNNING_NAME); + } + assertEquals(200, (int) asyncStatus.get("completion_status")); + assertEquals(asyncStatus.get(IS_RUNNING_NAME), asyncStatus.get(IS_PARTIAL_NAME)); + assertEquals(asyncId, asyncStatus.get(ID_NAME)); + } + + // fetch the results (potentially waiting now to complete) + Request request = new Request("GET", SQL_ASYNC_REST_ENDPOINT + asyncId); + if (pollForCompletion == false) { + request.addParameter(WAIT_FOR_COMPLETION_TIMEOUT_NAME, "1d"); + } + if (randomBoolean()) { + request.addParameter(URL_PARAM_FORMAT, format); + if (format.equals("csv")) { + csvDelimiter = ';'; + request.addParameter(URL_PARAM_DELIMITER, URLEncoder.encode(String.valueOf(csvDelimiter), StandardCharsets.UTF_8)); + } + } else { + request.setOptions(request.getOptions().toBuilder().addHeader("Accept", acceptMap.get(format))); + } + response = client().performRequest(request); + + assertEquals(200, response.getStatusLine().getStatusCode()); + assertEquals(asyncId, response.getHeader(HEADER_NAME_ASYNC_ID)); + assertEquals("false", response.getHeader(HEADER_NAME_ASYNC_PARTIAL)); + assertEquals("false", response.getHeader(HEADER_NAME_ASYNC_RUNNING)); + } + + cursor = response.getHeader(HEADER_NAME_CURSOR); + String body = responseBody(response); + if (i == fetchCount) { + assertNull(cursor); + assertFalse(hasText(body)); + } else { + String expected = expectedTextBody(format, fetchSize, i, csvDelimiter); + assertEquals(expected, body); + + if (hasText(cursor) == false) { // depending on index and fetch size, the last page might or not have a cursor + assertEquals(i, fetchCount - 1); + i++; // end the loop after deleting the async resources + } + } + + // delete the query results + Request request = new Request("DELETE", SQL_ASYNC_DELETE_REST_ENDPOINT + asyncId); + Map deleteStatus = toMap(client().performRequest(request), null); + assertEquals(200, response.getStatusLine().getStatusCode()); + assertTrue((boolean) deleteStatus.get("acknowledged")); + } + } + + static Map runSql(RequestObjectBuilder builder, String mode) throws IOException { + return toMap(runSql(builder.mode(mode)), mode); + } + + static Response runSql(RequestObjectBuilder builder) throws IOException { + return runSqlAsTextFormat(builder, null); + } + + static Response runSqlAsTextFormat(RequestObjectBuilder builder, @Nullable String format) throws IOException { + Request request = new Request("POST", SQL_QUERY_REST_ENDPOINT); + request.addParameter("error_trace", "true"); // Helps with debugging in case something crazy happens on the server. + request.addParameter("pretty", "true"); // Improves error reporting readability + if (format != null) { + request.addParameter(URL_PARAM_FORMAT, format); // Improves error reporting readability + } + request.setEntity(new StringEntity(builder.toString(), ContentType.APPLICATION_JSON)); + return client().performRequest(request); + + } + + static void assertAsyncResponse(Map expected, Map actual) { + String actualId = (String) actual.get("id"); + assertTrue("async ID missing in response", hasText(actualId)); + expected.put("id", actualId); + assertResponse(expected, actual); + } + + private static String expectedTextBody(String format, int fetchSize, int count, Character csvDelimiter) { + StringBuilder sb = new StringBuilder(); + if (count == 0) { // add the header + switch (format) { + case "txt": + sb.append(" text | number \n"); + sb.append("---------------+---------------\n"); + break; + case "csv": + sb.append("text").append(csvDelimiter).append("number\r\n"); + break; + case "tsv": + sb.append("text\tnumber\n"); + break; + default: + assert false : "unexpected format type [" + format + "]"; + } + } + for (int i = 0; i < fetchSize; i++) { + int val = fetchSize * count + i; + sb.append("text").append(val); + switch (format) { + case "txt": + sb.append(val < 10 ? " " : StringUtils.EMPTY).append(" |"); + break; + case "csv": + sb.append(csvDelimiter); + break; + case "tsv": + sb.append('\t'); + break; + } + sb.append(val); + if (format.equals("txt")) { + sb.append(" ").append(val < 10 ? " " : StringUtils.EMPTY); + } + sb.append(format.equals("csv") ? "\r\n" : "\n"); + } + return sb.toString(); + } } diff --git a/x-pack/plugin/sql/sql-action/src/main/java/org/elasticsearch/xpack/sql/action/SqlQueryResponse.java b/x-pack/plugin/sql/sql-action/src/main/java/org/elasticsearch/xpack/sql/action/SqlQueryResponse.java index e7282c5aec6ba..bfbd110688eb9 100644 --- a/x-pack/plugin/sql/sql-action/src/main/java/org/elasticsearch/xpack/sql/action/SqlQueryResponse.java +++ b/x-pack/plugin/sql/sql-action/src/main/java/org/elasticsearch/xpack/sql/action/SqlQueryResponse.java @@ -194,7 +194,7 @@ public void writeTo(StreamOutput out) throws IOException { public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); { - if (Strings.hasText(asyncExecutionId)) { + if (hasId()) { builder.field(Protocol.ID_NAME, asyncExecutionId); builder.field(Protocol.IS_PARTIAL_NAME, isPartial); builder.field(Protocol.IS_RUNNING_NAME, isRunning); @@ -285,6 +285,10 @@ public static void writeColumnInfo(StreamOutput out, ColumnInfo columnInfo) thro out.writeOptionalVInt(columnInfo.displaySize()); } + public boolean hasId() { + return Strings.hasText(asyncExecutionId); + } + @Override public String id() { return asyncExecutionId; diff --git a/x-pack/plugin/sql/sql-proto/src/main/java/org/elasticsearch/xpack/sql/proto/Protocol.java b/x-pack/plugin/sql/sql-proto/src/main/java/org/elasticsearch/xpack/sql/proto/Protocol.java index 8550c3bb10683..b6ae31a9e5b2a 100644 --- a/x-pack/plugin/sql/sql-proto/src/main/java/org/elasticsearch/xpack/sql/proto/Protocol.java +++ b/x-pack/plugin/sql/sql-proto/src/main/java/org/elasticsearch/xpack/sql/proto/Protocol.java @@ -81,6 +81,15 @@ public final class Protocol { public static final String URL_PARAM_FORMAT = "format"; public static final String URL_PARAM_DELIMITER = "delimiter"; + /** + * HTTP header names + */ + public static final String HEADER_NAME_CURSOR = "Cursor"; + public static final String HEADER_NAME_TOOK_NANOS = "Took-nanos"; + public static final String HEADER_NAME_ASYNC_ID = "Async-ID"; + public static final String HEADER_NAME_ASYNC_PARTIAL = "Async-partial"; + public static final String HEADER_NAME_ASYNC_RUNNING = "Async-running"; + /** * SQL-related endpoints */ diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/RestSqlAsyncGetResultsAction.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/RestSqlAsyncGetResultsAction.java index 3ef3021d16189..be2cda5863083 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/RestSqlAsyncGetResultsAction.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/RestSqlAsyncGetResultsAction.java @@ -9,15 +9,17 @@ import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.RestRequest; -import org.elasticsearch.rest.action.RestToXContentListener; import org.elasticsearch.xpack.core.async.GetAsyncResultRequest; +import java.util.Collections; import java.util.List; +import java.util.Set; import static org.elasticsearch.rest.RestRequest.Method.GET; import static org.elasticsearch.xpack.sql.proto.Protocol.ID_NAME; import static org.elasticsearch.xpack.sql.proto.Protocol.KEEP_ALIVE_NAME; import static org.elasticsearch.xpack.sql.proto.Protocol.SQL_ASYNC_REST_ENDPOINT; +import static org.elasticsearch.xpack.sql.proto.Protocol.URL_PARAM_DELIMITER; import static org.elasticsearch.xpack.sql.proto.Protocol.WAIT_FOR_COMPLETION_TIMEOUT_NAME; public class RestSqlAsyncGetResultsAction extends BaseRestHandler { @@ -40,6 +42,12 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli if (request.hasParam(KEEP_ALIVE_NAME)) { get.setKeepAlive(request.paramAsTime(KEEP_ALIVE_NAME, get.getKeepAlive())); } - return channel -> client.execute(SqlAsyncGetResultsAction.INSTANCE, get, new RestToXContentListener<>(channel)); + return channel -> client.execute(SqlAsyncGetResultsAction.INSTANCE, get, new SqlResponseFormatter(channel, request)); } + + @Override + protected Set responseParams() { + return Collections.singleton(URL_PARAM_DELIMITER); + } + } diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/RestSqlQueryAction.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/RestSqlQueryAction.java index 75b5390d88e01..6040630787dfc 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/RestSqlQueryAction.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/RestSqlQueryAction.java @@ -10,37 +10,25 @@ import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.common.xcontent.MediaType; import org.elasticsearch.common.xcontent.MediaTypeRegistry; -import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.rest.BaseRestHandler; -import org.elasticsearch.rest.BytesRestResponse; import org.elasticsearch.rest.RestRequest; -import org.elasticsearch.rest.RestResponse; -import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.action.RestCancellableNodeClient; -import org.elasticsearch.rest.action.RestResponseListener; import org.elasticsearch.xpack.sql.action.SqlQueryAction; import org.elasticsearch.xpack.sql.action.SqlQueryRequest; -import org.elasticsearch.xpack.sql.action.SqlQueryResponse; import org.elasticsearch.xpack.sql.proto.Protocol; import java.io.IOException; -import java.nio.charset.StandardCharsets; import java.util.Collections; import java.util.List; -import java.util.Locale; import java.util.Set; -import static java.util.Collections.emptySet; import static org.elasticsearch.rest.RestRequest.Method.GET; import static org.elasticsearch.rest.RestRequest.Method.POST; import static org.elasticsearch.xpack.sql.proto.Protocol.URL_PARAM_DELIMITER; public class RestSqlQueryAction extends BaseRestHandler { - private final SqlMediaTypeParser sqlMediaTypeParser = new SqlMediaTypeParser(); - @Override public List routes() { return List.of( @@ -60,54 +48,9 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli sqlRequest = SqlQueryRequest.fromXContent(parser); } - MediaType responseMediaType = sqlMediaTypeParser.getResponseMediaType(request, sqlRequest); - if (responseMediaType == null) { - String msg = String.format(Locale.ROOT, "Invalid response content type: Accept=[%s], Content-Type=[%s], format=[%s]", - request.header("Accept"), request.header("Content-Type"), request.param("format")); - throw new IllegalArgumentException(msg); - } - - /* - * Special handling for the "delimiter" parameter which should only be - * checked for being present or not in the case of CSV format. We cannot - * override {@link BaseRestHandler#responseParams()} because this - * parameter should only be checked for CSV, not always. - */ - if ((responseMediaType instanceof XContentType || ((TextFormat) responseMediaType) != TextFormat.CSV) - && request.hasParam(URL_PARAM_DELIMITER)) { - throw new IllegalArgumentException(unrecognized(request, Collections.singleton(URL_PARAM_DELIMITER), emptySet(), "parameter")); - } - - long startNanos = System.nanoTime(); return channel -> { RestCancellableNodeClient cancellableClient = new RestCancellableNodeClient(client, request.getHttpChannel()); - cancellableClient.execute(SqlQueryAction.INSTANCE, sqlRequest, new RestResponseListener<>(channel) { - @Override - public RestResponse buildResponse(SqlQueryResponse response) throws Exception { - RestResponse restResponse; - - // XContent branch - if (responseMediaType instanceof XContentType) { - XContentType type = (XContentType) responseMediaType; - XContentBuilder builder = channel.newBuilder(request.getXContentType(), type, true); - response.toXContent(builder, request); - restResponse = new BytesRestResponse(RestStatus.OK, builder); - } else { // TextFormat - TextFormat type = (TextFormat) responseMediaType; - final String data = type.format(request, response); - - restResponse = new BytesRestResponse(RestStatus.OK, type.contentType(request), - data.getBytes(StandardCharsets.UTF_8)); - - if (response.hasCursor()) { - restResponse.addHeader("Cursor", response.cursor()); - } - } - - restResponse.addHeader("Took-nanos", Long.toString(System.nanoTime() - startNanos)); - return restResponse; - } - }); + cancellableClient.execute(SqlQueryAction.INSTANCE, sqlRequest, new SqlResponseFormatter(channel, request, sqlRequest)); }; } diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/SqlMediaTypeParser.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/SqlMediaTypeParser.java index 7b5eaea4b70ff..7a6b3d81cb3e2 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/SqlMediaTypeParser.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/SqlMediaTypeParser.java @@ -9,11 +9,13 @@ import org.elasticsearch.common.xcontent.MediaType; import org.elasticsearch.common.xcontent.MediaTypeRegistry; +import org.elasticsearch.common.xcontent.ParsedMediaType; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.rest.RestRequest; import org.elasticsearch.xpack.sql.action.SqlQueryRequest; import org.elasticsearch.xpack.sql.proto.Mode; +import java.util.Locale; import static org.elasticsearch.xpack.sql.proto.Protocol.URL_PARAM_FORMAT; @@ -34,27 +36,47 @@ public class SqlMediaTypeParser { * isn't but there is a {@code Accept} header then we use that. If there * isn't then we use the {@code Content-Type} header which is required. */ - public MediaType getResponseMediaType(RestRequest request, SqlQueryRequest sqlRequest) { + public static MediaType getResponseMediaType(RestRequest request, SqlQueryRequest sqlRequest) { if (Mode.isDedicatedClient(sqlRequest.requestInfo().mode()) && (sqlRequest.binaryCommunication() == null || sqlRequest.binaryCommunication())) { // enforce CBOR response for drivers and CLI (unless instructed differently through the config param) return XContentType.CBOR; } else if (request.hasParam(URL_PARAM_FORMAT)) { return validateColumnarRequest(sqlRequest.columnar(), - MEDIA_TYPE_REGISTRY.queryParamToMediaType(request.param(URL_PARAM_FORMAT))); + MEDIA_TYPE_REGISTRY.queryParamToMediaType(request.param(URL_PARAM_FORMAT)), request); } - if (request.getParsedAccept() != null) { - return request.getParsedAccept().toMediaType(MEDIA_TYPE_REGISTRY); + return getResponseMediaType(request); + } + + public static MediaType getResponseMediaType(RestRequest request) { + MediaType mediaType; + + if (request.hasParam(URL_PARAM_FORMAT)) { + mediaType = MEDIA_TYPE_REGISTRY.queryParamToMediaType(request.param(URL_PARAM_FORMAT)); + } else { + ParsedMediaType acceptType = request.getParsedAccept(); + mediaType = acceptType != null ? acceptType.toMediaType(MEDIA_TYPE_REGISTRY) : request.getXContentType(); } - return request.getXContentType(); + + return checkNonNullMediaType(mediaType, request); } - private static MediaType validateColumnarRequest(boolean requestIsColumnar, MediaType fromMediaType) { + private static MediaType validateColumnarRequest(boolean requestIsColumnar, MediaType fromMediaType, RestRequest request) { if (requestIsColumnar && fromMediaType instanceof TextFormat) { throw new IllegalArgumentException("Invalid use of [columnar] argument: cannot be used in combination with " + "txt, csv or tsv formats"); } - return fromMediaType; + return checkNonNullMediaType(fromMediaType, request); + } + + private static MediaType checkNonNullMediaType(MediaType mediaType, RestRequest request) { + if (mediaType == null) { + String msg = String.format(Locale.ROOT, "Invalid request content type: Accept=[%s], Content-Type=[%s], format=[%s]", + request.header("Accept"), request.header("Content-Type"), request.param("format")); + throw new IllegalArgumentException(msg); + } + + return mediaType; } } diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/SqlResponseFormatter.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/SqlResponseFormatter.java new file mode 100644 index 0000000000000..36be806efc73b --- /dev/null +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/SqlResponseFormatter.java @@ -0,0 +1,95 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.sql.plugin; + +import org.elasticsearch.common.xcontent.MediaType; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.rest.BytesRestResponse; +import org.elasticsearch.rest.RestChannel; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.RestResponse; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.rest.action.RestResponseListener; +import org.elasticsearch.xpack.sql.action.SqlQueryRequest; +import org.elasticsearch.xpack.sql.action.SqlQueryResponse; + +import java.nio.charset.StandardCharsets; +import java.util.Locale; + +import static org.elasticsearch.xpack.sql.proto.Protocol.HEADER_NAME_ASYNC_ID; +import static org.elasticsearch.xpack.sql.proto.Protocol.HEADER_NAME_ASYNC_PARTIAL; +import static org.elasticsearch.xpack.sql.proto.Protocol.HEADER_NAME_ASYNC_RUNNING; +import static org.elasticsearch.xpack.sql.proto.Protocol.HEADER_NAME_CURSOR; +import static org.elasticsearch.xpack.sql.proto.Protocol.HEADER_NAME_TOOK_NANOS; +import static org.elasticsearch.xpack.sql.proto.Protocol.URL_PARAM_DELIMITER; + +class SqlResponseFormatter extends RestResponseListener { + + private final long startNanos = System.nanoTime(); + private final MediaType mediaType; + private final RestRequest request; + + + SqlResponseFormatter(RestChannel channel, RestRequest request, SqlQueryRequest sqlRequest) { + super(channel); + this.request = request; + + this.mediaType = SqlMediaTypeParser.getResponseMediaType(request, sqlRequest); + + /* + * Special handling for the "delimiter" parameter which should only be + * checked for being present or not in the case of CSV format. We cannot + * override {@link BaseRestHandler#responseParams()} because this + * parameter should only be checked for CSV, not always. + */ + if (mediaType != TextFormat.CSV && request.hasParam(URL_PARAM_DELIMITER)) { + String message = String.format(Locale.ROOT, "request [%s] contains unrecognized parameter: [" + URL_PARAM_DELIMITER + "]", + request.path()); + throw new IllegalArgumentException(message); + } + } + + SqlResponseFormatter(RestChannel channel, RestRequest request) { + super(channel); + this.request = request; + this.mediaType = SqlMediaTypeParser.getResponseMediaType(request); + } + + @Override + public RestResponse buildResponse(SqlQueryResponse response) throws Exception { + RestResponse restResponse; + + // XContent branch + if (mediaType instanceof XContentType) { + XContentType type = (XContentType) mediaType; + XContentBuilder builder = channel.newBuilder(request.getXContentType(), type, true); + response.toXContent(builder, request); + restResponse = new BytesRestResponse(RestStatus.OK, builder); + } else { // TextFormat + TextFormat type = (TextFormat) mediaType; + final String data = type.format(request, response); + + restResponse = new BytesRestResponse(RestStatus.OK, type.contentType(request), + data.getBytes(StandardCharsets.UTF_8)); + + if (response.hasCursor()) { + restResponse.addHeader(HEADER_NAME_CURSOR, response.cursor()); + } + + if (response.hasId()) { + restResponse.addHeader(HEADER_NAME_ASYNC_ID, response.id()); + restResponse.addHeader(HEADER_NAME_ASYNC_PARTIAL, String.valueOf(response.isPartial())); + restResponse.addHeader(HEADER_NAME_ASYNC_RUNNING, String.valueOf(response.isRunning())); + } + } + + restResponse.addHeader(HEADER_NAME_TOOK_NANOS, Long.toString(System.nanoTime() - startNanos)); + return restResponse; + } +} diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/TextFormat.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/TextFormat.java index c26f6f9d25391..44206ad9e950c 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/TextFormat.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/TextFormat.java @@ -73,13 +73,12 @@ String format(RestRequest request, SqlQueryResponse response) { } // format with header return formatter.formatWithHeader(response.columns(), response.rows()); - } - else { - // should be initialized (wrapped by the cursor) - if (formatter != null) { - // format without header - return formatter.formatWithoutHeader(response.rows()); - } + } else if (formatter != null) { // should be initialized (wrapped by the cursor) + // format without header + return formatter.formatWithoutHeader(response.rows()); + } else if (response.hasId()) { + // an async request has no results yet + return StringUtils.EMPTY; } // if this code is reached, it means it's a next page without cursor wrapping throw new SqlIllegalArgumentException("Cannot find text formatter - this is likely a bug"); diff --git a/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/plugin/SqlMediaTypeParserTests.java b/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/plugin/SqlMediaTypeParserTests.java index c445da3d28bf6..3da565a0b7fec 100644 --- a/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/plugin/SqlMediaTypeParserTests.java +++ b/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/plugin/SqlMediaTypeParserTests.java @@ -20,61 +20,69 @@ import java.util.Collections; import java.util.Map; +import static org.elasticsearch.xpack.sql.plugin.SqlMediaTypeParser.getResponseMediaType; import static org.elasticsearch.xpack.sql.plugin.TextFormat.CSV; import static org.elasticsearch.xpack.sql.plugin.TextFormat.PLAIN_TEXT; import static org.elasticsearch.xpack.sql.plugin.TextFormat.TSV; import static org.elasticsearch.xpack.sql.proto.RequestInfo.CLIENT_IDS; import static org.hamcrest.CoreMatchers.is; -import static org.hamcrest.CoreMatchers.nullValue; public class SqlMediaTypeParserTests extends ESTestCase { - SqlMediaTypeParser parser = new SqlMediaTypeParser(); public void testPlainTextDetection() { - MediaType text = parser.getResponseMediaType(reqWithAccept("text/plain"), createTestInstance(false, Mode.PLAIN, false)); + MediaType text = getResponseMediaType(reqWithAccept("text/plain"), createTestInstance(false, Mode.PLAIN, false)); assertThat(text, is(PLAIN_TEXT)); } public void testCsvDetection() { - MediaType text = parser.getResponseMediaType(reqWithAccept("text/csv"), createTestInstance(false, Mode.PLAIN, false)); + MediaType text = getResponseMediaType(reqWithAccept("text/csv"), createTestInstance(false, Mode.PLAIN, false)); assertThat(text, is(CSV)); - text = parser.getResponseMediaType(reqWithAccept("text/csv; delimiter=x"), createTestInstance(false, Mode.PLAIN, false)); + text = getResponseMediaType(reqWithAccept("text/csv; delimiter=x"), createTestInstance(false, Mode.PLAIN, false)); assertThat(text, is(CSV)); } public void testTsvDetection() { - MediaType text = parser.getResponseMediaType(reqWithAccept("text/tab-separated-values"), + MediaType text = getResponseMediaType(reqWithAccept("text/tab-separated-values"), createTestInstance(false, Mode.PLAIN, false)); assertThat(text, is(TSV)); } public void testMediaTypeDetectionWithParameters() { - assertThat(parser.getResponseMediaType(reqWithAccept("text/plain; charset=utf-8"), + assertThat(getResponseMediaType(reqWithAccept("text/plain; charset=utf-8"), createTestInstance(false, Mode.PLAIN, false)), is(PLAIN_TEXT)); - assertThat(parser.getResponseMediaType(reqWithAccept("text/plain; header=present"), + assertThat(getResponseMediaType(reqWithAccept("text/plain; header=present"), createTestInstance(false, Mode.PLAIN, false)), is(PLAIN_TEXT)); - assertThat(parser.getResponseMediaType(reqWithAccept("text/plain; charset=utf-8; header=present"), + assertThat(getResponseMediaType(reqWithAccept("text/plain; charset=utf-8; header=present"), createTestInstance(false, Mode.PLAIN, false)), is(PLAIN_TEXT)); - assertThat(parser.getResponseMediaType(reqWithAccept("text/csv; charset=utf-8"), + assertThat(getResponseMediaType(reqWithAccept("text/csv; charset=utf-8"), createTestInstance(false, Mode.PLAIN, false)), is(CSV)); - assertThat(parser.getResponseMediaType(reqWithAccept("text/csv; header=present"), + assertThat(getResponseMediaType(reqWithAccept("text/csv; header=present"), createTestInstance(false, Mode.PLAIN, false)), is(CSV)); - assertThat(parser.getResponseMediaType(reqWithAccept("text/csv; charset=utf-8; header=present"), + assertThat(getResponseMediaType(reqWithAccept("text/csv; charset=utf-8; header=present"), createTestInstance(false, Mode.PLAIN, false)), is(CSV)); - assertThat(parser.getResponseMediaType(reqWithAccept("text/tab-separated-values; charset=utf-8"), + assertThat(getResponseMediaType(reqWithAccept("text/tab-separated-values; charset=utf-8"), createTestInstance(false, Mode.PLAIN, false)), is(TSV)); - assertThat(parser.getResponseMediaType(reqWithAccept("text/tab-separated-values; header=present"), + assertThat(getResponseMediaType(reqWithAccept("text/tab-separated-values; header=present"), createTestInstance(false, Mode.PLAIN, false)), is(TSV)); - assertThat(parser.getResponseMediaType(reqWithAccept("text/tab-separated-values; charset=utf-8; header=present"), + assertThat(getResponseMediaType(reqWithAccept("text/tab-separated-values; charset=utf-8; header=present"), createTestInstance(false, Mode.PLAIN, false)), is(TSV)); } public void testInvalidFormat() { - MediaType mediaType = parser.getResponseMediaType(reqWithAccept("text/garbage"), createTestInstance(false, Mode.PLAIN, false)); - assertThat(mediaType, is(nullValue())); + IllegalArgumentException e = expectThrows(IllegalArgumentException.class, + () -> getResponseMediaType(reqWithAccept("text/garbage"), createTestInstance(false, Mode.PLAIN, false))); + assertEquals(e.getMessage(), + "Invalid request content type: Accept=[text/garbage], Content-Type=[application/json], format=[null]"); + } + + public void testNoFormat() { + IllegalArgumentException e = expectThrows(IllegalArgumentException.class, + () -> getResponseMediaType(new FakeRestRequest.Builder(NamedXContentRegistry.EMPTY).build(), + createTestInstance(false, Mode.PLAIN, false))); + assertEquals(e.getMessage(), "Invalid request content type: Accept=[null], Content-Type=[null], format=[null]"); } private static RestRequest reqWithAccept(String acceptHeader) { From 67c8e03253605189139a354f38a978d06250c64b Mon Sep 17 00:00:00 2001 From: Bogdan Pintea Date: Tue, 8 Jun 2021 20:39:25 +0200 Subject: [PATCH 2/4] Address review comments - refactor media type calculation to avoid redundant check on URL param. - rename methods in tests. - remove empty lines. --- .../qa/security/RestSqlSecurityAsyncIT.java | 1 - .../xpack/sql/qa/rest/RestSqlTestCase.java | 15 ++++++------ .../xpack/sql/plugin/SqlMediaTypeParser.java | 24 ++++++++++--------- 3 files changed, 20 insertions(+), 20 deletions(-) diff --git a/x-pack/plugin/sql/qa/server/security/src/test/java/org/elasticsearch/xpack/sql/qa/security/RestSqlSecurityAsyncIT.java b/x-pack/plugin/sql/qa/server/security/src/test/java/org/elasticsearch/xpack/sql/qa/security/RestSqlSecurityAsyncIT.java index 69b22cb4f2f4d..6f341db624641 100644 --- a/x-pack/plugin/sql/qa/server/security/src/test/java/org/elasticsearch/xpack/sql/qa/security/RestSqlSecurityAsyncIT.java +++ b/x-pack/plugin/sql/qa/server/security/src/test/java/org/elasticsearch/xpack/sql/qa/security/RestSqlSecurityAsyncIT.java @@ -117,7 +117,6 @@ public void testWithManager() throws IOException { Response deleteResp = deleteAsyncSqlSearch(id, "manage_user"); assertOK(deleteResp); - } static String extractResponseId(Response response) throws IOException { diff --git a/x-pack/plugin/sql/qa/server/src/main/java/org/elasticsearch/xpack/sql/qa/rest/RestSqlTestCase.java b/x-pack/plugin/sql/qa/server/src/main/java/org/elasticsearch/xpack/sql/qa/rest/RestSqlTestCase.java index c7f7637d4038b..86f00ea65b2e0 100644 --- a/x-pack/plugin/sql/qa/server/src/main/java/org/elasticsearch/xpack/sql/qa/rest/RestSqlTestCase.java +++ b/x-pack/plugin/sql/qa/server/src/main/java/org/elasticsearch/xpack/sql/qa/rest/RestSqlTestCase.java @@ -975,7 +975,7 @@ public void testDefaultQueryInCSV() throws IOException { Tuple response = runSqlAsText(query, "text/csv"); assertEquals(expected, response.v1()); - response = runSqlAsTextFormat(query, "csv"); + response = runSqlAsTextWithFormat(query, "csv"); assertEquals(expected, response.v1()); } @@ -1038,7 +1038,7 @@ public void testQueryInTSV() throws IOException { String query = "SELECT * FROM test ORDER BY number"; Tuple response = runSqlAsText(query, "text/tab-separated-values"); assertEquals(expected, response.v1()); - response = runSqlAsTextFormat(query, "tsv"); + response = runSqlAsTextWithFormat(query, "tsv"); assertEquals(expected, response.v1()); } @@ -1158,7 +1158,6 @@ private static void bulkLoadTestData(int count) throws IOException { } request.setJsonEntity(bulk.toString()); client().performRequest(request); - } private static Tuple runSqlAsText(String sql, String accept) throws IOException { @@ -1188,7 +1187,7 @@ private static String responseBody(Response response) throws IOException { * Run SQL as text using the {@code format} parameter to specify the format * rather than an {@code Accept} header. */ - private static Tuple runSqlAsTextFormat(String sql, String format) throws IOException { + private static Tuple runSqlAsTextWithFormat(String sql, String format) throws IOException { Request request = new Request("POST", SQL_QUERY_REST_ENDPOINT); request.addParameter("error_trace", "true"); request.addParameter("format", format); @@ -1236,7 +1235,7 @@ public void testAsyncTextWait() throws IOException { }; for (String format : contentMap.keySet()) { - Response response = runSqlAsTextFormat(builder, format); + Response response = runSqlAsTextWithFormat(builder, format); assertEquals(contentMap.get(format), responseBody(response)); @@ -1269,7 +1268,7 @@ public void testAsyncTextPaginated() throws IOException, InterruptedException { .keepAlive("1d") // keep "forever" .mode(mode) .binaryFormat(false); // prevent JDBC mode to (ignore `format` and) enforce CBOR - Response response = runSqlAsTextFormat(builder, format); + Response response = runSqlAsTextWithFormat(builder, format); Character csvDelimiter = ','; @@ -1346,10 +1345,10 @@ static Map runSql(RequestObjectBuilder builder, String mode) thr } static Response runSql(RequestObjectBuilder builder) throws IOException { - return runSqlAsTextFormat(builder, null); + return runSqlAsTextWithFormat(builder, null); } - static Response runSqlAsTextFormat(RequestObjectBuilder builder, @Nullable String format) throws IOException { + static Response runSqlAsTextWithFormat(RequestObjectBuilder builder, @Nullable String format) throws IOException { Request request = new Request("POST", SQL_QUERY_REST_ENDPOINT); request.addParameter("error_trace", "true"); // Helps with debugging in case something crazy happens on the server. request.addParameter("pretty", "true"); // Improves error reporting readability diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/SqlMediaTypeParser.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/SqlMediaTypeParser.java index 7a6b3d81cb3e2..dc764553928fb 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/SqlMediaTypeParser.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/SqlMediaTypeParser.java @@ -42,26 +42,28 @@ public static MediaType getResponseMediaType(RestRequest request, SqlQueryReques // enforce CBOR response for drivers and CLI (unless instructed differently through the config param) return XContentType.CBOR; } else if (request.hasParam(URL_PARAM_FORMAT)) { - return validateColumnarRequest(sqlRequest.columnar(), - MEDIA_TYPE_REGISTRY.queryParamToMediaType(request.param(URL_PARAM_FORMAT)), request); + return validateColumnarRequest(sqlRequest.columnar(), mediaTypeFromParams(request), request); } - return getResponseMediaType(request); + return mediaTypeFromHeaders(request); } public static MediaType getResponseMediaType(RestRequest request) { - MediaType mediaType; - - if (request.hasParam(URL_PARAM_FORMAT)) { - mediaType = MEDIA_TYPE_REGISTRY.queryParamToMediaType(request.param(URL_PARAM_FORMAT)); - } else { - ParsedMediaType acceptType = request.getParsedAccept(); - mediaType = acceptType != null ? acceptType.toMediaType(MEDIA_TYPE_REGISTRY) : request.getXContentType(); - } + return request.hasParam(URL_PARAM_FORMAT) + ? checkNonNullMediaType(mediaTypeFromParams(request), request) + : mediaTypeFromHeaders(request); + } + private static MediaType mediaTypeFromHeaders(RestRequest request) { + ParsedMediaType acceptType = request.getParsedAccept(); + MediaType mediaType = acceptType != null ? acceptType.toMediaType(MEDIA_TYPE_REGISTRY) : request.getXContentType(); return checkNonNullMediaType(mediaType, request); } + private static MediaType mediaTypeFromParams(RestRequest request) { + return MEDIA_TYPE_REGISTRY.queryParamToMediaType(request.param(URL_PARAM_FORMAT)); + } + private static MediaType validateColumnarRequest(boolean requestIsColumnar, MediaType fromMediaType, RestRequest request) { if (requestIsColumnar && fromMediaType instanceof TextFormat) { throw new IllegalArgumentException("Invalid use of [columnar] argument: cannot be used in combination with " From ea85d2b97992a75048f788c9f50a6d23718635ac Mon Sep 17 00:00:00 2001 From: Bogdan Pintea Date: Wed, 9 Jun 2021 16:56:59 +0200 Subject: [PATCH 3/4] Address review comments - renamed listener; - minor test refactorings. --- .../elasticsearch/xpack/sql/qa/rest/RestSqlTestCase.java | 2 +- .../xpack/sql/plugin/RestSqlAsyncGetResultsAction.java | 2 +- .../elasticsearch/xpack/sql/plugin/RestSqlQueryAction.java | 2 +- .../{SqlResponseFormatter.java => SqlResponseListener.java} | 6 +++--- 4 files changed, 6 insertions(+), 6 deletions(-) rename x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/{SqlResponseFormatter.java => SqlResponseListener.java} (94%) diff --git a/x-pack/plugin/sql/qa/server/src/main/java/org/elasticsearch/xpack/sql/qa/rest/RestSqlTestCase.java b/x-pack/plugin/sql/qa/server/src/main/java/org/elasticsearch/xpack/sql/qa/rest/RestSqlTestCase.java index 86f00ea65b2e0..bef4b405c88b2 100644 --- a/x-pack/plugin/sql/qa/server/src/main/java/org/elasticsearch/xpack/sql/qa/rest/RestSqlTestCase.java +++ b/x-pack/plugin/sql/qa/server/src/main/java/org/elasticsearch/xpack/sql/qa/rest/RestSqlTestCase.java @@ -1273,7 +1273,7 @@ public void testAsyncTextPaginated() throws IOException, InterruptedException { Character csvDelimiter = ','; assertEquals(200, response.getStatusLine().getStatusCode()); - assertEquals(response.getHeader(HEADER_NAME_ASYNC_PARTIAL), response.getHeader(HEADER_NAME_ASYNC_RUNNING)); + assertTrue(response.getHeader(HEADER_NAME_ASYNC_PARTIAL).equals(response.getHeader(HEADER_NAME_ASYNC_RUNNING))); String asyncId = response.getHeader(HEADER_NAME_ASYNC_ID); assertTrue(hasText(asyncId)); diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/RestSqlAsyncGetResultsAction.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/RestSqlAsyncGetResultsAction.java index be2cda5863083..3762f88d245d0 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/RestSqlAsyncGetResultsAction.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/RestSqlAsyncGetResultsAction.java @@ -42,7 +42,7 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli if (request.hasParam(KEEP_ALIVE_NAME)) { get.setKeepAlive(request.paramAsTime(KEEP_ALIVE_NAME, get.getKeepAlive())); } - return channel -> client.execute(SqlAsyncGetResultsAction.INSTANCE, get, new SqlResponseFormatter(channel, request)); + return channel -> client.execute(SqlAsyncGetResultsAction.INSTANCE, get, new SqlResponseListener(channel, request)); } @Override diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/RestSqlQueryAction.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/RestSqlQueryAction.java index 6040630787dfc..53b0b84c014a0 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/RestSqlQueryAction.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/RestSqlQueryAction.java @@ -50,7 +50,7 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli return channel -> { RestCancellableNodeClient cancellableClient = new RestCancellableNodeClient(client, request.getHttpChannel()); - cancellableClient.execute(SqlQueryAction.INSTANCE, sqlRequest, new SqlResponseFormatter(channel, request, sqlRequest)); + cancellableClient.execute(SqlQueryAction.INSTANCE, sqlRequest, new SqlResponseListener(channel, request, sqlRequest)); }; } diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/SqlResponseFormatter.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/SqlResponseListener.java similarity index 94% rename from x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/SqlResponseFormatter.java rename to x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/SqlResponseListener.java index 36be806efc73b..200af40066d08 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/SqlResponseFormatter.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/SqlResponseListener.java @@ -29,14 +29,14 @@ import static org.elasticsearch.xpack.sql.proto.Protocol.HEADER_NAME_TOOK_NANOS; import static org.elasticsearch.xpack.sql.proto.Protocol.URL_PARAM_DELIMITER; -class SqlResponseFormatter extends RestResponseListener { +class SqlResponseListener extends RestResponseListener { private final long startNanos = System.nanoTime(); private final MediaType mediaType; private final RestRequest request; - SqlResponseFormatter(RestChannel channel, RestRequest request, SqlQueryRequest sqlRequest) { + SqlResponseListener(RestChannel channel, RestRequest request, SqlQueryRequest sqlRequest) { super(channel); this.request = request; @@ -55,7 +55,7 @@ class SqlResponseFormatter extends RestResponseListener { } } - SqlResponseFormatter(RestChannel channel, RestRequest request) { + SqlResponseListener(RestChannel channel, RestRequest request) { super(channel); this.request = request; this.mediaType = SqlMediaTypeParser.getResponseMediaType(request); From 6455ff3c73f38cccffcf4f82e06973cd356b0f1c Mon Sep 17 00:00:00 2001 From: Bogdan Pintea Date: Wed, 9 Jun 2021 18:24:55 +0200 Subject: [PATCH 4/4] Fix imports past master merge - Fix Nullable and TimeValue imports after classes relocation. --- .../xpack/sql/qa/security/RestSqlSecurityAsyncIT.java | 2 +- .../org/elasticsearch/xpack/sql/qa/rest/RestSqlTestCase.java | 2 +- .../java/org/elasticsearch/xpack/sql/action/SqlQueryTask.java | 2 +- .../elasticsearch/xpack/sql/action/AsyncSqlSearchActionIT.java | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/x-pack/plugin/sql/qa/server/security/src/test/java/org/elasticsearch/xpack/sql/qa/security/RestSqlSecurityAsyncIT.java b/x-pack/plugin/sql/qa/server/security/src/test/java/org/elasticsearch/xpack/sql/qa/security/RestSqlSecurityAsyncIT.java index 6f341db624641..deeac8212161d 100644 --- a/x-pack/plugin/sql/qa/server/security/src/test/java/org/elasticsearch/xpack/sql/qa/security/RestSqlSecurityAsyncIT.java +++ b/x-pack/plugin/sql/qa/server/security/src/test/java/org/elasticsearch/xpack/sql/qa/security/RestSqlSecurityAsyncIT.java @@ -14,10 +14,10 @@ import org.elasticsearch.client.ResponseException; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.json.JsonXContent; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.test.rest.ESRestTestCase; import org.elasticsearch.xpack.core.XPackPlugin; import org.elasticsearch.xpack.core.async.AsyncExecutionId; diff --git a/x-pack/plugin/sql/qa/server/src/main/java/org/elasticsearch/xpack/sql/qa/rest/RestSqlTestCase.java b/x-pack/plugin/sql/qa/server/src/main/java/org/elasticsearch/xpack/sql/qa/rest/RestSqlTestCase.java index bef4b405c88b2..13906046e7af4 100644 --- a/x-pack/plugin/sql/qa/server/src/main/java/org/elasticsearch/xpack/sql/qa/rest/RestSqlTestCase.java +++ b/x-pack/plugin/sql/qa/server/src/main/java/org/elasticsearch/xpack/sql/qa/rest/RestSqlTestCase.java @@ -16,9 +16,9 @@ import org.elasticsearch.client.Response; import org.elasticsearch.client.ResponseException; import org.elasticsearch.common.CheckedSupplier; -import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Tuple; import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.xcontent.XContentBuilder; diff --git a/x-pack/plugin/sql/sql-action/src/main/java/org/elasticsearch/xpack/sql/action/SqlQueryTask.java b/x-pack/plugin/sql/sql-action/src/main/java/org/elasticsearch/xpack/sql/action/SqlQueryTask.java index d07a3d68de365..710f5309a49aa 100644 --- a/x-pack/plugin/sql/sql-action/src/main/java/org/elasticsearch/xpack/sql/action/SqlQueryTask.java +++ b/x-pack/plugin/sql/sql-action/src/main/java/org/elasticsearch/xpack/sql/action/SqlQueryTask.java @@ -7,7 +7,7 @@ package org.elasticsearch.xpack.sql.action; -import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.xpack.core.async.AsyncExecutionId; import org.elasticsearch.xpack.core.async.StoredAsyncTask; diff --git a/x-pack/plugin/sql/src/internalClusterTest/java/org/elasticsearch/xpack/sql/action/AsyncSqlSearchActionIT.java b/x-pack/plugin/sql/src/internalClusterTest/java/org/elasticsearch/xpack/sql/action/AsyncSqlSearchActionIT.java index f4f820617b1eb..7b4c851d1e628 100644 --- a/x-pack/plugin/sql/src/internalClusterTest/java/org/elasticsearch/xpack/sql/action/AsyncSqlSearchActionIT.java +++ b/x-pack/plugin/sql/src/internalClusterTest/java/org/elasticsearch/xpack/sql/action/AsyncSqlSearchActionIT.java @@ -19,8 +19,8 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.CollectionUtils; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.script.MockScriptPlugin;