diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/FlussArrayAsPaimonArray.java b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/FlussArrayAsPaimonArray.java new file mode 100644 index 0000000000..37438582ba --- /dev/null +++ b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/FlussArrayAsPaimonArray.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.lake.paimon.source; + +import org.apache.fluss.row.TimestampLtz; +import org.apache.fluss.row.TimestampNtz; + +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.Decimal; +import org.apache.paimon.data.InternalArray; +import org.apache.paimon.data.InternalMap; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.Timestamp; +import org.apache.paimon.data.variant.Variant; +import org.apache.paimon.types.ArrayType; +import org.apache.paimon.types.DataType; + +/** Adapter class for converting Fluss InternalArray to Paimon InternalArray. */ +public class FlussArrayAsPaimonArray implements InternalArray { + + private final org.apache.fluss.row.InternalArray flussArray; + private final DataType elementType; + + public FlussArrayAsPaimonArray( + org.apache.fluss.row.InternalArray flussArray, DataType elementType) { + this.flussArray = flussArray; + this.elementType = elementType; + } + + @Override + public int size() { + return flussArray.size(); + } + + @Override + public boolean isNullAt(int pos) { + return flussArray.isNullAt(pos); + } + + @Override + public boolean getBoolean(int pos) { + return flussArray.getBoolean(pos); + } + + @Override + public byte getByte(int pos) { + return flussArray.getByte(pos); + } + + @Override + public short getShort(int pos) { + return flussArray.getShort(pos); + } + + @Override + public int getInt(int pos) { + return flussArray.getInt(pos); + } + + @Override + public long getLong(int pos) { + return flussArray.getLong(pos); + } + + @Override + public float getFloat(int pos) { + return flussArray.getFloat(pos); + } + + @Override + public double getDouble(int pos) { + return flussArray.getDouble(pos); + } + + @Override + public BinaryString getString(int pos) { + return BinaryString.fromBytes(flussArray.getString(pos).toBytes()); + } + + @Override + public Decimal getDecimal(int pos, int precision, int scale) { + org.apache.fluss.row.Decimal flussDecimal = flussArray.getDecimal(pos, precision, scale); + if (flussDecimal.isCompact()) { + return Decimal.fromUnscaledLong(flussDecimal.toUnscaledLong(), precision, scale); + } else { + return Decimal.fromBigDecimal(flussDecimal.toBigDecimal(), precision, scale); + } + } + + @Override + public Timestamp getTimestamp(int pos, int precision) { + // Default to TIMESTAMP_WITHOUT_TIME_ZONE behavior for arrays + switch (elementType.getTypeRoot()) { + case TIMESTAMP_WITHOUT_TIME_ZONE: + if (TimestampNtz.isCompact(precision)) { + return Timestamp.fromEpochMillis( + flussArray.getTimestampNtz(pos, precision).getMillisecond()); + } else { + TimestampNtz timestampNtz = flussArray.getTimestampNtz(pos, precision); + return Timestamp.fromEpochMillis( + timestampNtz.getMillisecond(), timestampNtz.getNanoOfMillisecond()); + } + + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + if (TimestampLtz.isCompact(precision)) { + return Timestamp.fromEpochMillis( + flussArray.getTimestampLtz(pos, precision).getEpochMillisecond()); + } else { + TimestampLtz timestampLtz = flussArray.getTimestampLtz(pos, precision); + return Timestamp.fromEpochMillis( + timestampLtz.getEpochMillisecond(), + timestampLtz.getNanoOfMillisecond()); + } + default: + throw new UnsupportedOperationException( + "Unsupported array element type for getTimestamp. " + + "Only TIMESTAMP_WITHOUT_TIME_ZONE and " + + "TIMESTAMP_WITH_LOCAL_TIME_ZONE are supported, but got: " + + elementType.getTypeRoot() + + " (" + + elementType + + ")."); + } + } + + @Override + public byte[] getBinary(int pos) { + return flussArray.getBytes(pos); + } + + @Override + public Variant getVariant(int pos) { + throw new UnsupportedOperationException( + "getVariant is not supported for Fluss array currently."); + } + + @Override + public InternalArray getArray(int pos) { + org.apache.fluss.row.InternalArray innerArray = flussArray.getArray(pos); + return innerArray == null + ? null + : new FlussArrayAsPaimonArray( + innerArray, ((ArrayType) elementType).getElementType()); + } + + @Override + public InternalMap getMap(int pos) { + throw new UnsupportedOperationException( + "getMap is not supported for Fluss array currently."); + } + + @Override + public InternalRow getRow(int pos, int numFields) { + throw new UnsupportedOperationException( + "getRow is not supported for Fluss array currently."); + } + + @Override + public boolean[] toBooleanArray() { + return flussArray.toBooleanArray(); + } + + @Override + public byte[] toByteArray() { + return flussArray.toByteArray(); + } + + @Override + public short[] toShortArray() { + return flussArray.toShortArray(); + } + + @Override + public int[] toIntArray() { + return flussArray.toIntArray(); + } + + @Override + public long[] toLongArray() { + return flussArray.toLongArray(); + } + + @Override + public float[] toFloatArray() { + return flussArray.toFloatArray(); + } + + @Override + public double[] toDoubleArray() { + return flussArray.toDoubleArray(); + } +} diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/FlussRowAsPaimonRow.java b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/FlussRowAsPaimonRow.java index 52994569aa..e2df08cc3d 100644 --- a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/FlussRowAsPaimonRow.java +++ b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/FlussRowAsPaimonRow.java @@ -27,6 +27,7 @@ import org.apache.paimon.data.InternalRow; import org.apache.paimon.data.Timestamp; import org.apache.paimon.data.variant.Variant; +import org.apache.paimon.types.ArrayType; import org.apache.paimon.types.DataType; import org.apache.paimon.types.RowKind; import org.apache.paimon.types.RowType; @@ -159,8 +160,12 @@ public Variant getVariant(int i) { @Override public InternalArray getArray(int pos) { - throw new UnsupportedOperationException( - "getArray is not support for Fluss record currently."); + org.apache.fluss.row.InternalArray flussArray = internalRow.getArray(pos); + return flussArray == null + ? null + : new FlussArrayAsPaimonArray( + flussArray, + ((ArrayType) tableRowType.getField(pos).type()).getElementType()); } @Override diff --git a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java index 7c8c2d698d..df57974bff 100644 --- a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java +++ b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java @@ -27,6 +27,7 @@ import org.apache.fluss.metadata.TablePath; import org.apache.fluss.row.BinaryString; import org.apache.fluss.row.Decimal; +import org.apache.fluss.row.GenericArray; import org.apache.fluss.row.GenericRow; import org.apache.fluss.row.InternalRow; import org.apache.fluss.row.TimestampLtz; @@ -132,7 +133,7 @@ void testUnionReadFullType(Boolean isPartitioned) throws Exception { String partitionName = isPartitioned ? waitUntilPartitions(t1).values().iterator().next() : null; if (partitionName != null) { - queryFilterStr = queryFilterStr + " and c16= '" + partitionName + "'"; + queryFilterStr = queryFilterStr + " and c17= '" + partitionName + "'"; } List expectedRows = new ArrayList<>(); @@ -155,6 +156,7 @@ void testUnionReadFullType(Boolean isPartitioned) throws Exception { TimestampNtz.fromMillis(1698235273183L, 0), TimestampNtz.fromMillis(1698235273183L, 6000), new byte[] {1, 2, 3, 4}, + new float[] {1.1f, 1.2f, 1.3f}, partition)); expectedRows.add( @@ -174,6 +176,7 @@ void testUnionReadFullType(Boolean isPartitioned) throws Exception { TimestampNtz.fromMillis(1698235273201L), TimestampNtz.fromMillis(1698235273201L, 6000), new byte[] {1, 2, 3, 4}, + new float[] {1.1f, 1.2f, 1.3f}, partition)); } } else { @@ -195,6 +198,7 @@ void testUnionReadFullType(Boolean isPartitioned) throws Exception { TimestampNtz.fromMillis(1698235273183L, 0), TimestampNtz.fromMillis(1698235273183L, 6000), new byte[] {1, 2, 3, 4}, + new float[] {1.1f, 1.2f, 1.3f}, null), Row.of( true, @@ -212,6 +216,7 @@ void testUnionReadFullType(Boolean isPartitioned) throws Exception { TimestampNtz.fromMillis(1698235273201L), TimestampNtz.fromMillis(1698235273201L, 6000), new byte[] {1, 2, 3, 4}, + new float[] {1.1f, 1.2f, 1.3f}, null)); } tableResult = @@ -225,7 +230,7 @@ void testUnionReadFullType(Boolean isPartitioned) throws Exception { row -> { boolean isMatch = row.getField(3).equals(30); if (partitionName != null) { - isMatch = isMatch && row.getField(15).equals(partitionName); + isMatch = isMatch && row.getField(16).equals(partitionName); } return isMatch; }) @@ -300,6 +305,7 @@ void testUnionReadFullType(Boolean isPartitioned) throws Exception { TimestampNtz.fromMillis(1698235273183L), TimestampNtz.fromMillis(1698235273183L, 6000), new byte[] {1, 2, 3, 4}, + new float[] {1.1f, 1.2f, 1.3f}, partition)); expectedRows.add( @@ -319,6 +325,7 @@ void testUnionReadFullType(Boolean isPartitioned) throws Exception { TimestampNtz.fromMillis(1698235273501L), TimestampNtz.fromMillis(1698235273501L, 8000), new byte[] {5, 6, 7, 8}, + new float[] {2.1f, 2.2f, 2.3f}, partition)); } } else { @@ -340,6 +347,7 @@ void testUnionReadFullType(Boolean isPartitioned) throws Exception { TimestampNtz.fromMillis(1698235273183L), TimestampNtz.fromMillis(1698235273183L, 6000), new byte[] {1, 2, 3, 4}, + new float[] {1.1f, 1.2f, 1.3f}, null), Row.of( true, @@ -357,6 +365,7 @@ void testUnionReadFullType(Boolean isPartitioned) throws Exception { TimestampNtz.fromMillis(1698235273501L), TimestampNtz.fromMillis(1698235273501L, 8000), new byte[] {5, 6, 7, 8}, + new float[] {2.1f, 2.2f, 2.3f}, null)); } @@ -473,6 +482,7 @@ void testUnionReadInStreamMode(Boolean isPartitioned) throws Exception { TimestampNtz.fromMillis(1698235273183L), TimestampNtz.fromMillis(1698235273183L, 6000), new byte[] {1, 2, 3, 4}, + new float[] {1.1f, 1.2f, 1.3f}, partition)); expectedRows.add( Row.of( @@ -491,6 +501,7 @@ void testUnionReadInStreamMode(Boolean isPartitioned) throws Exception { TimestampNtz.fromMillis(1698235273201L), TimestampNtz.fromMillis(1698235273201L, 6000), new byte[] {1, 2, 3, 4}, + new float[] {1.1f, 1.2f, 1.3f}, partition)); } } else { @@ -512,6 +523,7 @@ void testUnionReadInStreamMode(Boolean isPartitioned) throws Exception { TimestampNtz.fromMillis(1698235273183L), TimestampNtz.fromMillis(1698235273183L, 6000), new byte[] {1, 2, 3, 4}, + new float[] {1.1f, 1.2f, 1.3f}, null), Row.of( true, @@ -529,6 +541,7 @@ void testUnionReadInStreamMode(Boolean isPartitioned) throws Exception { TimestampNtz.fromMillis(1698235273201L), TimestampNtz.fromMillis(1698235273201L, 6000), new byte[] {1, 2, 3, 4}, + new float[] {1.1f, 1.2f, 1.3f}, null)); } @@ -571,6 +584,7 @@ void testUnionReadInStreamMode(Boolean isPartitioned) throws Exception { TimestampNtz.fromMillis(1698235273201L), TimestampNtz.fromMillis(1698235273201L, 6000), new byte[] {1, 2, 3, 4}, + new float[] {1.1f, 1.2f, 1.3f}, partition)); expectedRows2.add( Row.ofKind( @@ -590,6 +604,7 @@ void testUnionReadInStreamMode(Boolean isPartitioned) throws Exception { TimestampNtz.fromMillis(1698235273501L), TimestampNtz.fromMillis(1698235273501L, 8000), new byte[] {5, 6, 7, 8}, + new float[] {2.1f, 2.2f, 2.3f}, partition)); } } else { @@ -611,6 +626,7 @@ void testUnionReadInStreamMode(Boolean isPartitioned) throws Exception { TimestampNtz.fromMillis(1698235273201L), TimestampNtz.fromMillis(1698235273201L, 6000), new byte[] {1, 2, 3, 4}, + new float[] {1.1f, 1.2f, 1.3f}, null)); expectedRows2.add( Row.ofKind( @@ -630,6 +646,7 @@ void testUnionReadInStreamMode(Boolean isPartitioned) throws Exception { TimestampNtz.fromMillis(1698235273501L), TimestampNtz.fromMillis(1698235273501L, 8000), new byte[] {5, 6, 7, 8}, + new float[] {2.1f, 2.2f, 2.3f}, null)); } @@ -962,21 +979,21 @@ private String buildExpectedResult(boolean isPartitioned, int record1, int recor + "2023-10-25T12:01:13.182005Z, " + "2023-10-25T12:01:13.183, " + "2023-10-25T12:01:13.183006, " - + "[1, 2, 3, 4], %s]"); + + "[1, 2, 3, 4], [1.1, 1.2, 1.3], %s]"); records.add( "+I[true, 10, 20, 30, 40, 50.1, 60.0, another_string, 0.90, 100, " + "2023-10-25T12:01:13.200Z, " + "2023-10-25T12:01:13.200005Z, " + "2023-10-25T12:01:13.201, " + "2023-10-25T12:01:13.201006, " - + "[1, 2, 3, 4], %s]"); + + "[1, 2, 3, 4], [1.1, 1.2, 1.3], %s]"); records.add( "+I[true, 100, 200, 30, 400, 500.1, 600.0, another_string_2, 9.00, 1000, " + "2023-10-25T12:01:13.400Z, " + "2023-10-25T12:01:13.400007Z, " + "2023-10-25T12:01:13.501, " + "2023-10-25T12:01:13.501008, " - + "[5, 6, 7, 8], %s]"); + + "[5, 6, 7, 8], [2.1, 2.2, 2.3], %s]"); if (isPartitioned) { return String.format( @@ -1012,9 +1029,10 @@ protected long createPkTableFullType(TablePath tablePath, int bucketNum, boolean .column("c13", DataTypes.TIMESTAMP(3)) .column("c14", DataTypes.TIMESTAMP(6)) .column("c15", DataTypes.BINARY(4)) - .column("c16", DataTypes.STRING()); + .column("c16", DataTypes.ARRAY(DataTypes.FLOAT())) + .column("c17", DataTypes.STRING()); - return createPkTable(tablePath, bucketNum, isPartitioned, true, schemaBuilder, "c4", "c16"); + return createPkTable(tablePath, bucketNum, isPartitioned, true, schemaBuilder, "c4", "c17"); } protected long createSimplePkTable( @@ -1079,6 +1097,7 @@ private void writeFullTypeRow(TablePath tablePath, String partition) throws Exce TimestampNtz.fromMillis(1698235273501L), TimestampNtz.fromMillis(1698235273501L, 8000), new byte[] {5, 6, 7, 8}, + new GenericArray(new float[] {2.1f, 2.2f, 2.3f}), partition)); writeRows(tablePath, rows, false); } @@ -1101,6 +1120,7 @@ private static List generateKvRowsFullType(@Nullable String partiti TimestampNtz.fromMillis(1698235273183L), TimestampNtz.fromMillis(1698235273183L, 6000), new byte[] {1, 2, 3, 4}, + new GenericArray(new float[] {1.1f, 1.2f, 1.3f}), partition), row( true, @@ -1118,6 +1138,7 @@ private static List generateKvRowsFullType(@Nullable String partiti TimestampNtz.fromMillis(1698235273201L), TimestampNtz.fromMillis(1698235273201L, 6000), new byte[] {1, 2, 3, 4}, + new GenericArray(new float[] {1.1f, 1.2f, 1.3f}), partition)); } } diff --git a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/source/FlussRowAsPaimonRowTest.java b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/source/FlussRowAsPaimonRowTest.java index c41b28dad1..fd9bfdeb23 100644 --- a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/source/FlussRowAsPaimonRowTest.java +++ b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/source/FlussRowAsPaimonRowTest.java @@ -24,10 +24,12 @@ import org.apache.fluss.record.LogRecord; import org.apache.fluss.row.BinaryString; import org.apache.fluss.row.Decimal; +import org.apache.fluss.row.GenericArray; import org.apache.fluss.row.GenericRow; import org.apache.fluss.row.TimestampLtz; import org.apache.fluss.row.TimestampNtz; +import org.apache.paimon.data.InternalArray; import org.apache.paimon.types.RowKind; import org.apache.paimon.types.RowType; import org.junit.jupiter.api.Test; @@ -140,4 +142,195 @@ void testPrimaryKeyTableRecord() { assertThat(new FlussRowAsPaimonRow(logRecord.getRow(), tableRowType).getRowKind()) .isEqualTo(RowKind.INSERT); } + + @Test + void testArrayWithAllTypes() { + RowType tableRowType = + RowType.of( + new org.apache.paimon.types.ArrayType( + new org.apache.paimon.types.BooleanType()), + new org.apache.paimon.types.ArrayType( + new org.apache.paimon.types.TinyIntType()), + new org.apache.paimon.types.ArrayType( + new org.apache.paimon.types.SmallIntType()), + new org.apache.paimon.types.ArrayType( + new org.apache.paimon.types.IntType()), + new org.apache.paimon.types.ArrayType( + new org.apache.paimon.types.BigIntType()), + new org.apache.paimon.types.ArrayType( + new org.apache.paimon.types.FloatType()), + new org.apache.paimon.types.ArrayType( + new org.apache.paimon.types.DoubleType()), + new org.apache.paimon.types.ArrayType( + new org.apache.paimon.types.VarCharType(true, 30)), + new org.apache.paimon.types.ArrayType( + new org.apache.paimon.types.DecimalType(10, 2)), + new org.apache.paimon.types.ArrayType( + new org.apache.paimon.types.TimestampType(3)), + new org.apache.paimon.types.ArrayType( + new org.apache.paimon.types.LocalZonedTimestampType()), + new org.apache.paimon.types.ArrayType( + new org.apache.paimon.types.VarBinaryType()), + // array> + new org.apache.paimon.types.ArrayType( + new org.apache.paimon.types.ArrayType( + new org.apache.paimon.types.IntType()))); + + long logOffset = 0; + long timeStamp = System.currentTimeMillis(); + GenericRow genericRow = new GenericRow(13); + genericRow.setField(0, new GenericArray(new boolean[] {true, false, true})); + genericRow.setField(1, new GenericArray(new byte[] {1, 2, 3})); + genericRow.setField(2, new GenericArray(new short[] {100, 200, 300})); + genericRow.setField(3, new GenericArray(new Object[] {1000, 2000, 3000})); + genericRow.setField(4, new GenericArray(new long[] {10000L, 20000L, 30000L})); + genericRow.setField(5, new GenericArray(new float[] {1.1f, 2.2f, 3.3f})); + genericRow.setField(6, new GenericArray(new double[] {1.11, 2.22, 3.33})); + // String type + genericRow.setField( + 7, + new GenericArray( + new BinaryString[] { + BinaryString.fromString("hello"), + BinaryString.fromString("world"), + BinaryString.fromString("test") + })); + + // Decimal type + genericRow.setField( + 8, + new GenericArray( + new Object[] { + Decimal.fromBigDecimal(new BigDecimal("123.45"), 10, 2), + Decimal.fromBigDecimal(new BigDecimal("678.90"), 10, 2), + Decimal.fromBigDecimal(new BigDecimal("999.99"), 10, 2) + })); + + // Timestamp type + genericRow.setField( + 9, + new GenericArray( + new Object[] { + TimestampNtz.fromMillis(1698235273182L), + TimestampNtz.fromMillis(1698235274000L), + TimestampNtz.fromMillis(1698235275000L) + })); + + // TimestampLTZ type + genericRow.setField( + 10, + new GenericArray( + new Object[] { + TimestampLtz.fromEpochMillis(1698235273182L), + TimestampLtz.fromEpochMillis(1698235274000L), + TimestampLtz.fromEpochMillis(1698235275000L) + })); + + // Binary type + genericRow.setField( + 11, + new GenericArray( + new Object[] { + new byte[] {1, 2, 3}, + new byte[] {4, 5, 6, 7}, + new byte[] {8, 9, 10, 11, 12} + })); + + // array> type + genericRow.setField( + 12, + new GenericArray( + new Object[] { + new GenericArray(new int[] {1, 2}), + new GenericArray(new int[] {3, 4, 5}) + })); + + LogRecord logRecord = new GenericRecord(logOffset, timeStamp, APPEND_ONLY, genericRow); + FlussRowAsPaimonRow flussRowAsPaimonRow = + new FlussRowAsPaimonRow(logRecord.getRow(), tableRowType); + + // Test boolean array + InternalArray boolArray = flussRowAsPaimonRow.getArray(0); + assertThat(boolArray.size()).isEqualTo(3); + assertThat(boolArray.toBooleanArray()).isEqualTo(new boolean[] {true, false, true}); + + // Test byte array + InternalArray byteArray = flussRowAsPaimonRow.getArray(1); + assertThat(byteArray.size()).isEqualTo(3); + assertThat(byteArray.toByteArray()).isEqualTo(new byte[] {1, 2, 3}); + + // Test short array + InternalArray shortArray = flussRowAsPaimonRow.getArray(2); + assertThat(shortArray.size()).isEqualTo(3); + assertThat(shortArray.toShortArray()).isEqualTo(new short[] {100, 200, 300}); + + // Test int array + InternalArray intArray = flussRowAsPaimonRow.getArray(3); + assertThat(intArray.size()).isEqualTo(3); + assertThat(intArray.toIntArray()).isEqualTo(new int[] {1000, 2000, 3000}); + + // Test long array + InternalArray longArray = flussRowAsPaimonRow.getArray(4); + assertThat(longArray.size()).isEqualTo(3); + assertThat(longArray.toLongArray()).isEqualTo(new long[] {10000L, 20000L, 30000L}); + + // Test float array + InternalArray floatArray = flussRowAsPaimonRow.getArray(5); + assertThat(floatArray.size()).isEqualTo(3); + assertThat(floatArray.toFloatArray()).isEqualTo(new float[] {1.1f, 2.2f, 3.3f}); + + // Test double array + InternalArray doubleArray = flussRowAsPaimonRow.getArray(6); + assertThat(doubleArray.size()).isEqualTo(3); + assertThat(doubleArray.toDoubleArray()).isEqualTo(new double[] {1.11, 2.22, 3.33}); + + // Test string array + InternalArray stringArray = flussRowAsPaimonRow.getArray(7); + assertThat(stringArray.size()).isEqualTo(3); + assertThat(stringArray.getString(0).toString()).isEqualTo("hello"); + assertThat(stringArray.getString(1).toString()).isEqualTo("world"); + assertThat(stringArray.getString(2).toString()).isEqualTo("test"); + + // Test decimal array + InternalArray decimalArray = flussRowAsPaimonRow.getArray(8); + assertThat(decimalArray.size()).isEqualTo(3); + assertThat(decimalArray.getDecimal(0, 10, 2).toBigDecimal()) + .isEqualTo(new BigDecimal("123.45")); + assertThat(decimalArray.getDecimal(1, 10, 2).toBigDecimal()) + .isEqualTo(new BigDecimal("678.90")); + assertThat(decimalArray.getDecimal(2, 10, 2).toBigDecimal()) + .isEqualTo(new BigDecimal("999.99")); + + // Test timestamp array + InternalArray timestampArray = flussRowAsPaimonRow.getArray(9); + assertThat(timestampArray.size()).isEqualTo(3); + assertThat(timestampArray.getTimestamp(0, 3).getMillisecond()).isEqualTo(1698235273182L); + assertThat(timestampArray.getTimestamp(1, 3).getMillisecond()).isEqualTo(1698235274000L); + assertThat(timestampArray.getTimestamp(2, 3).getMillisecond()).isEqualTo(1698235275000L); + + // test timestamp_ltz array + timestampArray = flussRowAsPaimonRow.getArray(10); + assertThat(timestampArray.size()).isEqualTo(3); + assertThat(timestampArray.getTimestamp(0, 3).getMillisecond()).isEqualTo(1698235273182L); + assertThat(timestampArray.getTimestamp(1, 3).getMillisecond()).isEqualTo(1698235274000L); + assertThat(timestampArray.getTimestamp(2, 3).getMillisecond()).isEqualTo(1698235275000L); + + // Test binary array + InternalArray binaryArray = flussRowAsPaimonRow.getArray(11); + assertThat(binaryArray.size()).isEqualTo(3); + assertThat(binaryArray.getBinary(0)).isEqualTo(new byte[] {1, 2, 3}); + assertThat(binaryArray.getBinary(1)).isEqualTo(new byte[] {4, 5, 6, 7}); + assertThat(binaryArray.getBinary(2)).isEqualTo(new byte[] {8, 9, 10, 11, 12}); + + // Also test array> (nested int array) + InternalArray outerArray = flussRowAsPaimonRow.getArray(12); + assertThat(outerArray).isNotNull(); + assertThat(outerArray.size()).isEqualTo(2); + + InternalArray innerArray1 = outerArray.getArray(0); + assertThat(innerArray1.toIntArray()).isEqualTo(new int[] {1, 2}); + + InternalArray innerArray2 = outerArray.getArray(1); + assertThat(innerArray2.toIntArray()).isEqualTo(new int[] {3, 4, 5}); + } } diff --git a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/FlussRecordAsPaimonRowTest.java b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/FlussRecordAsPaimonRowTest.java index e6aa2ce84b..79b548966c 100644 --- a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/FlussRecordAsPaimonRowTest.java +++ b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/FlussRecordAsPaimonRowTest.java @@ -21,10 +21,12 @@ import org.apache.fluss.record.LogRecord; import org.apache.fluss.row.BinaryString; import org.apache.fluss.row.Decimal; +import org.apache.fluss.row.GenericArray; import org.apache.fluss.row.GenericRow; import org.apache.fluss.row.TimestampLtz; import org.apache.fluss.row.TimestampNtz; +import org.apache.paimon.data.InternalArray; import org.apache.paimon.data.Timestamp; import org.apache.paimon.types.RowKind; import org.apache.paimon.types.RowType; @@ -163,4 +165,384 @@ void testPrimaryKeyTableRecord() { flussRecordAsPaimonRow.setFlussRecord(logRecord); assertThat(flussRecordAsPaimonRow.getRowKind()).isEqualTo(RowKind.DELETE); } + + @Test + void testArrayTypeWithIntElements() { + int tableBucket = 0; + RowType tableRowType = + RowType.of( + new org.apache.paimon.types.IntType(), + new org.apache.paimon.types.ArrayType( + new org.apache.paimon.types.IntType()), + // system columns: __bucket, __offset, __timestamp + new org.apache.paimon.types.IntType(), + new org.apache.paimon.types.BigIntType(), + new org.apache.paimon.types.LocalZonedTimestampType(3)); + + FlussRecordAsPaimonRow flussRecordAsPaimonRow = + new FlussRecordAsPaimonRow(tableBucket, tableRowType); + long logOffset = 10; + long timeStamp = System.currentTimeMillis(); + GenericRow genericRow = new GenericRow(2); + genericRow.setField(0, 42); + genericRow.setField(1, new GenericArray(new int[] {1, 2, 3, 4, 5})); + + LogRecord logRecord = new GenericRecord(logOffset, timeStamp, APPEND_ONLY, genericRow); + flussRecordAsPaimonRow.setFlussRecord(logRecord); + + assertThat(flussRecordAsPaimonRow.getInt(0)).isEqualTo(42); + InternalArray array = flussRecordAsPaimonRow.getArray(1); + assertThat(array).isNotNull(); + assertThat(array.size()).isEqualTo(5); + assertThat(array.getInt(0)).isEqualTo(1); + assertThat(array.getInt(1)).isEqualTo(2); + assertThat(array.getInt(2)).isEqualTo(3); + assertThat(array.getInt(3)).isEqualTo(4); + assertThat(array.getInt(4)).isEqualTo(5); + + // Verify system columns are still accessible + assertThat(flussRecordAsPaimonRow.getInt(2)).isEqualTo(tableBucket); + assertThat(flussRecordAsPaimonRow.getLong(3)).isEqualTo(logOffset); + assertThat(flussRecordAsPaimonRow.getLong(4)).isEqualTo(timeStamp); + } + + @Test + void testArrayTypeWithStringElements() { + int tableBucket = 1; + RowType tableRowType = + RowType.of( + new org.apache.paimon.types.ArrayType( + new org.apache.paimon.types.VarCharType()), + // system columns: __bucket, __offset, __timestamp + new org.apache.paimon.types.IntType(), + new org.apache.paimon.types.BigIntType(), + new org.apache.paimon.types.LocalZonedTimestampType(3)); + + FlussRecordAsPaimonRow flussRecordAsPaimonRow = + new FlussRecordAsPaimonRow(tableBucket, tableRowType); + long logOffset = 5; + long timeStamp = System.currentTimeMillis(); + GenericRow genericRow = new GenericRow(1); + genericRow.setField( + 0, + new GenericArray( + new Object[] { + BinaryString.fromString("hello"), BinaryString.fromString("world") + })); + + LogRecord logRecord = new GenericRecord(logOffset, timeStamp, INSERT, genericRow); + flussRecordAsPaimonRow.setFlussRecord(logRecord); + + InternalArray array = flussRecordAsPaimonRow.getArray(0); + assertThat(array).isNotNull(); + assertThat(array.size()).isEqualTo(2); + assertThat(array.getString(0).toString()).isEqualTo("hello"); + assertThat(array.getString(1).toString()).isEqualTo("world"); + } + + @Test + void testNestedArrayType() { + int tableBucket = 0; + RowType tableRowType = + RowType.of( + new org.apache.paimon.types.ArrayType( + new org.apache.paimon.types.ArrayType( + new org.apache.paimon.types.IntType())), + // system columns: __bucket, __offset, __timestamp + new org.apache.paimon.types.IntType(), + new org.apache.paimon.types.BigIntType(), + new org.apache.paimon.types.LocalZonedTimestampType(3)); + + FlussRecordAsPaimonRow flussRecordAsPaimonRow = + new FlussRecordAsPaimonRow(tableBucket, tableRowType); + long logOffset = 0; + long timeStamp = System.currentTimeMillis(); + GenericRow genericRow = new GenericRow(1); + genericRow.setField( + 0, + new GenericArray( + new Object[] { + new GenericArray(new int[] {1, 2}), + new GenericArray(new int[] {3, 4, 5}) + })); + + LogRecord logRecord = new GenericRecord(logOffset, timeStamp, APPEND_ONLY, genericRow); + flussRecordAsPaimonRow.setFlussRecord(logRecord); + + InternalArray outerArray = flussRecordAsPaimonRow.getArray(0); + assertThat(outerArray).isNotNull(); + assertThat(outerArray.size()).isEqualTo(2); + + InternalArray innerArray1 = outerArray.getArray(0); + assertThat(innerArray1.size()).isEqualTo(2); + assertThat(innerArray1.getInt(0)).isEqualTo(1); + assertThat(innerArray1.getInt(1)).isEqualTo(2); + + InternalArray innerArray2 = outerArray.getArray(1); + assertThat(innerArray2.size()).isEqualTo(3); + assertThat(innerArray2.getInt(0)).isEqualTo(3); + } + + @Test + void testArrayWithAllPrimitiveTypes() { + int tableBucket = 0; + RowType tableRowType = + RowType.of( + new org.apache.paimon.types.ArrayType( + new org.apache.paimon.types.BooleanType()), + new org.apache.paimon.types.ArrayType( + new org.apache.paimon.types.TinyIntType()), + new org.apache.paimon.types.ArrayType( + new org.apache.paimon.types.SmallIntType()), + new org.apache.paimon.types.ArrayType( + new org.apache.paimon.types.IntType()), + new org.apache.paimon.types.ArrayType( + new org.apache.paimon.types.BigIntType()), + new org.apache.paimon.types.ArrayType( + new org.apache.paimon.types.FloatType()), + new org.apache.paimon.types.ArrayType( + new org.apache.paimon.types.DoubleType()), + // system columns: __bucket, __offset, __timestamp + new org.apache.paimon.types.IntType(), + new org.apache.paimon.types.BigIntType(), + new org.apache.paimon.types.LocalZonedTimestampType(3)); + + FlussRecordAsPaimonRow flussRecordAsPaimonRow = + new FlussRecordAsPaimonRow(tableBucket, tableRowType); + long logOffset = 0; + long timeStamp = System.currentTimeMillis(); + GenericRow genericRow = new GenericRow(7); + genericRow.setField(0, new GenericArray(new boolean[] {true, false, true})); + genericRow.setField(1, new GenericArray(new byte[] {1, 2, 3})); + genericRow.setField(2, new GenericArray(new short[] {100, 200, 300})); + genericRow.setField(3, new GenericArray(new int[] {1000, 2000, 3000})); + genericRow.setField(4, new GenericArray(new long[] {10000L, 20000L, 30000L})); + genericRow.setField(5, new GenericArray(new float[] {1.1f, 2.2f, 3.3f})); + genericRow.setField(6, new GenericArray(new double[] {1.11, 2.22, 3.33})); + + LogRecord logRecord = new GenericRecord(logOffset, timeStamp, APPEND_ONLY, genericRow); + flussRecordAsPaimonRow.setFlussRecord(logRecord); + + // Test boolean array + InternalArray boolArray = flussRecordAsPaimonRow.getArray(0); + assertThat(boolArray.size()).isEqualTo(3); + assertThat(boolArray.getBoolean(0)).isTrue(); + assertThat(boolArray.getBoolean(1)).isFalse(); + assertThat(boolArray.getBoolean(2)).isTrue(); + assertThat(boolArray.toBooleanArray()).isEqualTo(new boolean[] {true, false, true}); + + // Test byte array + InternalArray byteArray = flussRecordAsPaimonRow.getArray(1); + assertThat(byteArray.size()).isEqualTo(3); + assertThat(byteArray.getByte(0)).isEqualTo((byte) 1); + assertThat(byteArray.getByte(1)).isEqualTo((byte) 2); + assertThat(byteArray.toByteArray()).isEqualTo(new byte[] {1, 2, 3}); + + // Test short array + InternalArray shortArray = flussRecordAsPaimonRow.getArray(2); + assertThat(shortArray.size()).isEqualTo(3); + assertThat(shortArray.getShort(0)).isEqualTo((short) 100); + assertThat(shortArray.toShortArray()).isEqualTo(new short[] {100, 200, 300}); + + // Test int array + InternalArray intArray = flussRecordAsPaimonRow.getArray(3); + assertThat(intArray.toIntArray()).isEqualTo(new int[] {1000, 2000, 3000}); + + // Test long array + InternalArray longArray = flussRecordAsPaimonRow.getArray(4); + assertThat(longArray.getLong(0)).isEqualTo(10000L); + assertThat(longArray.toLongArray()).isEqualTo(new long[] {10000L, 20000L, 30000L}); + + // Test float array + InternalArray floatArray = flussRecordAsPaimonRow.getArray(5); + assertThat(floatArray.getFloat(0)).isEqualTo(1.1f); + assertThat(floatArray.toFloatArray()).isEqualTo(new float[] {1.1f, 2.2f, 3.3f}); + + // Test double array + InternalArray doubleArray = flussRecordAsPaimonRow.getArray(6); + assertThat(doubleArray.getDouble(0)).isEqualTo(1.11); + assertThat(doubleArray.toDoubleArray()).isEqualTo(new double[] {1.11, 2.22, 3.33}); + + // Verify system columns + assertThat(flussRecordAsPaimonRow.getInt(7)).isEqualTo(tableBucket); + assertThat(flussRecordAsPaimonRow.getLong(8)).isEqualTo(logOffset); + } + + @Test + void testArrayWithDecimalElements() { + int tableBucket = 0; + RowType tableRowType = + RowType.of( + new org.apache.paimon.types.ArrayType( + new org.apache.paimon.types.DecimalType(10, 2)), + // system columns + new org.apache.paimon.types.IntType(), + new org.apache.paimon.types.BigIntType(), + new org.apache.paimon.types.LocalZonedTimestampType(3)); + + FlussRecordAsPaimonRow flussRecordAsPaimonRow = + new FlussRecordAsPaimonRow(tableBucket, tableRowType); + long logOffset = 0; + long timeStamp = System.currentTimeMillis(); + GenericRow genericRow = new GenericRow(1); + genericRow.setField( + 0, + new GenericArray( + new Object[] { + Decimal.fromBigDecimal(new BigDecimal("123.45"), 10, 2), + Decimal.fromBigDecimal(new BigDecimal("678.90"), 10, 2) + })); + + LogRecord logRecord = new GenericRecord(logOffset, timeStamp, APPEND_ONLY, genericRow); + flussRecordAsPaimonRow.setFlussRecord(logRecord); + + InternalArray array = flussRecordAsPaimonRow.getArray(0); + assertThat(array.size()).isEqualTo(2); + assertThat(array.getDecimal(0, 10, 2).toBigDecimal()).isEqualTo(new BigDecimal("123.45")); + assertThat(array.getDecimal(1, 10, 2).toBigDecimal()).isEqualTo(new BigDecimal("678.90")); + } + + @Test + void testArrayWithTimestampElements() { + int tableBucket = 0; + RowType tableRowType = + RowType.of( + new org.apache.paimon.types.ArrayType( + new org.apache.paimon.types.TimestampType(3)), + // system columns + new org.apache.paimon.types.IntType(), + new org.apache.paimon.types.BigIntType(), + new org.apache.paimon.types.LocalZonedTimestampType(3)); + + FlussRecordAsPaimonRow flussRecordAsPaimonRow = + new FlussRecordAsPaimonRow(tableBucket, tableRowType); + long logOffset = 0; + long timeStamp = System.currentTimeMillis(); + GenericRow genericRow = new GenericRow(1); + genericRow.setField( + 0, + new GenericArray( + new Object[] { + TimestampNtz.fromMillis(1698235273182L), + TimestampNtz.fromMillis(1698235274000L) + })); + + LogRecord logRecord = new GenericRecord(logOffset, timeStamp, APPEND_ONLY, genericRow); + flussRecordAsPaimonRow.setFlussRecord(logRecord); + + InternalArray array = flussRecordAsPaimonRow.getArray(0); + assertThat(array.size()).isEqualTo(2); + assertThat(array.getTimestamp(0, 3).getMillisecond()).isEqualTo(1698235273182L); + assertThat(array.getTimestamp(1, 3).getMillisecond()).isEqualTo(1698235274000L); + } + + @Test + void testArrayWithBinaryElements() { + int tableBucket = 0; + RowType tableRowType = + RowType.of( + new org.apache.paimon.types.ArrayType( + new org.apache.paimon.types.VarBinaryType()), + // system columns + new org.apache.paimon.types.IntType(), + new org.apache.paimon.types.BigIntType(), + new org.apache.paimon.types.LocalZonedTimestampType(3)); + + FlussRecordAsPaimonRow flussRecordAsPaimonRow = + new FlussRecordAsPaimonRow(tableBucket, tableRowType); + long logOffset = 0; + long timeStamp = System.currentTimeMillis(); + GenericRow genericRow = new GenericRow(1); + genericRow.setField( + 0, new GenericArray(new Object[] {new byte[] {1, 2, 3}, new byte[] {4, 5, 6, 7}})); + + LogRecord logRecord = new GenericRecord(logOffset, timeStamp, APPEND_ONLY, genericRow); + flussRecordAsPaimonRow.setFlussRecord(logRecord); + + InternalArray array = flussRecordAsPaimonRow.getArray(0); + assertThat(array.size()).isEqualTo(2); + assertThat(array.getBinary(0)).isEqualTo(new byte[] {1, 2, 3}); + assertThat(array.getBinary(1)).isEqualTo(new byte[] {4, 5, 6, 7}); + } + + @Test + void testNullArray() { + int tableBucket = 0; + RowType tableRowType = + RowType.of( + new org.apache.paimon.types.ArrayType(new org.apache.paimon.types.IntType()) + .nullable(), + // system columns + new org.apache.paimon.types.IntType(), + new org.apache.paimon.types.BigIntType(), + new org.apache.paimon.types.LocalZonedTimestampType(3)); + + FlussRecordAsPaimonRow flussRecordAsPaimonRow = + new FlussRecordAsPaimonRow(tableBucket, tableRowType); + long logOffset = 0; + long timeStamp = System.currentTimeMillis(); + GenericRow genericRow = new GenericRow(1); + genericRow.setField(0, null); + + LogRecord logRecord = new GenericRecord(logOffset, timeStamp, APPEND_ONLY, genericRow); + flussRecordAsPaimonRow.setFlussRecord(logRecord); + + assertThat(flussRecordAsPaimonRow.isNullAt(0)).isTrue(); + } + + @Test + void testArrayWithNullableElements() { + int tableBucket = 0; + RowType tableRowType = + RowType.of( + new org.apache.paimon.types.ArrayType( + new org.apache.paimon.types.IntType().nullable()), + // system columns + new org.apache.paimon.types.IntType(), + new org.apache.paimon.types.BigIntType(), + new org.apache.paimon.types.LocalZonedTimestampType(3)); + + FlussRecordAsPaimonRow flussRecordAsPaimonRow = + new FlussRecordAsPaimonRow(tableBucket, tableRowType); + long logOffset = 0; + long timeStamp = System.currentTimeMillis(); + GenericRow genericRow = new GenericRow(1); + genericRow.setField(0, new GenericArray(new Object[] {1, null, 3})); + + LogRecord logRecord = new GenericRecord(logOffset, timeStamp, APPEND_ONLY, genericRow); + flussRecordAsPaimonRow.setFlussRecord(logRecord); + + InternalArray array = flussRecordAsPaimonRow.getArray(0); + assertThat(array.size()).isEqualTo(3); + assertThat(array.getInt(0)).isEqualTo(1); + assertThat(array.isNullAt(1)).isTrue(); + assertThat(array.getInt(2)).isEqualTo(3); + } + + @Test + void testEmptyArray() { + int tableBucket = 0; + RowType tableRowType = + RowType.of( + new org.apache.paimon.types.ArrayType( + new org.apache.paimon.types.IntType()), + // system columns + new org.apache.paimon.types.IntType(), + new org.apache.paimon.types.BigIntType(), + new org.apache.paimon.types.LocalZonedTimestampType(3)); + + FlussRecordAsPaimonRow flussRecordAsPaimonRow = + new FlussRecordAsPaimonRow(tableBucket, tableRowType); + long logOffset = 0; + long timeStamp = System.currentTimeMillis(); + GenericRow genericRow = new GenericRow(1); + genericRow.setField(0, new GenericArray(new int[] {})); + + LogRecord logRecord = new GenericRecord(logOffset, timeStamp, APPEND_ONLY, genericRow); + flussRecordAsPaimonRow.setFlussRecord(logRecord); + + InternalArray array = flussRecordAsPaimonRow.getArray(0); + assertThat(array).isNotNull(); + assertThat(array.size()).isEqualTo(0); + } } diff --git a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/utils/PaimonRowAsFlussRowTest.java b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/utils/PaimonRowAsFlussRowTest.java new file mode 100644 index 0000000000..3c1e2c5519 --- /dev/null +++ b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/utils/PaimonRowAsFlussRowTest.java @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.lake.paimon.utils; + +import org.apache.fluss.row.InternalArray; + +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.Decimal; +import org.apache.paimon.data.GenericArray; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.Timestamp; +import org.junit.jupiter.api.Test; + +import java.math.BigDecimal; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link PaimonRowAsFlussRow}. */ +class PaimonRowAsFlussRowTest { + + @Test + void testArrayWithAllTypes() { + GenericRow paimonRow = new GenericRow(16); + // Primitive types + paimonRow.setField(0, new GenericArray(new boolean[] {true, false, true})); + paimonRow.setField(1, new GenericArray(new byte[] {1, 2, 3})); + paimonRow.setField(2, new GenericArray(new short[] {100, 200, 300})); + paimonRow.setField(3, new GenericArray(new int[] {1000, 2000, 3000})); + paimonRow.setField(4, new GenericArray(new long[] {10000L, 20000L, 30000L})); + paimonRow.setField(5, new GenericArray(new float[] {1.1f, 2.2f, 3.3f})); + paimonRow.setField(6, new GenericArray(new double[] {1.11, 2.22, 3.33})); + // String type + paimonRow.setField( + 7, + new GenericArray( + new BinaryString[] { + BinaryString.fromString("hello"), + BinaryString.fromString("world"), + BinaryString.fromString("test") + })); + // Decimal type + paimonRow.setField( + 8, + new GenericArray( + new Object[] { + Decimal.fromBigDecimal(new BigDecimal("123.45"), 10, 2), + Decimal.fromBigDecimal(new BigDecimal("678.90"), 10, 2), + Decimal.fromBigDecimal(new BigDecimal("999.99"), 10, 2) + })); + // Timestamp type + paimonRow.setField( + 9, + new GenericArray( + new Object[] { + Timestamp.fromEpochMillis(1698235273182L), + Timestamp.fromEpochMillis(1698235274000L), + Timestamp.fromEpochMillis(1698235275000L) + })); + // TimestampLTZ type + paimonRow.setField( + 10, + new GenericArray( + new Object[] { + Timestamp.fromEpochMillis(1698235273182L), + Timestamp.fromEpochMillis(1698235274000L), + Timestamp.fromEpochMillis(1698235275000L) + })); + // Binary type + paimonRow.setField( + 11, + new GenericArray( + new Object[] { + new byte[] {1, 2, 3}, + new byte[] {4, 5, 6, 7}, + new byte[] {8, 9, 10, 11, 12} + })); + // array> type + paimonRow.setField( + 12, + new GenericArray( + new Object[] { + new GenericArray(new int[] {1, 2}), + new GenericArray(new int[] {3, 4, 5}) + })); + // System columns: __bucket, __offset, __timestamp + paimonRow.setField(13, 0); + paimonRow.setField(14, 0L); + paimonRow.setField(15, 0L); + + PaimonRowAsFlussRow flussRow = new PaimonRowAsFlussRow(paimonRow); + + // Test boolean array + InternalArray boolArray = flussRow.getArray(0); + assertThat(boolArray.size()).isEqualTo(3); + assertThat(boolArray.toBooleanArray()).isEqualTo(new boolean[] {true, false, true}); + + // Test byte array + InternalArray byteArray = flussRow.getArray(1); + assertThat(byteArray.size()).isEqualTo(3); + assertThat(byteArray.toByteArray()).isEqualTo(new byte[] {1, 2, 3}); + + // Test short array + InternalArray shortArray = flussRow.getArray(2); + assertThat(shortArray.size()).isEqualTo(3); + assertThat(shortArray.toShortArray()).isEqualTo(new short[] {100, 200, 300}); + + // Test int array + InternalArray intArray = flussRow.getArray(3); + assertThat(intArray.size()).isEqualTo(3); + assertThat(intArray.toIntArray()).isEqualTo(new int[] {1000, 2000, 3000}); + + // Test long array + InternalArray longArray = flussRow.getArray(4); + assertThat(longArray.size()).isEqualTo(3); + assertThat(longArray.toLongArray()).isEqualTo(new long[] {10000L, 20000L, 30000L}); + + // Test float array + InternalArray floatArray = flussRow.getArray(5); + assertThat(floatArray.size()).isEqualTo(3); + assertThat(floatArray.toFloatArray()).isEqualTo(new float[] {1.1f, 2.2f, 3.3f}); + + // Test double array + InternalArray doubleArray = flussRow.getArray(6); + assertThat(doubleArray.size()).isEqualTo(3); + assertThat(doubleArray.toDoubleArray()).isEqualTo(new double[] {1.11, 2.22, 3.33}); + + // Test string array + InternalArray stringArray = flussRow.getArray(7); + assertThat(stringArray.size()).isEqualTo(3); + assertThat(stringArray.getString(0).toString()).isEqualTo("hello"); + assertThat(stringArray.getString(1).toString()).isEqualTo("world"); + assertThat(stringArray.getString(2).toString()).isEqualTo("test"); + + // Test decimal array + InternalArray decimalArray = flussRow.getArray(8); + assertThat(decimalArray.size()).isEqualTo(3); + assertThat(decimalArray.getDecimal(0, 10, 2).toBigDecimal()) + .isEqualTo(new BigDecimal("123.45")); + assertThat(decimalArray.getDecimal(1, 10, 2).toBigDecimal()) + .isEqualTo(new BigDecimal("678.90")); + assertThat(decimalArray.getDecimal(2, 10, 2).toBigDecimal()) + .isEqualTo(new BigDecimal("999.99")); + + // Test timestamp array + InternalArray timestampArray = flussRow.getArray(9); + assertThat(timestampArray.size()).isEqualTo(3); + assertThat(timestampArray.getTimestampNtz(0, 3).getMillisecond()).isEqualTo(1698235273182L); + assertThat(timestampArray.getTimestampNtz(1, 3).getMillisecond()).isEqualTo(1698235274000L); + assertThat(timestampArray.getTimestampNtz(2, 3).getMillisecond()).isEqualTo(1698235275000L); + + // Test timestamp_ltz array + InternalArray timestampLtzArray = flussRow.getArray(10); + assertThat(timestampLtzArray.size()).isEqualTo(3); + assertThat(timestampLtzArray.getTimestampLtz(0, 3).getEpochMillisecond()) + .isEqualTo(1698235273182L); + assertThat(timestampLtzArray.getTimestampLtz(1, 3).getEpochMillisecond()) + .isEqualTo(1698235274000L); + assertThat(timestampLtzArray.getTimestampLtz(2, 3).getEpochMillisecond()) + .isEqualTo(1698235275000L); + + // Test binary array + InternalArray binaryArray = flussRow.getArray(11); + assertThat(binaryArray.size()).isEqualTo(3); + assertThat(binaryArray.getBinary(0, 3)).isEqualTo(new byte[] {1, 2, 3}); + assertThat(binaryArray.getBinary(1, 4)).isEqualTo(new byte[] {4, 5, 6, 7}); + assertThat(binaryArray.getBinary(2, 5)).isEqualTo(new byte[] {8, 9, 10, 11, 12}); + // Also test getBytes method + assertThat(binaryArray.getBytes(0)).isEqualTo(new byte[] {1, 2, 3}); + assertThat(binaryArray.getBytes(1)).isEqualTo(new byte[] {4, 5, 6, 7}); + assertThat(binaryArray.getBytes(2)).isEqualTo(new byte[] {8, 9, 10, 11, 12}); + + // Also test array> (nested int array) + InternalArray outerArray = flussRow.getArray(12); + assertThat(outerArray).isNotNull(); + assertThat(outerArray.size()).isEqualTo(2); + + InternalArray innerArray1 = outerArray.getArray(0); + assertThat(innerArray1.toIntArray()).isEqualTo(new int[] {1, 2}); + + InternalArray innerArray2 = outerArray.getArray(1); + assertThat(innerArray2.toIntArray()).isEqualTo(new int[] {3, 4, 5}); + } +}