Skip to content

Commit f251767

Browse files
author
Hendrik Muhs
authored
[7.10][Transform] add support for unsigned_long data type (#63957)
add support for unsigned_long, which required a change in writing out integer results properly, because coerce is not supported for unsigned_long fixes #63871 backport #63940
1 parent 8d30766 commit f251767

File tree

9 files changed

+105
-39
lines changed

9 files changed

+105
-39
lines changed

x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformPivotRestIT.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -554,15 +554,15 @@ public void testContinuousPivotHistogram() throws Exception {
554554

555555
Map<String, Object> searchResult = getAsMap(transformIndex + "/_search?q=every_2:0.0");
556556
assertEquals(1, XContentMapValues.extractValue("hits.total.value", searchResult));
557-
assertThat(((List<?>) XContentMapValues.extractValue("hits.hits._source.user_dc", searchResult)).get(0), equalTo(19.0));
557+
assertThat(((List<?>) XContentMapValues.extractValue("hits.hits._source.user_dc", searchResult)).get(0), equalTo(19));
558558

559559
searchResult = getAsMap(transformIndex + "/_search?q=every_2:2.0");
560560
assertEquals(1, XContentMapValues.extractValue("hits.total.value", searchResult));
561-
assertThat(((List<?>) XContentMapValues.extractValue("hits.hits._source.user_dc", searchResult)).get(0), equalTo(27.0));
561+
assertThat(((List<?>) XContentMapValues.extractValue("hits.hits._source.user_dc", searchResult)).get(0), equalTo(27));
562562

563563
searchResult = getAsMap(transformIndex + "/_search?q=every_2:4.0");
564564
assertEquals(1, XContentMapValues.extractValue("hits.total.value", searchResult));
565-
assertThat(((List<?>) XContentMapValues.extractValue("hits.hits._source.user_dc", searchResult)).get(0), equalTo(27.0));
565+
assertThat(((List<?>) XContentMapValues.extractValue("hits.hits._source.user_dc", searchResult)).get(0), equalTo(27));
566566

567567
final StringBuilder bulk = new StringBuilder();
568568

@@ -606,15 +606,15 @@ public void testContinuousPivotHistogram() throws Exception {
606606

607607
searchResult = getAsMap(transformIndex + "/_search?q=every_2:0.0");
608608
assertEquals(1, XContentMapValues.extractValue("hits.total.value", searchResult));
609-
assertThat(((List<?>) XContentMapValues.extractValue("hits.hits._source.user_dc", searchResult)).get(0), equalTo(19.0));
609+
assertThat(((List<?>) XContentMapValues.extractValue("hits.hits._source.user_dc", searchResult)).get(0), equalTo(19));
610610

611611
searchResult = getAsMap(transformIndex + "/_search?q=every_2:2.0");
612612
assertEquals(1, XContentMapValues.extractValue("hits.total.value", searchResult));
613-
assertThat(((List<?>) XContentMapValues.extractValue("hits.hits._source.user_dc", searchResult)).get(0), equalTo(30.0));
613+
assertThat(((List<?>) XContentMapValues.extractValue("hits.hits._source.user_dc", searchResult)).get(0), equalTo(30));
614614

615615
searchResult = getAsMap(transformIndex + "/_search?q=every_2:4.0");
616616
assertEquals(1, XContentMapValues.extractValue("hits.total.value", searchResult));
617-
assertThat(((List<?>) XContentMapValues.extractValue("hits.hits._source.user_dc", searchResult)).get(0), equalTo(27.0));
617+
assertThat(((List<?>) XContentMapValues.extractValue("hits.hits._source.user_dc", searchResult)).get(0), equalTo(27));
618618

619619
}
620620

x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/continuous/DateHistogramGroupByIT.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -130,8 +130,8 @@ public void testIteration(int iteration) throws IOException {
130130
);
131131
assertThat(
132132
"Doc count did not match, source: " + source + ", expected: " + bucket.getDocCount() + ", iteration: " + iteration,
133-
XContentMapValues.extractValue("count", source),
134-
equalTo(Double.valueOf(bucket.getDocCount()))
133+
((Integer) XContentMapValues.extractValue("count", source)).longValue(),
134+
equalTo(bucket.getDocCount())
135135
);
136136

137137
// transform should only rewrite documents that require it
@@ -146,9 +146,11 @@ public void testIteration(int iteration) throws IOException {
146146
// we use a fixed_interval of `1s`, the transform runs every `1s` so it the bucket might be recalculated at the next run
147147
// but
148148
// should NOT be recalculated for the 2nd/3rd/... run
149-
Double.valueOf((Integer) XContentMapValues.extractValue(INGEST_RUN_FIELD, source)) - (Double) XContentMapValues
150-
.extractValue(MAX_RUN_FIELD, source),
151-
is(lessThanOrEqualTo(1.0))
149+
(Integer) XContentMapValues.extractValue(INGEST_RUN_FIELD, source) - (Integer) XContentMapValues.extractValue(
150+
MAX_RUN_FIELD,
151+
source
152+
),
153+
is(lessThanOrEqualTo(1))
152154
);
153155
}
154156
}

