Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,34 @@

import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.Query;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.fieldcaps.FieldCapabilities;
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesAction;
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesFailure;
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesRequest;
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse;
import org.elasticsearch.action.fieldcaps.TransportFieldCapabilitiesAction;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.breaker.CircuitBreakingException;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.mapper.DocumentParserContext;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentFactory;
import org.elasticsearch.index.mapper.KeywordFieldMapper;
import org.elasticsearch.index.mapper.MetadataFieldMapper;
import org.elasticsearch.index.mapper.DocumentParserContext;
import org.elasticsearch.index.query.AbstractQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
Expand All @@ -39,16 +57,28 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.Predicate;

import static java.util.Collections.singletonList;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.aMapWithSize;
import static org.hamcrest.Matchers.arrayContainingInAnyOrder;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.hasSize;

public class FieldCapabilitiesIT extends ESIntegTestCase {

@Override
protected Collection<Class<? extends Plugin>> getMockPlugins() {
final Collection<Class<? extends Plugin>> plugins = new ArrayList<>(super.getMockPlugins());
plugins.add(MockTransportService.TestPlugin.class);
return plugins;
}

@Override
@Before
public void setUp() throws Exception {
Expand Down Expand Up @@ -313,6 +343,163 @@ public void testFailures() throws InterruptedException {
assertEquals("I throw because I choose to.", ex.getMessage());
}

private void populateTimeRangeIndices() throws Exception {
internalCluster().ensureAtLeastNumDataNodes(2);
assertAcked(prepareCreate("log-index-1")
.setSettings(Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, between(1, 5))
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1))
.addMapping("_doc", "timestamp", "type=date", "field1", "type=keyword"));
assertAcked(prepareCreate("log-index-2")
.setSettings(Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, between(1, 5))
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1))
.addMapping("_doc", "timestamp", "type=date", "field1", "type=long"));
List<IndexRequestBuilder> reqs = new ArrayList<>();
reqs.add(client().prepareIndex("log-index-1", "_doc").setSource("timestamp", "2015-07-08"));
reqs.add(client().prepareIndex("log-index-1", "_doc").setSource("timestamp", "2018-07-08"));
reqs.add(client().prepareIndex("log-index-1", "_doc").setSource("timestamp", "2020-03-03"));
reqs.add(client().prepareIndex("log-index-1", "_doc").setSource("timestamp", "2020-09-09"));
reqs.add(client().prepareIndex("log-index-2", "_doc").setSource("timestamp", "2019-10-12"));
reqs.add(client().prepareIndex("log-index-2", "_doc").setSource("timestamp", "2020-02-02"));
reqs.add(client().prepareIndex("log-index-2", "_doc").setSource("timestamp", "2020-10-10"));
indexRandom(true, reqs);
ensureGreen("log-index-1", "log-index-2");
client().admin().indices().prepareRefresh("log-index-1", "log-index-2").get();
}

public void testTargetNodeFails() throws Exception {
populateTimeRangeIndices();
try {
final AtomicBoolean failedRequest = new AtomicBoolean();
for (String node : internalCluster().getNodeNames()) {
MockTransportService transportService = (MockTransportService) internalCluster().getInstance(TransportService.class, node);
transportService.addRequestHandlingBehavior(TransportFieldCapabilitiesAction.ACTION_NODE_NAME,
(handler, request, channel, task) -> {
if (failedRequest.compareAndSet(false, true)) {
channel.sendResponse(new CircuitBreakingException("Simulated", CircuitBreaker.Durability.TRANSIENT));
} else {
handler.messageReceived(request, channel, task);
}
});
}
FieldCapabilitiesRequest request = new FieldCapabilitiesRequest();
request.indices("log-index-*");
request.fields("*");
if (randomBoolean()) {
request.indexFilter(QueryBuilders.rangeQuery("timestamp").gte("2020-01-01"));
}
final FieldCapabilitiesResponse response = client().execute(FieldCapabilitiesAction.INSTANCE, request).actionGet();
assertTrue(failedRequest.get());
assertThat(response.getIndices(), arrayContainingInAnyOrder("log-index-1", "log-index-2"));
assertThat(response.getField("field1"), aMapWithSize(2));
assertThat(response.getField("field1"), hasKey("long"));
assertThat(response.getField("field1"), hasKey("keyword"));
} finally {
for (String node : internalCluster().getNodeNames()) {
MockTransportService transportService = (MockTransportService) internalCluster().getInstance(TransportService.class, node);
transportService.clearAllRules();
}
}
}

