Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
273a352
Watcher: Use Bulkprocessor in HistoryStore/TriggeredWatchStore
spinscale Jul 31, 2018
36e3f7e
fix timeout
spinscale Jul 31, 2018
d700558
Merge branch 'master' into 1807-watcher-allow-bulk-requests-for-histo…
spinscale Jul 31, 2018
660b43a
Merge branch 'master' into 1807-watcher-allow-bulk-requests-for-histo…
spinscale Aug 6, 2018
c6ff80f
Merge branch 'master' into 1807-watcher-allow-bulk-requests-for-histo…
spinscale Aug 16, 2018
1c2ab0f
to discuss: only use a single bulk processor
spinscale Aug 16, 2018
99b2b1a
Merge branch 'master' into 1807-watcher-allow-bulk-requests-for-histo…
spinscale Aug 21, 2018
de78ceb
Merge branch 'master' into 1807-watcher-allow-bulk-requests-for-histo…
spinscale Aug 29, 2018
6bfbeeb
remove TODO, minor refactoring, added more tests when deleting or add…
spinscale Aug 29, 2018
3ba6138
Merge branch 'master' into 1807-watcher-allow-bulk-requests-for-histo…
spinscale Aug 30, 2018
365fefa
Merge branch 'master' into 1807-watcher-allow-bulk-requests-for-histo…
spinscale Aug 31, 2018
11ba093
Merge branch 'master' into 1807-watcher-allow-bulk-requests-for-histo…
spinscale Aug 31, 2018
eee7a52
Merge branch 'master' into 1807-watcher-allow-bulk-requests-for-histo…
spinscale Sep 5, 2018
5541e2d
make store final again
spinscale Sep 5, 2018
1ed000f
Merge branch 'master' into 1807-watcher-allow-bulk-requests-for-histo…
spinscale Sep 6, 2018
56a9728
Merge branch 'master' into 1807-watcher-allow-bulk-requests-for-histo…
spinscale Sep 6, 2018
1359c8c
Merge branch 'master' into 1807-watcher-allow-bulk-requests-for-histo…
spinscale Sep 17, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,14 @@
*/
package org.elasticsearch.xpack.watcher;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.bootstrap.BootstrapCheck;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
Expand All @@ -20,13 +25,14 @@
import org.elasticsearch.common.inject.util.Providers;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.logging.LoggerMessageFormat;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.IndexScopedSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
Expand All @@ -51,6 +57,7 @@
import org.elasticsearch.threadpool.FixedExecutorBuilder;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.ResourceWatcherService;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.XPackPlugin;
import org.elasticsearch.xpack.core.XPackSettings;
import org.elasticsearch.xpack.core.ssl.SSLService;
Expand Down Expand Up @@ -184,12 +191,16 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;

import static java.util.Collections.emptyList;
import static org.elasticsearch.common.settings.Setting.Property.NodeScope;
import static org.elasticsearch.xpack.core.ClientHelper.WATCHER_ORIGIN;

