Skip to content

Commit b6c0645

Browse files
committed
Add Scripted Upsert functionality
Add Kafka Payload -> Injection as Parameters functionality Signed-off-by: Nicholas Cole <[email protected]>
1 parent f0c057f commit b6c0645

File tree

6 files changed

+340
-21
lines changed

6 files changed

+340
-21
lines changed

src/main/java/io/confluent/connect/elasticsearch/DataConverter.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.fasterxml.jackson.databind.ObjectMapper;
2121
import com.fasterxml.jackson.databind.node.ObjectNode;
2222
import io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.BehaviorOnNullValues;
23+
import io.confluent.connect.elasticsearch.util.ScriptParser;
2324
import org.apache.kafka.connect.data.ConnectSchema;
2425
import org.apache.kafka.connect.data.Date;
2526
import org.apache.kafka.connect.data.Decimal;
@@ -43,6 +44,7 @@
4344
import org.elasticsearch.action.index.IndexRequest;
4445
import org.elasticsearch.action.update.UpdateRequest;
4546
import org.elasticsearch.index.VersionType;
47+
import org.elasticsearch.script.Script;
4648
import org.elasticsearch.xcontent.XContentType;
4749
import org.slf4j.Logger;
4850
import org.slf4j.LoggerFactory;
@@ -185,11 +187,44 @@ public DocWriteRequest<?> convertRecord(SinkRecord record, String index) {
185187
new IndexRequest(index).id(id).source(payload, XContentType.JSON).opType(opType),
186188
record
187189
);
190+
case SCRIPTED_UPSERT:
191+
try {
192+
193+
if (config.getIsPayloadAsParams()) {
194+
return buildUpdateRequestWithParams(index, payload, id);
195+
}
196+
197+
Script script = ScriptParser.parseScript(config.getScript());
198+
199+
return new UpdateRequest(index, id)
200+
.doc(payload, XContentType.JSON)
201+
.upsert(payload, XContentType.JSON)
202+
.retryOnConflict(Math.min(config.maxInFlightRequests(), 5))
203+
.script(script)
204+
.scriptedUpsert(true);
205+
206+
} catch (JsonProcessingException jsonProcessingException) {
207+
throw new RuntimeException(jsonProcessingException);
208+
}
188209
default:
189210
return null; // shouldn't happen
190211
}
191212
}
192213