public void testNoActiveCopy() throws Exception {
assertAcked(prepareCreate("log-index-inactive")
.setSettings(Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, between(1, 5))
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
.put("index.routing.allocation.require._id", "unknown"))
.setWaitForActiveShards(ActiveShardCount.NONE)
.addMapping("_doc", "timestamp", "type=date", "field1", "type=keyword"));
{
final ElasticsearchException ex =
expectThrows(ElasticsearchException.class, () -> client().prepareFieldCaps("log-index-*").setFields("*").get());
assertThat(ex.getMessage(), equalTo("index [log-index-inactive] has no active shard copy"));
}
{
populateTimeRangeIndices();
FieldCapabilitiesRequest request = new FieldCapabilitiesRequest();
request.indices("log-index-*");
request.fields("*");
if (randomBoolean()) {
request.indexFilter(QueryBuilders.rangeQuery("timestamp").gte("2020-01-01"));
}
final FieldCapabilitiesResponse response = client().execute(FieldCapabilitiesAction.INSTANCE, request).actionGet();
assertThat(response.getIndices(), arrayContainingInAnyOrder("log-index-1", "log-index-2"));
assertThat(response.getField("field1"), aMapWithSize(2));
assertThat(response.getField("field1"), hasKey("long"));
assertThat(response.getField("field1"), hasKey("long"));

assertThat(response.getFailures(), hasSize(1));
final FieldCapabilitiesFailure failure = response.getFailures().get(0);
assertThat(failure.getIndices(), arrayContainingInAnyOrder("log-index-inactive"));
assertThat(failure.getException().getMessage(), equalTo("index [log-index-inactive] has no active shard copy"));
}
}

private void moveOrCloseShardsOnNodes(String nodeName) throws Exception {
final IndicesService indicesService = internalCluster().getInstance(IndicesService.class, nodeName);
for (IndexService indexService : indicesService) {
for (IndexShard indexShard : indexService) {
if (randomBoolean()) {
indexShard.close("test", randomBoolean());
} else if (randomBoolean()) {
final ShardId shardId = indexShard.shardId();
final String[] nodeNames = internalCluster().getNodeNames();
final String newNodeName = randomValueOtherThanMany(n -> nodeName.equals(n) == false, () -> randomFrom(nodeNames));
DiscoveryNode fromNode = null;
DiscoveryNode toNode = null;
for (DiscoveryNode node : clusterService().state().nodes()) {
if (node.getName().equals(nodeName)) {
fromNode = node;
}
if (node.getName().equals(newNodeName)) {
toNode = node;
}
}
assertNotNull(fromNode);
assertNotNull(toNode);
client().admin().cluster().prepareReroute()
.add(new MoveAllocationCommand(shardId.getIndexName(), shardId.id(), fromNode.getId(), toNode.getId()))
.execute().actionGet();
}
}
}
}

public void testRelocation() throws Exception {
populateTimeRangeIndices();
try {
final AtomicBoolean relocated = new AtomicBoolean();
for (String node : internalCluster().getNodeNames()) {
MockTransportService transportService = (MockTransportService) internalCluster().getInstance(TransportService.class, node);
transportService.addRequestHandlingBehavior(TransportFieldCapabilitiesAction.ACTION_NODE_NAME,
(handler, request, channel, task) -> {
if (relocated.compareAndSet(false, true)) {
moveOrCloseShardsOnNodes(node);
}
handler.messageReceived(request, channel, task);
});
}
FieldCapabilitiesRequest request = new FieldCapabilitiesRequest();
request.indices("log-index-*");
request.fields("*");
if (randomBoolean()) {
request.indexFilter(QueryBuilders.rangeQuery("timestamp").gte("2020-01-01"));
}
final FieldCapabilitiesResponse response = client().execute(FieldCapabilitiesAction.INSTANCE, request).actionGet();
assertThat(response.getIndices(), arrayContainingInAnyOrder("log-index-1", "log-index-2"));
assertThat(response.getField("field1"), aMapWithSize(2));
assertThat(response.getField("field1"), hasKey("long"));
assertThat(response.getField("field1"), hasKey("long"));
} finally {
for (String node : internalCluster().getNodeNames()) {
MockTransportService transportService = (MockTransportService) internalCluster().getInstance(TransportService.class, node);
transportService.clearAllRules();
}
}
}

