Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/96056.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 96056
summary: Add mappings for enrich fields
area: Ingest Node
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,15 @@
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.CheckedFunction;
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.common.util.iterable.Iterables;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.ReindexRequest;
import org.elasticsearch.index.reindex.ScrollableHitSource;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.xcontent.ObjectPath;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentType;
import org.elasticsearch.xcontent.json.JsonXContent;
Expand All @@ -57,6 +57,7 @@

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -180,9 +181,9 @@ static void validateMappings(
}
// Validate the key and values
try {
validateField(mapping, policy.getMatchField(), true);
validateAndGetMappingTypeAndFormat(mapping, policy.getMatchField(), true);
for (String valueFieldName : policy.getEnrichFields()) {
validateField(mapping, valueFieldName, false);
validateAndGetMappingTypeAndFormat(mapping, valueFieldName, false);
}
} catch (ElasticsearchException e) {
throw new ElasticsearchException(
Expand All @@ -194,11 +195,64 @@ static void validateMappings(
}
}

private static void validateField(Map<?, ?> properties, String fieldName, boolean fieldRequired) {
private record MappingTypeAndFormat(String type, String format) {

}

private static MappingTypeAndFormat validateAndGetMappingTypeAndFormat(
String fieldName,
EnrichPolicy policy,
boolean strictlyRequired,
List<Map<String, Object>> sourceMappings
) {
var fieldMappings = sourceMappings.stream()
.map(mapping -> validateAndGetMappingTypeAndFormat(mapping, fieldName, strictlyRequired))
.filter(Objects::nonNull)
.toList();
Set<String> types = fieldMappings.stream().map(tf -> tf.type).collect(Collectors.toSet());
if (types.size() > 1) {
if (strictlyRequired) {
throw new ElasticsearchException(
"Multiple distinct mapping types for field '{}' - indices({}) types({})",
fieldName,
Strings.collectionToCommaDelimitedString(policy.getIndices()),
Strings.collectionToCommaDelimitedString(types)
);
}
return null;
}
if (types.isEmpty()) {
return null;
}
Set<String> formats = fieldMappings.stream().map(tf -> tf.format).filter(Objects::nonNull).collect(Collectors.toSet());
if (formats.size() > 1) {
if (strictlyRequired) {
throw new ElasticsearchException(
"Multiple distinct formats specified for field '{}' - indices({}) format entries({})",
policy.getMatchField(),
Strings.collectionToCommaDelimitedString(policy.getIndices()),
Strings.collectionToCommaDelimitedString(formats)
);
}
return null;
}
Comment on lines +228 to +238
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Bug: Incorrect field name in error message for format conflicts.

Line 232 uses policy.getMatchField() in the error message, but this method can be called for any field (including enrich fields). This is inconsistent with the type conflict error at Line 217 which correctly uses fieldName.

🐛 Proposed fix
         if (formats.size() > 1) {
             if (strictlyRequired) {
                 throw new ElasticsearchException(
                     "Multiple distinct formats specified for field '{}' - indices({})  format entries({})",
-                    policy.getMatchField(),
+                    fieldName,
                     Strings.collectionToCommaDelimitedString(policy.getIndices()),
                     Strings.collectionToCommaDelimitedString(formats)
                 );
             }
🤖 Prompt for AI Agents
In
`@x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java`
around lines 228 - 238, The error message in EnrichPolicyRunner's formats
conflict branch uses policy.getMatchField() which is incorrect for non-match
fields; replace that usage with the local fieldName variable so the exception
shows the actual field being validated (update the thrown ElasticsearchException
message invocation in the if (formats.size() > 1) block to pass fieldName
instead of policy.getMatchField(), keeping the rest of the message/arguments the
same).

return new MappingTypeAndFormat(Iterables.get(types, 0), formats.isEmpty() ? null : Iterables.get(formats, 0));
}

@SuppressWarnings("unchecked")
private static <T> T extractValues(Map<String, Object> properties, String path) {
return (T) properties.get(path);
}

private static MappingTypeAndFormat validateAndGetMappingTypeAndFormat(
Map<String, Object> properties,
String fieldName,
boolean fieldRequired
) {
assert Strings.isEmpty(fieldName) == false : "Field name cannot be null or empty";
String[] fieldParts = fieldName.split("\\.");
StringBuilder parent = new StringBuilder();
Map<?, ?> currentField = properties;
Map<String, Object> currentField = properties;
boolean onRoot = true;
for (String fieldPart : fieldParts) {
// Ensure that the current field is of object type only (not a nested type or a non compound field)
Expand All @@ -211,7 +265,7 @@ private static void validateField(Map<?, ?> properties, String fieldName, boolea
type
);
}
Map<?, ?> currentProperties = ((Map<?, ?>) currentField.get("properties"));
Map<String, Object> currentProperties = extractValues(currentField, "properties");
if (currentProperties == null) {
if (fieldRequired) {
throw new ElasticsearchException(
Expand All @@ -220,10 +274,10 @@ private static void validateField(Map<?, ?> properties, String fieldName, boolea
onRoot ? "root" : parent.toString()
);
} else {
return;
return null;
}
}
currentField = ((Map<?, ?>) currentProperties.get(fieldPart));
currentField = extractValues(currentProperties, fieldPart);
if (currentField == null) {
if (fieldRequired) {
throw new ElasticsearchException(
Expand All @@ -233,7 +287,7 @@ private static void validateField(Map<?, ?> properties, String fieldName, boolea
onRoot ? "root" : parent.toString()
);
} else {
return;
return null;
}
}
if (onRoot) {
Expand All @@ -243,95 +297,70 @@ private static void validateField(Map<?, ?> properties, String fieldName, boolea
}
parent.append(fieldPart);
}
}

private XContentBuilder resolveEnrichMapping(final EnrichPolicy enrichPolicy, final List<Map<String, Object>> mappings) {
if (EnrichPolicy.MATCH_TYPE.equals(enrichPolicy.getType())) {
return createEnrichMappingBuilder((builder) -> builder.field("type", "keyword").field("doc_values", false));
} else if (EnrichPolicy.RANGE_TYPE.equals(enrichPolicy.getType())) {
return createRangeEnrichMappingBuilder(enrichPolicy, mappings);
} else if (EnrichPolicy.GEO_MATCH_TYPE.equals(enrichPolicy.getType())) {
return createEnrichMappingBuilder((builder) -> builder.field("type", "geo_shape"));
} else {
throw new ElasticsearchException("Unrecognized enrich policy type [{}]", enrichPolicy.getType());
if (currentField == null) {
return null;
}
final String type = (String) currentField.getOrDefault("type", "object");
final String format = (String) currentField.get("format");
return new MappingTypeAndFormat(type, format);
}

private XContentBuilder createRangeEnrichMappingBuilder(EnrichPolicy enrichPolicy, List<Map<String, Object>> mappings) {
String matchFieldPath = "properties." + enrichPolicy.getMatchField().replace(".", ".properties.");
List<Map<String, String>> matchFieldMappings = mappings.stream()
.map(map -> ObjectPath.<Map<String, String>>eval(matchFieldPath, map))
.filter(Objects::nonNull)
.toList();

Set<String> types = matchFieldMappings.stream().map(map -> map.get("type")).collect(Collectors.toSet());
if (types.size() == 1) {
String type = types.iterator().next();
if (type == null) {
// when no type is defined in a field mapping then it is of type object:
throw new ElasticsearchException(
"Field '{}' has type [object] which doesn't appear to be a range type",
enrichPolicy.getMatchField(),
type
);
}
static final Set<String> RANGE_TYPES = Set.of("integer_range", "float_range", "long_range", "double_range", "ip_range", "date_range");

switch (type) {
case "integer_range":
case "float_range":
case "long_range":
case "double_range":
case "ip_range":
return createEnrichMappingBuilder((builder) -> builder.field("type", type).field("doc_values", false));

// date_range types mappings allow for the format to be specified, should be preserved in the created index
case "date_range":
Set<String> formatEntries = matchFieldMappings.stream().map(map -> map.get("format")).collect(Collectors.toSet());
if (formatEntries.size() == 1) {
return createEnrichMappingBuilder((builder) -> {
builder.field("type", type).field("doc_values", false);
String format = formatEntries.iterator().next();
if (format != null) {
builder.field("format", format);
}
return builder;
});
}
if (formatEntries.isEmpty()) {
// no format specify rely on default
return createEnrichMappingBuilder((builder) -> builder.field("type", type).field("doc_values", false));
}
throw new ElasticsearchException(
"Multiple distinct date format specified for match field '{}' - indices({}) format entries({})",
enrichPolicy.getMatchField(),
Strings.collectionToCommaDelimitedString(enrichPolicy.getIndices()),
(formatEntries.contains(null) ? "(DEFAULT), " : "") + Strings.collectionToCommaDelimitedString(formatEntries)
);

default:
static Map<String, Object> mappingForMatchField(EnrichPolicy policy, List<Map<String, Object>> sourceMappings) {
MappingTypeAndFormat typeAndFormat = validateAndGetMappingTypeAndFormat(policy.getMatchField(), policy, true, sourceMappings);
if (typeAndFormat == null) {
throw new ElasticsearchException(
"Match field '{}' doesn't have a correct mapping type for policy type '{}'",
policy.getMatchField(),
policy.getType()
);
}
return switch (policy.getType()) {
case EnrichPolicy.MATCH_TYPE -> Map.of("type", "keyword", "doc_values", false);
case EnrichPolicy.GEO_MATCH_TYPE -> Map.of("type", "geo_shape");
case EnrichPolicy.RANGE_TYPE -> {
if (RANGE_TYPES.contains(typeAndFormat.type) == false) {
throw new ElasticsearchException(
"Field '{}' has type [{}] which doesn't appear to be a range type",
enrichPolicy.getMatchField(),
type
policy.getMatchField(),
typeAndFormat.type
);
}
Map<String, Object> mapping = Maps.newMapWithExpectedSize(3);
mapping.put("type", typeAndFormat.type);
mapping.put("doc_values", false);
if (typeAndFormat.format != null) {
mapping.put("format", typeAndFormat.format);
}
yield mapping;
}
}
if (types.isEmpty()) {
throw new ElasticsearchException(
"No mapping type found for match field '{}' - indices({})",
enrichPolicy.getMatchField(),
Strings.collectionToCommaDelimitedString(enrichPolicy.getIndices())
);
}
throw new ElasticsearchException(
"Multiple distinct mapping types for match field '{}' - indices({}) types({})",
enrichPolicy.getMatchField(),
Strings.collectionToCommaDelimitedString(enrichPolicy.getIndices()),
Strings.collectionToCommaDelimitedString(types)
);
default -> throw new ElasticsearchException("Unrecognized enrich policy type [{}]", policy.getType());
};
}

private XContentBuilder createEnrichMappingBuilder(CheckedFunction<XContentBuilder, XContentBuilder, IOException> matchFieldMapping) {
private XContentBuilder createEnrichMapping(List<Map<String, Object>> sourceMappings) {
Map<String, Map<String, Object>> fieldMappings = new HashMap<>();
Map<String, Object> mappingForMatchField = mappingForMatchField(policy, sourceMappings);
for (String enrichField : policy.getEnrichFields()) {
if (enrichField.equals(policy.getMatchField())) {
mappingForMatchField = new HashMap<>(mappingForMatchField);
mappingForMatchField.remove("doc_values"); // enable doc_values
} else {
var typeAndFormat = validateAndGetMappingTypeAndFormat(enrichField, policy, false, sourceMappings);
if (typeAndFormat != null) {
Map<String, Object> mapping = Maps.newMapWithExpectedSize(3);
mapping.put("type", typeAndFormat.type);
if (typeAndFormat.format != null) {
mapping.put("format", typeAndFormat.format);
}
mapping.put("index", false); // disable index
fieldMappings.put(enrichField, mapping);
}
}
}
fieldMappings.put(policy.getMatchField(), mappingForMatchField);

// Enable _source on enrich index. Explicitly mark key mapping type.
try {
XContentBuilder builder = JsonXContent.contentBuilder();
Expand All @@ -347,9 +376,7 @@ private XContentBuilder createEnrichMappingBuilder(CheckedFunction<XContentBuild
builder.endObject();
builder.startObject("properties");
{
builder.startObject(policy.getMatchField());
matchFieldMapping.apply(builder);
builder.endObject();
builder.mapContents(fieldMappings);
}
builder.endObject();
builder.startObject("_meta");
Expand Down Expand Up @@ -380,7 +407,7 @@ private void prepareAndCreateEnrichIndex(List<Map<String, Object>> mappings) {
.put("index.warmer.enabled", false)
.build();
CreateIndexRequest createEnrichIndexRequest = new CreateIndexRequest(enrichIndexName, enrichIndexSettings);
createEnrichIndexRequest.mapping(resolveEnrichMapping(policy, mappings));
createEnrichIndexRequest.mapping(createEnrichMapping(mappings));
logger.debug("Policy [{}]: Creating new enrich index [{}]", policyName, enrichIndexName);
enrichOriginClient().admin()
.indices()
Expand Down
Loading