Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -250,9 +250,45 @@ public RexNode visitEqualTo(EqualTo node, CalcitePlanContext context) {
return context.rexBuilder.equals(left, right);
}

static final int namespace_max_length = 1;

/** Resolve qualified name. Note, the name should be case-sensitive. */
@Override
public RexNode visitQualifiedName(QualifiedName node, CalcitePlanContext context) {
for (int i = 0; i <= namespace_max_length; ++i) {
RexNode result = visitQualifiedNameInner(node, context, i);
if (result != null) {
return result;
}
}
throw new IllegalArgumentException(
String.format(
"Cannot resolve field [%s]; input fields are: %s",
node, context.relBuilder.peek().getRowType().getFieldNames()));
}

private RexNode visitQualifiedNameInner(
QualifiedName node, CalcitePlanContext context, int namespaceLen) {
int rootIdentSize = namespaceLen + 1;
QualifiedName rootFieldName =
QualifiedName.of(node.getParts().stream().limit(rootIdentSize).toList());
RexNode rootField = visitRootQualifiedName(rootFieldName, context);
if (rootField == null) {
// If the root field is not found, return null
return null;
}
return node.getParts().size() > rootIdentSize
?
// Provide alias for RexFieldAccess, otherwise it will be auto-generated name
context.relBuilder.alias(
node.getParts().stream()
.skip(rootIdentSize)
.reduce(rootField, context.relBuilder::dot, (rex1, rex2) -> rex1),
node.toString())
: rootField;
}

private RexNode visitRootQualifiedName(QualifiedName node, CalcitePlanContext context) {
// 1. resolve QualifiedName in join condition
if (context.isResolvingJoinCondition()) {
List<String> parts = node.getParts();
Expand Down Expand Up @@ -281,14 +317,13 @@ public RexNode visitQualifiedName(QualifiedName node, CalcitePlanContext context
// field("nation2", "n_name"); // do nothing when fail (n_name is field of nation1)
}
}
throw new UnsupportedOperationException("Unsupported qualified name: " + node);
return null;
}
} else if (parts.size() == 3) {
throw new UnsupportedOperationException("Unsupported qualified name: " + node);
return null;
}
}