public class Watcher extends Plugin implements ActionPlugin, ScriptPlugin, ReloadablePlugin {

Expand All @@ -201,6 +212,16 @@ public class Watcher extends Plugin implements ActionPlugin, ScriptPlugin, Reloa
Setting.boolSetting("xpack.watcher.encrypt_sensitive_data", false, Setting.Property.NodeScope);
public static final Setting<TimeValue> MAX_STOP_TIMEOUT_SETTING =
Setting.timeSetting("xpack.watcher.stop.timeout", TimeValue.timeValueSeconds(30), Setting.Property.NodeScope);
private static final Setting<Integer> SETTING_BULK_ACTIONS =
Setting.intSetting("xpack.watcher.bulk.actions", 1, 1, 10000, NodeScope);
private static final Setting<Integer> SETTING_BULK_CONCURRENT_REQUESTS =
Setting.intSetting("xpack.watcher.bulk.concurrent_requests", 0, 0, 20, NodeScope);
private static final Setting<TimeValue> SETTING_BULK_FLUSH_INTERVAL =
Setting.timeSetting("xpack.watcher.bulk.flush_interval", TimeValue.timeValueSeconds(1), NodeScope);
private static final Setting<ByteSizeValue> SETTING_BULK_SIZE =
Setting.byteSizeSetting("xpack.watcher.bulk.size", new ByteSizeValue(1, ByteSizeUnit.MB),
new ByteSizeValue(1, ByteSizeUnit.MB), new ByteSizeValue(10, ByteSizeUnit.MB), NodeScope);


public static final ScriptContext<SearchScript.Factory> SCRIPT_SEARCH_CONTEXT =
new ScriptContext<>("xpack", SearchScript.Factory.class);
Expand All @@ -210,9 +231,10 @@ public class Watcher extends Plugin implements ActionPlugin, ScriptPlugin, Reloa
public static final ScriptContext<TemplateScript.Factory> SCRIPT_TEMPLATE_CONTEXT
= new ScriptContext<>("xpack_template", TemplateScript.Factory.class);

private static final Logger logger = Loggers.getLogger(Watcher.class);
private static final Logger logger = LogManager.getLogger(Watcher.class);
private WatcherIndexingListener listener;
private HttpClient httpClient;
private BulkProcessor bulkProcessor;

protected final Settings settings;
protected final boolean transportClient;
Expand Down Expand Up @@ -318,7 +340,49 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
final InputRegistry inputRegistry = new InputRegistry(settings, inputFactories);
inputFactories.put(ChainInput.TYPE, new ChainInputFactory(settings, inputRegistry));

final HistoryStore historyStore = new HistoryStore(settings, client);
bulkProcessor = BulkProcessor.builder(ClientHelper.clientWithOrigin(client, WATCHER_ORIGIN), new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {
}

@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
if (response.hasFailures()) {
Map<String, String> triggeredWatches = Arrays.stream(response.getItems())
.filter(BulkItemResponse::isFailed)
.filter(r -> r.getIndex().startsWith(TriggeredWatchStoreField.INDEX_NAME))
.collect(Collectors.toMap(BulkItemResponse::getId, BulkItemResponse::getFailureMessage));
if (triggeredWatches.isEmpty() == false) {
String failure = triggeredWatches.values().stream().collect(Collectors.joining(", "));
logger.error("triggered watches could not be deleted {}, failure [{}]",
triggeredWatches.keySet(), Strings.substring(failure, 0, 2000));
}

Map<String, String> overwrittenIds = Arrays.stream(response.getItems())
.filter(BulkItemResponse::isFailed)
.filter(r -> r.getIndex().startsWith(HistoryStoreField.INDEX_PREFIX))
.filter(r -> r.getVersion() > 1)
.collect(Collectors.toMap(BulkItemResponse::getId, BulkItemResponse::getFailureMessage));
if (overwrittenIds.isEmpty() == false) {
String failure = overwrittenIds.values().stream().collect(Collectors.joining(", "));
logger.info("overwrote watch history entries {}, possible second execution of a triggered watch, failure [{}]",
overwrittenIds.keySet(), Strings.substring(failure, 0, 2000));
}
}
}

@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
logger.error("error executing bulk", failure);
}
})
.setFlushInterval(SETTING_BULK_FLUSH_INTERVAL.get(settings))
.setBulkActions(SETTING_BULK_ACTIONS.get(settings))
.setBulkSize(SETTING_BULK_SIZE.get(settings))
.setConcurrentRequests(SETTING_BULK_CONCURRENT_REQUESTS.get(settings))
.build();

HistoryStore historyStore = new HistoryStore(settings, bulkProcessor);

// schedulers
final Set<Schedule.Parser> scheduleParsers = new HashSet<>();
Expand All @@ -340,7 +404,7 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
final TriggerService triggerService = new TriggerService(settings, triggerEngines);

final TriggeredWatch.Parser triggeredWatchParser = new TriggeredWatch.Parser(settings, triggerService);
final TriggeredWatchStore triggeredWatchStore = new TriggeredWatchStore(settings, client, triggeredWatchParser);
final TriggeredWatchStore triggeredWatchStore = new TriggeredWatchStore(settings, client, triggeredWatchParser, bulkProcessor);