private void assertIndices(FieldCapabilitiesResponse response, String... indices) {
assertNotNull(response.getIndices());
Arrays.sort(indices);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,6 @@
import org.elasticsearch.action.explain.TransportExplainAction;
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesAction;
import org.elasticsearch.action.fieldcaps.TransportFieldCapabilitiesAction;
import org.elasticsearch.action.fieldcaps.TransportFieldCapabilitiesIndexAction;
import org.elasticsearch.action.get.GetAction;
import org.elasticsearch.action.get.MultiGetAction;
import org.elasticsearch.action.get.TransportGetAction;
Expand Down Expand Up @@ -635,8 +634,7 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
actions.register(GetScriptContextAction.INSTANCE, TransportGetScriptContextAction.class);
actions.register(GetScriptLanguageAction.INSTANCE, TransportGetScriptLanguageAction.class);

actions.register(FieldCapabilitiesAction.INSTANCE, TransportFieldCapabilitiesAction.class,
TransportFieldCapabilitiesIndexAction.class);
actions.register(FieldCapabilitiesAction.INSTANCE, TransportFieldCapabilitiesAction.class);

actions.register(PutPipelineAction.INSTANCE, PutPipelineTransportAction.class);
actions.register(GetPipelineAction.INSTANCE, GetPipelineTransportAction.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,26 +22,25 @@
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;

public class FieldCapabilitiesIndexRequest extends ActionRequest implements IndicesRequest {

public static final IndicesOptions INDICES_OPTIONS = IndicesOptions.strictSingleIndexNoExpandForbidClosed();

private final String index;
private final String[] fields;
private final OriginalIndices originalIndices;
private final QueryBuilder indexFilter;
private final long nowInMillis;
private final Map<String, Object> runtimeFields;

private ShardId shardId;
private final ShardId shardId;

// For serialization
FieldCapabilitiesIndexRequest(StreamInput in) throws IOException {
super(in);
shardId = in.readOptionalWriteable(ShardId::new);
index = in.readOptionalString();
if (in.getVersion().before(Version.V_7_16_0)) {
in.readOptionalString(); // index
}
fields = in.readStringArray();
if (in.getVersion().onOrAfter(Version.V_6_2_0)) {
originalIndices = OriginalIndices.readOriginalIndices(in);
Expand All @@ -54,15 +53,15 @@ public class FieldCapabilitiesIndexRequest extends ActionRequest implements Indi
}

FieldCapabilitiesIndexRequest(String[] fields,
String index,
ShardId shardId,
OriginalIndices originalIndices,
QueryBuilder indexFilter,
long nowInMillis,
Map<String, Object> runtimeFields) {
if (fields == null || fields.length == 0) {
throw new IllegalArgumentException("specified fields can't be null or empty");
}
this.index = Objects.requireNonNull(index);
this.shardId = shardId;
this.fields = fields;
this.originalIndices = originalIndices;
this.indexFilter = indexFilter;
Expand All @@ -85,7 +84,7 @@ public IndicesOptions indicesOptions() {
}

public String index() {
return index;
return shardId.getIndexName();
}

public QueryBuilder indexFilter() {
Expand All @@ -104,16 +103,14 @@ public long nowInMillis() {
return nowInMillis;
}

FieldCapabilitiesIndexRequest shardId(ShardId shardId) {
this.shardId = shardId;
return this;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeOptionalWriteable(shardId);
out.writeOptionalString(index);
if (out.getVersion().before(Version.V_7_16_0)) {
out.writeOptionalString(shardId.getIndexName());
}
out.writeStringArray(fields);
if (out.getVersion().onOrAfter(Version.V_6_2_0)) {
OriginalIndices.writeOriginalIndices(originalIndices, out);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@
import java.util.Map;
import java.util.Objects;

/**
* Response for {@link TransportFieldCapabilitiesIndexAction}.
*/
public class FieldCapabilitiesIndexResponse extends ActionResponse implements Writeable {
private final String indexName;
private final Map<String, IndexFieldCapabilities> responseMap;
Expand Down
Loading