Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,23 @@

import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ChunkedToXContent;
import org.elasticsearch.common.xcontent.XContentParserUtils;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.xcontent.ConstructingObjectParser;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentParser;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand All @@ -32,7 +35,7 @@
/**
* Response for {@link FieldCapabilitiesRequest} requests.
*/
public class FieldCapabilitiesResponse extends ActionResponse implements ToXContentObject {
public class FieldCapabilitiesResponse extends ActionResponse implements ToXContentObject, ChunkedToXContent {
private static final ParseField INDICES_FIELD = new ParseField("indices");
private static final ParseField FIELDS_FIELD = new ParseField("fields");
private static final ParseField FAILED_INDICES_FIELD = new ParseField("failed_indices");
Expand Down Expand Up @@ -150,23 +153,29 @@ private static void writeField(StreamOutput out, Map<String, FieldCapabilities>
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
public Iterator<? extends ToXContent> toXContentChunked() {
if (indexResponses.size() > 0) {
throw new IllegalStateException("cannot serialize non-merged response");
}
builder.startObject();
builder.array(INDICES_FIELD.getPreferredName(), indices);
builder.startObject(FIELDS_FIELD.getPreferredName());
for (var r : responseMap.entrySet()) {
builder.xContentValuesMap(r.getKey(), r.getValue());
}
builder.endObject();
if (this.failures.size() > 0) {
builder.field(FAILED_INDICES_FIELD.getPreferredName(), getFailedIndices().length);
builder.xContentList(FAILURES_FIELD.getPreferredName(), failures);
}
builder.endObject();
return builder;

return Iterators.concat(
Iterators.single(
(b, p) -> b.startObject().array(INDICES_FIELD.getPreferredName(), indices).startObject(FIELDS_FIELD.getPreferredName())
),
responseMap.entrySet().stream().map(r -> (ToXContent) (b, p) -> b.xContentValuesMap(r.getKey(), r.getValue())).iterator(),
this.failures.size() > 0
? Iterators.concat(
Iterators.single(
(ToXContent) (b, p) -> b.endObject()
.field(FAILED_INDICES_FIELD.getPreferredName(), getFailedIndices().length)
.field(FAILURES_FIELD.getPreferredName())
.startArray()
),
failures.iterator(),
Iterators.single((b, p) -> b.endArray().endObject())
)
: Iterators.single((b, p) -> b.endObject().endObject())
);
}

public static FieldCapabilitiesResponse fromXContent(XContentParser parser) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ public interface ChunkedToXContent extends ToXContent {
* {@link ToXContent.Params} for each call until it is fully drained.
* @return iterator over chunks of {@link ToXContent}
*/
Iterator<ToXContent> toXContentChunked();
Iterator<? extends ToXContent> toXContentChunked();

@Override
default XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
Iterator<ToXContent> serialization = toXContentChunked();
Iterator<? extends ToXContent> serialization = toXContentChunked();
while (serialization.hasNext()) {
serialization.next().toXContent(builder, params);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public void write(byte[] b, int off, int len) throws IOException {
Streams.noCloseStream(out)
);

private final Iterator<ToXContent> serialization = chunkedToXContent.toXContentChunked();
private final Iterator<? extends ToXContent> serialization = chunkedToXContent.toXContentChunked();

private BytesStream target;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,15 @@
package org.elasticsearch.rest.action;

import org.elasticsearch.action.fieldcaps.FieldCapabilitiesRequest;
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.ChunkedRestResponseBody;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestResponse;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.xcontent.ObjectParser;
import org.elasticsearch.xcontent.ParseField;

Expand Down Expand Up @@ -67,12 +71,18 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
}
fieldRequest.fields(Strings.splitStringByCommaToArray(request.param("fields")));
}
return channel -> client.fieldCaps(fieldRequest, new RestToXContentListener<>(channel));
return channel -> client.fieldCaps(fieldRequest, new RestActionListener<>(channel) {
@Override
protected void processResponse(FieldCapabilitiesResponse response) throws IOException {
ensureOpen();
channel.sendResponse(new RestResponse(RestStatus.OK, ChunkedRestResponseBody.fromXContent(response, request, channel)));
}
});
}

private static ParseField INDEX_FILTER_FIELD = new ParseField("index_filter");
private static ParseField RUNTIME_MAPPINGS_FIELD = new ParseField("runtime_mappings");
private static ParseField FIELDS_FIELD = new ParseField("fields");
private static final ParseField INDEX_FILTER_FIELD = new ParseField("index_filter");
private static final ParseField RUNTIME_MAPPINGS_FIELD = new ParseField("runtime_mappings");
private static final ParseField FIELDS_FIELD = new ParseField("fields");

private static final ObjectParser<FieldCapabilitiesRequest, Void> PARSER = new ObjectParser<>("field_caps_request");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public void testFailureParsing() throws IOException {
}
}

private FieldCapabilitiesResponse createResponseWithFailures() {
public static FieldCapabilitiesResponse createResponseWithFailures() {
String[] indices = randomArray(randomIntBetween(1, 5), String[]::new, () -> randomAlphaOfLength(5));
List<FieldCapabilitiesFailure> failures = new ArrayList<>();
for (String index : indices) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,4 +212,31 @@ private static FieldCapabilitiesResponse createSimpleResponse() {
);
return new FieldCapabilitiesResponse(new String[] { "index1", "index2", "index3", "index4" }, responses, failureMap);
}

public void testExpectedChunkSizes() {
{
final FieldCapabilitiesResponse instance = FieldCapabilitiesResponseTests.createResponseWithFailures();
final var iterator = instance.toXContentChunked();
int chunks = 0;
while (iterator.hasNext()) {
iterator.next();
chunks++;
}
if (instance.getFailures().isEmpty()) {
assertEquals(2, chunks);
} else {
assertEquals(3 + instance.get().size() + instance.getFailures().size(), chunks);
}
}
{
final FieldCapabilitiesResponse instance = createTestInstance();
final var iterator = instance.toXContentChunked();
int chunks = 0;
while (iterator.hasNext()) {
iterator.next();
chunks++;
}
assertEquals(2 + instance.get().size(), chunks);
}
}
}