final WatcherSearchTemplateService watcherSearchTemplateService =
new WatcherSearchTemplateService(settings, scriptService, xContentRegistry);
Expand Down Expand Up @@ -416,6 +480,12 @@ public List<Setting<?>> getSettings() {
settings.add(Setting.simpleString("xpack.watcher.execution.scroll.timeout", Setting.Property.NodeScope));
settings.add(WatcherLifeCycleService.SETTING_REQUIRE_MANUAL_START);

// bulk processor configuration
settings.add(SETTING_BULK_ACTIONS);
settings.add(SETTING_BULK_CONCURRENT_REQUESTS);
settings.add(SETTING_BULK_FLUSH_INTERVAL);
settings.add(SETTING_BULK_SIZE);

// notification services
settings.addAll(SlackService.getSettings());
settings.addAll(EmailService.getSettings());
Expand Down Expand Up @@ -608,7 +678,15 @@ public List<ScriptContext<?>> getContexts() {

@Override
public void close() throws IOException {
bulkProcessor.flush();
IOUtils.closeWhileHandlingException(httpClient);
try {
if (bulkProcessor.awaitClose(10, TimeUnit.SECONDS) == false) {
logger.warn("failed to properly close watcher bulk processor");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,11 +320,8 @@ record = createWatchRecord(record, ctx, e);
// TODO log watch record in logger, when saving in history store failed, otherwise the info is gone!
}
}
try {
triggeredWatchStore.delete(ctx.id());
} catch (Exception e) {
logger.error((Supplier<?>) () -> new ParameterizedMessage("failed to delete triggered watch [{}]", ctx.id()), e);
}

triggeredWatchStore.delete(ctx.id());
}
currentExecutions.get().remove(watchId);
logger.debug("finished [{}]/[{}]", watchId, ctx.id());
Expand Down Expand Up @@ -412,14 +409,8 @@ private void executeAsync(WatchExecutionContext ctx, final TriggeredWatch trigge
triggeredWatch.id()), exc);
}

try {
triggeredWatchStore.delete(triggeredWatch.id());
} catch (Exception exc) {
logger.error((Supplier<?>) () ->
new ParameterizedMessage("Error deleting triggered watch store record for watch [{}] after thread pool " +
"rejection", triggeredWatch.id()), exc);
}
};
triggeredWatchStore.delete(triggeredWatch.id());
}
}

WatchRecord executeInner(WatchExecutionContext ctx) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
Expand All @@ -24,14 +25,14 @@
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.watcher.execution.TriggeredWatchStoreField;
import org.elasticsearch.xpack.core.watcher.execution.Wid;
import org.elasticsearch.xpack.core.watcher.watch.Watch;
Expand All @@ -46,8 +47,6 @@
import java.util.stream.Collectors;

import static org.elasticsearch.xpack.core.ClientHelper.WATCHER_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
import static org.elasticsearch.xpack.core.ClientHelper.stashWithOrigin;

