diff --git a/core/src/main/java/org/opensearch/sql/calcite/CalciteRexNodeVisitor.java b/core/src/main/java/org/opensearch/sql/calcite/CalciteRexNodeVisitor.java index a9a32a707b6..1c6231cfab0 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/CalciteRexNodeVisitor.java +++ b/core/src/main/java/org/opensearch/sql/calcite/CalciteRexNodeVisitor.java @@ -250,9 +250,45 @@ public RexNode visitEqualTo(EqualTo node, CalcitePlanContext context) { return context.rexBuilder.equals(left, right); } + static final int namespace_max_length = 1; + /** Resolve qualified name. Note, the name should be case-sensitive. */ @Override public RexNode visitQualifiedName(QualifiedName node, CalcitePlanContext context) { + for (int i = 0; i <= namespace_max_length; ++i) { + RexNode result = visitQualifiedNameInner(node, context, i); + if (result != null) { + return result; + } + } + throw new IllegalArgumentException( + String.format( + "Cannot resolve field [%s]; input fields are: %s", + node, context.relBuilder.peek().getRowType().getFieldNames())); + } + + private RexNode visitQualifiedNameInner( + QualifiedName node, CalcitePlanContext context, int namespaceLen) { + int rootIdentSize = namespaceLen + 1; + QualifiedName rootFieldName = + QualifiedName.of(node.getParts().stream().limit(rootIdentSize).toList()); + RexNode rootField = visitRootQualifiedName(rootFieldName, context); + if (rootField == null) { + // If the root field is not found, return null + return null; + } + return node.getParts().size() > rootIdentSize + ? + // Provide alias for RexFieldAccess, otherwise it will be auto-generated name + context.relBuilder.alias( + node.getParts().stream() + .skip(rootIdentSize) + .reduce(rootField, context.relBuilder::dot, (rex1, rex2) -> rex1), + node.toString()) + : rootField; + } + + private RexNode visitRootQualifiedName(QualifiedName node, CalcitePlanContext context) { // 1. resolve QualifiedName in join condition if (context.isResolvingJoinCondition()) { List parts = node.getParts(); @@ -281,14 +317,13 @@ public RexNode visitQualifiedName(QualifiedName node, CalcitePlanContext context // field("nation2", "n_name"); // do nothing when fail (n_name is field of nation1) } } - throw new UnsupportedOperationException("Unsupported qualified name: " + node); + return null; } } else if (parts.size() == 3) { - throw new UnsupportedOperationException("Unsupported qualified name: " + node); + return null; } } - // TODO: Need to support nested fields https://github.com/opensearch-project/sql/issues/3459 // 2. resolve QualifiedName in non-join condition String qualifiedName = node.toString(); if (context.getRexLambdaRefMap().containsKey(qualifiedName)) { @@ -338,11 +373,8 @@ public RexNode visitQualifiedName(QualifiedName node, CalcitePlanContext context .peekCorrelVar() .map(correlVar -> context.relBuilder.field(correlVar, qualifiedName)) .orElseGet(() -> context.relBuilder.field(qualifiedName)); - } else { - throw new IllegalArgumentException( - String.format( - "field [%s] not found; input fields are: %s", qualifiedName, currentFields)); } + return null; } @Override diff --git a/core/src/main/java/org/opensearch/sql/calcite/utils/OpenSearchTypeFactory.java b/core/src/main/java/org/opensearch/sql/calcite/utils/OpenSearchTypeFactory.java index 57e43d93897..730b16c9ba0 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/utils/OpenSearchTypeFactory.java +++ b/core/src/main/java/org/opensearch/sql/calcite/utils/OpenSearchTypeFactory.java @@ -177,8 +177,6 @@ public static RelDataType convertExprTypeToRelDataType(ExprType fieldType, boole return TYPE_FACTORY.createArrayType( TYPE_FACTORY.createSqlType(SqlTypeName.ANY, nullable), -1); case STRUCT: - // TODO: should use RelRecordType instead of MapSqlType here - // https://github.com/opensearch-project/sql/issues/3459 final RelDataType relKey = TYPE_FACTORY.createSqlType(SqlTypeName.VARCHAR); return TYPE_FACTORY.createMapType( relKey, TYPE_FACTORY.createSqlType(SqlTypeName.BINARY), nullable); @@ -243,7 +241,7 @@ public static ExprType convertSqlTypeNameToExprType(SqlTypeName sqlTypeName) { INTERVAL_MINUTE_SECOND, INTERVAL_SECOND -> INTERVAL; case ARRAY -> ARRAY; - case MAP -> STRUCT; + case ROW, MAP -> STRUCT; case GEOMETRY -> GEO_POINT; case NULL, ANY, OTHER -> UNDEFINED; default -> UNKNOWN; diff --git a/core/src/main/java/org/opensearch/sql/data/model/ExprTupleValue.java b/core/src/main/java/org/opensearch/sql/data/model/ExprTupleValue.java index b92fdb51bf2..e4217f243d7 100644 --- a/core/src/main/java/org/opensearch/sql/data/model/ExprTupleValue.java +++ b/core/src/main/java/org/opensearch/sql/data/model/ExprTupleValue.java @@ -7,10 +7,13 @@ import java.util.Iterator; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Objects; import java.util.stream.Collectors; +import javax.annotation.Nullable; +import lombok.AllArgsConstructor; import lombok.RequiredArgsConstructor; import org.opensearch.sql.data.type.ExprCoreType; import org.opensearch.sql.data.type.ExprType; @@ -19,18 +22,26 @@ /** Expression Tuple Value. */ @RequiredArgsConstructor +@AllArgsConstructor public class ExprTupleValue extends AbstractExprValue { + // Only used for Calcite to keep the field names in the same order as the schema. + @Nullable private List fieldNames; private final LinkedHashMap valueMap; public static ExprTupleValue fromExprValueMap(Map map) { + return fromExprValueMap(null, map); + } + + public static ExprTupleValue fromExprValueMap( + List fieldNames, Map map) { LinkedHashMap linkedHashMap = new LinkedHashMap<>(map); - return new ExprTupleValue(linkedHashMap); + return new ExprTupleValue(fieldNames, linkedHashMap); } - public static ExprTupleValue empty() { + public static ExprTupleValue empty(List fieldNames) { LinkedHashMap linkedHashMap = new LinkedHashMap<>(); - return new ExprTupleValue(linkedHashMap); + return new ExprTupleValue(fieldNames, linkedHashMap); } @Override @@ -44,11 +55,10 @@ public Object value() { @Override public Object valueForCalcite() { - LinkedHashMap resultMap = new LinkedHashMap<>(); - for (Entry entry : valueMap.entrySet()) { - resultMap.put(entry.getKey(), entry.getValue().valueForCalcite()); - } - return resultMap; + // Needs to keep the value the same sequence as the schema, and fill up missing values + return fieldNames.stream() + .map(fieldName -> valueMap.getOrDefault(fieldName, ExprMissingValue.of()).valueForCalcite()) + .toArray(); } @Override diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteObjectFieldOperateIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteObjectFieldOperateIT.java index 6f181e37f77..49e1b2fe0ee 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteObjectFieldOperateIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteObjectFieldOperateIT.java @@ -5,6 +5,13 @@ package org.opensearch.sql.calcite.remote; +import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_DEEP_NESTED; +import static org.opensearch.sql.util.MatcherUtils.schema; +import static org.opensearch.sql.util.MatcherUtils.verifySchema; + +import java.io.IOException; +import org.json.JSONObject; +import org.junit.Test; import org.opensearch.sql.ppl.ObjectFieldOperateIT; public class CalciteObjectFieldOperateIT extends ObjectFieldOperateIT { @@ -14,4 +21,12 @@ public void init() throws Exception { enableCalcite(); disallowCalciteFallback(); } + + @Test + public void test() throws IOException { + JSONObject result = + executeQuery( + String.format("source=%s | fields city | sort city.name ", TEST_INDEX_DEEP_NESTED)); + verifySchema(result, schema("city", "struct")); + } } diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/data/type/OpenSearchDataType.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/data/type/OpenSearchDataType.java index 51a4db60ffd..2eebd130b4a 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/data/type/OpenSearchDataType.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/data/type/OpenSearchDataType.java @@ -293,7 +293,7 @@ public void accept(Map subtree, String prefix) { String entryKey = entry.getKey(); var nextPrefix = prefix.isEmpty() ? entryKey : String.format("%s.%s", prefix, entryKey); - result.put(nextPrefix, entry.getValue().cloneEmpty()); + result.put(nextPrefix, entry.getValue()); var nextSubtree = entry.getValue().getProperties(); if (!nextSubtree.isEmpty()) { accept(nextSubtree, nextPrefix); diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/data/value/OpenSearchExprValueFactory.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/data/value/OpenSearchExprValueFactory.java index a26f56ef702..59428d25043 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/data/value/OpenSearchExprValueFactory.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/data/value/OpenSearchExprValueFactory.java @@ -346,12 +346,13 @@ private static ExprValue createOpenSearchDateType(Content value, ExprType type) * @return Value parsed from content. */ private ExprValue parseStruct(Content content, String prefix, boolean supportArrays) { - ExprTupleValue result = ExprTupleValue.empty(); + ExprTupleValue result = ExprTupleValue.empty(getFields(prefix)); content .map() .forEachRemaining( entry -> populateValueRecursive( + prefix, result, new JsonPath(entry.getKey()), parse( @@ -362,6 +363,12 @@ private ExprValue parseStruct(Content content, String prefix, boolean supportArr return result; } + private List getFields(String prefix) { + return TOP_PATH.equals(prefix) + ? typeMapping.keySet().stream().toList() + : typeMapping.get(prefix).getProperties().keySet().stream().toList(); + } + /** * Populate the current ExprTupleValue recursively. * @@ -370,7 +377,8 @@ private ExprValue parseStruct(Content content, String prefix, boolean supportArr * *

If there is existing vale for the JsonPath, we need to merge the new value to the old. */ - static void populateValueRecursive(ExprTupleValue result, JsonPath path, ExprValue value) { + void populateValueRecursive( + String prefix, ExprTupleValue result, JsonPath path, ExprValue value) { if (path.getPaths().size() == 1) { // Update the current ExprValue by using mergeTo if exists result @@ -378,9 +386,15 @@ static void populateValueRecursive(ExprTupleValue result, JsonPath path, ExprVal .computeIfPresent(path.getRootPath(), (key, oldValue) -> value.mergeTo(oldValue)); result.tupleValue().putIfAbsent(path.getRootPath(), value); } else { - result.tupleValue().putIfAbsent(path.getRootPath(), ExprTupleValue.empty()); + String newPrefix = prefix + path.getRootPath(); + result + .tupleValue() + .putIfAbsent(path.getRootPath(), ExprTupleValue.empty(getFields(newPrefix))); populateValueRecursive( - (ExprTupleValue) result.tupleValue().get(path.getRootPath()), path.getChildPath(), value); + newPrefix, + (ExprTupleValue) result.tupleValue().get(path.getRootPath()), + path.getChildPath(), + value); } } diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java index b0002f9ba52..847ceca5824 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java @@ -224,11 +224,10 @@ private void buildResultSet( // Loop through each column for (int i = 1; i <= columnCount; i++) { String columnName = metaData.getColumnName(i); - int sqlType = metaData.getColumnType(i); RelDataType fieldType = fieldTypes.get(i - 1); ExprValue exprValue = - JdbcOpenSearchDataTypeConvertor.getExprValueFromSqlType( - resultSet, i, sqlType, fieldType, columnName); + JdbcOpenSearchDataTypeConvertor.getExprValueFromRelDataType( + getObject(resultSet, i, columnName, fieldType.getSqlTypeName()), fieldType); row.put(columnName, exprValue); } values.add(ExprTupleValue.fromExprValueMap(row)); @@ -259,6 +258,13 @@ private void buildResultSet( listener.onResponse(response); } + private static Object getObject(ResultSet rs, int i, String columnName, SqlTypeName type) + throws SQLException { + // For GEOMETRY, use getObject by name instead of index to avoid Avatica's transformation on + // the accessor. Otherwise, Avatica will transform Geometry to String. + return type == SqlTypeName.GEOMETRY ? rs.getObject(columnName) : rs.getObject(i); + } + /** Registers opensearch-dependent functions */ private void registerOpenSearchFunctions() { PPLFuncImpTable.FunctionImp geoIpImpl = diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/physical/OpenSearchProjectIndexScanRule.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/physical/OpenSearchProjectIndexScanRule.java index 2724c419e33..7b801355d3f 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/physical/OpenSearchProjectIndexScanRule.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/physical/OpenSearchProjectIndexScanRule.java @@ -6,19 +6,22 @@ import static java.util.Objects.requireNonNull; -import java.util.ArrayList; +import java.util.LinkedHashSet; import java.util.List; +import java.util.Set; import org.apache.calcite.plan.RelOptRuleCall; import org.apache.calcite.plan.RelOptTable; import org.apache.calcite.plan.RelRule; import org.apache.calcite.rel.logical.LogicalProject; +import org.apache.calcite.rex.RexFieldAccess; import org.apache.calcite.rex.RexInputRef; import org.apache.calcite.rex.RexNode; import org.apache.calcite.rex.RexUtil; import org.apache.calcite.rex.RexVisitorImpl; -import org.apache.calcite.util.mapping.Mapping; -import org.apache.calcite.util.mapping.Mappings; import org.immutables.value.Value; +import org.opensearch.sql.opensearch.planner.physical.RexPermuteIdentShuttle.Ident; +import org.opensearch.sql.opensearch.planner.physical.RexPermuteIdentShuttle.IdentRexVisitorImpl; +import org.opensearch.sql.opensearch.planner.physical.RexPermuteIdentShuttle.IdentTargetMapping; import org.opensearch.sql.opensearch.storage.OpenSearchIndex; import org.opensearch.sql.opensearch.storage.scan.CalciteLogicalIndexScan; @@ -50,25 +53,38 @@ protected void apply(RelOptRuleCall call, LogicalProject project, CalciteLogical final RelOptTable table = scan.getTable(); requireNonNull(table.unwrap(OpenSearchIndex.class)); - // TODO: support script pushdown for project instead of only reference + // TODO: support script pushdown for project instead of only reference or field access // https://github.com/opensearch-project/sql/issues/3387 - final List selectedColumns = new ArrayList<>(); + final Set selectedColumns = new LinkedHashSet<>(); + final IdentRexVisitorImpl identRexVisitor = new IdentRexVisitorImpl(); final RexVisitorImpl visitor = - new RexVisitorImpl(true) { + new RexVisitorImpl<>(true) { @Override public Void visitInputRef(RexInputRef inputRef) { - if (!selectedColumns.contains(inputRef.getIndex())) { - selectedColumns.add(inputRef.getIndex()); - } + Ident ident = new Ident(null, inputRef.getIndex(), null); + selectedColumns.add(ident); + return null; + } + + @Override + public Void visitFieldAccess(RexFieldAccess fieldAccess) { + Ident prefix = fieldAccess.getReferenceExpr().accept(identRexVisitor); + Ident ident = + new Ident(prefix, fieldAccess.getField().getIndex(), fieldAccess.getField()); + selectedColumns.add(ident); return null; } }; visitor.visitEach(project.getProjects()); // Only do push down when an actual projection happens + // TODO: fix this for FieldAccess + List identList = selectedColumns.stream().toList(); if (!selectedColumns.isEmpty() && selectedColumns.size() != scan.getRowType().getFieldCount()) { - Mapping mapping = Mappings.target(selectedColumns, scan.getRowType().getFieldCount()); - CalciteLogicalIndexScan newScan = scan.pushDownProject(selectedColumns); - final List newProjectRexNodes = RexUtil.apply(mapping, project.getProjects()); + IdentTargetMapping mapping = + IdentTargetMapping.create(identList, scan.getRowType().getFieldCount()); + CalciteLogicalIndexScan newScan = scan.pushDownProject(identList, mapping); + final List newProjectRexNodes = + RexPermuteIdentShuttle.of(mapping, identRexVisitor).visitList(project.getProjects()); if (RexUtil.isIdentity(newProjectRexNodes, newScan.getRowType())) { call.transformTo(newScan); diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/physical/RexPermuteIdentShuttle.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/physical/RexPermuteIdentShuttle.java new file mode 100644 index 00000000000..7a09e591792 --- /dev/null +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/physical/RexPermuteIdentShuttle.java @@ -0,0 +1,133 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.sql.opensearch.planner.physical; + +import static java.util.Objects.hash; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.rel.type.RelDataTypeFieldImpl; +import org.apache.calcite.rex.RexFieldAccess; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexPermuteInputsShuttle; +import org.apache.calcite.rex.RexVisitorImpl; +import org.apache.calcite.util.mapping.AbstractTargetMapping; + +public class RexPermuteIdentShuttle extends RexPermuteInputsShuttle { + private final IdentTargetMapping mapping; + private final RexVisitorImpl visitor; + + public RexPermuteIdentShuttle(IdentTargetMapping mapping, RexVisitorImpl visitor) { + super(mapping); + this.mapping = mapping; + this.visitor = visitor; + } + + public static RexPermuteIdentShuttle of( + IdentTargetMapping mapping, RexVisitorImpl visitor) { + return new RexPermuteIdentShuttle(mapping, visitor); + } + + @Override + public RexNode visitInputRef(RexInputRef local) { + return visitToTransform(local); + } + + @Override + public RexNode visitFieldAccess(RexFieldAccess fieldAccess) { + return visitToTransform(fieldAccess); + } + + private RexNode visitToTransform(RexNode node) { + Ident ident = node.accept(visitor); + int target = mapping.getTargetByIdent(ident); + return new RexInputRef(target, node.getType()); + } + + /** + * @param prefix The prefix of the Ident, null if the Ident is generated from RexInputRef + * @param index The index of the Ident in the schema + * @param typeField For the convenience of retrieving RelDataTypeField of RexFieldAccess; Should + * be NULL for RexInputRef since the field name has been set to index, which is useless. No + * usage in identification of Ident, so no need to be used in equals or hashCode. + */ + public record Ident( + RexPermuteIdentShuttle.Ident prefix, Integer index, RelDataTypeField typeField) { + + public boolean equals(Object other) { + if (this == other) { + return true; + } + if (!(other instanceof Ident otherIdent)) { + return false; + } + return Objects.equals(prefix, otherIdent.prefix) && Objects.equals(index, otherIdent.index); + } + + public int hashCode() { + return hash(prefix, index); + } + + public RelDataTypeField constructFieldDataType(List fieldList) { + // For RexInputRef, the typeField is null, so we use the retrieved field in the row type. + return prefix == null + ? fieldList.get(index) + : new RelDataTypeFieldImpl(constructFieldName(fieldList), index, typeField.getType()); + } + + private String constructFieldName(List fieldList) { + return prefix == null + ? fieldList.get(index).getName() + : prefix.constructFieldName(fieldList) + "." + typeField.getName(); + } + } + + public static class IdentTargetMapping extends AbstractTargetMapping { + private final Map identToTarget; + + private IdentTargetMapping(Map identToTarget, int sourceCount) { + super(sourceCount, identToTarget.size()); + this.identToTarget = identToTarget; + } + + public static IdentTargetMapping create(List selectedColumns, int sourceCount) { + final Map identToTarget = new HashMap<>(); + for (int i = 0; i < selectedColumns.size(); i++) { + identToTarget.put(selectedColumns.get(i), i); + } + return new IdentTargetMapping(identToTarget, sourceCount); + } + + @Override + public int getTargetOpt(int source) { + throw new UnsupportedOperationException(); + } + + public int getTargetByIdent(Ident ident) { + return identToTarget.getOrDefault(ident, -1); + } + } + + public static class IdentRexVisitorImpl extends RexVisitorImpl { + protected IdentRexVisitorImpl() { + super(true); + } + + @Override + public Ident visitInputRef(RexInputRef inputRef) { + return new Ident(null, inputRef.getIndex(), null); + } + + @Override + public Ident visitFieldAccess(RexFieldAccess fieldAccess) { + Ident prefix = fieldAccess.getReferenceExpr().accept(this); + return new Ident(prefix, fieldAccess.getField().getIndex(), fieldAccess.getField()); + } + } +} diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java index aed1ab42839..eb70deec910 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java @@ -5,9 +5,14 @@ package org.opensearch.sql.opensearch.storage; +import static org.opensearch.sql.calcite.utils.OpenSearchTypeFactory.TYPE_FACTORY; +import static org.opensearch.sql.data.type.ExprCoreType.*; + import com.google.common.annotations.VisibleForTesting; +import java.util.ArrayList; import java.util.HashMap; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.function.Function; @@ -17,8 +22,11 @@ import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelOptTable; import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; import org.opensearch.common.unit.TimeValue; import org.opensearch.sql.calcite.plan.AbstractOpenSearchTable; +import org.opensearch.sql.calcite.utils.OpenSearchTypeFactory; import org.opensearch.sql.common.setting.Settings; import org.opensearch.sql.data.type.ExprCoreType; import org.opensearch.sql.data.type.ExprType; @@ -147,6 +155,37 @@ public Map getFieldTypes() { return cachedFieldTypes; } + @Override + public RelDataType getRowType(RelDataTypeFactory relDataTypeFactory) { + LinkedHashMap allFields = + new LinkedHashMap<>(getFieldOpenSearchTypes()); + getReservedFieldTypes().forEach((k, v) -> allFields.put(k, OpenSearchDataType.of(v))); + return constructStructType(allFields); + } + + private RelDataType constructStructType(Map osTypes) { + List fieldNameList = new ArrayList<>(); + List typeList = new ArrayList<>(); + for (Entry entry : osTypes.entrySet()) { + fieldNameList.add(entry.getKey()); + typeList.add(convertOpenSearchDataTypeToRelDataType(entry.getValue())); + } + return TYPE_FACTORY.createStructType(typeList, fieldNameList, true); + } + + private RelDataType convertOpenSearchDataTypeToRelDataType( + OpenSearchDataType openSearchDataType) { + // OpenSearchIndex will handle struct type itself to use Calcite's StructType. Then it can + // support resolving nested fields without relying on schema flattening. + if (openSearchDataType.getExprType().equals(STRUCT) + || openSearchDataType.getExprType().equals(ARRAY)) { + Map internalFields = openSearchDataType.getProperties(); + return constructStructType(internalFields); + } + return OpenSearchTypeFactory.convertExprTypeToRelDataType( + openSearchDataType.getExprType(), true); + } + @Override public Map getReservedFieldTypes() { return METADATAFIELD_TYPE_MAP; @@ -212,11 +251,8 @@ public TableScanBuilder createScanBuilder() { } private OpenSearchExprValueFactory createExprValueFactory() { - Map allFields = new HashMap<>(); - getReservedFieldTypes().forEach((k, v) -> allFields.put(k, OpenSearchDataType.of(v))); - allFields.putAll(getFieldOpenSearchTypes()); return new OpenSearchExprValueFactory( - allFields, settings.getSettingValue(Settings.Key.FIELD_TYPE_TOLERANCE)); + getFieldOpenSearchTypes(), settings.getSettingValue(Settings.Key.FIELD_TYPE_TOLERANCE)); } public boolean isFieldTypeTolerance() { diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteLogicalIndexScan.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteLogicalIndexScan.java index a8edc18735d..280e5b26d8c 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteLogicalIndexScan.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteLogicalIndexScan.java @@ -46,6 +46,8 @@ import org.opensearch.sql.opensearch.data.type.OpenSearchTextType; import org.opensearch.sql.opensearch.planner.physical.EnumerableIndexScanRule; import org.opensearch.sql.opensearch.planner.physical.OpenSearchIndexRules; +import org.opensearch.sql.opensearch.planner.physical.RexPermuteIdentShuttle.Ident; +import org.opensearch.sql.opensearch.planner.physical.RexPermuteIdentShuttle.IdentTargetMapping; import org.opensearch.sql.opensearch.request.AggregateAnalyzer; import org.opensearch.sql.opensearch.request.PredicateAnalyzer; import org.opensearch.sql.opensearch.response.agg.OpenSearchAggregationResponseParser; @@ -126,11 +128,12 @@ public CalciteLogicalIndexScan pushDownFilter(Filter filter) { * When pushing down a project, we need to create a new CalciteLogicalIndexScan with the updated * schema since we cannot override getRowType() which is defined to be final. */ - public CalciteLogicalIndexScan pushDownProject(List selectedColumns) { + public CalciteLogicalIndexScan pushDownProject( + List selectedColumns, IdentTargetMapping mapping) { final RelDataTypeFactory.Builder builder = getCluster().getTypeFactory().builder(); final List fieldList = this.getRowType().getFieldList(); - for (int project : selectedColumns) { - builder.add(fieldList.get(project)); + for (Ident project : selectedColumns) { + builder.add(project.constructFieldDataType(fieldList)); } RelDataType newSchema = builder.build(); @@ -138,7 +141,7 @@ public CalciteLogicalIndexScan pushDownProject(List selectedColumns) { // E.g. When sorting age // `Project(age) - TableScan(schema=[name, age], collation=[$1 ASC])` should become // `TableScan(schema=[age], collation=[$0 ASC])` after pushing down project. - RelTraitSet traitSetWithReIndexedCollations = reIndexCollations(selectedColumns); + RelTraitSet traitSetWithReIndexedCollations = reIndexCollations(mapping); CalciteLogicalIndexScan newScan = new CalciteLogicalIndexScan( @@ -165,16 +168,19 @@ public CalciteLogicalIndexScan pushDownProject(List selectedColumns) { return newScan; } - private RelTraitSet reIndexCollations(List selectedColumns) { + private RelTraitSet reIndexCollations(IdentTargetMapping mapping) { RelTraitSet newTraitSet; RelCollation relCollation = getTraitSet().getCollation(); if (!Objects.isNull(relCollation) && !relCollation.getFieldCollations().isEmpty()) { List newCollations = relCollation.getFieldCollations().stream() - .filter(collation -> selectedColumns.contains(collation.getFieldIndex())) .map( - collation -> - collation.withFieldIndex(selectedColumns.indexOf(collation.getFieldIndex()))) + collation -> { + int targetIndex = + mapping.getTargetByIdent(new Ident(null, collation.getFieldIndex(), null)); + return targetIndex != -1 ? collation.withFieldIndex(targetIndex) : null; + }) + .filter(Objects::nonNull) .collect(Collectors.toList()); newTraitSet = getTraitSet().plus(RelCollations.of(newCollations)); } else { diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/util/JdbcOpenSearchDataTypeConvertor.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/util/JdbcOpenSearchDataTypeConvertor.java index 554e5f4b108..e1bcdc75e2d 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/util/JdbcOpenSearchDataTypeConvertor.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/util/JdbcOpenSearchDataTypeConvertor.java @@ -5,15 +5,15 @@ package org.opensearch.sql.opensearch.util; -import java.sql.Array; -import java.sql.ResultSet; import java.sql.SQLException; +import java.sql.Struct; import java.sql.Types; import java.util.Arrays; +import java.util.LinkedHashMap; import lombok.experimental.UtilityClass; import org.apache.calcite.avatica.util.ArrayImpl; import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.calcite.rel.type.RelDataTypeField; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.locationtech.jts.geom.Point; @@ -22,6 +22,7 @@ import org.opensearch.sql.data.model.ExprNullValue; import org.opensearch.sql.data.model.ExprTimeValue; import org.opensearch.sql.data.model.ExprTimestampValue; +import org.opensearch.sql.data.model.ExprTupleValue; import org.opensearch.sql.data.model.ExprValue; import org.opensearch.sql.data.model.ExprValueUtils; import org.opensearch.sql.data.type.ExprCoreType; @@ -61,77 +62,51 @@ public static ExprType getExprTypeFromSqlType(int sqlType) { } } - public static ExprValue getExprValueFromSqlType( - ResultSet rs, int i, int sqlType, RelDataType fieldType, String fieldName) + public static ExprValue getExprValueFromRelDataType(Object value, RelDataType fieldType) throws SQLException { - Object value = rs.getObject(i); if (value == null) { return ExprNullValue.of(); } if (fieldType instanceof ExprJavaType && value instanceof ExprValue) { return (ExprValue) value; - } else if (fieldType.getSqlTypeName() == SqlTypeName.GEOMETRY) { - // Use getObject by name instead of index to avoid Avatica's transformation on the accessor. - // Otherwise, Avatica will transform Geometry to String. - Point geoPoint = (Point) rs.getObject(fieldName); - return new OpenSearchExprGeoPointValue(geoPoint.getY(), geoPoint.getX()); } try { - switch (sqlType) { - case Types.VARCHAR: - case Types.CHAR: - case Types.LONGVARCHAR: - return ExprValueUtils.fromObjectValue(rs.getString(i)); - - case Types.INTEGER: - return ExprValueUtils.fromObjectValue(rs.getInt(i)); - - case Types.BIGINT: - return ExprValueUtils.fromObjectValue(rs.getLong(i)); - - case Types.FLOAT: - case Types.REAL: - return ExprValueUtils.fromObjectValue(rs.getFloat(i)); - - case Types.DECIMAL: - case Types.NUMERIC: - case Types.DOUBLE: - return ExprValueUtils.fromObjectValue(rs.getDouble(i)); - - case Types.DATE: - String dateStr = rs.getString(i); - return new ExprDateValue(dateStr); - - case Types.TIME: - String timeStr = rs.getString(i); - return new ExprTimeValue(timeStr); - - case Types.TIMESTAMP: - String timestampStr = rs.getString(i); - return new ExprTimestampValue(timestampStr); - - case Types.BOOLEAN: - return ExprValueUtils.fromObjectValue(rs.getBoolean(i)); - - case Types.ARRAY: - Array array = rs.getArray(i); - if (array instanceof ArrayImpl) { + switch (fieldType.getSqlTypeName()) { + case GEOMETRY: + Point geoPoint = (Point) value; + return new OpenSearchExprGeoPointValue(geoPoint.getY(), geoPoint.getX()); + case DATE: + return new ExprDateValue(value.toString()); + case TIME: + return new ExprTimeValue(value.toString()); + case TIMESTAMP: + return new ExprTimestampValue(value.toString()); + case ARRAY: + if (value instanceof ArrayImpl) { return ExprValueUtils.fromObjectValue( Arrays.asList((Object[]) ((ArrayImpl) value).getArray())); } - return ExprValueUtils.fromObjectValue(array); - + return ExprValueUtils.fromObjectValue(value); + case ROW: + Object[] attributes = ((Struct) value).getAttributes(); + java.util.Map tupleMap = new LinkedHashMap<>(); + for (int i = 0; i < attributes.length; i++) { + RelDataTypeField field = fieldType.getFieldList().get(i); + tupleMap.put( + field.getName(), getExprValueFromRelDataType(attributes[i], field.getType())); + } + return ExprTupleValue.fromExprValueMap(tupleMap); default: - LOG.debug( - "Unchecked sql type: {}, return Object type {}", - sqlType, - value.getClass().getTypeName()); return ExprValueUtils.fromObjectValue(value); } } catch (SQLException e) { - LOG.error("Error converting SQL type {}: {}", sqlType, e.getMessage()); + LOG.error( + "Error converting RelDataType {} with field value {}: {}", + fieldType, + value, + e.getMessage()); throw e; } } diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/data/value/OpenSearchExprValueFactoryTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/data/value/OpenSearchExprValueFactoryTest.java index e8588fa778c..c99c3a28cef 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/data/value/OpenSearchExprValueFactoryTest.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/data/value/OpenSearchExprValueFactoryTest.java @@ -996,16 +996,18 @@ public void factoryMappingsAreExtendableWithoutOverWrite() @Test public void testPopulateValueRecursive() { - ExprTupleValue tupleValue = ExprTupleValue.empty(); + ExprTupleValue tupleValue = ExprTupleValue.empty(null); + OpenSearchExprValueFactory factory = new OpenSearchExprValueFactory(null, true); - OpenSearchExprValueFactory.populateValueRecursive( - tupleValue, new JsonPath("log.json.time"), ExprValueUtils.integerValue(100)); + factory.populateValueRecursive( + "", tupleValue, new JsonPath("log.json.time"), ExprValueUtils.integerValue(100)); ExprValue expectedValue = ExprValueUtils.tupleValue( Map.of("log", Map.of("json", new LinkedHashMap<>(Map.of("time", 100))))); assertEquals(expectedValue, tupleValue); - OpenSearchExprValueFactory.populateValueRecursive( + factory.populateValueRecursive( + "", tupleValue, new JsonPath("log.json"), ExprValueUtils.tupleValue(new LinkedHashMap<>(Map.of("status", "SUCCESS")))); @@ -1024,8 +1026,8 @@ public void testPopulateValueRecursive() { assertEquals(expectedValue, tupleValue); // update the conflict value with the latest - OpenSearchExprValueFactory.populateValueRecursive( - tupleValue, new JsonPath("log.json.status"), ExprValueUtils.stringValue("FAILED")); + factory.populateValueRecursive( + "", tupleValue, new JsonPath("log.json.status"), ExprValueUtils.stringValue("FAILED")); expectedValue = ExprValueUtils.tupleValue( Map.of(