// TODO: Need to support nested fields https://github.com/opensearch-project/sql/issues/3459
// 2. resolve QualifiedName in non-join condition
String qualifiedName = node.toString();
if (context.getRexLambdaRefMap().containsKey(qualifiedName)) {
Expand Down Expand Up @@ -338,11 +373,8 @@ public RexNode visitQualifiedName(QualifiedName node, CalcitePlanContext context
.peekCorrelVar()
.map(correlVar -> context.relBuilder.field(correlVar, qualifiedName))
.orElseGet(() -> context.relBuilder.field(qualifiedName));
} else {
throw new IllegalArgumentException(
String.format(
"field [%s] not found; input fields are: %s", qualifiedName, currentFields));
}
return null;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,6 @@ public static RelDataType convertExprTypeToRelDataType(ExprType fieldType, boole
return TYPE_FACTORY.createArrayType(
TYPE_FACTORY.createSqlType(SqlTypeName.ANY, nullable), -1);
case STRUCT:
// TODO: should use RelRecordType instead of MapSqlType here
// https://github.com/opensearch-project/sql/issues/3459
final RelDataType relKey = TYPE_FACTORY.createSqlType(SqlTypeName.VARCHAR);
return TYPE_FACTORY.createMapType(
relKey, TYPE_FACTORY.createSqlType(SqlTypeName.BINARY), nullable);
Expand Down Expand Up @@ -243,7 +241,7 @@ public static ExprType convertSqlTypeNameToExprType(SqlTypeName sqlTypeName) {
INTERVAL_MINUTE_SECOND,
INTERVAL_SECOND -> INTERVAL;
case ARRAY -> ARRAY;
case MAP -> STRUCT;
case ROW, MAP -> STRUCT;
case GEOMETRY -> GEO_POINT;
case NULL, ANY, OTHER -> UNDEFINED;
default -> UNKNOWN;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,13 @@

import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import lombok.AllArgsConstructor;
import lombok.RequiredArgsConstructor;
import org.opensearch.sql.data.type.ExprCoreType;
import org.opensearch.sql.data.type.ExprType;
Expand All @@ -19,18 +22,26 @@

/** Expression Tuple Value. */
@RequiredArgsConstructor
@AllArgsConstructor
public class ExprTupleValue extends AbstractExprValue {
// Only used for Calcite to keep the field names in the same order as the schema.
@Nullable private List<String> fieldNames;

private final LinkedHashMap<String, ExprValue> valueMap;

public static ExprTupleValue fromExprValueMap(Map<String, ExprValue> map) {
return fromExprValueMap(null, map);
}

public static ExprTupleValue fromExprValueMap(
List<String> fieldNames, Map<String, ExprValue> map) {
LinkedHashMap<String, ExprValue> linkedHashMap = new LinkedHashMap<>(map);
return new ExprTupleValue(linkedHashMap);
return new ExprTupleValue(fieldNames, linkedHashMap);
}

public static ExprTupleValue empty() {
public static ExprTupleValue empty(List<String> fieldNames) {
LinkedHashMap<String, ExprValue> linkedHashMap = new LinkedHashMap<>();
return new ExprTupleValue(linkedHashMap);
return new ExprTupleValue(fieldNames, linkedHashMap);
}

@Override
Expand All @@ -44,11 +55,10 @@ public Object value() {

@Override
public Object valueForCalcite() {
LinkedHashMap<String, Object> resultMap = new LinkedHashMap<>();
for (Entry<String, ExprValue> entry : valueMap.entrySet()) {
resultMap.put(entry.getKey(), entry.getValue().valueForCalcite());
}
return resultMap;
// Needs to keep the value the same sequence as the schema, and fill up missing values
return fieldNames.stream()
.map(fieldName -> valueMap.getOrDefault(fieldName, ExprMissingValue.of()).valueForCalcite())
.toArray();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,13 @@

package org.opensearch.sql.calcite.remote;

import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_DEEP_NESTED;
import static org.opensearch.sql.util.MatcherUtils.schema;
import static org.opensearch.sql.util.MatcherUtils.verifySchema;

import java.io.IOException;
import org.json.JSONObject;
import org.junit.Test;
import org.opensearch.sql.ppl.ObjectFieldOperateIT;

public class CalciteObjectFieldOperateIT extends ObjectFieldOperateIT {
Expand All @@ -14,4 +21,12 @@ public void init() throws Exception {
enableCalcite();
disallowCalciteFallback();
}

@Test
public void test() throws IOException {
JSONObject result =
executeQuery(
String.format("source=%s | fields city | sort city.name ", TEST_INDEX_DEEP_NESTED));
verifySchema(result, schema("city", "struct"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ public void accept(Map<String, OpenSearchDataType> subtree, String prefix) {
String entryKey = entry.getKey();
var nextPrefix =
prefix.isEmpty() ? entryKey : String.format("%s.%s", prefix, entryKey);
result.put(nextPrefix, entry.getValue().cloneEmpty());
result.put(nextPrefix, entry.getValue());
var nextSubtree = entry.getValue().getProperties();
if (!nextSubtree.isEmpty()) {
accept(nextSubtree, nextPrefix);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,12 +346,13 @@ private static ExprValue createOpenSearchDateType(Content value, ExprType type)
* @return Value parsed from content.
*/
private ExprValue parseStruct(Content content, String prefix, boolean supportArrays) {
ExprTupleValue result = ExprTupleValue.empty();
ExprTupleValue result = ExprTupleValue.empty(getFields(prefix));
content
.map()
.forEachRemaining(
entry ->
populateValueRecursive(
prefix,
result,
new JsonPath(entry.getKey()),
parse(
Expand All @@ -362,6 +363,12 @@ private ExprValue parseStruct(Content content, String prefix, boolean supportArr
return result;
}

private List<String> getFields(String prefix) {
return TOP_PATH.equals(prefix)
? typeMapping.keySet().stream().toList()
: typeMapping.get(prefix).getProperties().keySet().stream().toList();
}

/**
* Populate the current ExprTupleValue recursively.
*
Expand All @@ -370,17 +377,24 @@ private ExprValue parseStruct(Content content, String prefix, boolean supportArr
*
* <p>If there is existing vale for the JsonPath, we need to merge the new value to the old.
*/
static void populateValueRecursive(ExprTupleValue result, JsonPath path, ExprValue value) {
void populateValueRecursive(
String prefix, ExprTupleValue result, JsonPath path, ExprValue value) {
if (path.getPaths().size() == 1) {
// Update the current ExprValue by using mergeTo if exists
result
.tupleValue()
.computeIfPresent(path.getRootPath(), (key, oldValue) -> value.mergeTo(oldValue));
result.tupleValue().putIfAbsent(path.getRootPath(), value);
} else {
result.tupleValue().putIfAbsent(path.getRootPath(), ExprTupleValue.empty());
String newPrefix = prefix + path.getRootPath();
result
.tupleValue()
.putIfAbsent(path.getRootPath(), ExprTupleValue.empty(getFields(newPrefix)));
populateValueRecursive(
(ExprTupleValue) result.tupleValue().get(path.getRootPath()), path.getChildPath(), value);
newPrefix,
(ExprTupleValue) result.tupleValue().get(path.getRootPath()),
path.getChildPath(),
value);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,11 +224,10 @@ private void buildResultSet(
// Loop through each column
for (int i = 1; i <= columnCount; i++) {
String columnName = metaData.getColumnName(i);
int sqlType = metaData.getColumnType(i);
RelDataType fieldType = fieldTypes.get(i - 1);
ExprValue exprValue =
JdbcOpenSearchDataTypeConvertor.getExprValueFromSqlType(
resultSet, i, sqlType, fieldType, columnName);
JdbcOpenSearchDataTypeConvertor.getExprValueFromRelDataType(
getObject(resultSet, i, columnName, fieldType.getSqlTypeName()), fieldType);
row.put(columnName, exprValue);
}
values.add(ExprTupleValue.fromExprValueMap(row));
Expand Down Expand Up @@ -259,6 +258,13 @@ private void buildResultSet(
listener.onResponse(response);
}

private static Object getObject(ResultSet rs, int i, String columnName, SqlTypeName type)
throws SQLException {
// For GEOMETRY, use getObject by name instead of index to avoid Avatica's transformation on
// the accessor. Otherwise, Avatica will transform Geometry to String.
return type == SqlTypeName.GEOMETRY ? rs.getObject(columnName) : rs.getObject(i);
}

/** Registers opensearch-dependent functions */
private void registerOpenSearchFunctions() {
PPLFuncImpTable.FunctionImp geoIpImpl =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,22 @@

import static java.util.Objects.requireNonNull;

import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.plan.RelRule;
import org.apache.calcite.rel.logical.LogicalProject;
import org.apache.calcite.rex.RexFieldAccess;
import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rex.RexUtil;
import org.apache.calcite.rex.RexVisitorImpl;
import org.apache.calcite.util.mapping.Mapping;
import org.apache.calcite.util.mapping.Mappings;
import org.immutables.value.Value;
import org.opensearch.sql.opensearch.planner.physical.RexPermuteIdentShuttle.Ident;
import org.opensearch.sql.opensearch.planner.physical.RexPermuteIdentShuttle.IdentRexVisitorImpl;
import org.opensearch.sql.opensearch.planner.physical.RexPermuteIdentShuttle.IdentTargetMapping;
import org.opensearch.sql.opensearch.storage.OpenSearchIndex;
import org.opensearch.sql.opensearch.storage.scan.CalciteLogicalIndexScan;

Expand Down Expand Up @@ -50,25 +53,38 @@ protected void apply(RelOptRuleCall call, LogicalProject project, CalciteLogical
final RelOptTable table = scan.getTable();
requireNonNull(table.unwrap(OpenSearchIndex.class));

// TODO: support script pushdown for project instead of only reference
// TODO: support script pushdown for project instead of only reference or field access
// https://github.com/opensearch-project/sql/issues/3387
final List<Integer> selectedColumns = new ArrayList<>();
final Set<Ident> selectedColumns = new LinkedHashSet<>();
final IdentRexVisitorImpl identRexVisitor = new IdentRexVisitorImpl();
final RexVisitorImpl<Void> visitor =
new RexVisitorImpl<Void>(true) {
new RexVisitorImpl<>(true) {
@Override
public Void visitInputRef(RexInputRef inputRef) {
if (!selectedColumns.contains(inputRef.getIndex())) {
selectedColumns.add(inputRef.getIndex());
}
Ident ident = new Ident(null, inputRef.getIndex(), null);
selectedColumns.add(ident);
return null;
}

@Override
public Void visitFieldAccess(RexFieldAccess fieldAccess) {
Ident prefix = fieldAccess.getReferenceExpr().accept(identRexVisitor);
Ident ident =
new Ident(prefix, fieldAccess.getField().getIndex(), fieldAccess.getField());
selectedColumns.add(ident);
return null;
}
};
visitor.visitEach(project.getProjects());
// Only do push down when an actual projection happens
// TODO: fix this for FieldAccess
List<Ident> identList = selectedColumns.stream().toList();
if (!selectedColumns.isEmpty() && selectedColumns.size() != scan.getRowType().getFieldCount()) {
Mapping mapping = Mappings.target(selectedColumns, scan.getRowType().getFieldCount());
CalciteLogicalIndexScan newScan = scan.pushDownProject(selectedColumns);
final List<RexNode> newProjectRexNodes = RexUtil.apply(mapping, project.getProjects());
IdentTargetMapping mapping =
IdentTargetMapping.create(identList, scan.getRowType().getFieldCount());
CalciteLogicalIndexScan newScan = scan.pushDownProject(identList, mapping);
final List<RexNode> newProjectRexNodes =
RexPermuteIdentShuttle.of(mapping, identRexVisitor).visitList(project.getProjects());

if (RexUtil.isIdentity(newProjectRexNodes, newScan.getRowType())) {
call.transformTo(newScan);
Expand Down
Loading
Loading