public class TriggeredWatchStore extends AbstractComponent {

Expand All @@ -58,21 +57,17 @@ public class TriggeredWatchStore extends AbstractComponent {

private final TimeValue defaultBulkTimeout;
private final TimeValue defaultSearchTimeout;
private final BulkProcessor bulkProcessor;

public TriggeredWatchStore(Settings settings, Client client, TriggeredWatch.Parser triggeredWatchParser) {
public TriggeredWatchStore(Settings settings, Client client, TriggeredWatch.Parser triggeredWatchParser, BulkProcessor bulkProcessor) {
super(settings);
this.scrollSize = settings.getAsInt("xpack.watcher.execution.scroll.size", 1000);
this.client = client;
this.client = ClientHelper.clientWithOrigin(client, WATCHER_ORIGIN);
this.scrollTimeout = settings.getAsTime("xpack.watcher.execution.scroll.timeout", TimeValue.timeValueMinutes(5));
this.defaultBulkTimeout = settings.getAsTime("xpack.watcher.internal.ops.bulk.default_timeout", TimeValue.timeValueSeconds(120));
this.defaultSearchTimeout = settings.getAsTime("xpack.watcher.internal.ops.search.default_timeout", TimeValue.timeValueSeconds(30));
this.triggeredWatchParser = triggeredWatchParser;
}

public static boolean validate(ClusterState state) {
IndexMetaData indexMetaData = WatchStoreUtils.getConcreteIndex(TriggeredWatchStoreField.INDEX_NAME, state.metaData());
return indexMetaData == null || (indexMetaData.getState() == IndexMetaData.State.OPEN &&
state.routingTable().index(indexMetaData.getIndex()).allPrimaryShardsActive());
this.bulkProcessor = bulkProcessor;
}

public void putAll(final List<TriggeredWatch> triggeredWatches, final ActionListener<BulkResponse> listener) throws IOException {
Expand All @@ -81,8 +76,7 @@ public void putAll(final List<TriggeredWatch> triggeredWatches, final ActionList
return;
}

executeAsyncWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN, createBulkRequest(triggeredWatches,
TriggeredWatchStoreField.DOC_TYPE), listener, client::bulk);
client.bulk(createBulkRequest(triggeredWatches), listener);
}

public BulkResponse putAll(final List<TriggeredWatch> triggeredWatches) throws IOException {
Expand All @@ -94,14 +88,14 @@ public BulkResponse putAll(final List<TriggeredWatch> triggeredWatches) throws I
/**
* Create a bulk request from the triggered watches with a specified document type
* @param triggeredWatches The list of triggered watches
* @param docType The document type to use, either the current one or legacy
* @return The bulk request for the triggered watches
* @throws IOException If a triggered watch could not be parsed to JSON, this exception is thrown
*/
private BulkRequest createBulkRequest(final List<TriggeredWatch> triggeredWatches, String docType) throws IOException {
private BulkRequest createBulkRequest(final List<TriggeredWatch> triggeredWatches) throws IOException {
BulkRequest request = new BulkRequest();
for (TriggeredWatch triggeredWatch : triggeredWatches) {
IndexRequest indexRequest = new IndexRequest(TriggeredWatchStoreField.INDEX_NAME, docType, triggeredWatch.id().value());
IndexRequest indexRequest = new IndexRequest(TriggeredWatchStoreField.INDEX_NAME, TriggeredWatchStoreField.DOC_TYPE,
triggeredWatch.id().value());
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
triggeredWatch.toXContent(builder, ToXContent.EMPTY_PARAMS);
indexRequest.source(builder);
Expand All @@ -112,12 +106,15 @@ private BulkRequest createBulkRequest(final List<TriggeredWatch> triggeredWatche
return request;
}

/**
* Delete a triggered watch entry.
* Note that this happens asynchronously, as these kind of requests are batched together to reduce the amount of concurrent requests.
*
* @param wid The ID os the triggered watch id
*/
public void delete(Wid wid) {
DeleteRequest request = new DeleteRequest(TriggeredWatchStoreField.INDEX_NAME, TriggeredWatchStoreField.DOC_TYPE, wid.value());
try (ThreadContext.StoredContext ignore = stashWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN)) {
client.delete(request); // FIXME shouldn't we wait before saying the delete was successful
}
logger.trace("successfully deleted triggered watch with id [{}]", wid);
bulkProcessor.add(request);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does executing watches take into consideration any deleted requests here? what happens if i delete a watch and then wait to submit the bulk, might that watch trigger again? Should we make deletes be the only sync thing? I think thats a fair tradeoff given my very little knowledge here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes it does. If the watch is deleted and then a triggered watch is picked, there will be a watch history entry telling you that the referenced watch could not be found.

}

/**
Expand All @@ -140,9 +137,9 @@ public Collection<TriggeredWatch> findTriggeredWatches(Collection<Watch> watches
return Collections.emptyList();
}

try (ThreadContext.StoredContext ignore = stashWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN)) {
client.admin().indices().refresh(new RefreshRequest(TriggeredWatchStoreField.INDEX_NAME))
.actionGet(TimeValue.timeValueSeconds(5));
try {
RefreshRequest request = new RefreshRequest(TriggeredWatchStoreField.INDEX_NAME);
client.admin().indices().refresh(request).actionGet(TimeValue.timeValueSeconds(5));
} catch (IndexNotFoundException e) {
return Collections.emptyList();
}
Expand All @@ -159,7 +156,7 @@ public Collection<TriggeredWatch> findTriggeredWatches(Collection<Watch> watches
.version(true));

SearchResponse response = null;
try (ThreadContext.StoredContext ignore = stashWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN)) {
try {
response = client.search(searchRequest).actionGet(defaultSearchTimeout);
logger.debug("trying to find triggered watches for ids {}: found [{}] docs", ids, response.getHits().getTotalHits());
while (response.getHits().getHits().length != 0) {
Expand All @@ -176,14 +173,18 @@ public Collection<TriggeredWatch> findTriggeredWatches(Collection<Watch> watches
}
} finally {
if (response != null) {
try (ThreadContext.StoredContext ignore = stashWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN)) {
ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
clearScrollRequest.addScrollId(response.getScrollId());
client.clearScroll(clearScrollRequest).actionGet(scrollTimeout);
}
ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
clearScrollRequest.addScrollId(response.getScrollId());
client.clearScroll(clearScrollRequest).actionGet(scrollTimeout);
}
}

return triggeredWatches;
}

public static boolean validate(ClusterState state) {
IndexMetaData indexMetaData = WatchStoreUtils.getConcreteIndex(TriggeredWatchStoreField.INDEX_NAME, state.metaData());
return indexMetaData == null || (indexMetaData.getState() == IndexMetaData.State.OPEN &&
state.routingTable().index(indexMetaData.getIndex()).allPrimaryShardsActive());
}
}
Loading