x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/continuous/TermsGroupByIT.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ public TransformConfig createConfig() {
6060

6161
AggregatorFactories.Builder aggregations = new AggregatorFactories.Builder();
6262
addCommonAggregations(aggregations);
63-
aggregations.addAggregator(AggregationBuilders.max("metric.avg").field("metric"));
63+
aggregations.addAggregator(AggregationBuilders.avg("metric.avg").field("metric"));
6464

6565
pivotConfigBuilder.setAggregations(aggregations);
6666
transformConfigBuilder.setPivotConfig(pivotConfigBuilder.build());
@@ -83,7 +83,7 @@ public void testIteration(int iteration) throws IOException {
8383
// missing_bucket produces `null`, we can't use `null` in aggs, so we have to use a magic value, see gh#60043
8484
terms.missing(MISSING_BUCKET_KEY);
8585
}
86-
terms.subAggregation(AggregationBuilders.max("metric.avg").field("metric"));
86+
terms.subAggregation(AggregationBuilders.avg("metric.avg").field("metric"));
8787
sourceBuilderSource.aggregation(terms);
8888
searchRequestSource.source(sourceBuilderSource);
8989
SearchResponse responseSource = search(searchRequestSource);
@@ -129,8 +129,8 @@ public void testIteration(int iteration) throws IOException {
129129
);
130130
assertThat(
131131
"Doc count did not match, source: " + source + ", expected: " + bucket.getDocCount() + ", iteration: " + iteration,
132-
XContentMapValues.extractValue("count", source),
133-
equalTo(Double.valueOf(bucket.getDocCount()))
132+
((Integer) XContentMapValues.extractValue("count", source)).longValue(),
133+
equalTo(bucket.getDocCount())
134134
);
135135

136136
SingleValue avgAgg = (SingleValue) bucket.getAggregations().get("metric.avg");
@@ -154,8 +154,7 @@ public void testIteration(int iteration) throws IOException {
154154
+ iteration
155155
+ " full source: "
156156
+ source,
157-
// TODO: aggs return double for MAX_RUN_FIELD, although it is an integer
158-
Double.valueOf((Integer) XContentMapValues.extractValue(INGEST_RUN_FIELD, source)),
157+
XContentMapValues.extractValue(INGEST_RUN_FIELD, source),
159158
equalTo(XContentMapValues.extractValue(MAX_RUN_FIELD, source))
160159
);
161160
}

x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/continuous/TransformContinuousIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -310,7 +310,7 @@ private void putIndex(String indexName, String dateType, boolean isDataStream) t
310310
.field("type", "keyword")
311311
.endObject()
312312
.startObject("metric")
313-
.field("type", "integer")
313+
.field("type", randomFrom("integer", "long", "unsigned_long"))
314314
.endObject()
315315
.startObject("location")
316316
.field("type", "geo_point")

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/AggregationResultUtils.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import java.util.stream.Collectors;
4141
import java.util.stream.Stream;
4242