214+
private UpdateRequest buildUpdateRequestWithParams(String index, String payload, String id)
215+
throws JsonProcessingException {
216+
217+
Script script = ScriptParser.parseScriptWithParams(config.getScript(), payload);
218+
219+
UpdateRequest updateRequest =
220+
new UpdateRequest(index, id)
221+
.retryOnConflict(Math.min(config.maxInFlightRequests(), 5))
222+
.script(script)
223+
.scriptedUpsert(true);
224+
225+
return updateRequest;
226+
}
227+
193228
private String getPayload(SinkRecord record) {
194229
if (record.value() == null) {
195230
return null;

src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkConnectorConfig.java

Lines changed: 72 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import java.util.Set;
2525
import java.util.stream.Collectors;
2626
import java.util.concurrent.TimeUnit;
27+
28+
import io.confluent.connect.elasticsearch.validator.ScriptValidator;
2729
import org.apache.kafka.common.config.AbstractConfig;
2830
import org.apache.kafka.common.config.ConfigDef;
2931
import org.apache.kafka.common.config.ConfigException;
@@ -277,6 +279,24 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig {
277279
);
278280
private static final String WRITE_METHOD_DISPLAY = "Write Method";
279281
private static final String WRITE_METHOD_DEFAULT = WriteMethod.INSERT.name();
282+
283+
public static final String UPSERT_SCRIPT_CONFIG = "upsert.script";
284+
285+
private static final String UPSERT_SCRIPT_DOC = "Script used for"
286+
+ " upserting data to Elasticsearch. This script allows for"
287+
+ " customizable behavior upon upserting a document. Please refer to"
288+
+ " Elasticsearch scripted upsert documentation";
289+
290+
private static final String UPSERT_SCRIPT_DISPLAY = "Upsert Script";
291+
292+
public static final String PAYLOAD_AS_PARAMS_CONFIG = "payload.as.params";
293+
294+
private static final String PAYLOAD_AS_PARAMS_DOC = "Defines Kafka payload will be injected"
295+
+ " into upsert.script script component as params object";
296+
297+
private static final String PAYLOAD_AS_PARAMS_DISPLAY = "Payload as Params";
298+
299+
280300
public static final String LOG_SENSITIVE_DATA_CONFIG = "log.sensitive.data";
281301
private static final String LOG_SENSITIVE_DATA_DISPLAY = "Log Sensitive data";
282302
private static final String LOG_SENSITIVE_DATA_DOC = "If true, logs sensitive data "
@@ -408,7 +428,8 @@ public enum SecurityProtocol {
408428

409429
public enum WriteMethod {
410430
INSERT,
411-
UPSERT
431+
UPSERT,
432+
SCRIPTED_UPSERT
412433
}
413434

414435
protected static ConfigDef baseConfigDef() {
@@ -622,8 +643,8 @@ private static void addConversionConfigs(ConfigDef configDef) {
622643
DATA_CONVERSION_GROUP,
623644
++order,
624645
Width.SHORT,
625-
IGNORE_KEY_DISPLAY
626-
).define(
646+
IGNORE_KEY_DISPLAY)
647+
.define(
627648
IGNORE_SCHEMA_CONFIG,
628649
Type.BOOLEAN,
629650
IGNORE_SCHEMA_DEFAULT,
@@ -632,8 +653,8 @@ private static void addConversionConfigs(ConfigDef configDef) {
632653
DATA_CONVERSION_GROUP,
633654
++order,
634655
Width.SHORT,
635-
IGNORE_SCHEMA_DISPLAY
636-
).define(
656+
IGNORE_SCHEMA_DISPLAY)
657+
.define(
637658
COMPACT_MAP_ENTRIES_CONFIG,
638659
Type.BOOLEAN,
639660
COMPACT_MAP_ENTRIES_DEFAULT,
@@ -642,8 +663,8 @@ private static void addConversionConfigs(ConfigDef configDef) {
642663
DATA_CONVERSION_GROUP,
643664
++order,
644665
Width.SHORT,
645-
COMPACT_MAP_ENTRIES_DISPLAY
646-
).define(
666+
COMPACT_MAP_ENTRIES_DISPLAY)
667+
.define(
647668
IGNORE_KEY_TOPICS_CONFIG,
648669
Type.LIST,
649670
IGNORE_KEY_TOPICS_DEFAULT,
@@ -652,8 +673,8 @@ private static void addConversionConfigs(ConfigDef configDef) {
652673
DATA_CONVERSION_GROUP,
653674
++order,
654675
Width.LONG,
655-
IGNORE_KEY_TOPICS_DISPLAY
656-
).define(
676+
IGNORE_KEY_TOPICS_DISPLAY)
677+
.define(
657678
IGNORE_SCHEMA_TOPICS_CONFIG,
658679
Type.LIST,
659680
IGNORE_SCHEMA_TOPICS_DEFAULT,
@@ -662,8 +683,8 @@ private static void addConversionConfigs(ConfigDef configDef) {
662683
DATA_CONVERSION_GROUP,
663684
++order,
664685
Width.LONG,
665-
IGNORE_SCHEMA_TOPICS_DISPLAY
666-
).define(
686+
IGNORE_SCHEMA_TOPICS_DISPLAY)
687+
.define(
667688
DROP_INVALID_MESSAGE_CONFIG,
668689
Type.BOOLEAN,
669690
DROP_INVALID_MESSAGE_DEFAULT,
@@ -672,8 +693,8 @@ private static void addConversionConfigs(ConfigDef configDef) {
672693
DATA_CONVERSION_GROUP,
673694
++order,
674695
Width.LONG,
675-
DROP_INVALID_MESSAGE_DISPLAY
676-
).define(
696+
DROP_INVALID_MESSAGE_DISPLAY)
697+
.define(
677698
BEHAVIOR_ON_NULL_VALUES_CONFIG,
678699
Type.STRING,
679700
BEHAVIOR_ON_NULL_VALUES_DEFAULT.name(),
@@ -684,8 +705,8 @@ private static void addConversionConfigs(ConfigDef configDef) {
684705
++order,
685706
Width.SHORT,
686707
BEHAVIOR_ON_NULL_VALUES_DISPLAY,
687-
new EnumRecommender<>(BehaviorOnNullValues.class)
688-
).define(
708+
new EnumRecommender<>(BehaviorOnNullValues.class))
709+
.define(
689710
BEHAVIOR_ON_MALFORMED_DOCS_CONFIG,
690711
Type.STRING,
691712
BEHAVIOR_ON_MALFORMED_DOCS_DEFAULT.name(),
@@ -696,8 +717,8 @@ private static void addConversionConfigs(ConfigDef configDef) {
696717
++order,
697718
Width.SHORT,
698719
BEHAVIOR_ON_MALFORMED_DOCS_DISPLAY,
699-
new EnumRecommender<>(BehaviorOnMalformedDoc.class)
700-
).define(
720+
new EnumRecommender<>(BehaviorOnMalformedDoc.class))
721+
.define(
701722
EXTERNAL_VERSION_HEADER_CONFIG,
702723
Type.STRING,
703724
EXTERNAL_VERSION_HEADER_DEFAULT,
@@ -706,8 +727,8 @@ private static void addConversionConfigs(ConfigDef configDef) {
706727
DATA_CONVERSION_GROUP,
707728
++order,
708729
Width.SHORT,
709-
EXTERNAL_VERSION_HEADER_DISPLAY
710-
).define(
730+
EXTERNAL_VERSION_HEADER_DISPLAY)
731+
.define(
711732
WRITE_METHOD_CONFIG,
712733
Type.STRING,
713734
WRITE_METHOD_DEFAULT,
@@ -718,8 +739,30 @@ private static void addConversionConfigs(ConfigDef configDef) {
718739
++order,
719740
Width.SHORT,
720741
WRITE_METHOD_DISPLAY,
721-
new EnumRecommender<>(WriteMethod.class)
722-
);
742+
new EnumRecommender<>(WriteMethod.class))
743+
.define(
744+
UPSERT_SCRIPT_CONFIG,
745+
Type.STRING,
746+
null,
747+
new ScriptValidator(),
748+
Importance.LOW,
749+
UPSERT_SCRIPT_DOC,
750+
DATA_CONVERSION_GROUP,
751+
++order,
752+
Width.SHORT,
753+
UPSERT_SCRIPT_DISPLAY,
754+
new ScriptValidator())
755+
.define(
756+
PAYLOAD_AS_PARAMS_CONFIG,
757+
Type.BOOLEAN,
758+
false,
759+
Importance.LOW,
760+
PAYLOAD_AS_PARAMS_DOC,
761+
DATA_CONVERSION_GROUP,
762+
++order,
763+
Width.SHORT,
764+
PAYLOAD_AS_PARAMS_DISPLAY);
765+
;
723766
}
724767

725768
private static void addProxyConfigs(ConfigDef configDef) {
@@ -1078,6 +1121,14 @@ public WriteMethod writeMethod() {
10781121
return WriteMethod.valueOf(getString(WRITE_METHOD_CONFIG).toUpperCase());
10791122
}
10801123

1124+
public String getScript() {
1125+
return getString(UPSERT_SCRIPT_CONFIG);
1126+
}
1127+
1128+
public Boolean getIsPayloadAsParams() {
1129+
return getBoolean(PAYLOAD_AS_PARAMS_CONFIG);
1130+
}
1131+
10811132
private static class DataStreamDatasetValidator implements Validator {
10821133

10831134
@Override
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Copyright 2018 Confluent Inc.
3+
*
4+
* Licensed under the Confluent Community License (the "License"); you may not use
5+
* this file except in compliance with the License. You may obtain a copy of the
6+
* License at
7+
*
8+
* http://www.confluent.io/confluent-community-license
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
13+
* specific language governing permissions and limitations under the License.
14+
*/
15+
16+
package io.confluent.connect.elasticsearch.util;
17+
18+
import com.fasterxml.jackson.core.JsonProcessingException;
19+
import com.fasterxml.jackson.core.type.TypeReference;
20+
import com.fasterxml.jackson.databind.ObjectMapper;
21+
import org.elasticsearch.script.Script;
22+
23+
import java.util.Map;
24+
25+
public class ScriptParser {
26+
27+
private static final ObjectMapper objectMapper = new ObjectMapper();
28+
29+
public static Script parseScript(String scriptJson) throws JsonProcessingException {
30+
31+
Map<String, Object> map = ScriptParser.parseSchemaStringAsJson(scriptJson);
32+
33+
return Script.parse(map);
34+
}
35+
36+
private static Map<String, Object> parseSchemaStringAsJson(String scriptJson)
37+
throws JsonProcessingException {
38+
39+
ObjectMapper objectMapper = new ObjectMapper();
40+
41+
Map<String, Object> scriptConverted;
42+
43+
scriptConverted =
44+
objectMapper.readValue(scriptJson, new TypeReference<Map<String, Object>>() {});
45+
46+
return scriptConverted;
47+
}
48+
49+
public static Script parseScriptWithParams(String scriptJson, String jsonPayload)
50+
throws JsonProcessingException {
51+
52+
Map<String, Object> map = ScriptParser.parseSchemaStringAsJson(scriptJson);
53+
54+
Map<String, Object> fields =
55+
objectMapper.readValue(jsonPayload, new TypeReference<Map<String, Object>>() {});
56+
57+
map.put("params", fields);
58+
59+
return Script.parse(map);
60+
}
61+
}
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
* Copyright 2018 Confluent Inc.
3+
*
4+
* Licensed under the Confluent Community License (the "License"); you may not use
5+
* this file except in compliance with the License. You may obtain a copy of the
6+
* License at
7+
*
8+
* http://www.confluent.io/confluent-community-license
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
13+
* specific language governing permissions and limitations under the License.
14+
*/
15+
16+
package io.confluent.connect.elasticsearch.validator;
17+
18+
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.WRITE_METHOD_CONFIG;
19+
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.WriteMethod.SCRIPTED_UPSERT;
20+
21+
import com.fasterxml.jackson.core.JsonProcessingException;
22+
import io.confluent.connect.elasticsearch.util.ScriptParser;
23+
import java.util.ArrayList;
24+
import java.util.List;
25+
import java.util.Map;
26+
import org.apache.kafka.common.config.ConfigDef;
27+
import org.apache.kafka.common.config.ConfigException;
28+
import org.elasticsearch.script.Script;
29+
30+
public class ScriptValidator implements ConfigDef.Validator, ConfigDef.Recommender {
31+
32+
@Override
33+
@SuppressWarnings("unchecked")
34+
public void ensureValid(String name, Object value) {
35+
36+
if (value == null) {
37+
return;
38+
}
39+
40+
String script = (String) value;
41+
42+
try {
43+
Script parsedScript = ScriptParser.parseScript(script);
44+
45+
if (parsedScript.getIdOrCode() == null) {
46+
throw new ConfigException(name, script, "The specified script is missing code");
47+
} else if (parsedScript.getLang() == null) {
48+
throw new ConfigException(name, script, "The specified script is missing lang");
49+
}
50+
51+
} catch (JsonProcessingException jsonProcessingException) {
52+
throw new ConfigException(
53+
name, script, "The specified script is not a valid Elasticsearch painless script");
54+
}
55+
}
56+
57+
@Override
58+
public String toString() {
59+
return "A valid script that is able to be parsed";
60+
}
61+
62+
@Override
63+
public List<Object> validValues(String name, Map<String, Object> parsedConfig) {
64+
if (!parsedConfig.get(WRITE_METHOD_CONFIG).equals(SCRIPTED_UPSERT)) {
65+
return new ArrayList<>();
66+
}
67+
return null;
68+
}
69+
70+
@Override
71+
public boolean visible(String name, Map<String, Object> parsedConfig) {
72+
return parsedConfig.get(WRITE_METHOD_CONFIG).equals(SCRIPTED_UPSERT.name());
73+
}
74+
}

0 commit comments

Comments
 (0)