Skip to content

[TRANSFORM] Make transforms compatible with "subobjects" : false #88386

@droberts195

Description

@droberts195

#86166 added the option for object fields in mappings to have a subobjects : false setting. This in turn allows fieldnames with dots to be nested inside the object, without the usual object/scalar clashes that would arise if some scalar fields have more components than others with the same prefix.

For example, subobjects : false makes the following document possible:

{
  "@timestamp" : "2022-06-23T00:00:00Z",
  "attributes" : {
    "service" : "elasticsearch",
    "service.administrator" : "dave",
    "service.category" : "search engine"
  },
  "metrics" : {
    "responsetime" : 100,
    "responsetime.min" : 10,
    "responsetime.max" : 900
  }
}

The mappings for such a document could look like this:

{
  "metrics2": {
    "mappings": {
      "properties": {
        "@timestamp": {
          "type": "date"
        },
        "attributes": {
          "subobjects": false,
          "properties": {
            "service": {
              "type": "keyword"
            },
            "service.administrator": {
              "type": "keyword"
            },
            "service.category": {
              "type": "keyword"
            }
          }
        },
        "metrics": {
          "subobjects": false,
          "properties": {
            "responsetime": {
              "type": "double"
            },
            "responsetime.max": {
              "type": "double"
            },
            "responsetime.min": {
              "type": "double"
            }
          }
        }
      }
    }
  }
}

Historically it would have been possible to store the document, but only by completely disabling mappings for the metrics and attributes objects. With subobjects : false the dotted fields under metrics and attributes can all have mappings and participate in searches and aggregations.

Creating a pivot transform that aggregates statistics for (formerly) clashing metrics fields while only grouping by one attribute works:

PUT _transform/service1
{
  "source": {
    "index": "metrics2"
  },
  "pivot": {
    "group_by": {
      "attributes.service.administrator": {
        "terms": {
          "field": "attributes.service.administrator"
        }
      }
    },
    "aggregations": {
      "avg_responsetime": {
        "avg": {
          "field": "metrics.responsetime"
        }
      },
      "max_responsetime": {
        "max": {
          "field": "metrics.responsetime.max"
        }
      }
    }
  },
  "dest": {
    "index": "metrics_transformed"
  }
}

Returns:

{
  "acknowledged": true
}

The mappings for the destination index are as follows:

{
  "metrics_transformed": {
    "mappings": {
      "_meta": {
        "created_by": "transform",
        "_transform": {
          "transform": "service1",
          "version": {
            "created": "8.4.0"
          },
          "creation_date_in_millis": 1657288120327
        }
      },
      "properties": {
        "attributes": {
          "properties": {
            "service": {
              "properties": {
                "administrator": {
                  "type": "keyword"
                }
              }
            }
          }
        },
        "avg_responsetime": {
          "type": "double"
        },
        "max_responsetime": {
          "type": "double"
        }
      }
    }
  }
}

But if we try to group by attribute fields that would clash without subobjects : false then we get an error:

PUT _transform/service2
{
  "source": {
    "index": "metrics2"
  },
  "pivot": {
    "group_by": {
      "attributes.service": {
        "terms": {
          "field": "attributes.service"
        }
      },
      "attributes.service.administrator": {
        "terms": {
          "field": "attributes.service.administrator"
        }
      }
    },
    "aggregations": {
      "avg_responsetime": {
        "avg": {
          "field": "metrics.responsetime"
        }
      },
      "max_responsetime": {
        "max": {
          "field": "metrics.responsetime.max"
        }
      }
    }
  },
  "dest": {
    "index": "metrics_transformed"
  }
}

Returns:

{
  "error": {
    "root_cause": [
      {
        "type": "action_request_validation_exception",
        "reason": "Validation Failed: 1: field [attributes.service] cannot be both an object and a field;"
      }
    ],
    "type": "action_request_validation_exception",
    "reason": "Validation Failed: 1: field [attributes.service] cannot be both an object and a field;"
  },
  "status": 400
}

It's possible to make this work by choosing different output field names for the grouping terms:

