Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,24 @@
*/
package org.elasticsearch.xpack.watcher.notification;

import org.elasticsearch.common.collect.Tuple;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.SecureSettings;
import org.elasticsearch.common.settings.SecureString;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsException;

import java.io.IOException;
import java.io.InputStream;
import java.security.GeneralSecurityException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.BiFunction;

/**
Expand All @@ -23,25 +31,70 @@
public abstract class NotificationService<Account> {

private final String type;
// both are guarded by this
private Map<String, Account> accounts;
private Account defaultAccount;

public NotificationService(String type,
ClusterSettings clusterSettings, List<Setting<?>> pluginSettings) {
this(type);
clusterSettings.addSettingsUpdateConsumer(this::reload, pluginSettings);
private final Logger logger;
private final Settings bootSettings;
private final List<Setting<?>> pluginSecureSettings;
// all are guarded by this
private volatile Map<String, Account> accounts;
private volatile Account defaultAccount;
// cached cluster setting, required when recreating the notification clients
// using the new "reloaded" secure settings
private volatile Settings cachedClusterSettings;
// cached secure settings, required when recreating the notification clients
// using the new updated cluster settings
private volatile SecureSettings cachedSecureSettings;

public NotificationService(String type, Settings settings, ClusterSettings clusterSettings, List<Setting<?>> pluginDynamicSettings,
List<Setting<?>> pluginSecureSettings) {
this(type, settings, pluginSecureSettings);
// register a grand updater for the whole group, as settings are usable together
clusterSettings.addSettingsUpdateConsumer(this::clusterSettingsConsumer, pluginDynamicSettings);
}

// Used for testing only
NotificationService(String type) {
NotificationService(String type, Settings settings, List<Setting<?>> pluginSecureSettings) {
this.type = type;
this.logger = LogManager.getLogger();
this.bootSettings = settings;
this.pluginSecureSettings = pluginSecureSettings;
}

private synchronized void clusterSettingsConsumer(Settings settings) {
// update cached cluster settings
this.cachedClusterSettings = settings;
// use these new dynamic cluster settings together with the previously cached
// secure settings
buildAccounts();
}

public synchronized void reload(Settings settings) {
Tuple<Map<String, Account>, Account> accounts = buildAccounts(settings, this::createAccount);
this.accounts = Collections.unmodifiableMap(accounts.v1());
this.defaultAccount = accounts.v2();
// `SecureSettings` are available here! cache them as they will be needed
// whenever dynamic cluster settings change and we have to rebuild the accounts
try {
this.cachedSecureSettings = extractSecureSettings(settings, pluginSecureSettings);
} catch (GeneralSecurityException e) {
logger.error("Keystore exception while reloading watcher notification service", e);
return;
}
// use these new secure settings together with the previously cached dynamic
// cluster settings
buildAccounts();
}

private void buildAccounts() {
// build complete settings combining cluster and secure settings
final Settings.Builder completeSettingsBuilder = Settings.builder().put(bootSettings, false);
if (this.cachedClusterSettings != null) {
completeSettingsBuilder.put(this.cachedClusterSettings, false);
}
if (this.cachedSecureSettings != null) {
completeSettingsBuilder.setSecureSettings(this.cachedSecureSettings);
}
final Settings completeSettings = completeSettingsBuilder.build();
// obtain account names and create accounts
final Set<String> accountNames = getAccountNames(completeSettings);
this.accounts = createAccounts(completeSettings, accountNames, this::createAccount);
this.defaultAccount = findDefaultAccountOrNull(completeSettings, this.accounts);
}

protected abstract Account createAccount(String name, Settings accountSettings);
Expand All @@ -66,31 +119,100 @@ public Account getAccount(String name) {
return theAccount;
}

private <A> Tuple<Map<String, A>, A> buildAccounts(Settings settings, BiFunction<String, Settings, A> accountFactory) {
Settings accountsSettings = settings.getByPrefix("xpack.notification." + type + ".").getAsSettings("account");
Map<String, A> accounts = new HashMap<>();
for (String name : accountsSettings.names()) {
Settings accountSettings = accountsSettings.getAsSettings(name);
A account = accountFactory.apply(name, accountSettings);
accounts.put(name, account);
private String getNotificationsAccountPrefix() {
return "xpack.notification." + type + ".account.";
}

private Set<String> getAccountNames(Settings settings) {
// secure settings are not responsible for the client names
final Settings noSecureSettings = Settings.builder().put(settings, false).build();
return noSecureSettings.getByPrefix(getNotificationsAccountPrefix()).names();
}

private @Nullable String getDefaultAccountName(Settings settings) {
return settings.get("xpack.notification." + type + ".default_account");
}

private Map<String, Account> createAccounts(Settings settings, Set<String> accountNames,
BiFunction<String, Settings, Account> accountFactory) {
final Map<String, Account> accounts = new HashMap<>();
for (final String accountName : accountNames) {
final Settings accountSettings = settings.getAsSettings(getNotificationsAccountPrefix() + accountName);
final Account account = accountFactory.apply(accountName, accountSettings);
accounts.put(accountName, account);
}
return Collections.unmodifiableMap(accounts);
}

final String defaultAccountName = settings.get("xpack.notification." + type + ".default_account");
A defaultAccount;
private @Nullable Account findDefaultAccountOrNull(Settings settings, Map<String, Account> accounts) {
final String defaultAccountName = getDefaultAccountName(settings);
if (defaultAccountName == null) {
if (accounts.isEmpty()) {
defaultAccount = null;
return null;
} else {
A account = accounts.values().iterator().next();
defaultAccount = account;

return accounts.values().iterator().next();
}
} else {
defaultAccount = accounts.get(defaultAccountName);
if (defaultAccount == null) {
final Account account = accounts.get(defaultAccountName);
if (account == null) {
throw new SettingsException("could not find default account [" + defaultAccountName + "]");
}
return account;
}
}

/**
* Extracts the {@link SecureSettings}` out of the passed in {@link Settings} object. The {@code Setting} argument has to have the
* {@code SecureSettings} open/available. Normally {@code SecureSettings} are available only under specific callstacks (eg. during node
* initialization or during a `reload` call). The returned copy can be reused freely as it will never be closed (this is a bit of
* cheating, but it is necessary in this specific circumstance). Only works for secure settings of type string (not file).
*
* @param source
* A {@code Settings} object with its {@code SecureSettings} open/available.
* @param securePluginSettings
* The list of settings to copy.
* @return A copy of the {@code SecureSettings} of the passed in {@code Settings} argument.
*/
private static SecureSettings extractSecureSettings(Settings source, List<Setting<?>> securePluginSettings)
throws GeneralSecurityException {
// get the secure settings out
final SecureSettings sourceSecureSettings = Settings.builder().put(source, true).getSecureSettings();
// filter and cache them...
final Map<String, SecureString> cache = new HashMap<>();
if (sourceSecureSettings != null && securePluginSettings != null) {
for (final String settingKey : sourceSecureSettings.getSettingNames()) {
for (final Setting<?> secureSetting : securePluginSettings) {
if (secureSetting.match(settingKey)) {
cache.put(settingKey, sourceSecureSettings.getString(settingKey));
}
}
}
}
return new Tuple<>(accounts, defaultAccount);
return new SecureSettings() {

@Override
public boolean isLoaded() {
return true;
}

@Override
public SecureString getString(String setting) throws GeneralSecurityException {
return cache.get(setting);
}

@Override
public Set<String> getSettingNames() {
return cache.keySet();
}

@Override
public InputStream getFile(String setting) throws GeneralSecurityException {
throw new IllegalStateException("A NotificationService setting cannot be File.");
}

@Override
public void close() throws IOException {
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import org.elasticsearch.xpack.watcher.notification.NotificationService;

import javax.mail.MessagingException;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

Expand Down Expand Up @@ -108,7 +110,7 @@ public class EmailService extends NotificationService<Account> {
private final CryptoService cryptoService;

public EmailService(Settings settings, @Nullable CryptoService cryptoService, ClusterSettings clusterSettings) {
super("email", clusterSettings, EmailService.getSettings());
super("email", settings, clusterSettings, EmailService.getDynamicSettings(), EmailService.getSecureSettings());
this.cryptoService = cryptoService;
// ensure logging of setting changes
clusterSettings.addSettingsUpdateConsumer(SETTING_DEFAULT_ACCOUNT, (s) -> {});
Expand All @@ -121,7 +123,6 @@ public EmailService(Settings settings, @Nullable CryptoService cryptoService, Cl
clusterSettings.addAffixUpdateConsumer(SETTING_SMTP_PORT, (s, o) -> {}, (s, o) -> {});
clusterSettings.addAffixUpdateConsumer(SETTING_SMTP_USER, (s, o) -> {}, (s, o) -> {});
clusterSettings.addAffixUpdateConsumer(SETTING_SMTP_PASSWORD, (s, o) -> {}, (s, o) -> {});
clusterSettings.addAffixUpdateConsumer(SETTING_SECURE_PASSWORD, (s, o) -> {}, (s, o) -> {});
clusterSettings.addAffixUpdateConsumer(SETTING_SMTP_TIMEOUT, (s, o) -> {}, (s, o) -> {});
clusterSettings.addAffixUpdateConsumer(SETTING_SMTP_CONNECTION_TIMEOUT, (s, o) -> {}, (s, o) -> {});
clusterSettings.addAffixUpdateConsumer(SETTING_SMTP_WRITE_TIMEOUT, (s, o) -> {}, (s, o) -> {});
Expand Down Expand Up @@ -179,12 +180,21 @@ public Email email() {
}
}

public static List<Setting<?>> getSettings() {
private static List<Setting<?>> getDynamicSettings() {
return Arrays.asList(SETTING_DEFAULT_ACCOUNT, SETTING_PROFILE, SETTING_EMAIL_DEFAULTS, SETTING_SMTP_AUTH, SETTING_SMTP_HOST,
SETTING_SMTP_PASSWORD, SETTING_SMTP_PORT, SETTING_SMTP_STARTTLS_ENABLE, SETTING_SMTP_USER, SETTING_SMTP_STARTTLS_REQUIRED,
SETTING_SMTP_TIMEOUT, SETTING_SMTP_CONNECTION_TIMEOUT, SETTING_SMTP_WRITE_TIMEOUT, SETTING_SMTP_LOCAL_ADDRESS,
SETTING_SMTP_LOCAL_PORT, SETTING_SMTP_SEND_PARTIAL, SETTING_SMTP_WAIT_ON_QUIT, SETTING_SMTP_SSL_TRUST_ADDRESS,
SETTING_SECURE_PASSWORD);
SETTING_SMTP_LOCAL_PORT, SETTING_SMTP_SEND_PARTIAL, SETTING_SMTP_WAIT_ON_QUIT, SETTING_SMTP_SSL_TRUST_ADDRESS);
}

private static List<Setting<?>> getSecureSettings() {
return Arrays.asList(SETTING_SECURE_PASSWORD);
}

public static List<Setting<?>> getSettings() {
List<Setting<?>> allSettings = new ArrayList<Setting<?>>(EmailService.getDynamicSettings());
allSettings.addAll(EmailService.getSecureSettings());
return allSettings;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.xpack.watcher.common.http.HttpClient;
import org.elasticsearch.xpack.watcher.notification.NotificationService;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

Expand Down Expand Up @@ -68,20 +69,19 @@ public class HipChatService extends NotificationService<HipChatAccount> {
private HipChatServer defaultServer;

public HipChatService(Settings settings, HttpClient httpClient, ClusterSettings clusterSettings) {
super("hipchat", clusterSettings, HipChatService.getSettings());
super("hipchat", settings, clusterSettings, HipChatService.getDynamicSettings(), HipChatService.getSecureSettings());
this.httpClient = httpClient;
// ensure logging of setting changes
clusterSettings.addSettingsUpdateConsumer(SETTING_DEFAULT_ACCOUNT, (s) -> {});
clusterSettings.addSettingsUpdateConsumer(SETTING_DEFAULT_HOST, (s) -> {});
clusterSettings.addSettingsUpdateConsumer(SETTING_DEFAULT_PORT, (s) -> {});
clusterSettings.addAffixUpdateConsumer(SETTING_AUTH_TOKEN, (s, o) -> {}, (s, o) -> {});
clusterSettings.addAffixUpdateConsumer(SETTING_AUTH_TOKEN_SECURE, (s, o) -> {}, (s, o) -> {});
clusterSettings.addAffixUpdateConsumer(SETTING_PROFILE, (s, o) -> {}, (s, o) -> {});
clusterSettings.addAffixUpdateConsumer(SETTING_ROOM, (s, o) -> {}, (s, o) -> {});
clusterSettings.addAffixUpdateConsumer(SETTING_HOST, (s, o) -> {}, (s, o) -> {});
clusterSettings.addAffixUpdateConsumer(SETTING_PORT, (s, o) -> {}, (s, o) -> {});
clusterSettings.addAffixUpdateConsumer(SETTING_MESSAGE_DEFAULTS, (s, o) -> {}, (s, o) -> {});

// do an initial load
reload(settings);
}

Expand All @@ -100,8 +100,18 @@ protected HipChatAccount createAccount(String name, Settings accountSettings) {
return profile.createAccount(name, accountSettings, defaultServer, httpClient, logger);
}

private static List<Setting<?>> getDynamicSettings() {
return Arrays.asList(SETTING_DEFAULT_ACCOUNT, SETTING_AUTH_TOKEN, SETTING_PROFILE, SETTING_ROOM, SETTING_MESSAGE_DEFAULTS,
SETTING_DEFAULT_HOST, SETTING_DEFAULT_PORT, SETTING_HOST, SETTING_PORT);
}

private static List<Setting<?>> getSecureSettings() {
return Arrays.asList(SETTING_AUTH_TOKEN_SECURE);
}

public static List<Setting<?>> getSettings() {
return Arrays.asList(SETTING_DEFAULT_ACCOUNT, SETTING_AUTH_TOKEN, SETTING_AUTH_TOKEN_SECURE, SETTING_PROFILE, SETTING_ROOM,
SETTING_MESSAGE_DEFAULTS, SETTING_DEFAULT_HOST, SETTING_DEFAULT_PORT, SETTING_HOST, SETTING_PORT);
List<Setting<?>> allSettings = new ArrayList<Setting<?>>(getDynamicSettings());
allSettings.addAll(getSecureSettings());
return allSettings;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.elasticsearch.xpack.watcher.common.http.HttpClient;
import org.elasticsearch.xpack.watcher.notification.NotificationService;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

Expand Down Expand Up @@ -62,17 +63,14 @@ public class JiraService extends NotificationService<JiraAccount> {
private final HttpClient httpClient;

public JiraService(Settings settings, HttpClient httpClient, ClusterSettings clusterSettings) {
super("jira", clusterSettings, JiraService.getSettings());
super("jira", settings, clusterSettings, JiraService.getDynamicSettings(), JiraService.getSecureSettings());
this.httpClient = httpClient;
// ensure logging of setting changes
clusterSettings.addSettingsUpdateConsumer(SETTING_DEFAULT_ACCOUNT, (s) -> {});
clusterSettings.addAffixUpdateConsumer(SETTING_ALLOW_HTTP, (s, o) -> {}, (s, o) -> {});
clusterSettings.addAffixUpdateConsumer(SETTING_URL, (s, o) -> {}, (s, o) -> {});
clusterSettings.addAffixUpdateConsumer(SETTING_USER, (s, o) -> {}, (s, o) -> {});
clusterSettings.addAffixUpdateConsumer(SETTING_PASSWORD, (s, o) -> {}, (s, o) -> {});
clusterSettings.addAffixUpdateConsumer(SETTING_SECURE_USER, (s, o) -> {}, (s, o) -> {});
clusterSettings.addAffixUpdateConsumer(SETTING_SECURE_URL, (s, o) -> {}, (s, o) -> {});
clusterSettings.addAffixUpdateConsumer(SETTING_SECURE_PASSWORD, (s, o) -> {}, (s, o) -> {});
clusterSettings.addAffixUpdateConsumer(SETTING_DEFAULTS, (s, o) -> {}, (s, o) -> {});
// do an initial load
reload(settings);
Expand All @@ -83,8 +81,17 @@ protected JiraAccount createAccount(String name, Settings settings) {
return new JiraAccount(name, settings, httpClient);
}

private static List<Setting<?>> getDynamicSettings() {
return Arrays.asList(SETTING_DEFAULT_ACCOUNT, SETTING_ALLOW_HTTP, SETTING_URL, SETTING_USER, SETTING_PASSWORD, SETTING_DEFAULTS);
}

private static List<Setting<?>> getSecureSettings() {
return Arrays.asList(SETTING_SECURE_USER, SETTING_SECURE_PASSWORD, SETTING_SECURE_URL);
}

public static List<Setting<?>> getSettings() {
return Arrays.asList(SETTING_ALLOW_HTTP, SETTING_URL, SETTING_USER, SETTING_PASSWORD, SETTING_SECURE_USER,
SETTING_SECURE_PASSWORD, SETTING_SECURE_URL, SETTING_DEFAULTS, SETTING_DEFAULT_ACCOUNT);
List<Setting<?>> allSettings = new ArrayList<Setting<?>>(getDynamicSettings());
allSettings.addAll(getSecureSettings());
return allSettings;
}
}
Loading