From 00afc5b5a228a708969a248d5acd8ed46dd43100 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 16 May 2023 09:55:30 -0700 Subject: [PATCH] Add mappings for enrich fields (#96056) We are developing a new component that performs lookup during query time. The idea is to utilize the existing enrich policies and indices used during indexing. However, to ensure proper functionality of the new component, we need the mapping types and doc_values of the enrich fields. With this change, populating the mapping for enrich fields is a best-effort process to maintain the current behavior. This means that any mapping conflicts arising between the enrich fields of the source indices will be ignored. I will follow up to address this validation issue. --- docs/changelog/96056.yaml | 5 + .../xpack/enrich/EnrichPolicyRunner.java | 213 ++++--- .../xpack/enrich/EnrichPolicyRunnerTests.java | 570 ++++++++++++------ 3 files changed, 502 insertions(+), 286 deletions(-) create mode 100644 docs/changelog/96056.yaml diff --git a/docs/changelog/96056.yaml b/docs/changelog/96056.yaml new file mode 100644 index 0000000000000..a32a6dd675bb9 --- /dev/null +++ b/docs/changelog/96056.yaml @@ -0,0 +1,5 @@ +pr: 96056 +summary: Add mappings for enrich fields +area: Ingest Node +type: enhancement +issues: [] diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java index cffe9f638ddd9..38ad58b22699e 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java @@ -39,7 +39,8 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.core.CheckedFunction; +import org.elasticsearch.common.util.Maps; +import org.elasticsearch.common.util.iterable.Iterables; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.reindex.BulkByScrollResponse; @@ -47,7 +48,6 @@ import org.elasticsearch.index.reindex.ScrollableHitSource; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.tasks.TaskCancelledException; -import org.elasticsearch.xcontent.ObjectPath; import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xcontent.XContentType; import org.elasticsearch.xcontent.json.JsonXContent; @@ -57,6 +57,7 @@ import java.io.IOException; import java.io.UncheckedIOException; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -180,9 +181,9 @@ static void validateMappings( } // Validate the key and values try { - validateField(mapping, policy.getMatchField(), true); + validateAndGetMappingTypeAndFormat(mapping, policy.getMatchField(), true); for (String valueFieldName : policy.getEnrichFields()) { - validateField(mapping, valueFieldName, false); + validateAndGetMappingTypeAndFormat(mapping, valueFieldName, false); } } catch (ElasticsearchException e) { throw new ElasticsearchException( @@ -194,11 +195,64 @@ static void validateMappings( } } - private static void validateField(Map properties, String fieldName, boolean fieldRequired) { + private record MappingTypeAndFormat(String type, String format) { + + } + + private static MappingTypeAndFormat validateAndGetMappingTypeAndFormat( + String fieldName, + EnrichPolicy policy, + boolean strictlyRequired, + List> sourceMappings + ) { + var fieldMappings = sourceMappings.stream() + .map(mapping -> validateAndGetMappingTypeAndFormat(mapping, fieldName, strictlyRequired)) + .filter(Objects::nonNull) + .toList(); + Set types = fieldMappings.stream().map(tf -> tf.type).collect(Collectors.toSet()); + if (types.size() > 1) { + if (strictlyRequired) { + throw new ElasticsearchException( + "Multiple distinct mapping types for field '{}' - indices({}) types({})", + fieldName, + Strings.collectionToCommaDelimitedString(policy.getIndices()), + Strings.collectionToCommaDelimitedString(types) + ); + } + return null; + } + if (types.isEmpty()) { + return null; + } + Set formats = fieldMappings.stream().map(tf -> tf.format).filter(Objects::nonNull).collect(Collectors.toSet()); + if (formats.size() > 1) { + if (strictlyRequired) { + throw new ElasticsearchException( + "Multiple distinct formats specified for field '{}' - indices({}) format entries({})", + policy.getMatchField(), + Strings.collectionToCommaDelimitedString(policy.getIndices()), + Strings.collectionToCommaDelimitedString(formats) + ); + } + return null; + } + return new MappingTypeAndFormat(Iterables.get(types, 0), formats.isEmpty() ? null : Iterables.get(formats, 0)); + } + + @SuppressWarnings("unchecked") + private static T extractValues(Map properties, String path) { + return (T) properties.get(path); + } + + private static MappingTypeAndFormat validateAndGetMappingTypeAndFormat( + Map properties, + String fieldName, + boolean fieldRequired + ) { assert Strings.isEmpty(fieldName) == false : "Field name cannot be null or empty"; String[] fieldParts = fieldName.split("\\."); StringBuilder parent = new StringBuilder(); - Map currentField = properties; + Map currentField = properties; boolean onRoot = true; for (String fieldPart : fieldParts) { // Ensure that the current field is of object type only (not a nested type or a non compound field) @@ -211,7 +265,7 @@ private static void validateField(Map properties, String fieldName, boolea type ); } - Map currentProperties = ((Map) currentField.get("properties")); + Map currentProperties = extractValues(currentField, "properties"); if (currentProperties == null) { if (fieldRequired) { throw new ElasticsearchException( @@ -220,10 +274,10 @@ private static void validateField(Map properties, String fieldName, boolea onRoot ? "root" : parent.toString() ); } else { - return; + return null; } } - currentField = ((Map) currentProperties.get(fieldPart)); + currentField = extractValues(currentProperties, fieldPart); if (currentField == null) { if (fieldRequired) { throw new ElasticsearchException( @@ -233,7 +287,7 @@ private static void validateField(Map properties, String fieldName, boolea onRoot ? "root" : parent.toString() ); } else { - return; + return null; } } if (onRoot) { @@ -243,95 +297,70 @@ private static void validateField(Map properties, String fieldName, boolea } parent.append(fieldPart); } - } - - private XContentBuilder resolveEnrichMapping(final EnrichPolicy enrichPolicy, final List> mappings) { - if (EnrichPolicy.MATCH_TYPE.equals(enrichPolicy.getType())) { - return createEnrichMappingBuilder((builder) -> builder.field("type", "keyword").field("doc_values", false)); - } else if (EnrichPolicy.RANGE_TYPE.equals(enrichPolicy.getType())) { - return createRangeEnrichMappingBuilder(enrichPolicy, mappings); - } else if (EnrichPolicy.GEO_MATCH_TYPE.equals(enrichPolicy.getType())) { - return createEnrichMappingBuilder((builder) -> builder.field("type", "geo_shape")); - } else { - throw new ElasticsearchException("Unrecognized enrich policy type [{}]", enrichPolicy.getType()); + if (currentField == null) { + return null; } + final String type = (String) currentField.getOrDefault("type", "object"); + final String format = (String) currentField.get("format"); + return new MappingTypeAndFormat(type, format); } - private XContentBuilder createRangeEnrichMappingBuilder(EnrichPolicy enrichPolicy, List> mappings) { - String matchFieldPath = "properties." + enrichPolicy.getMatchField().replace(".", ".properties."); - List> matchFieldMappings = mappings.stream() - .map(map -> ObjectPath.>eval(matchFieldPath, map)) - .filter(Objects::nonNull) - .toList(); - - Set types = matchFieldMappings.stream().map(map -> map.get("type")).collect(Collectors.toSet()); - if (types.size() == 1) { - String type = types.iterator().next(); - if (type == null) { - // when no type is defined in a field mapping then it is of type object: - throw new ElasticsearchException( - "Field '{}' has type [object] which doesn't appear to be a range type", - enrichPolicy.getMatchField(), - type - ); - } + static final Set RANGE_TYPES = Set.of("integer_range", "float_range", "long_range", "double_range", "ip_range", "date_range"); - switch (type) { - case "integer_range": - case "float_range": - case "long_range": - case "double_range": - case "ip_range": - return createEnrichMappingBuilder((builder) -> builder.field("type", type).field("doc_values", false)); - - // date_range types mappings allow for the format to be specified, should be preserved in the created index - case "date_range": - Set formatEntries = matchFieldMappings.stream().map(map -> map.get("format")).collect(Collectors.toSet()); - if (formatEntries.size() == 1) { - return createEnrichMappingBuilder((builder) -> { - builder.field("type", type).field("doc_values", false); - String format = formatEntries.iterator().next(); - if (format != null) { - builder.field("format", format); - } - return builder; - }); - } - if (formatEntries.isEmpty()) { - // no format specify rely on default - return createEnrichMappingBuilder((builder) -> builder.field("type", type).field("doc_values", false)); - } - throw new ElasticsearchException( - "Multiple distinct date format specified for match field '{}' - indices({}) format entries({})", - enrichPolicy.getMatchField(), - Strings.collectionToCommaDelimitedString(enrichPolicy.getIndices()), - (formatEntries.contains(null) ? "(DEFAULT), " : "") + Strings.collectionToCommaDelimitedString(formatEntries) - ); - - default: + static Map mappingForMatchField(EnrichPolicy policy, List> sourceMappings) { + MappingTypeAndFormat typeAndFormat = validateAndGetMappingTypeAndFormat(policy.getMatchField(), policy, true, sourceMappings); + if (typeAndFormat == null) { + throw new ElasticsearchException( + "Match field '{}' doesn't have a correct mapping type for policy type '{}'", + policy.getMatchField(), + policy.getType() + ); + } + return switch (policy.getType()) { + case EnrichPolicy.MATCH_TYPE -> Map.of("type", "keyword", "doc_values", false); + case EnrichPolicy.GEO_MATCH_TYPE -> Map.of("type", "geo_shape"); + case EnrichPolicy.RANGE_TYPE -> { + if (RANGE_TYPES.contains(typeAndFormat.type) == false) { throw new ElasticsearchException( "Field '{}' has type [{}] which doesn't appear to be a range type", - enrichPolicy.getMatchField(), - type + policy.getMatchField(), + typeAndFormat.type ); + } + Map mapping = Maps.newMapWithExpectedSize(3); + mapping.put("type", typeAndFormat.type); + mapping.put("doc_values", false); + if (typeAndFormat.format != null) { + mapping.put("format", typeAndFormat.format); + } + yield mapping; } - } - if (types.isEmpty()) { - throw new ElasticsearchException( - "No mapping type found for match field '{}' - indices({})", - enrichPolicy.getMatchField(), - Strings.collectionToCommaDelimitedString(enrichPolicy.getIndices()) - ); - } - throw new ElasticsearchException( - "Multiple distinct mapping types for match field '{}' - indices({}) types({})", - enrichPolicy.getMatchField(), - Strings.collectionToCommaDelimitedString(enrichPolicy.getIndices()), - Strings.collectionToCommaDelimitedString(types) - ); + default -> throw new ElasticsearchException("Unrecognized enrich policy type [{}]", policy.getType()); + }; } - private XContentBuilder createEnrichMappingBuilder(CheckedFunction matchFieldMapping) { + private XContentBuilder createEnrichMapping(List> sourceMappings) { + Map> fieldMappings = new HashMap<>(); + Map mappingForMatchField = mappingForMatchField(policy, sourceMappings); + for (String enrichField : policy.getEnrichFields()) { + if (enrichField.equals(policy.getMatchField())) { + mappingForMatchField = new HashMap<>(mappingForMatchField); + mappingForMatchField.remove("doc_values"); // enable doc_values + } else { + var typeAndFormat = validateAndGetMappingTypeAndFormat(enrichField, policy, false, sourceMappings); + if (typeAndFormat != null) { + Map mapping = Maps.newMapWithExpectedSize(3); + mapping.put("type", typeAndFormat.type); + if (typeAndFormat.format != null) { + mapping.put("format", typeAndFormat.format); + } + mapping.put("index", false); // disable index + fieldMappings.put(enrichField, mapping); + } + } + } + fieldMappings.put(policy.getMatchField(), mappingForMatchField); + // Enable _source on enrich index. Explicitly mark key mapping type. try { XContentBuilder builder = JsonXContent.contentBuilder(); @@ -347,9 +376,7 @@ private XContentBuilder createEnrichMappingBuilder(CheckedFunction> mappings) { .put("index.warmer.enabled", false) .build(); CreateIndexRequest createEnrichIndexRequest = new CreateIndexRequest(enrichIndexName, enrichIndexSettings); - createEnrichIndexRequest.mapping(resolveEnrichMapping(policy, mappings)); + createEnrichIndexRequest.mapping(createEnrichMapping(mappings)); logger.debug("Policy [{}]: Creating new enrich index [{}]", policyName, enrichIndexName); enrichOriginClient().admin() .indices() diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunnerTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunnerTests.java index 67f7b8498ecd6..998faaaa5ad46 100644 --- a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunnerTests.java +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunnerTests.java @@ -32,12 +32,14 @@ import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.client.internal.Client; import org.elasticsearch.client.internal.FilterClient; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.core.Strings; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.engine.Segment; @@ -53,6 +55,7 @@ import org.elasticsearch.tasks.TaskId; import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.test.ESSingleNodeTestCase; +import org.elasticsearch.test.hamcrest.ElasticsearchAssertions; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xcontent.XContentBuilder; @@ -71,6 +74,7 @@ import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -153,15 +157,22 @@ public void testRunner() throws Exception { // Validate Mapping Map mapping = enrichIndex.getMappings().get(createdEnrichIndex).sourceAsMap(); validateMappingMetadata(mapping, policyName, policy); - assertThat(mapping.get("dynamic"), is("false")); - Map properties = (Map) mapping.get("properties"); - assertNotNull(properties); - assertThat(properties.size(), is(equalTo(1))); - Map field1 = (Map) properties.get("field1"); - assertNotNull(field1); - assertThat(field1.get("type"), is(equalTo("keyword"))); - assertThat(field1.get("doc_values"), is(false)); - + assertEnrichMapping(mapping, """ + { + "field1": { + "type": "keyword", + "doc_values": false + }, + "field2": { + "type": "long", + "index": false + }, + "field5": { + "type": "text", + "index": false + } + } + """); // Validate document structure SearchResponse enrichSearchResponse = client().search( new SearchRequest(".enrich-test1").source(SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery())) @@ -222,15 +233,17 @@ public void testRunnerGeoMatchType() throws Exception { // Validate Mapping Map mapping = enrichIndex.getMappings().get(createdEnrichIndex).sourceAsMap(); validateMappingMetadata(mapping, policyName, policy); - assertThat(mapping.get("dynamic"), is("false")); - Map properties = (Map) mapping.get("properties"); - assertNotNull(properties); - assertThat(properties.size(), is(equalTo(1))); - Map field1 = (Map) properties.get("location"); - assertNotNull(field1); - assertThat(field1.get("type"), is(equalTo("geo_shape"))); - assertNull(field1.get("doc_values")); - + assertEnrichMapping(mapping, """ + { + "location": { + "type": "geo_shape" + }, + "zipcode": { + "type": "long", + "index": false + } + } + """); // Validate document structure SearchResponse enrichSearchResponse = client().search( new SearchRequest(".enrich-test1").source(SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery())) @@ -307,14 +320,18 @@ private void testNumberRangeMatchType(String rangeType) throws Exception { // Validate Mapping Map mapping = enrichIndex.getMappings().get(createdEnrichIndex).sourceAsMap(); validateMappingMetadata(mapping, policyName, policy); - assertThat(mapping.get("dynamic"), is("false")); - Map properties = (Map) mapping.get("properties"); - assertNotNull(properties); - assertThat(properties.size(), is(equalTo(1))); - Map field1 = (Map) properties.get("range"); - assertNotNull(field1); - assertThat(field1.get("type"), is(equalTo(rangeType + "_range"))); - assertEquals(Boolean.FALSE, field1.get("doc_values")); + assertEnrichMapping(mapping, String.format(Locale.ROOT, """ + { + "range": { + "type": "%s", + "doc_values": false + }, + "zipcode": { + "type": "long", + "index": false + } + } + """, rangeType + "_range")); // Validate document structure SearchResponse enrichSearchResponse = client().search( @@ -394,15 +411,18 @@ public void testRunnerRangeTypeWithIpRange() throws Exception { // Validate Mapping Map mapping = enrichIndex.getMappings().get(createdEnrichIndex).sourceAsMap(); validateMappingMetadata(mapping, policyName, policy); - assertThat(mapping.get("dynamic"), is("false")); - Map properties = (Map) mapping.get("properties"); - assertNotNull(properties); - assertThat(properties.size(), is(equalTo(1))); - Map field1 = (Map) properties.get("subnet"); - assertNotNull(field1); - assertThat(field1.get("type"), is(equalTo("ip_range"))); - assertThat(field1.get("doc_values"), is(false)); - + assertEnrichMapping(mapping, """ + { + "subnet": { + "type": "ip_range", + "doc_values": false + }, + "department": { + "type": "text", + "index": false + } + } + """); // Validate document structure and lookup of element in range SearchResponse enrichSearchResponse = client().search( new SearchRequest(".enrich-test1").source( @@ -483,15 +503,30 @@ public void testRunnerMultiSource() throws Exception { // Validate Mapping Map mapping = enrichIndex.getMappings().get(createdEnrichIndex).sourceAsMap(); validateMappingMetadata(mapping, policyName, policy); - assertThat(mapping.get("dynamic"), is("false")); - Map properties = (Map) mapping.get("properties"); - assertNotNull(properties); - assertThat(properties.size(), is(equalTo(1))); - Map keyfield = (Map) properties.get("key"); - assertNotNull(keyfield); - assertThat(keyfield.get("type"), is(equalTo("keyword"))); - assertThat(keyfield.get("doc_values"), is(false)); - + assertEnrichMapping(mapping, """ + { + "key": { + "type": "keyword", + "doc_values": false + }, + "idx": { + "type": "long", + "index": false + }, + "field1": { + "type": "text", + "index": false + }, + "field2": { + "type": "long", + "index": false + }, + "field5": { + "type": "text", + "index": false + } + } + """); // Validate document structure SearchResponse enrichSearchResponse = client().search( new SearchRequest(".enrich-test1").source(SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery())) @@ -578,15 +613,30 @@ public void testRunnerMultiSourceDocIdCollisions() throws Exception { // Validate Mapping Map mapping = enrichIndex.getMappings().get(createdEnrichIndex).sourceAsMap(); - assertThat(mapping.get("dynamic"), is("false")); - Map properties = (Map) mapping.get("properties"); - assertNotNull(properties); - assertThat(properties.size(), is(equalTo(1))); - Map keyfield = (Map) properties.get("key"); - assertNotNull(keyfield); - assertThat(keyfield.get("type"), is(equalTo("keyword"))); - assertThat(keyfield.get("doc_values"), is(false)); - + assertEnrichMapping(mapping, """ + { + "key": { + "type": "keyword", + "doc_values": false + }, + "idx": { + "type": "long", + "index": false + }, + "field1": { + "type": "text", + "index": false + }, + "field2": { + "type": "long", + "index": false + }, + "field5": { + "type": "text", + "index": false + } + } + """); // Validate document structure SearchResponse enrichSearchResponse = client().search( new SearchRequest(".enrich-test1").source(SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery())) @@ -675,15 +725,30 @@ public void testRunnerMultiSourceEnrichKeyCollisions() throws Exception { // Validate Mapping Map mapping = enrichIndex.getMappings().get(createdEnrichIndex).sourceAsMap(); - assertThat(mapping.get("dynamic"), is("false")); - Map properties = (Map) mapping.get("properties"); - assertNotNull(properties); - assertThat(properties.size(), is(equalTo(1))); - Map keyfield = (Map) properties.get("key"); - assertNotNull(keyfield); - assertThat(keyfield.get("type"), is(equalTo("keyword"))); - assertThat(keyfield.get("doc_values"), is(false)); - + assertEnrichMapping(mapping, """ + { + "key": { + "type": "keyword", + "doc_values": false + }, + "field1": { + "type": "text", + "index": false + }, + "field2": { + "type": "long", + "index": false + }, + "field5": { + "type": "text", + "index": false + }, + "idx": { + "type": "long", + "index": false + } + } + """); // Validate document structure SearchResponse enrichSearchResponse = client().search( new SearchRequest(".enrich-test1").source(SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery())) @@ -971,21 +1036,22 @@ public void testRunnerObjectSourceMapping() throws Exception { // Validate Mapping Map mapping = enrichIndex.getMappings().get(createdEnrichIndex).sourceAsMap(); validateMappingMetadata(mapping, policyName, policy); - assertThat(mapping.get("dynamic"), is("false")); - Map properties = (Map) mapping.get("properties"); - assertNotNull(properties); - assertThat(properties.size(), is(equalTo(1))); - Map data = (Map) properties.get("data"); - assertNotNull(data); - assertThat(data.size(), is(equalTo(1))); - Map dataProperties = (Map) data.get("properties"); - assertNotNull(dataProperties); - assertThat(dataProperties.size(), is(equalTo(1))); - Map field1 = (Map) dataProperties.get("field1"); - assertNotNull(field1); - assertThat(field1.get("type"), is(equalTo("keyword"))); - assertThat(field1.get("doc_values"), is(false)); - + assertEnrichMapping(mapping, """ + { + "data": { + "properties": { + "field1": { + "type": "keyword", + "doc_values": false + }, + "field2": { + "type": "integer", + "index": false + } + } + } + } + """); SearchResponse enrichSearchResponse = client().search( new SearchRequest(".enrich-test1").source(SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery())) ).actionGet(); @@ -1082,21 +1148,22 @@ public void testRunnerExplicitObjectSourceMapping() throws Exception { // Validate Mapping Map mapping = enrichIndex.getMappings().get(createdEnrichIndex).sourceAsMap(); validateMappingMetadata(mapping, policyName, policy); - assertThat(mapping.get("dynamic"), is("false")); - Map properties = (Map) mapping.get("properties"); - assertNotNull(properties); - assertThat(properties.size(), is(equalTo(1))); - Map data = (Map) properties.get("data"); - assertNotNull(data); - assertThat(data.size(), is(equalTo(1))); - Map dataProperties = (Map) data.get("properties"); - assertNotNull(dataProperties); - assertThat(dataProperties.size(), is(equalTo(1))); - Map field1 = (Map) dataProperties.get("field1"); - assertNotNull(field1); - assertThat(field1.get("type"), is(equalTo("keyword"))); - assertThat(field1.get("doc_values"), is(false)); - + assertEnrichMapping(mapping, """ + { + "data": { + "properties": { + "field1": { + "type": "keyword", + "doc_values": false + }, + "field2": { + "type": "integer", + "index": false + } + } + } + } + """); SearchResponse enrichSearchResponse = client().search( new SearchRequest(".enrich-test1").source(SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery())) ).actionGet(); @@ -1194,21 +1261,22 @@ public void testRunnerExplicitObjectSourceMappingRangePolicy() throws Exception // Validate Mapping Map mapping = enrichIndex.getMappings().get(createdEnrichIndex).sourceAsMap(); validateMappingMetadata(mapping, policyName, policy); - assertThat(mapping.get("dynamic"), is("false")); - Map properties = (Map) mapping.get("properties"); - assertNotNull(properties); - assertThat(properties.size(), is(equalTo(1))); - Map data = (Map) properties.get("data"); - assertNotNull(data); - assertThat(data.size(), is(equalTo(1))); - Map dataProperties = (Map) data.get("properties"); - assertNotNull(dataProperties); - assertThat(dataProperties.size(), is(equalTo(1))); - Map field1 = (Map) dataProperties.get("subnet"); - assertNotNull(field1); - assertThat(field1.get("type"), is(equalTo("ip_range"))); - assertThat(field1.get("doc_values"), is(false)); - + assertEnrichMapping(mapping, """ + { + "data": { + "properties": { + "subnet": { + "type": "ip_range", + "doc_values": false + }, + "department": { + "type": "keyword", + "index": false + } + } + } + } + """); SearchResponse enrichSearchResponse = client().search( new SearchRequest(".enrich-test1").source( SearchSourceBuilder.searchSource().query(QueryBuilders.matchQuery("data.subnet", "10.0.0.1")) @@ -1315,26 +1383,26 @@ public void testRunnerTwoObjectLevelsSourceMapping() throws Exception { // Validate Mapping Map mapping = enrichIndex.getMappings().get(createdEnrichIndex).sourceAsMap(); validateMappingMetadata(mapping, policyName, policy); - assertThat(mapping.get("dynamic"), is("false")); - Map properties = (Map) mapping.get("properties"); - assertNotNull(properties); - assertThat(properties.size(), is(equalTo(1))); - Map data = (Map) properties.get("data"); - assertNotNull(data); - assertThat(data.size(), is(equalTo(1))); - Map dataProperties = (Map) data.get("properties"); - assertNotNull(dataProperties); - assertThat(dataProperties.size(), is(equalTo(1))); - Map fields = (Map) dataProperties.get("fields"); - assertNotNull(fields); - assertThat(fields.size(), is(equalTo(1))); - Map fieldsProperties = (Map) fields.get("properties"); - assertNotNull(fieldsProperties); - assertThat(fieldsProperties.size(), is(equalTo(1))); - Map field1 = (Map) fieldsProperties.get("field1"); - assertNotNull(field1); - assertThat(field1.get("type"), is(equalTo("keyword"))); - assertThat(field1.get("doc_values"), is(false)); + assertEnrichMapping(mapping, """ + { + "data": { + "properties": { + "fields": { + "properties": { + "field1": { + "type": "keyword", + "doc_values": false + }, + "field2": { + "type": "integer", + "index": false + } + } + } + } + } + } + """); SearchResponse enrichSearchResponse = client().search( new SearchRequest(".enrich-test1").source(SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery())) @@ -1442,27 +1510,26 @@ public void testRunnerTwoObjectLevelsSourceMappingRangePolicy() throws Exception // Validate Mapping Map mapping = enrichIndex.getMappings().get(createdEnrichIndex).sourceAsMap(); validateMappingMetadata(mapping, policyName, policy); - assertThat(mapping.get("dynamic"), is("false")); - Map properties = (Map) mapping.get("properties"); - assertNotNull(properties); - assertThat(properties.size(), is(equalTo(1))); - Map data = (Map) properties.get("data"); - assertNotNull(data); - assertThat(data.size(), is(equalTo(1))); - Map dataProperties = (Map) data.get("properties"); - assertNotNull(dataProperties); - assertThat(dataProperties.size(), is(equalTo(1))); - Map fields = (Map) dataProperties.get("fields"); - assertNotNull(fields); - assertThat(fields.size(), is(equalTo(1))); - Map fieldsProperties = (Map) fields.get("properties"); - assertNotNull(fieldsProperties); - assertThat(fieldsProperties.size(), is(equalTo(1))); - Map field1 = (Map) fieldsProperties.get("subnet"); - assertNotNull(field1); - assertThat(field1.get("type"), is(equalTo("ip_range"))); - assertThat(field1.get("doc_values"), is(false)); - + assertEnrichMapping(mapping, """ + { + "data": { + "properties": { + "fields": { + "properties": { + "subnet": { + "type": "ip_range", + "doc_values": false + }, + "department": { + "type": "keyword", + "index": false + } + } + } + } + } + } + """); SearchResponse enrichSearchResponse = client().search( new SearchRequest(".enrich-test1").source(SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery())) ).actionGet(); @@ -1576,26 +1643,27 @@ public void testRunnerTwoObjectLevelsSourceMappingDateRangeWithFormat() throws E // Validate Mapping Map mapping = enrichIndex.getMappings().get(createdEnrichIndex).sourceAsMap(); validateMappingMetadata(mapping, policyName, policy); - assertThat(mapping.get("dynamic"), is("false")); - Map properties = (Map) mapping.get("properties"); - assertNotNull(properties); - assertThat(properties.size(), is(equalTo(1))); - Map data = (Map) properties.get("data"); - assertNotNull(data); - assertThat(data.size(), is(equalTo(1))); - Map dataProperties = (Map) data.get("properties"); - assertNotNull(dataProperties); - assertThat(dataProperties.size(), is(equalTo(1))); - Map fields = (Map) dataProperties.get("fields"); - assertNotNull(fields); - assertThat(fields.size(), is(equalTo(1))); - Map fieldsProperties = (Map) fields.get("properties"); - assertNotNull(fieldsProperties); - assertThat(fieldsProperties.size(), is(equalTo(1))); - Map field1 = (Map) fieldsProperties.get("period"); - assertNotNull(field1); - assertThat(field1.get("type"), is(equalTo("date_range"))); - assertThat(field1.get("doc_values"), is(false)); + assertEnrichMapping(mapping, """ + { + "data": { + "properties": { + "fields": { + "properties": { + "period": { + "type": "date_range", + "format": "yyyy'/'MM'/'dd' at 'HH':'mm||strict_date_time", + "doc_values": false + }, + "status": { + "type": "keyword", + "index": false + } + } + } + } + } + } + """); SearchResponse enrichSearchResponse = client().search( new SearchRequest(".enrich-test1").source( @@ -1708,21 +1776,22 @@ public void testRunnerDottedKeyNameSourceMapping() throws Exception { // Validate Mapping Map mapping = enrichIndex.getMappings().get(createdEnrichIndex).sourceAsMap(); validateMappingMetadata(mapping, policyName, policy); - assertThat(mapping.get("dynamic"), is("false")); - Map properties = (Map) mapping.get("properties"); - assertNotNull(properties); - assertThat(properties.size(), is(equalTo(1))); - Map data = (Map) properties.get("data"); - assertNotNull(data); - assertThat(data.size(), is(equalTo(1))); - Map dataProperties = (Map) data.get("properties"); - assertNotNull(dataProperties); - assertThat(dataProperties.size(), is(equalTo(1))); - Map field1 = (Map) dataProperties.get("field1"); - assertNotNull(field1); - assertThat(field1.get("type"), is(equalTo("keyword"))); - assertThat(field1.get("doc_values"), is(false)); - + assertEnrichMapping(mapping, """ + { + "data": { + "properties": { + "field1": { + "type": "keyword", + "doc_values": false + }, + "field2": { + "type": "integer", + "index": false + } + } + } + } + """); SearchResponse enrichSearchResponse = client().search( new SearchRequest(".enrich-test1").source(SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery())) ).actionGet(); @@ -1867,16 +1936,22 @@ protected void ensureSingleSegment(String destinationIndexName, int attempt) { // Validate Mapping Map mapping = enrichIndex.getMappings().get(createdEnrichIndex).sourceAsMap(); - validateMappingMetadata(mapping, policyName, policy); - assertThat(mapping.get("dynamic"), is("false")); - Map properties = (Map) mapping.get("properties"); - assertNotNull(properties); - assertThat(properties.size(), is(equalTo(1))); - Map field1 = (Map) properties.get("field1"); - assertNotNull(field1); - assertThat(field1.get("type"), is(equalTo("keyword"))); - assertThat(field1.get("doc_values"), is(false)); - + assertEnrichMapping(mapping, """ + { + "field1": { + "type": "keyword", + "doc_values": false + }, + "field2": { + "type": "long", + "index": false + }, + "field5": { + "type": "text", + "index": false + } + } + """); // Validate document structure SearchResponse allEnrichDocs = client().search( new SearchRequest(".enrich-test1").source(SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery())) @@ -2007,6 +2082,108 @@ public void testRunRangePolicyWithObjectFieldAsMatchField() throws Exception { assertThat(e.getMessage(), equalTo("Field 'field1' has type [object] which doesn't appear to be a range type")); } + public void testEnrichFieldsConflictMappingTypes() { + createIndex("source-1", Settings.EMPTY, "_doc", "user", "type=keyword", "name", "type=text", "zipcode", "type=long"); + client().prepareIndex("source-1") + .setSource("user", "u1", "name", "n", "zipcode", 90000) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + .get(); + createIndex("source-2", Settings.EMPTY, "_doc", "user", "type=keyword", "zipcode", "type=long"); + + client().prepareIndex("source-2").setSource(""" + { + "user": "u2", + "name": { + "first": "f", + "last": "l" + }, + "zipcode": 90001 + } + """, XContentType.JSON).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); + + EnrichPolicy policy = new EnrichPolicy( + EnrichPolicy.MATCH_TYPE, + null, + List.of("source-1", "source-2"), + "user", + List.of("name", "zipcode") + ); + String policyName = "test1"; + final long createTime = randomNonNegativeLong(); + String createdEnrichIndex = ".enrich-test1-" + createTime; + PlainActionFuture future = new PlainActionFuture<>(); + EnrichPolicyRunner enrichPolicyRunner = createPolicyRunner(policyName, policy, future, createdEnrichIndex); + enrichPolicyRunner.run(); + future.actionGet(); + + // Validate Index definition + GetIndexResponse enrichIndex = getGetIndexResponseAndCheck(createdEnrichIndex); + Map mapping = enrichIndex.getMappings().get(createdEnrichIndex).sourceAsMap(); + assertEnrichMapping(mapping, """ + { + "user": { + "type": "keyword", + "doc_values": false + }, + "zipcode": { + "type": "long", + "index": false + } + } + """); + // Validate document structure + SearchResponse searchResponse = client().search(new SearchRequest(".enrich-test1")).actionGet(); + ElasticsearchAssertions.assertHitCount(searchResponse, 2L); + Map hit0 = searchResponse.getHits().getAt(0).getSourceAsMap(); + assertThat(hit0, equalTo(Map.of("user", "u1", "name", "n", "zipcode", 90000))); + Map hit1 = searchResponse.getHits().getAt(1).getSourceAsMap(); + assertThat(hit1, equalTo(Map.of("user", "u2", "name", Map.of("first", "f", "last", "l"), "zipcode", 90001))); + } + + public void testEnrichMappingConflictFormats() { + createIndex("source-1", Settings.EMPTY, "_doc", "user", "type=keyword", "date", "type=date,format=yyyy"); + client().prepareIndex("source-1") + .setSource("user", "u1", "date", "2023") + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + .get(); + createIndex("source-2", Settings.EMPTY, "_doc", "user", "type=keyword", "date", "type=date,format=yyyy-MM"); + + client().prepareIndex("source-2").setSource(""" + { + "user": "u2", + "date": "2023-05" + } + """, XContentType.JSON).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); + + EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of("source-1", "source-2"), "user", List.of("date")); + String policyName = "test1"; + final long createTime = randomNonNegativeLong(); + String createdEnrichIndex = ".enrich-test1-" + createTime; + PlainActionFuture future = new PlainActionFuture<>(); + EnrichPolicyRunner enrichPolicyRunner = createPolicyRunner(policyName, policy, future, createdEnrichIndex); + enrichPolicyRunner.run(); + future.actionGet(); + + // Validate Index definition + GetIndexResponse enrichIndex = getGetIndexResponseAndCheck(createdEnrichIndex); + Map mapping = enrichIndex.getMappings().get(createdEnrichIndex).sourceAsMap(); + assertEnrichMapping(mapping, """ + { + "user": { + "type": "keyword", + "doc_values": false + } + } + """); + // Validate document structure + SearchResponse searchResponse = client().search(new SearchRequest(".enrich-test1")).actionGet(); + ElasticsearchAssertions.assertHitCount(searchResponse, 2L); + Map hit0 = searchResponse.getHits().getAt(0).getSourceAsMap(); + assertThat(hit0, equalTo(Map.of("user", "u1", "date", "2023"))); + Map hit1 = searchResponse.getHits().getAt(1).getSourceAsMap(); + assertThat(hit1, equalTo(Map.of("user", "u2", "date", "2023-05"))); + } + private EnrichPolicyRunner createPolicyRunner( String policyName, EnrichPolicy policy, @@ -2124,4 +2301,11 @@ private void ensureEnrichIndexIsReadOnly(String createdEnrichIndex) { assertThat(expected.getMessage(), containsString("index [" + createdEnrichIndex + "] blocked by: [FORBIDDEN/8/index write (api)]")); } + + private static void assertEnrichMapping(Map actual, String expectedMapping) { + assertThat(actual.get("dynamic"), is("false")); + Object actualProperties = actual.get("properties"); + Map mappings = XContentHelper.convertToMap(JsonXContent.jsonXContent, expectedMapping, false); + assertThat(actualProperties, equalTo(mappings)); + } }