PUT _transform/service3
{
  "source": {
    "index": "metrics2"
  },
  "pivot": {
    "group_by": {
      "service": {
        "terms": {
          "field": "attributes.service"
        }
      },
      "administrator": {
        "terms": {
          "field": "attributes.service.administrator"
        }
      }
    },
    "aggregations": {
      "avg_responsetime": {
        "avg": {
          "field": "metrics.responsetime"
        }
      },
      "max_responsetime": {
        "max": {
          "field": "metrics.responsetime.max"
        }
      }
    }
  },
  "dest": {
    "index": "metrics_transformed_again"
  }
}

For the latest transform it is possible to create one with clashing attributes:

PUT _transform/service4
{
  "source": {
    "index": "metrics2"
  },
  "latest": {
    "sort" : "@timestamp",
    "unique_key" : [ "attributes.service", "attributes.service.category" ]
  },
  "dest": {
    "index": "metrics_latest"
  }
}

Returns:

{
  "acknowledged": true
}

However, then when you run this latest transform it suffers from errors:

[2022-07-08T15:10:41,431][WARN ][o.e.x.t.t.TransformIndexer] [runTask-0] [service4] transform encountered an exception:
org.elasticsearch.xpack.transform.transforms.BulkIndexingException: Bulk index experienced [1] failures and at least 1 irrecoverable [can't merge a non object mapping [attributes.service] with an object mapping].
        at org.elasticsearch.xpack.transform.transforms.ClientTransformIndexer.handleBulkResponse(ClientTransformIndexer.java:224) ~[?:?]
        at org.elasticsearch.xpack.transform.transforms.ClientTransformIndexer.lambda$doNextBulk$1(ClientTransformIndexer.java:163) ~[?:?]
        at org.elasticsearch.action.ActionListener$2.onResponse(ActionListener.java:162) ~[elasticsearch-8.4.0-SNAPSHOT.jar:?]
        at org.elasticsearch.action.support.ContextPreservingActionListener.onResponse(ContextPreservingActionListener.java:31) ~[elasticsearch-8.4.0-SNAPSHOT.jar:?]
        at org.elasticsearch.client.internal.node.NodeClient$SafelyWrappedActionListener.onResponse(NodeClient.java:160) ~[elasticsearch-8.4.0-SNAPSHOT.jar:?]
        at org.elasticsearch.tasks.TaskManager$1.onResponse(TaskManager.java:171) ~[elasticsearch-8.4.0-SNAPSHOT.jar:?]
        at org.elasticsearch.tasks.TaskManager$1.onResponse(TaskManager.java:165) ~[elasticsearch-8.4.0-SNAPSHOT.jar:?]
        at org.elasticsearch.action.support.ContextPreservingActionListener.onResponse(ContextPreservingActionListener.java:31) ~[elasticsearch-8.4.0-SNAPSHOT.jar:?]
        at org.elasticsearch.xpack.security.action.filter.SecurityActionFilter.lambda$applyInternal$2(SecurityActionFilter.java:165) ~[?:?]
        at org.elasticsearch.action.ActionListener$DelegatingFailureActionListener.onResponse(ActionListener.java:245) ~[elasticsearch-8.4.0-SNAPSHOT.jar:?]
        at org.elasticsearch.action.ActionListener$RunBeforeActionListener.onResponse(ActionListener.java:415) ~[elasticsearch-8.4.0-SNAPSHOT.jar:?]
        at org.elasticsearch.action.bulk.TransportBulkAction$BulkOperation$1.finishHim(TransportBulkAction.java:612) ~[elasticsearch-8.4.0-SNAPSHOT.jar:?]
        at org.elasticsearch.action.bulk.TransportBulkAction$BulkOperation$1.onResponse(TransportBulkAction.java:593) ~[elasticsearch-8.4.0-SNAPSHOT.jar:?]
        at org.elasticsearch.action.bulk.TransportBulkAction$BulkOperation$1.onResponse(TransportBulkAction.java:582) ~[elasticsearch-8.4.0-SNAPSHOT.jar:?]
        at org.elasticsearch.client.internal.node.NodeClient$SafelyWrappedActionListener.onResponse(NodeClient.java:160) ~[elasticsearch-8.4.0-SNAPSHOT.jar:?]
        at org.elasticsearch.tasks.TaskManager$1.onResponse(TaskManager.java:171) ~[elasticsearch-8.4.0-SNAPSHOT.jar:?]
        at org.elasticsearch.tasks.TaskManager$1.onResponse(TaskManager.java:165) ~[elasticsearch-8.4.0-SNAPSHOT.jar:?]
        at org.elasticsearch.action.support.ContextPreservingActionListener.onResponse(ContextPreservingActionListener.java:31) ~[elasticsearch-8.4.0-SNAPSHOT.jar:?]
        at org.elasticsearch.xpack.security.action.filter.SecurityActionFilter.lambda$applyInternal$2(SecurityActionFilter.java:165) ~[?:?]
        at org.elasticsearch.action.ActionListener$DelegatingFailureActionListener.onResponse(ActionListener.java:245) ~[elasticsearch-8.4.0-SNAPSHOT.jar:?]
        at org.elasticsearch.action.support.replication.TransportReplicationAction$ReroutePhase.finishOnSuccess(TransportReplicationAction.java:1066) ~[elasticsearch-8.4.0-SNAPSHOT.jar:?]
        at org.elasticsearch.action.support.replication.TransportReplicationAction$ReroutePhase$1.handleResponse(TransportReplicationAction.java:978) ~[elasticsearch-8.4.0-SNAPSHOT.jar:?]
        at org.elasticsearch.action.support.replication.TransportReplicationAction$ReroutePhase$1.handleResponse(TransportReplicationAction.java:969) ~[elasticsearch-8.4.0-SNAPSHOT.jar:?]
        at org.elasticsearch.transport.TransportService$ContextRestoreResponseHandler.handleResponse(TransportService.java:1355) ~[elasticsearch-8.4.0-SNAPSHOT.jar:?]
        at org.elasticsearch.transport.TransportService$DirectResponseChannel.processResponse(TransportService.java:1440) ~[elasticsearch-8.4.0-SNAPSHOT.jar:?]
        at org.elasticsearch.transport.TransportService$DirectResponseChannel.sendResponse(TransportService.java:1420) ~[elasticsearch-8.4.0-SNAPSHOT.jar:?]
        at org.elasticsearch.transport.TaskTransportChannel.sendResponse(TaskTransportChannel.java:41) ~[elasticsearch-8.4.0-SNAPSHOT.jar:?]
        at org.elasticsearch.action.support.ChannelActionListener.onResponse(ChannelActionListener.java:39) ~[elasticsearch-8.4.0-SNAPSHOT.jar:?]
        at org.elasticsearch.action.support.ChannelActionListener.onResponse(ChannelActionListener.java:20) ~[elasticsearch-8.4.0-SNAPSHOT.jar:?]
        at org.elasticsearch.action.ActionListener$RunBeforeActionListener.onResponse(ActionListener.java:415) ~[elasticsearch-8.4.0-SNAPSHOT.jar:?]
        at org.elasticsearch.action.support.replication.TransportReplicationAction$AsyncPrimaryAction.lambda$runWithPrimaryShardReference$3(TransportReplicationAction.java:494) ~[elasticsearch-8.4.0-SNAPSHOT.jar:?]
        at org.elasticsearch.action.ActionListener$2.onResponse(ActionListener.java:162) ~[elasticsearch-8.4.0-SNAPSHOT.jar:?]
        at org.elasticsearch.action.ActionListener$MappedActionListener.onResponse(ActionListener.java:127) ~[elasticsearch-8.4.0-SNAPSHOT.jar:?]
        at org.elasticsearch.action.support.replication.ReplicationOperation.finish(ReplicationOperation.java:453) ~[elasticsearch-8.4.0-SNAPSHOT.jar:?]
        at org.elasticsearch.action.support.replication.ReplicationOperation.decPendingAndFinishIfNeeded(ReplicationOperation.java:439) ~[elasticsearch-8.4.0-SNAPSHOT.jar:?]
        at org.elasticsearch.action.support.replication.ReplicationOperation$1.lambda$onResponse$0(ReplicationOperation.java:162) ~[elasticsearch-8.4.0-SNAPSHOT.jar:?]
        at org.elasticsearch.action.support.replication.ReplicationOperation.updateCheckPoints(ReplicationOperation.java:340) ~[elasticsearch-8.4.0-SNAPSHOT.jar:?]
        at org.elasticsearch.action.support.replication.ReplicationOperation$1.onResponse(ReplicationOperation.java:158) ~[elasticsearch-8.4.0-SNAPSHOT.jar:?]
        at org.elasticsearch.action.support.replication.ReplicationOperation$1.onResponse(ReplicationOperation.java:153) ~[elasticsearch-8.4.0-SNAPSHOT.jar:?]
        at org.elasticsearch.action.support.replication.TransportWriteAction$WritePrimaryResult$1.onSuccess(TransportWriteAction.java:301) ~[elasticsearch-8.4.0-SNAPSHOT.jar:?]
        at org.elasticsearch.action.support.replication.TransportWriteAction$AsyncAfterWriteAction.maybeFinish(TransportWriteAction.java:443) ~[elasticsearch-8.4.0-SNAPSHOT.jar:?]
        at org.elasticsearch.action.support.replication.TransportWriteAction$AsyncAfterWriteAction.run(TransportWriteAction.java:457) ~[elasticsearch-8.4.0-SNAPSHOT.jar:?]
        at org.elasticsearch.action.support.replication.TransportWriteAction$WritePrimaryResult.runPostReplicationActions(TransportWriteAction.java:308) ~[elasticsearch-8.4.0-SNAPSHOT.jar:?]
        at org.elasticsearch.action.support.replication.ReplicationOperation.handlePrimaryResult(ReplicationOperation.java:153) ~[elasticsearch-8.4.0-SNAPSHOT.jar:?]
        at org.elasticsearch.action.ActionListener$2.onResponse(ActionListener.java:162) ~[elasticsearch-8.4.0-SNAPSHOT.jar:?]
        at org.elasticsearch.action.ActionListener$MappedActionListener.onResponse(ActionListener.java:127) ~[elasticsearch-8.4.0-SNAPSHOT.jar:?]
        at org.elasticsearch.action.ActionListener.completeWith(ActionListener.java:473) ~[elasticsearch-8.4.0-SNAPSHOT.jar:?]
        at org.elasticsearch.action.bulk.TransportShardBulkAction$2.finishRequest(TransportShardBulkAction.java:241) ~[elasticsearch-8.4.0-SNAPSHOT.jar:?]
        at org.elasticsearch.action.bulk.TransportShardBulkAction$2.doRun(TransportShardBulkAction.java:203) ~[elasticsearch-8.4.0-SNAPSHOT.jar:?]
        at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:26) ~[elasticsearch-8.4.0-SNAPSHOT.jar:?]
        at org.elasticsearch.action.bulk.TransportShardBulkAction.performOnPrimary(TransportShardBulkAction.java:253) ~[elasticsearch-8.4.0-SNAPSHOT.jar:?]
        at org.elasticsearch.action.bulk.TransportShardBulkAction.dispatchedShardOperationOnPrimary(TransportShardBulkAction.java:133) ~[elasticsearch-8.4.0-SNAPSHOT.jar:?]
        at org.elasticsearch.action.bulk.TransportShardBulkAction.dispatchedShardOperationOnPrimary(TransportShardBulkAction.java:72) ~[elasticsearch-8.4.0-SNAPSHOT.jar:?]
        at org.elasticsearch.action.support.replication.TransportWriteAction$1.doRun(TransportWriteAction.java:211) ~[elasticsearch-8.4.0-SNAPSHOT.jar:?]
        at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingAbstractRunnable.doRun(ThreadContext.java:769) ~[elasticsearch-8.4.0-SNAPSHOT.jar:?]
        at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:26) ~[elasticsearch-8.4.0-SNAPSHOT.jar:?]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?]
        at java.lang.Thread.run(Thread.java:833) ~[?:?]
