diff --git a/docs/changelog/96056.yaml b/docs/changelog/96056.yaml new file mode 100644 index 0000000000000..a32a6dd675bb9 --- /dev/null +++ b/docs/changelog/96056.yaml @@ -0,0 +1,5 @@ +pr: 96056 +summary: Add mappings for enrich fields +area: Ingest Node +type: enhancement +issues: [] diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java index cffe9f638ddd9..38ad58b22699e 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java @@ -39,7 +39,8 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.core.CheckedFunction; +import org.elasticsearch.common.util.Maps; +import org.elasticsearch.common.util.iterable.Iterables; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.reindex.BulkByScrollResponse; @@ -47,7 +48,6 @@ import org.elasticsearch.index.reindex.ScrollableHitSource; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.tasks.TaskCancelledException; -import org.elasticsearch.xcontent.ObjectPath; import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xcontent.XContentType; import org.elasticsearch.xcontent.json.JsonXContent; @@ -57,6 +57,7 @@ import java.io.IOException; import java.io.UncheckedIOException; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -180,9 +181,9 @@ static void validateMappings( } // Validate the key and values try { - validateField(mapping, policy.getMatchField(), true); + validateAndGetMappingTypeAndFormat(mapping, policy.getMatchField(), true); for (String valueFieldName : policy.getEnrichFields()) { - validateField(mapping, valueFieldName, false); + validateAndGetMappingTypeAndFormat(mapping, valueFieldName, false); } } catch (ElasticsearchException e) { throw new ElasticsearchException( @@ -194,11 +195,64 @@ static void validateMappings( } } - private static void validateField(Map properties, String fieldName, boolean fieldRequired) { + private record MappingTypeAndFormat(String type, String format) { + + } + + private static MappingTypeAndFormat validateAndGetMappingTypeAndFormat( + String fieldName, + EnrichPolicy policy, + boolean strictlyRequired, + List> sourceMappings + ) { + var fieldMappings = sourceMappings.stream() + .map(mapping -> validateAndGetMappingTypeAndFormat(mapping, fieldName, strictlyRequired)) + .filter(Objects::nonNull) + .toList(); + Set types = fieldMappings.stream().map(tf -> tf.type).collect(Collectors.toSet()); + if (types.size() > 1) { + if (strictlyRequired) { + throw new ElasticsearchException( + "Multiple distinct mapping types for field '{}' - indices({}) types({})", + fieldName, + Strings.collectionToCommaDelimitedString(policy.getIndices()), + Strings.collectionToCommaDelimitedString(types) + ); + } + return null; + } + if (types.isEmpty()) { + return null; + } + Set formats = fieldMappings.stream().map(tf -> tf.format).filter(Objects::nonNull).collect(Collectors.toSet()); + if (formats.size() > 1) { + if (strictlyRequired) { + throw new ElasticsearchException( + "Multiple distinct formats specified for field '{}' - indices({}) format entries({})", + policy.getMatchField(), + Strings.collectionToCommaDelimitedString(policy.getIndices()), + Strings.collectionToCommaDelimitedString(formats) + ); + } + return null; + } + return new MappingTypeAndFormat(Iterables.get(types, 0), formats.isEmpty() ? null : Iterables.get(formats, 0)); + } + + @SuppressWarnings("unchecked") + private static T extractValues(Map properties, String path) { + return (T) properties.get(path); + } + + private static MappingTypeAndFormat validateAndGetMappingTypeAndFormat( + Map properties, + String fieldName, + boolean fieldRequired + ) { assert Strings.isEmpty(fieldName) == false : "Field name cannot be null or empty"; String[] fieldParts = fieldName.split("\\."); StringBuilder parent = new StringBuilder(); - Map currentField = properties; + Map currentField = properties; boolean onRoot = true; for (String fieldPart : fieldParts) { // Ensure that the current field is of object type only (not a nested type or a non compound field) @@ -211,7 +265,7 @@ private static void validateField(Map properties, String fieldName, boolea type ); } - Map currentProperties = ((Map) currentField.get("properties")); + Map currentProperties = extractValues(currentField, "properties"); if (currentProperties == null) { if (fieldRequired) { throw new ElasticsearchException( @@ -220,10 +274,10 @@ private static void validateField(Map properties, String fieldName, boolea onRoot ? "root" : parent.toString() ); } else { - return; + return null; } } - currentField = ((Map) currentProperties.get(fieldPart)); + currentField = extractValues(currentProperties, fieldPart); if (currentField == null) { if (fieldRequired) { throw new ElasticsearchException( @@ -233,7 +287,7 @@ private static void validateField(Map properties, String fieldName, boolea onRoot ? "root" : parent.toString() ); } else { - return; + return null; } } if (onRoot) { @@ -243,95 +297,70 @@ private static void validateField(Map properties, String fieldName, boolea } parent.append(fieldPart); } - } - - private XContentBuilder resolveEnrichMapping(final EnrichPolicy enrichPolicy, final List> mappings) { - if (EnrichPolicy.MATCH_TYPE.equals(enrichPolicy.getType())) { - return createEnrichMappingBuilder((builder) -> builder.field("type", "keyword").field("doc_values", false)); - } else if (EnrichPolicy.RANGE_TYPE.equals(enrichPolicy.getType())) { - return createRangeEnrichMappingBuilder(enrichPolicy, mappings); - } else if (EnrichPolicy.GEO_MATCH_TYPE.equals(enrichPolicy.getType())) { - return createEnrichMappingBuilder((builder) -> builder.field("type", "geo_shape")); - } else { - throw new ElasticsearchException("Unrecognized enrich policy type [{}]", enrichPolicy.getType()); + if (currentField == null) { + return null; } + final String type = (String) currentField.getOrDefault("type", "object"); + final String format = (String) currentField.get("format"); + return new MappingTypeAndFormat(type, format); } - private XContentBuilder createRangeEnrichMappingBuilder(EnrichPolicy enrichPolicy, List> mappings) { - String matchFieldPath = "properties." + enrichPolicy.getMatchField().replace(".", ".properties."); - List> matchFieldMappings = mappings.stream() - .map(map -> ObjectPath.>eval(matchFieldPath, map)) - .filter(Objects::nonNull) - .toList(); - - Set types = matchFieldMappings.stream().map(map -> map.get("type")).collect(Collectors.toSet()); - if (types.size() == 1) { - String type = types.iterator().next(); - if (type == null) { - // when no type is defined in a field mapping then it is of type object: - throw new ElasticsearchException( - "Field '{}' has type [object] which doesn't appear to be a range type", - enrichPolicy.getMatchField(), - type - ); - } + static final Set RANGE_TYPES = Set.of("integer_range", "float_range", "long_range", "double_range", "ip_range", "date_range"); - switch (type) { - case "integer_range": - case "float_range": - case "long_range": - case "double_range": - case "ip_range": - return createEnrichMappingBuilder((builder) -> builder.field("type", type).field("doc_values", false)); - - // date_range types mappings allow for the format to be specified, should be preserved in the created index - case "date_range": - Set formatEntries = matchFieldMappings.stream().map(map -> map.get("format")).collect(Collectors.toSet()); - if (formatEntries.size() == 1) { - return createEnrichMappingBuilder((builder) -> { - builder.field("type", type).field("doc_values", false); - String format = formatEntries.iterator().next(); - if (format != null) { - builder.field("format", format); - } - return builder; - }); - } - if (formatEntries.isEmpty()) { - // no format specify rely on default - return createEnrichMappingBuilder((builder) -> builder.field("type", type).field("doc_values", false)); - } - throw new ElasticsearchException( - "Multiple distinct date format specified for match field '{}' - indices({}) format entries({})", - enrichPolicy.getMatchField(), - Strings.collectionToCommaDelimitedString(enrichPolicy.getIndices()), - (formatEntries.contains(null) ? "(DEFAULT), " : "") + Strings.collectionToCommaDelimitedString(formatEntries) - ); - - default: + static Map mappingForMatchField(EnrichPolicy policy, List> sourceMappings) { + MappingTypeAndFormat typeAndFormat = validateAndGetMappingTypeAndFormat(policy.getMatchField(), policy, true, sourceMappings); + if (typeAndFormat == null) { + throw new ElasticsearchException( + "Match field '{}' doesn't have a correct mapping type for policy type '{}'", + policy.getMatchField(), + policy.getType() + ); + } + return switch (policy.getType()) { + case EnrichPolicy.MATCH_TYPE -> Map.of("type", "keyword", "doc_values", false); + case EnrichPolicy.GEO_MATCH_TYPE -> Map.of("type", "geo_shape"); + case EnrichPolicy.RANGE_TYPE -> { + if (RANGE_TYPES.contains(typeAndFormat.type) == false) { throw new ElasticsearchException( "Field '{}' has type [{}] which doesn't appear to be a range type", - enrichPolicy.getMatchField(), - type + policy.getMatchField(), + typeAndFormat.type ); + } + Map mapping = Maps.newMapWithExpectedSize(3); + mapping.put("type", typeAndFormat.type); + mapping.put("doc_values", false); + if (typeAndFormat.format != null) { + mapping.put("format", typeAndFormat.format); + } + yield mapping; } - } - if (types.isEmpty()) { - throw new ElasticsearchException( - "No mapping type found for match field '{}' - indices({})", - enrichPolicy.getMatchField(), - Strings.collectionToCommaDelimitedString(enrichPolicy.getIndices()) - ); - } - throw new ElasticsearchException( - "Multiple distinct mapping types for match field '{}' - indices({}) types({})", - enrichPolicy.getMatchField(), - Strings.collectionToCommaDelimitedString(enrichPolicy.getIndices()), - Strings.collectionToCommaDelimitedString(types) - ); + default -> throw new ElasticsearchException("Unrecognized enrich policy type [{}]", policy.getType()); + }; } - private XContentBuilder createEnrichMappingBuilder(CheckedFunction matchFieldMapping) { + private XContentBuilder createEnrichMapping(List> sourceMappings) { + Map> fieldMappings = new HashMap<>(); + Map mappingForMatchField = mappingForMatchField(policy, sourceMappings); + for (String enrichField : policy.getEnrichFields()) { + if (enrichField.equals(policy.getMatchField())) { + mappingForMatchField = new HashMap<>(mappingForMatchField); + mappingForMatchField.remove("doc_values"); // enable doc_values + } else { + var typeAndFormat = validateAndGetMappingTypeAndFormat(enrichField, policy, false, sourceMappings); + if (typeAndFormat != null) { + Map mapping = Maps.newMapWithExpectedSize(3); + mapping.put("type", typeAndFormat.type); + if (typeAndFormat.format != null) { + mapping.put("format", typeAndFormat.format); + } + mapping.put("index", false); // disable index + fieldMappings.put(enrichField, mapping); + } + } + } + fieldMappings.put(policy.getMatchField(), mappingForMatchField); + // Enable _source on enrich index. Explicitly mark key mapping type. try { XContentBuilder builder = JsonXContent.contentBuilder(); @@ -347,9 +376,7 @@ private XContentBuilder createEnrichMappingBuilder(CheckedFunction> mappings) { .put("index.warmer.enabled", false) .build(); CreateIndexRequest createEnrichIndexRequest = new CreateIndexRequest(enrichIndexName, enrichIndexSettings); - createEnrichIndexRequest.mapping(resolveEnrichMapping(policy, mappings)); + createEnrichIndexRequest.mapping(createEnrichMapping(mappings)); logger.debug("Policy [{}]: Creating new enrich index [{}]", policyName, enrichIndexName); enrichOriginClient().admin() .indices() diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunnerTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunnerTests.java index 67f7b8498ecd6..998faaaa5ad46 100644 --- a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunnerTests.java +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunnerTests.java @@ -32,12 +32,14 @@ import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.client.internal.Client; import org.elasticsearch.client.internal.FilterClient; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.core.Strings; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.engine.Segment; @@ -53,6 +55,7 @@ import org.elasticsearch.tasks.TaskId; import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.test.ESSingleNodeTestCase; +import org.elasticsearch.test.hamcrest.ElasticsearchAssertions; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xcontent.XContentBuilder; @@ -71,6 +74,7 @@ import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -153,15 +157,22 @@ public void testRunner() throws Exception { // Validate Mapping Map mapping = enrichIndex.getMappings().get(createdEnrichIndex).sourceAsMap(); validateMappingMetadata(mapping, policyName, policy); - assertThat(mapping.get("dynamic"), is("false")); - Map properties = (Map) mapping.get("properties"); - assertNotNull(properties); - assertThat(properties.size(), is(equalTo(1))); - Map field1 = (Map) properties.get("field1"); - assertNotNull(field1); - assertThat(field1.get("type"), is(equalTo("keyword"))); - assertThat(field1.get("doc_values"), is(false)); - + assertEnrichMapping(mapping, """ + { + "field1": { + "type": "keyword", + "doc_values": false + }, + "field2": { + "type": "long", + "index": false + }, + "field5": { + "type": "text", + "index": false + } + } + """); // Validate document structure SearchResponse enrichSearchResponse = client().search( new SearchRequest(".enrich-test1").source(SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery())) @@ -222,15 +233,17 @@ public void testRunnerGeoMatchType() throws Exception { // Validate Mapping Map mapping = enrichIndex.getMappings().get(createdEnrichIndex).sourceAsMap(); validateMappingMetadata(mapping, policyName, policy); - assertThat(mapping.get("dynamic"), is("false")); - Map properties = (Map) mapping.get("properties"); - assertNotNull(properties); - assertThat(properties.size(), is(equalTo(1))); - Map field1 = (Map) properties.get("location"); - assertNotNull(field1); - assertThat(field1.get("type"), is(equalTo("geo_shape"))); - assertNull(field1.get("doc_values")); - + assertEnrichMapping(mapping, """ + { + "location": { + "type": "geo_shape" + }, + "zipcode": { + "type": "long", + "index": false + } + } + """); // Validate document structure SearchResponse enrichSearchResponse = client().search( new SearchRequest(".enrich-test1").source(SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery())) @@ -307,14 +320,18 @@ private void testNumberRangeMatchType(String rangeType) throws Exception { // Validate Mapping Map mapping = enrichIndex.getMappings().get(createdEnrichIndex).sourceAsMap(); validateMappingMetadata(mapping, policyName, policy); - assertThat(mapping.get("dynamic"), is("false")); - Map properties = (Map) mapping.get("properties"); - assertNotNull(properties); - assertThat(properties.size(), is(equalTo(1))); - Map field1 = (Map) properties.get("range"); - assertNotNull(field1); - assertThat(field1.get("type"), is(equalTo(rangeType + "_range"))); - assertEquals(Boolean.FALSE, field1.get("doc_values")); + assertEnrichMapping(mapping, String.format(Locale.ROOT, """ + { + "range": { + "type": "%s", + "doc_values": false + }, + "zipcode": { + "type": "long", + "index": false + } + } + """, rangeType + "_range")); // Validate document structure SearchResponse enrichSearchResponse = client().search( @@ -394,15 +411,18 @@ public void testRunnerRangeTypeWithIpRange() throws Exception { // Validate Mapping Map mapping = enrichIndex.getMappings().get(createdEnrichIndex).sourceAsMap(); validateMappingMetadata(mapping, policyName, policy); - assertThat(mapping.get("dynamic"), is("false")); - Map properties = (Map) mapping.get("properties"); - assertNotNull(properties); - assertThat(properties.size(), is(equalTo(1))); - Map field1 = (Map) properties.get("subnet"); - assertNotNull(field1); - assertThat(field1.get("type"), is(equalTo("ip_range"))); - assertThat(field1.get("doc_values"), is(false)); - + assertEnrichMapping(mapping, """ + { + "subnet": { + "type": "ip_range", + "doc_values": false + }, + "department": { + "type": "text", + "index": false + } + } + """); // Validate document structure and lookup of element in range SearchResponse enrichSearchResponse = client().search( new SearchRequest(".enrich-test1").source( @@ -483,15 +503,30 @@ public void testRunnerMultiSource() throws Exception { // Validate Mapping Map mapping = enrichIndex.getMappings().get(createdEnrichIndex).sourceAsMap(); validateMappingMetadata(mapping, policyName, policy); - assertThat(mapping.get("dynamic"), is("false")); - Map properties = (Map) mapping.get("properties"); - assertNotNull(properties); - assertThat(properties.size(), is(equalTo(1))); - Map keyfield = (Map) properties.get("key"); - assertNotNull(keyfield); - assertThat(keyfield.get("type"), is(equalTo("keyword"))); - assertThat(keyfield.get("doc_values"), is(false)); - + assertEnrichMapping(mapping, """ + { + "key": { + "type": "keyword", + "doc_values": false + }, + "idx": { + "type": "long", + "index": false + }, + "field1": { + "type": "text", + "index": false + }, + "field2": { + "type": "long", + "index": false + }, + "field5": { + "type": "text", + "index": false + } + } + """); // Validate document structure SearchResponse enrichSearchResponse = client().search( new SearchRequest(".enrich-test1").source(SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery())) @@ -578,15 +613,30 @@ public void testRunnerMultiSourceDocIdCollisions() throws Exception { // Validate Mapping Map mapping = enrichIndex.getMappings().get(createdEnrichIndex).sourceAsMap(); - assertThat(mapping.get("dynamic"), is("false")); - Map properties = (Map) mapping.get("properties"); - assertNotNull(properties); - assertThat(properties.size(), is(equalTo(1))); - Map keyfield = (Map) properties.get("key"); - assertNotNull(keyfield); - assertThat(keyfield.get("type"), is(equalTo("keyword"))); - assertThat(keyfield.get("doc_values"), is(false)); - + assertEnrichMapping(mapping, """ + { + "key": { + "type": "keyword", + "doc_values": false + }, + "idx": { + "type": "long", + "index": false + }, + "field1": { + "type": "text", + "index": false + }, + "field2": { + "type": "long", + "index": false + }, + "field5": { + "type": "text", + "index": false + } + } + """); // Validate document structure SearchResponse enrichSearchResponse = client().search( new SearchRequest(".enrich-test1").source(SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery())) @@ -675,15 +725,30 @@ public void testRunnerMultiSourceEnrichKeyCollisions() throws Exception { // Validate Mapping Map mapping = enrichIndex.getMappings().get(createdEnrichIndex).sourceAsMap(); - assertThat(mapping.get("dynamic"), is("false")); - Map properties = (Map) mapping.get("properties"); - assertNotNull(properties); - assertThat(properties.size(), is(equalTo(1))); - Map keyfield = (Map) properties.get("key"); - assertNotNull(keyfield); - assertThat(keyfield.get("type"), is(equalTo("keyword"))); - assertThat(keyfield.get("doc_values"), is(false)); - + assertEnrichMapping(mapping, """ + { + "key": { + "type": "keyword", + "doc_values": false + }, + "field1": { + "type": "text", + "index": false + }, + "field2": { + "type": "long", + "index": false + }, + "field5": { + "type": "text", + "index": false + }, + "idx": { + "type": "long", + "index": false + } + } + """); // Validate document structure SearchResponse enrichSearchResponse = client().search( new SearchRequest(".enrich-test1").source(SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery())) @@ -971,21 +1036,22 @@ public void testRunnerObjectSourceMapping() throws Exception { // Validate Mapping Map mapping = enrichIndex.getMappings().get(createdEnrichIndex).sourceAsMap(); validateMappingMetadata(mapping, policyName, policy); - assertThat(mapping.get("dynamic"), is("false")); - Map properties = (Map) mapping.get("properties"); - assertNotNull(properties); - assertThat(properties.size(), is(equalTo(1))); - Map data = (Map) properties.get("data"); - assertNotNull(data); - assertThat(data.size(), is(equalTo(1))); - Map dataProperties = (Map) data.get("properties"); - assertNotNull(dataProperties); - assertThat(dataProperties.size(), is(equalTo(1))); - Map field1 = (Map) dataProperties.get("field1"); - assertNotNull(field1); - assertThat(field1.get("type"), is(equalTo("keyword"))); - assertThat(field1.get("doc_values"), is(false)); - + assertEnrichMapping(mapping, """ + { + "data": { + "properties": { + "field1": { + "type": "keyword", + "doc_values": false + }, + "field2": { + "type": "integer", + "index": false + } + } + } + } + """); SearchResponse enrichSearchResponse = client().search( new SearchRequest(".enrich-test1").source(SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery())) ).actionGet(); @@ -1082,21 +1148,22 @@ public void testRunnerExplicitObjectSourceMapping() throws Exception { // Validate Mapping Map mapping = enrichIndex.getMappings().get(createdEnrichIndex).sourceAsMap(); validateMappingMetadata(mapping, policyName, policy); - assertThat(mapping.get("dynamic"), is("false")); - Map properties = (Map) mapping.get("properties"); - assertNotNull(properties); - assertThat(properties.size(), is(equalTo(1))); - Map data = (Map) properties.get("data"); - assertNotNull(data); - assertThat(data.size(), is(equalTo(1))); - Map dataProperties = (Map) data.get("properties"); - assertNotNull(dataProperties); - assertThat(dataProperties.size(), is(equalTo(1))); - Map field1 = (Map) dataProperties.get("field1"); - assertNotNull(field1); - assertThat(field1.get("type"), is(equalTo("keyword"))); - assertThat(field1.get("doc_values"), is(false)); - + assertEnrichMapping(mapping, """ + { + "data": { + "properties": { + "field1": { + "type": "keyword", + "doc_values": false + }, + "field2": { + "type": "integer", + "index": false + } + } + } + } + """); SearchResponse enrichSearchResponse = client().search( new SearchRequest(".enrich-test1").source(SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery())) ).actionGet(); @@ -1194,21 +1261,22 @@ public void testRunnerExplicitObjectSourceMappingRangePolicy() throws Exception // Validate Mapping Map mapping = enrichIndex.getMappings().get(createdEnrichIndex).sourceAsMap(); validateMappingMetadata(mapping, policyName, policy); - assertThat(mapping.get("dynamic"), is("false")); - Map properties = (Map) mapping.get("properties"); - assertNotNull(properties); - assertThat(properties.size(), is(equalTo(1))); - Map data = (Map) properties.get("data"); - assertNotNull(data); - assertThat(data.size(), is(equalTo(1))); - Map dataProperties = (Map) data.get("properties"); - assertNotNull(dataProperties); - assertThat(dataProperties.size(), is(equalTo(1))); - Map field1 = (Map) dataProperties.get("subnet"); - assertNotNull(field1); - assertThat(field1.get("type"), is(equalTo("ip_range"))); - assertThat(field1.get("doc_values"), is(false)); - + assertEnrichMapping(mapping, """ + { + "data": { + "properties": { + "subnet": { + "type": "ip_range", + "doc_values": false + }, + "department": { + "type": "keyword", + "index": false + } + } + } + } + """); SearchResponse enrichSearchResponse = client().search( new SearchRequest(".enrich-test1").source( SearchSourceBuilder.searchSource().query(QueryBuilders.matchQuery("data.subnet", "10.0.0.1")) @@ -1315,26 +1383,26 @@ public void testRunnerTwoObjectLevelsSourceMapping() throws Exception { // Validate Mapping Map mapping = enrichIndex.getMappings().get(createdEnrichIndex).sourceAsMap(); validateMappingMetadata(mapping, policyName, policy); - assertThat(mapping.get("dynamic"), is("false")); - Map properties = (Map) mapping.get("properties"); - assertNotNull(properties); - assertThat(properties.size(), is(equalTo(1))); - Map data = (Map) properties.get("data"); - assertNotNull(data); - assertThat(data.size(), is(equalTo(1))); - Map dataProperties = (Map) data.get("properties"); - assertNotNull(dataProperties); - assertThat(dataProperties.size(), is(equalTo(1))); - Map fields = (Map) dataProperties.get("fields"); - assertNotNull(fields); - assertThat(fields.size(), is(equalTo(1))); - Map fieldsProperties = (Map) fields.get("properties"); - assertNotNull(fieldsProperties); - assertThat(fieldsProperties.size(), is(equalTo(1))); - Map field1 = (Map) fieldsProperties.get("field1"); - assertNotNull(field1); - assertThat(field1.get("type"), is(equalTo("keyword"))); - assertThat(field1.get("doc_values"), is(false)); + assertEnrichMapping(mapping, """ + { + "data": { + "properties": { + "fields": { + "properties": { + "field1": { + "type": "keyword", + "doc_values": false + }, + "field2": { + "type": "integer", + "index": false + } + } + } + } + } + } + """); SearchResponse enrichSearchResponse = client().search( new SearchRequest(".enrich-test1").source(SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery())) @@ -1442,27 +1510,26 @@ public void testRunnerTwoObjectLevelsSourceMappingRangePolicy() throws Exception // Validate Mapping Map mapping = enrichIndex.getMappings().get(createdEnrichIndex).sourceAsMap(); validateMappingMetadata(mapping, policyName, policy); - assertThat(mapping.get("dynamic"), is("false")); - Map properties = (Map) mapping.get("properties"); - assertNotNull(properties); - assertThat(properties.size(), is(equalTo(1))); - Map data = (Map) properties.get("data"); - assertNotNull(data); - assertThat(data.size(), is(equalTo(1))); - Map dataProperties = (Map) data.get("properties"); - assertNotNull(dataProperties); - assertThat(dataProperties.size(), is(equalTo(1))); - Map fields = (Map) dataProperties.get("fields"); - assertNotNull(fields); - assertThat(fields.size(), is(equalTo(1))); - Map fieldsProperties = (Map) fields.get("properties"); - assertNotNull(fieldsProperties); - assertThat(fieldsProperties.size(), is(equalTo(1))); - Map field1 = (Map) fieldsProperties.get("subnet"); - assertNotNull(field1); - assertThat(field1.get("type"), is(equalTo("ip_range"))); - assertThat(field1.get("doc_values"), is(false)); - + assertEnrichMapping(mapping, """ + { + "data": { + "properties": { + "fields": { + "properties": { + "subnet": { + "type": "ip_range", + "doc_values": false + }, + "department": { + "type": "keyword", + "index": false + } + } + } + } + } + } + """); SearchResponse enrichSearchResponse = client().search( new SearchRequest(".enrich-test1").source(SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery())) ).actionGet(); @@ -1576,26 +1643,27 @@ public void testRunnerTwoObjectLevelsSourceMappingDateRangeWithFormat() throws E // Validate Mapping Map mapping = enrichIndex.getMappings().get(createdEnrichIndex).sourceAsMap(); validateMappingMetadata(mapping, policyName, policy); - assertThat(mapping.get("dynamic"), is("false")); - Map properties = (Map) mapping.get("properties"); - assertNotNull(properties); - assertThat(properties.size(), is(equalTo(1))); - Map data = (Map) properties.get("data"); - assertNotNull(data); - assertThat(data.size(), is(equalTo(1))); - Map dataProperties = (Map) data.get("properties"); - assertNotNull(dataProperties); - assertThat(dataProperties.size(), is(equalTo(1))); - Map fields = (Map) dataProperties.get("fields"); - assertNotNull(fields); - assertThat(fields.size(), is(equalTo(1))); - Map fieldsProperties = (Map) fields.get("properties"); - assertNotNull(fieldsProperties); - assertThat(fieldsProperties.size(), is(equalTo(1))); - Map field1 = (Map) fieldsProperties.get("period"); - assertNotNull(field1); - assertThat(field1.get("type"), is(equalTo("date_range"))); - assertThat(field1.get("doc_values"), is(false)); + assertEnrichMapping(mapping, """ + { + "data": { + "properties": { + "fields": { + "properties": { + "period": { + "type": "date_range", + "format": "yyyy'/'MM'/'dd' at 'HH':'mm||strict_date_time", + "doc_values": false + }, + "status": { + "type": "keyword", + "index": false + } + } + } + } + } + } + """); SearchResponse enrichSearchResponse = client().search( new SearchRequest(".enrich-test1").source( @@ -1708,21 +1776,22 @@ public void testRunnerDottedKeyNameSourceMapping() throws Exception { // Validate Mapping Map mapping = enrichIndex.getMappings().get(createdEnrichIndex).sourceAsMap(); validateMappingMetadata(mapping, policyName, policy); - assertThat(mapping.get("dynamic"), is("false")); - Map properties = (Map) mapping.get("properties"); - assertNotNull(properties); - assertThat(properties.size(), is(equalTo(1))); - Map data = (Map) properties.get("data"); - assertNotNull(data); - assertThat(data.size(), is(equalTo(1))); - Map dataProperties = (Map) data.get("properties"); - assertNotNull(dataProperties); - assertThat(dataProperties.size(), is(equalTo(1))); - Map field1 = (Map) dataProperties.get("field1"); - assertNotNull(field1); - assertThat(field1.get("type"), is(equalTo("keyword"))); - assertThat(field1.get("doc_values"), is(false)); - + assertEnrichMapping(mapping, """ + { + "data": { + "properties": { + "field1": { + "type": "keyword", + "doc_values": false + }, + "field2": { + "type": "integer", + "index": false + } + } + } + } + """); SearchResponse enrichSearchResponse = client().search( new SearchRequest(".enrich-test1").source(SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery())) ).actionGet(); @@ -1867,16 +1936,22 @@ protected void ensureSingleSegment(String destinationIndexName, int attempt) { // Validate Mapping Map mapping = enrichIndex.getMappings().get(createdEnrichIndex).sourceAsMap(); - validateMappingMetadata(mapping, policyName, policy); - assertThat(mapping.get("dynamic"), is("false")); - Map properties = (Map) mapping.get("properties"); - assertNotNull(properties); - assertThat(properties.size(), is(equalTo(1))); - Map field1 = (Map) properties.get("field1"); - assertNotNull(field1); - assertThat(field1.get("type"), is(equalTo("keyword"))); - assertThat(field1.get("doc_values"), is(false)); - + assertEnrichMapping(mapping, """ + { + "field1": { + "type": "keyword", + "doc_values": false + }, + "field2": { + "type": "long", + "index": false + }, + "field5": { + "type": "text", + "index": false + } + } + """); // Validate document structure SearchResponse allEnrichDocs = client().search( new SearchRequest(".enrich-test1").source(SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery())) @@ -2007,6 +2082,108 @@ public void testRunRangePolicyWithObjectFieldAsMatchField() throws Exception { assertThat(e.getMessage(), equalTo("Field 'field1' has type [object] which doesn't appear to be a range type")); } + public void testEnrichFieldsConflictMappingTypes() { + createIndex("source-1", Settings.EMPTY, "_doc", "user", "type=keyword", "name", "type=text", "zipcode", "type=long"); + client().prepareIndex("source-1") + .setSource("user", "u1", "name", "n", "zipcode", 90000) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + .get(); + createIndex("source-2", Settings.EMPTY, "_doc", "user", "type=keyword", "zipcode", "type=long"); + + client().prepareIndex("source-2").setSource(""" + { + "user": "u2", + "name": { + "first": "f", + "last": "l" + }, + "zipcode": 90001 + } + """, XContentType.JSON).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); + + EnrichPolicy policy = new EnrichPolicy( + EnrichPolicy.MATCH_TYPE, + null, + List.of("source-1", "source-2"), + "user", + List.of("name", "zipcode") + ); + String policyName = "test1"; + final long createTime = randomNonNegativeLong(); + String createdEnrichIndex = ".enrich-test1-" + createTime; + PlainActionFuture future = new PlainActionFuture<>(); + EnrichPolicyRunner enrichPolicyRunner = createPolicyRunner(policyName, policy, future, createdEnrichIndex); + enrichPolicyRunner.run(); + future.actionGet(); + + // Validate Index definition + GetIndexResponse enrichIndex = getGetIndexResponseAndCheck(createdEnrichIndex); + Map mapping = enrichIndex.getMappings().get(createdEnrichIndex).sourceAsMap(); + assertEnrichMapping(mapping, """ + { + "user": { + "type": "keyword", + "doc_values": false + }, + "zipcode": { + "type": "long", + "index": false + } + } + """); + // Validate document structure + SearchResponse searchResponse = client().search(new SearchRequest(".enrich-test1")).actionGet(); + ElasticsearchAssertions.assertHitCount(searchResponse, 2L); + Map hit0 = searchResponse.getHits().getAt(0).getSourceAsMap(); + assertThat(hit0, equalTo(Map.of("user", "u1", "name", "n", "zipcode", 90000))); + Map hit1 = searchResponse.getHits().getAt(1).getSourceAsMap(); + assertThat(hit1, equalTo(Map.of("user", "u2", "name", Map.of("first", "f", "last", "l"), "zipcode", 90001))); + } + + public void testEnrichMappingConflictFormats() { + createIndex("source-1", Settings.EMPTY, "_doc", "user", "type=keyword", "date", "type=date,format=yyyy"); + client().prepareIndex("source-1") + .setSource("user", "u1", "date", "2023") + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + .get(); + createIndex("source-2", Settings.EMPTY, "_doc", "user", "type=keyword", "date", "type=date,format=yyyy-MM"); + + client().prepareIndex("source-2").setSource(""" + { + "user": "u2", + "date": "2023-05" + } + """, XContentType.JSON).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); + + EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of("source-1", "source-2"), "user", List.of("date")); + String policyName = "test1"; + final long createTime = randomNonNegativeLong(); + String createdEnrichIndex = ".enrich-test1-" + createTime; + PlainActionFuture future = new PlainActionFuture<>(); + EnrichPolicyRunner enrichPolicyRunner = createPolicyRunner(policyName, policy, future, createdEnrichIndex); + enrichPolicyRunner.run(); + future.actionGet(); + + // Validate Index definition + GetIndexResponse enrichIndex = getGetIndexResponseAndCheck(createdEnrichIndex); + Map mapping = enrichIndex.getMappings().get(createdEnrichIndex).sourceAsMap(); + assertEnrichMapping(mapping, """ + { + "user": { + "type": "keyword", + "doc_values": false + } + } + """); + // Validate document structure + SearchResponse searchResponse = client().search(new SearchRequest(".enrich-test1")).actionGet(); + ElasticsearchAssertions.assertHitCount(searchResponse, 2L); + Map hit0 = searchResponse.getHits().getAt(0).getSourceAsMap(); + assertThat(hit0, equalTo(Map.of("user", "u1", "date", "2023"))); + Map hit1 = searchResponse.getHits().getAt(1).getSourceAsMap(); + assertThat(hit1, equalTo(Map.of("user", "u2", "date", "2023-05"))); + } + private EnrichPolicyRunner createPolicyRunner( String policyName, EnrichPolicy policy, @@ -2124,4 +2301,11 @@ private void ensureEnrichIndexIsReadOnly(String createdEnrichIndex) { assertThat(expected.getMessage(), containsString("index [" + createdEnrichIndex + "] blocked by: [FORBIDDEN/8/index write (api)]")); } + + private static void assertEnrichMapping(Map actual, String expectedMapping) { + assertThat(actual.get("dynamic"), is("false")); + Object actualProperties = actual.get("properties"); + Map mappings = XContentHelper.convertToMap(JsonXContent.jsonXContent, expectedMapping, false); + assertThat(actualProperties, equalTo(mappings)); + } }