43+
import static org.elasticsearch.xpack.transform.transforms.pivot.SchemaUtil.dropFloatingPointComponentIfTypeRequiresIt;
4344
import static org.elasticsearch.xpack.transform.transforms.pivot.SchemaUtil.isNumericType;
4445

4546
public final class AggregationResultUtils {
@@ -198,7 +199,7 @@ public Object value(Aggregation agg, Map<String, String> fieldTypeMap, String lo
198199
// If the type is numeric or if the formatted string is the same as simply making the value a string,
199200
// gather the `value` type, otherwise utilize `getValueAsString` so we don't lose formatted outputs.
200201
if (isNumericType(fieldType) || aggregation.getValueAsString().equals(String.valueOf(aggregation.value()))) {
201-
return aggregation.value();
202+
return dropFloatingPointComponentIfTypeRequiresIt(fieldType, aggregation.value());
202203
} else {
203204
return aggregation.getValueAsString();
204205
}

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/SchemaUtil.java

Lines changed: 47 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -23,30 +23,59 @@
2323
import org.elasticsearch.xpack.core.ClientHelper;
2424
import org.elasticsearch.xpack.core.transform.transforms.pivot.PivotConfig;
2525

26+
import java.math.BigDecimal;
2627
import java.util.HashMap;
2728
import java.util.Map;
2829
import java.util.Objects;
29-
import java.util.Set;
3030
import java.util.stream.Collectors;
3131
import java.util.stream.Stream;
3232

3333
public final class SchemaUtil {
3434
private static final Logger logger = LogManager.getLogger(SchemaUtil.class);
3535

36-
// Full collection of numeric field type strings
37-
private static final Set<String> NUMERIC_FIELD_MAPPER_TYPES;
36+
// Full collection of numeric field type strings and whether they are floating point or not
37+
private static final Map<String, Boolean> NUMERIC_FIELD_MAPPER_TYPES;
3838
static {
39-
Set<String> types = Stream.of(NumberFieldMapper.NumberType.values())
40-
.map(NumberFieldMapper.NumberType::typeName)
41-
.collect(Collectors.toSet());
42-
types.add("scaled_float"); // have to add manually since scaled_float is in a module
39+
Map<String, Boolean> types = Stream.of(NumberFieldMapper.NumberType.values())
40+
.collect(Collectors.toMap(t -> t.typeName(), t -> t.numericType().isFloatingPoint()));
41+
42+
// have to add manually since they are in a module
43+
types.put("scaled_float", true);
44+
types.put("unsigned_long", false);
4345
NUMERIC_FIELD_MAPPER_TYPES = types;
4446
}
4547

4648
private SchemaUtil() {}
4749

4850
public static boolean isNumericType(String type) {
49-
return type != null && NUMERIC_FIELD_MAPPER_TYPES.contains(type);
51+
return type != null && NUMERIC_FIELD_MAPPER_TYPES.containsKey(type);
52+
}
53+
54+
/**
55+
* Convert a numeric value to a whole number if it's not a floating point number.
56+
*
57+
* Implementation decision: We do not care about the concrete type, but only if its floating point or not.
58+
* Further checks (e.g. range) are done at indexing.
59+
*
60+
* If type is floating point but ends with `.0`, we still preserve `.0` in case
61+
* the destination index uses dynamic mappings as well as being json friendly.
62+
*
63+
* @param type the type of the value according to the schema we know
64+
* @param value the value as double (aggs return double for everything)
65+
* @return value if its floating point, long if value is smaller than Long.MAX_VALUE, BigInteger otherwise
66+
*/
67+
public static Object dropFloatingPointComponentIfTypeRequiresIt(String type, double value) {
68+
if (NUMERIC_FIELD_MAPPER_TYPES.getOrDefault(type, true) == false) {
69+
assert value % 1 == 0;
70+
if (value < Long.MAX_VALUE) {
71+
return (long) value;
72+
}
73+
74+
// special case for unsigned long
75+
return BigDecimal.valueOf(value).toBigInteger();
76+
}
77+
78+
return value;
5079
}
5180

5281
/**
@@ -188,8 +217,11 @@ private static Map<String, String> resolveMappings(
188217
} else if (destinationMapping != null) {
189218
targetMapping.put(targetFieldName, destinationMapping);
190219
} else {
191-
logger.warn("Failed to deduce mapping for [{}], fall back to dynamic mapping. " +
192-
"Create the destination index with complete mappings first to avoid deducing the mappings", targetFieldName);
220+
logger.warn(
221+
"Failed to deduce mapping for [{}], fall back to dynamic mapping. "
222+
+ "Create the destination index with complete mappings first to avoid deducing the mappings",
223+
targetFieldName
224+
);
193225
}
194226
});
195227

@@ -199,8 +231,11 @@ private static Map<String, String> resolveMappings(
199231
if (destinationMapping != null) {
200232
targetMapping.put(targetFieldName, destinationMapping);
201233
} else {
202-
logger.warn("Failed to deduce mapping for [{}], fall back to keyword. " +
203-
"Create the destination index with complete mappings first to avoid deducing the mappings", targetFieldName);
234+
logger.warn(
235+
"Failed to deduce mapping for [{}], fall back to keyword. "
236+
+ "Create the destination index with complete mappings first to avoid deducing the mappings",
237+
targetFieldName
238+
);
204239
targetMapping.put(targetFieldName, KeywordFieldMapper.CONTENT_TYPE);
205240
}
206241
});

x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/AggregationResultUtilsTests.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -654,6 +654,12 @@ public void testSingleValueAggExtractor() {
654654
AggregationResultUtils.getExtractor(agg).value(agg, Collections.singletonMap("metric", "string"), ""),
655655
equalTo("one_hundred")
656656
);
657+
658+
agg = createSingleMetricAgg("metric", 100.0, "one_hundred");
659+
assertThat(
660+
AggregationResultUtils.getExtractor(agg).value(agg, Collections.singletonMap("metric", "unsigned_long"), ""),
661+
equalTo(100L)
662+
);
657663
}
658664

659665
private ScriptedMetric createScriptedMetric(Object returnValue) {
@@ -836,7 +842,7 @@ public void testSingleBucketAggExtractor() {
836842
);
837843
assertThat(
838844
AggregationResultUtils.getExtractor(agg).value(agg, asStringMap("sba2.sub1", "long", "sba2.sub2", "float"), ""),
839-
equalTo(asMap("sub1", 100.0, "sub2", 33.33))
845+
equalTo(asMap("sub1", 100L, "sub2", 33.33))
840846
);
841847

842848
agg = createSingleBucketAgg(
@@ -848,7 +854,7 @@ public void testSingleBucketAggExtractor() {
848854
);
849855
assertThat(
850856
AggregationResultUtils.getExtractor(agg).value(agg, asStringMap("sba3.sub1", "long", "sba3.sub2", "double"), ""),
851-
equalTo(asMap("sub1", 100.0, "sub2", 33.33, "sub3", 42L))
857+
equalTo(asMap("sub1", 100L, "sub2", 33.33, "sub3", 42L))
852858
);
853859

854860
agg = createSingleBucketAgg(
@@ -861,7 +867,7 @@ public void testSingleBucketAggExtractor() {
861867
assertThat(
862868
AggregationResultUtils.getExtractor(agg)
863869
.value(agg, asStringMap("sba4.sub3.subsub1", "double", "sba4.sub2", "float", "sba4.sub1", "long"), ""),
864-
equalTo(asMap("sub1", 100.0, "sub2", 33.33, "sub3", asMap("subsub1", 11.1)))
870+
equalTo(asMap("sub1", 100L, "sub2", 33.33, "sub3", asMap("subsub1", 11.1)))
865871
);
866872
}
867873

x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/AggregationSchemaAndResultTests.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -127,8 +127,11 @@ public void testBasic() throws InterruptedException {
127127
AggregationConfig aggregationConfig = new AggregationConfig(Collections.emptyMap(), aggs);
128128
GroupConfig groupConfig = GroupConfigTests.randomGroupConfig();
129129
PivotConfig pivotConfig = new PivotConfig(groupConfig, aggregationConfig, null);
130-
long numGroupsWithoutScripts = groupConfig.getGroups().values().stream()
131-
.filter(singleGroupSource -> singleGroupSource.getScriptConfig() == null).count();
130+
long numGroupsWithoutScripts = groupConfig.getGroups()
131+
.values()
132+
.stream()
133+
.filter(singleGroupSource -> singleGroupSource.getScriptConfig() == null)
134+
.count();
132135

133136
this.<Map<String, String>>assertAsync(
134137
listener -> SchemaUtil.deduceMappings(client, pivotConfig, new String[] { "source-index" }, listener),
@@ -191,8 +194,11 @@ public void testNested() throws InterruptedException {
191194
AggregationConfig aggregationConfig = new AggregationConfig(Collections.emptyMap(), aggs);
192195
GroupConfig groupConfig = GroupConfigTests.randomGroupConfig();
193196
PivotConfig pivotConfig = new PivotConfig(groupConfig, aggregationConfig, null);
194-
long numGroupsWithoutScripts = groupConfig.getGroups().values().stream()
195-
.filter(singleGroupSource -> singleGroupSource.getScriptConfig() == null).count();
197+
long numGroupsWithoutScripts = groupConfig.getGroups()
198+
.values()
199+
.stream()
200+
.filter(singleGroupSource -> singleGroupSource.getScriptConfig() == null)
201+
.count();
196202

197203
this.<Map<String, String>>assertAsync(
198204
listener -> SchemaUtil.deduceMappings(client, pivotConfig, new String[] { "source-index" }, listener),
@@ -219,7 +225,7 @@ public void testNested() throws InterruptedException {
219225
23144,
220226
AggregationResultUtilsTests.createSingleMetricAgg("max_drinks_2", 45.0, "forty_five")
221227
);
222-
assertThat(AggregationResultUtils.getExtractor(agg).value(agg, mappings, ""), equalTo(asMap("max_drinks_2", 45.0)));
228+
assertThat(AggregationResultUtils.getExtractor(agg).value(agg, mappings, ""), equalTo(asMap("max_drinks_2", 45L)));
223229

224230
agg = AggregationResultUtilsTests.createSingleBucketAgg(
225231
"filter_3",

x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/SchemaUtilTests.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,12 @@
88

99
import org.elasticsearch.test.ESTestCase;
1010

11+
import java.math.BigInteger;
1112
import java.util.HashMap;
1213
import java.util.Map;
1314

15+
import static org.hamcrest.CoreMatchers.instanceOf;
16+
1417
public class SchemaUtilTests extends ESTestCase {
1518

1619
public void testInsertNestedObjectMappings() {
@@ -52,4 +55,18 @@ public void testInsertNestedObjectMappings() {
5255
assertFalse(fieldMappings.containsKey(""));
5356
}
5457

58+
public void testConvertToIntegerTypeIfNeeded() {
59+
assertEquals(33L, SchemaUtil.dropFloatingPointComponentIfTypeRequiresIt("unsigned_long", 33.0));
60+
assertEquals(33L, SchemaUtil.dropFloatingPointComponentIfTypeRequiresIt("long", 33.0));
61+
assertEquals(33.0, SchemaUtil.dropFloatingPointComponentIfTypeRequiresIt("double", 33.0));
62+
assertEquals(33.0, SchemaUtil.dropFloatingPointComponentIfTypeRequiresIt("half_float", 33.0));
63+
assertEquals(33.0, SchemaUtil.dropFloatingPointComponentIfTypeRequiresIt("unknown", 33.0));
64+
assertEquals(33.0, SchemaUtil.dropFloatingPointComponentIfTypeRequiresIt(null, 33.0));
65+
66+
Object value = SchemaUtil.dropFloatingPointComponentIfTypeRequiresIt("unsigned_long", 1.8446744073709551615E19);
67+
assertThat(value, instanceOf(BigInteger.class));
68+
69+
assertEquals(new BigInteger("18446744073709551615").doubleValue(), ((BigInteger) value).doubleValue(), 0.0);
70+
}
71+
5572
}

0 commit comments

Comments
 (0)