Caused by: java.lang.IllegalArgumentException: can't merge a non object mapping [attributes.service] with an object mapping
        at org.elasticsearch.index.mapper.ObjectMapper.merge(ObjectMapper.java:431) ~[elasticsearch-8.4.0-SNAPSHOT.jar:?]
        at org.elasticsearch.index.mapper.ObjectMapper.doMerge(ObjectMapper.java:474) ~[elasticsearch-8.4.0-SNAPSHOT.jar:?]
        at org.elasticsearch.index.mapper.ObjectMapper.merge(ObjectMapper.java:439) ~[elasticsearch-8.4.0-SNAPSHOT.jar:?]
        at org.elasticsearch.index.mapper.ObjectMapper.merge(ObjectMapper.java:419) ~[elasticsearch-8.4.0-SNAPSHOT.jar:?]
        at org.elasticsearch.index.mapper.ObjectMapper.merge(ObjectMapper.java:32) ~[elasticsearch-8.4.0-SNAPSHOT.jar:?]
        at org.elasticsearch.index.mapper.ObjectMapper$Builder.buildMappers(ObjectMapper.java:153) ~[elasticsearch-8.4.0-SNAPSHOT.jar:?]
        at org.elasticsearch.index.mapper.RootObjectMapper$Builder.build(RootObjectMapper.java:110) ~[elasticsearch-8.4.0-SNAPSHOT.jar:?]
        at org.elasticsearch.index.mapper.DocumentParser.createDynamicUpdate(DocumentParser.java:243) ~[elasticsearch-8.4.0-SNAPSHOT.jar:?]
        at org.elasticsearch.index.mapper.DocumentParser.parseDocument(DocumentParser.java:97) ~[elasticsearch-8.4.0-SNAPSHOT.jar:?]
        at org.elasticsearch.index.mapper.DocumentMapper.parse(DocumentMapper.java:78) ~[elasticsearch-8.4.0-SNAPSHOT.jar:?]
        at org.elasticsearch.index.shard.IndexShard.prepareIndex(IndexShard.java:999) ~[elasticsearch-8.4.0-SNAPSHOT.jar:?]
        at org.elasticsearch.index.shard.IndexShard.applyIndexOperation(IndexShard.java:948) ~[elasticsearch-8.4.0-SNAPSHOT.jar:?]
        at org.elasticsearch.index.shard.IndexShard.applyIndexOperationOnPrimary(IndexShard.java:892) ~[elasticsearch-8.4.0-SNAPSHOT.jar:?]
        at org.elasticsearch.action.bulk.TransportShardBulkAction.executeBulkItemRequest(TransportShardBulkAction.java:321) ~[elasticsearch-8.4.0-SNAPSHOT.jar:?]
        at org.elasticsearch.action.bulk.TransportShardBulkAction$2.doRun(TransportShardBulkAction.java:187) ~[elasticsearch-8.4.0-SNAPSHOT.jar:?]
        ... 10 more
[2022-07-08T15:10:41,448][ERROR][o.e.x.t.t.TransformTask  ] [runTask-0] [service4] transform has failed; experienced: [Failed to index documents into destination index due to permanent error: [org.elasticsearch.xpack.transform.transforms.BulkIndexingException: Bulk index experienced [1] failures and at least 1 irrecoverable [can't merge a non object mapping [attributes.service] with an object mapping].; java.lang.IllegalArgumentException: can't merge a non object mapping [attributes.service] with an object mapping]].

For pivot transforms we have two options:

  1. Change nothing in the code - a source index with subobjects : false will work with pivot transforms if they use output fields that don't require subobjects : false, so we can document this.
  2. Change the pivot transform code to use subobjects : false if appropriate when deducing destination index mappings.

For the latest transform we really have to make improvements for subobjects : false. Allowing the transform to be created and then suffering exceptions at runtime is not acceptable behaviour.

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions