From 1b2580c721284601b976de9cc223ed83f58e828e Mon Sep 17 00:00:00 2001 From: binary-signal Date: Sat, 13 Dec 2025 12:46:54 +0100 Subject: [PATCH 1/4] add adapters for converting fluss arrays to paimon arrays and vice versa Signed-off-by: binary-signal --- .../source/FlussArrayAsPaimonArray.java | 175 +++++++++++++ .../paimon/source/FlussRowAsPaimonRow.java | 4 +- .../source/FlussRowAsPaimonRowTest.java | 169 ++++++++++++ .../tiering/FlussRecordAsPaimonRowTest.java | 119 +++++++++ .../paimon/utils/PaimonRowAsFlussRowTest.java | 242 ++++++++++++++++++ 5 files changed, 707 insertions(+), 2 deletions(-) create mode 100644 fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/FlussArrayAsPaimonArray.java create mode 100644 fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/utils/PaimonRowAsFlussRowTest.java diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/FlussArrayAsPaimonArray.java b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/FlussArrayAsPaimonArray.java new file mode 100644 index 0000000000..83fac62551 --- /dev/null +++ b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/FlussArrayAsPaimonArray.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.lake.paimon.source; + +import org.apache.fluss.row.TimestampNtz; + +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.Decimal; +import org.apache.paimon.data.InternalArray; +import org.apache.paimon.data.InternalMap; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.Timestamp; +import org.apache.paimon.data.variant.Variant; + +/** Adapter class for converting Fluss InternalArray to Paimon InternalArray. */ +public class FlussArrayAsPaimonArray implements InternalArray { + + private final org.apache.fluss.row.InternalArray flussArray; + + public FlussArrayAsPaimonArray(org.apache.fluss.row.InternalArray flussArray) { + this.flussArray = flussArray; + } + + @Override + public int size() { + return flussArray.size(); + } + + @Override + public boolean isNullAt(int pos) { + return flussArray.isNullAt(pos); + } + + @Override + public boolean getBoolean(int pos) { + return flussArray.getBoolean(pos); + } + + @Override + public byte getByte(int pos) { + return flussArray.getByte(pos); + } + + @Override + public short getShort(int pos) { + return flussArray.getShort(pos); + } + + @Override + public int getInt(int pos) { + return flussArray.getInt(pos); + } + + @Override + public long getLong(int pos) { + return flussArray.getLong(pos); + } + + @Override + public float getFloat(int pos) { + return flussArray.getFloat(pos); + } + + @Override + public double getDouble(int pos) { + return flussArray.getDouble(pos); + } + + @Override + public BinaryString getString(int pos) { + return BinaryString.fromBytes(flussArray.getString(pos).toBytes()); + } + + @Override + public Decimal getDecimal(int pos, int precision, int scale) { + org.apache.fluss.row.Decimal flussDecimal = flussArray.getDecimal(pos, precision, scale); + if (flussDecimal.isCompact()) { + return Decimal.fromUnscaledLong(flussDecimal.toUnscaledLong(), precision, scale); + } else { + return Decimal.fromBigDecimal(flussDecimal.toBigDecimal(), precision, scale); + } + } + + @Override + public Timestamp getTimestamp(int pos, int precision) { + // Default to TIMESTAMP_WITHOUT_TIME_ZONE behavior for arrays + if (TimestampNtz.isCompact(precision)) { + return Timestamp.fromEpochMillis( + flussArray.getTimestampNtz(pos, precision).getMillisecond()); + } else { + TimestampNtz timestampNtz = flussArray.getTimestampNtz(pos, precision); + return Timestamp.fromEpochMillis( + timestampNtz.getMillisecond(), timestampNtz.getNanoOfMillisecond()); + } + } + + @Override + public byte[] getBinary(int pos) { + return flussArray.getBytes(pos); + } + + @Override + public Variant getVariant(int pos) { + throw new UnsupportedOperationException( + "getVariant is not supported for Fluss array currently."); + } + + @Override + public InternalArray getArray(int pos) { + org.apache.fluss.row.InternalArray innerArray = flussArray.getArray(pos); + return innerArray == null ? null : new FlussArrayAsPaimonArray(innerArray); + } + + @Override + public InternalMap getMap(int pos) { + throw new UnsupportedOperationException( + "getMap is not supported for Fluss array currently."); + } + + @Override + public InternalRow getRow(int pos, int numFields) { + throw new UnsupportedOperationException( + "getRow is not supported for Fluss array currently."); + } + + @Override + public boolean[] toBooleanArray() { + return flussArray.toBooleanArray(); + } + + @Override + public byte[] toByteArray() { + return flussArray.toByteArray(); + } + + @Override + public short[] toShortArray() { + return flussArray.toShortArray(); + } + + @Override + public int[] toIntArray() { + return flussArray.toIntArray(); + } + + @Override + public long[] toLongArray() { + return flussArray.toLongArray(); + } + + @Override + public float[] toFloatArray() { + return flussArray.toFloatArray(); + } + + @Override + public double[] toDoubleArray() { + return flussArray.toDoubleArray(); + } +} diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/FlussRowAsPaimonRow.java b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/FlussRowAsPaimonRow.java index 52994569aa..21fabe0636 100644 --- a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/FlussRowAsPaimonRow.java +++ b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/FlussRowAsPaimonRow.java @@ -159,8 +159,8 @@ public Variant getVariant(int i) { @Override public InternalArray getArray(int pos) { - throw new UnsupportedOperationException( - "getArray is not support for Fluss record currently."); + org.apache.fluss.row.InternalArray flussArray = internalRow.getArray(pos); + return flussArray == null ? null : new FlussArrayAsPaimonArray(flussArray); } @Override diff --git a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/source/FlussRowAsPaimonRowTest.java b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/source/FlussRowAsPaimonRowTest.java index c41b28dad1..b17894e409 100644 --- a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/source/FlussRowAsPaimonRowTest.java +++ b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/source/FlussRowAsPaimonRowTest.java @@ -24,10 +24,12 @@ import org.apache.fluss.record.LogRecord; import org.apache.fluss.row.BinaryString; import org.apache.fluss.row.Decimal; +import org.apache.fluss.row.GenericArray; import org.apache.fluss.row.GenericRow; import org.apache.fluss.row.TimestampLtz; import org.apache.fluss.row.TimestampNtz; +import org.apache.paimon.data.InternalArray; import org.apache.paimon.types.RowKind; import org.apache.paimon.types.RowType; import org.junit.jupiter.api.Test; @@ -140,4 +142,171 @@ void testPrimaryKeyTableRecord() { assertThat(new FlussRowAsPaimonRow(logRecord.getRow(), tableRowType).getRowKind()) .isEqualTo(RowKind.INSERT); } + + @Test + void testArrayTypeWithIntElements() { + RowType tableRowType = + RowType.of( + new org.apache.paimon.types.IntType(), + new org.apache.paimon.types.ArrayType( + new org.apache.paimon.types.IntType())); + + long logOffset = 0; + long timeStamp = System.currentTimeMillis(); + GenericRow genericRow = new GenericRow(2); + genericRow.setField(0, 42); + genericRow.setField(1, new GenericArray(new int[] {1, 2, 3, 4, 5})); + + LogRecord logRecord = new GenericRecord(logOffset, timeStamp, APPEND_ONLY, genericRow); + FlussRowAsPaimonRow flussRowAsPaimonRow = + new FlussRowAsPaimonRow(logRecord.getRow(), tableRowType); + + assertThat(flussRowAsPaimonRow.getInt(0)).isEqualTo(42); + InternalArray array = flussRowAsPaimonRow.getArray(1); + assertThat(array).isNotNull(); + assertThat(array.size()).isEqualTo(5); + assertThat(array.getInt(0)).isEqualTo(1); + assertThat(array.getInt(1)).isEqualTo(2); + assertThat(array.getInt(2)).isEqualTo(3); + assertThat(array.getInt(3)).isEqualTo(4); + assertThat(array.getInt(4)).isEqualTo(5); + } + + @Test + void testArrayTypeWithStringElements() { + RowType tableRowType = + RowType.of( + new org.apache.paimon.types.VarCharType(), + new org.apache.paimon.types.ArrayType( + new org.apache.paimon.types.VarCharType())); + + long logOffset = 0; + long timeStamp = System.currentTimeMillis(); + GenericRow genericRow = new GenericRow(2); + genericRow.setField(0, BinaryString.fromString("name")); + genericRow.setField( + 1, + new GenericArray( + new Object[] { + BinaryString.fromString("a"), + BinaryString.fromString("b"), + BinaryString.fromString("c") + })); + + LogRecord logRecord = new GenericRecord(logOffset, timeStamp, APPEND_ONLY, genericRow); + FlussRowAsPaimonRow flussRowAsPaimonRow = + new FlussRowAsPaimonRow(logRecord.getRow(), tableRowType); + + assertThat(flussRowAsPaimonRow.getString(0).toString()).isEqualTo("name"); + InternalArray array = flussRowAsPaimonRow.getArray(1); + assertThat(array).isNotNull(); + assertThat(array.size()).isEqualTo(3); + assertThat(array.getString(0).toString()).isEqualTo("a"); + assertThat(array.getString(1).toString()).isEqualTo("b"); + assertThat(array.getString(2).toString()).isEqualTo("c"); + } + + @Test + void testArrayTypeWithNullableElements() { + RowType tableRowType = + RowType.of( + new org.apache.paimon.types.ArrayType( + new org.apache.paimon.types.IntType().nullable())); + + long logOffset = 0; + long timeStamp = System.currentTimeMillis(); + GenericRow genericRow = new GenericRow(1); + genericRow.setField(0, new GenericArray(new Object[] {1, null, 3})); + + LogRecord logRecord = new GenericRecord(logOffset, timeStamp, APPEND_ONLY, genericRow); + FlussRowAsPaimonRow flussRowAsPaimonRow = + new FlussRowAsPaimonRow(logRecord.getRow(), tableRowType); + + InternalArray array = flussRowAsPaimonRow.getArray(0); + assertThat(array).isNotNull(); + assertThat(array.size()).isEqualTo(3); + assertThat(array.getInt(0)).isEqualTo(1); + assertThat(array.isNullAt(1)).isTrue(); + assertThat(array.getInt(2)).isEqualTo(3); + } + + @Test + void testNullArray() { + RowType tableRowType = + RowType.of( + new org.apache.paimon.types.ArrayType(new org.apache.paimon.types.IntType()) + .nullable()); + + long logOffset = 0; + long timeStamp = System.currentTimeMillis(); + GenericRow genericRow = new GenericRow(1); + genericRow.setField(0, null); + + LogRecord logRecord = new GenericRecord(logOffset, timeStamp, APPEND_ONLY, genericRow); + FlussRowAsPaimonRow flussRowAsPaimonRow = + new FlussRowAsPaimonRow(logRecord.getRow(), tableRowType); + + assertThat(flussRowAsPaimonRow.isNullAt(0)).isTrue(); + } + + @Test + void testNestedArrayType() { + // Test ARRAY> + RowType tableRowType = + RowType.of( + new org.apache.paimon.types.ArrayType( + new org.apache.paimon.types.ArrayType( + new org.apache.paimon.types.IntType()))); + + long logOffset = 0; + long timeStamp = System.currentTimeMillis(); + GenericRow genericRow = new GenericRow(1); + genericRow.setField( + 0, + new GenericArray( + new Object[] { + new GenericArray(new int[] {1, 2}), + new GenericArray(new int[] {3, 4, 5}) + })); + + LogRecord logRecord = new GenericRecord(logOffset, timeStamp, APPEND_ONLY, genericRow); + FlussRowAsPaimonRow flussRowAsPaimonRow = + new FlussRowAsPaimonRow(logRecord.getRow(), tableRowType); + + InternalArray outerArray = flussRowAsPaimonRow.getArray(0); + assertThat(outerArray).isNotNull(); + assertThat(outerArray.size()).isEqualTo(2); + + InternalArray innerArray1 = outerArray.getArray(0); + assertThat(innerArray1.size()).isEqualTo(2); + assertThat(innerArray1.getInt(0)).isEqualTo(1); + assertThat(innerArray1.getInt(1)).isEqualTo(2); + + InternalArray innerArray2 = outerArray.getArray(1); + assertThat(innerArray2.size()).isEqualTo(3); + assertThat(innerArray2.getInt(0)).isEqualTo(3); + assertThat(innerArray2.getInt(1)).isEqualTo(4); + assertThat(innerArray2.getInt(2)).isEqualTo(5); + } + + @Test + void testEmptyArray() { + RowType tableRowType = + RowType.of( + new org.apache.paimon.types.ArrayType( + new org.apache.paimon.types.IntType())); + + long logOffset = 0; + long timeStamp = System.currentTimeMillis(); + GenericRow genericRow = new GenericRow(1); + genericRow.setField(0, new GenericArray(new int[] {})); + + LogRecord logRecord = new GenericRecord(logOffset, timeStamp, APPEND_ONLY, genericRow); + FlussRowAsPaimonRow flussRowAsPaimonRow = + new FlussRowAsPaimonRow(logRecord.getRow(), tableRowType); + + InternalArray array = flussRowAsPaimonRow.getArray(0); + assertThat(array).isNotNull(); + assertThat(array.size()).isEqualTo(0); + } } diff --git a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/FlussRecordAsPaimonRowTest.java b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/FlussRecordAsPaimonRowTest.java index e6aa2ce84b..b2d17f613d 100644 --- a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/FlussRecordAsPaimonRowTest.java +++ b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/FlussRecordAsPaimonRowTest.java @@ -21,10 +21,12 @@ import org.apache.fluss.record.LogRecord; import org.apache.fluss.row.BinaryString; import org.apache.fluss.row.Decimal; +import org.apache.fluss.row.GenericArray; import org.apache.fluss.row.GenericRow; import org.apache.fluss.row.TimestampLtz; import org.apache.fluss.row.TimestampNtz; +import org.apache.paimon.data.InternalArray; import org.apache.paimon.data.Timestamp; import org.apache.paimon.types.RowKind; import org.apache.paimon.types.RowType; @@ -163,4 +165,121 @@ void testPrimaryKeyTableRecord() { flussRecordAsPaimonRow.setFlussRecord(logRecord); assertThat(flussRecordAsPaimonRow.getRowKind()).isEqualTo(RowKind.DELETE); } + + @Test + void testArrayTypeWithIntElements() { + int tableBucket = 0; + RowType tableRowType = + RowType.of( + new org.apache.paimon.types.IntType(), + new org.apache.paimon.types.ArrayType( + new org.apache.paimon.types.IntType()), + // system columns: __bucket, __offset, __timestamp + new org.apache.paimon.types.IntType(), + new org.apache.paimon.types.BigIntType(), + new org.apache.paimon.types.LocalZonedTimestampType(3)); + + FlussRecordAsPaimonRow flussRecordAsPaimonRow = + new FlussRecordAsPaimonRow(tableBucket, tableRowType); + long logOffset = 10; + long timeStamp = System.currentTimeMillis(); + GenericRow genericRow = new GenericRow(2); + genericRow.setField(0, 42); + genericRow.setField(1, new GenericArray(new int[] {1, 2, 3, 4, 5})); + + LogRecord logRecord = new GenericRecord(logOffset, timeStamp, APPEND_ONLY, genericRow); + flussRecordAsPaimonRow.setFlussRecord(logRecord); + + assertThat(flussRecordAsPaimonRow.getInt(0)).isEqualTo(42); + InternalArray array = flussRecordAsPaimonRow.getArray(1); + assertThat(array).isNotNull(); + assertThat(array.size()).isEqualTo(5); + assertThat(array.getInt(0)).isEqualTo(1); + assertThat(array.getInt(1)).isEqualTo(2); + assertThat(array.getInt(2)).isEqualTo(3); + assertThat(array.getInt(3)).isEqualTo(4); + assertThat(array.getInt(4)).isEqualTo(5); + + // Verify system columns are still accessible + assertThat(flussRecordAsPaimonRow.getInt(2)).isEqualTo(tableBucket); + assertThat(flussRecordAsPaimonRow.getLong(3)).isEqualTo(logOffset); + assertThat(flussRecordAsPaimonRow.getLong(4)).isEqualTo(timeStamp); + } + + @Test + void testArrayTypeWithStringElements() { + int tableBucket = 1; + RowType tableRowType = + RowType.of( + new org.apache.paimon.types.ArrayType( + new org.apache.paimon.types.VarCharType()), + // system columns: __bucket, __offset, __timestamp + new org.apache.paimon.types.IntType(), + new org.apache.paimon.types.BigIntType(), + new org.apache.paimon.types.LocalZonedTimestampType(3)); + + FlussRecordAsPaimonRow flussRecordAsPaimonRow = + new FlussRecordAsPaimonRow(tableBucket, tableRowType); + long logOffset = 5; + long timeStamp = System.currentTimeMillis(); + GenericRow genericRow = new GenericRow(1); + genericRow.setField( + 0, + new GenericArray( + new Object[] { + BinaryString.fromString("hello"), BinaryString.fromString("world") + })); + + LogRecord logRecord = new GenericRecord(logOffset, timeStamp, INSERT, genericRow); + flussRecordAsPaimonRow.setFlussRecord(logRecord); + + InternalArray array = flussRecordAsPaimonRow.getArray(0); + assertThat(array).isNotNull(); + assertThat(array.size()).isEqualTo(2); + assertThat(array.getString(0).toString()).isEqualTo("hello"); + assertThat(array.getString(1).toString()).isEqualTo("world"); + } + + @Test + void testNestedArrayType() { + int tableBucket = 0; + RowType tableRowType = + RowType.of( + new org.apache.paimon.types.ArrayType( + new org.apache.paimon.types.ArrayType( + new org.apache.paimon.types.IntType())), + // system columns: __bucket, __offset, __timestamp + new org.apache.paimon.types.IntType(), + new org.apache.paimon.types.BigIntType(), + new org.apache.paimon.types.LocalZonedTimestampType(3)); + + FlussRecordAsPaimonRow flussRecordAsPaimonRow = + new FlussRecordAsPaimonRow(tableBucket, tableRowType); + long logOffset = 0; + long timeStamp = System.currentTimeMillis(); + GenericRow genericRow = new GenericRow(1); + genericRow.setField( + 0, + new GenericArray( + new Object[] { + new GenericArray(new int[] {1, 2}), + new GenericArray(new int[] {3, 4, 5}) + })); + + LogRecord logRecord = new GenericRecord(logOffset, timeStamp, APPEND_ONLY, genericRow); + flussRecordAsPaimonRow.setFlussRecord(logRecord); + + InternalArray outerArray = flussRecordAsPaimonRow.getArray(0); + assertThat(outerArray).isNotNull(); + assertThat(outerArray.size()).isEqualTo(2); + + InternalArray innerArray1 = outerArray.getArray(0); + assertThat(innerArray1.size()).isEqualTo(2); + assertThat(innerArray1.getInt(0)).isEqualTo(1); + assertThat(innerArray1.getInt(1)).isEqualTo(2); + + InternalArray innerArray2 = outerArray.getArray(1); + assertThat(innerArray2.size()).isEqualTo(3); + assertThat(innerArray2.getInt(0)).isEqualTo(3); + } } diff --git a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/utils/PaimonRowAsFlussRowTest.java b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/utils/PaimonRowAsFlussRowTest.java new file mode 100644 index 0000000000..598b46723e --- /dev/null +++ b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/utils/PaimonRowAsFlussRowTest.java @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.lake.paimon.utils; + +import org.apache.fluss.row.InternalArray; + +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.GenericArray; +import org.apache.paimon.data.GenericRow; +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link PaimonRowAsFlussRow}. */ +class PaimonRowAsFlussRowTest { + + @Test + void testArrayTypeWithIntElements() { + // Create a Paimon row with INT and ARRAY columns + // Note: PaimonRowAsFlussRow expects system columns at the end, so we add 3 dummy system + // columns + GenericRow paimonRow = new GenericRow(4); + paimonRow.setField(0, 42); + paimonRow.setField(1, new GenericArray(new int[] {1, 2, 3, 4, 5})); + // System columns: __bucket, __offset, __timestamp + paimonRow.setField(2, 0); + paimonRow.setField(3, 0L); + + PaimonRowAsFlussRow flussRow = new PaimonRowAsFlussRow(paimonRow); + + assertThat(flussRow.getInt(0)).isEqualTo(42); + InternalArray array = flussRow.getArray(1); + assertThat(array).isNotNull(); + assertThat(array.size()).isEqualTo(5); + assertThat(array.getInt(0)).isEqualTo(1); + assertThat(array.getInt(1)).isEqualTo(2); + assertThat(array.getInt(2)).isEqualTo(3); + assertThat(array.getInt(3)).isEqualTo(4); + assertThat(array.getInt(4)).isEqualTo(5); + } + + @Test + void testArrayTypeWithStringElements() { + GenericRow paimonRow = new GenericRow(5); + paimonRow.setField(0, BinaryString.fromString("name")); + paimonRow.setField( + 1, + new GenericArray( + new BinaryString[] { + BinaryString.fromString("a"), + BinaryString.fromString("b"), + BinaryString.fromString("c") + })); + // System columns: __bucket, __offset, __timestamp + paimonRow.setField(2, 0); + paimonRow.setField(3, 0L); + paimonRow.setField(4, 0L); + + PaimonRowAsFlussRow flussRow = new PaimonRowAsFlussRow(paimonRow); + + assertThat(flussRow.getString(0).toString()).isEqualTo("name"); + InternalArray array = flussRow.getArray(1); + assertThat(array).isNotNull(); + assertThat(array.size()).isEqualTo(3); + assertThat(array.getString(0).toString()).isEqualTo("a"); + assertThat(array.getString(1).toString()).isEqualTo("b"); + assertThat(array.getString(2).toString()).isEqualTo("c"); + } + + @Test + void testArrayTypeWithNullableElements() { + GenericRow paimonRow = new GenericRow(4); + paimonRow.setField(0, new GenericArray(new Object[] {1, null, 3})); + // System columns: __bucket, __offset, __timestamp + paimonRow.setField(1, 0); + paimonRow.setField(2, 0L); + paimonRow.setField(3, 0L); + + PaimonRowAsFlussRow flussRow = new PaimonRowAsFlussRow(paimonRow); + + InternalArray array = flussRow.getArray(0); + assertThat(array).isNotNull(); + assertThat(array.size()).isEqualTo(3); + assertThat(array.getInt(0)).isEqualTo(1); + assertThat(array.isNullAt(1)).isTrue(); + assertThat(array.getInt(2)).isEqualTo(3); + } + + @Test + void testNullArray() { + GenericRow paimonRow = new GenericRow(4); + paimonRow.setField(0, null); + // System columns: __bucket, __offset, __timestamp + paimonRow.setField(1, 0); + paimonRow.setField(2, 0L); + paimonRow.setField(3, 0L); + + PaimonRowAsFlussRow flussRow = new PaimonRowAsFlussRow(paimonRow); + + assertThat(flussRow.isNullAt(0)).isTrue(); + } + + @Test + void testNestedArrayType() { + // Test ARRAY> + GenericRow paimonRow = new GenericRow(4); + paimonRow.setField( + 0, + new GenericArray( + new Object[] { + new GenericArray(new int[] {1, 2}), + new GenericArray(new int[] {3, 4, 5}) + })); + // System columns: __bucket, __offset, __timestamp + paimonRow.setField(1, 0); + paimonRow.setField(2, 0L); + paimonRow.setField(3, 0L); + + PaimonRowAsFlussRow flussRow = new PaimonRowAsFlussRow(paimonRow); + + InternalArray outerArray = flussRow.getArray(0); + assertThat(outerArray).isNotNull(); + assertThat(outerArray.size()).isEqualTo(2); + + InternalArray innerArray1 = outerArray.getArray(0); + assertThat(innerArray1.size()).isEqualTo(2); + assertThat(innerArray1.getInt(0)).isEqualTo(1); + assertThat(innerArray1.getInt(1)).isEqualTo(2); + + InternalArray innerArray2 = outerArray.getArray(1); + assertThat(innerArray2.size()).isEqualTo(3); + assertThat(innerArray2.getInt(0)).isEqualTo(3); + assertThat(innerArray2.getInt(1)).isEqualTo(4); + assertThat(innerArray2.getInt(2)).isEqualTo(5); + } + + @Test + void testEmptyArray() { + GenericRow paimonRow = new GenericRow(4); + paimonRow.setField(0, new GenericArray(new int[] {})); + // System columns: __bucket, __offset, __timestamp + paimonRow.setField(1, 0); + paimonRow.setField(2, 0L); + paimonRow.setField(3, 0L); + + PaimonRowAsFlussRow flussRow = new PaimonRowAsFlussRow(paimonRow); + + InternalArray array = flussRow.getArray(0); + assertThat(array).isNotNull(); + assertThat(array.size()).isEqualTo(0); + } + + @Test + void testReplaceRow() { + GenericRow paimonRow1 = new GenericRow(4); + paimonRow1.setField(0, new GenericArray(new int[] {1, 2, 3})); + paimonRow1.setField(1, 0); + paimonRow1.setField(2, 0L); + paimonRow1.setField(3, 0L); + + GenericRow paimonRow2 = new GenericRow(4); + paimonRow2.setField(0, new GenericArray(new int[] {4, 5})); + paimonRow2.setField(1, 0); + paimonRow2.setField(2, 0L); + paimonRow2.setField(3, 0L); + + PaimonRowAsFlussRow flussRow = new PaimonRowAsFlussRow(paimonRow1); + assertThat(flussRow.getArray(0).size()).isEqualTo(3); + + flussRow.replaceRow(paimonRow2); + assertThat(flussRow.getArray(0).size()).isEqualTo(2); + } + + @Test + void testArrayWithLongElements() { + GenericRow paimonRow = new GenericRow(4); + paimonRow.setField(0, new GenericArray(new long[] {100L, 200L, 300L})); + paimonRow.setField(1, 0); + paimonRow.setField(2, 0L); + paimonRow.setField(3, 0L); + + PaimonRowAsFlussRow flussRow = new PaimonRowAsFlussRow(paimonRow); + + InternalArray array = flussRow.getArray(0); + assertThat(array).isNotNull(); + assertThat(array.size()).isEqualTo(3); + assertThat(array.getLong(0)).isEqualTo(100L); + assertThat(array.getLong(1)).isEqualTo(200L); + assertThat(array.getLong(2)).isEqualTo(300L); + } + + @Test + void testArrayWithDoubleElements() { + GenericRow paimonRow = new GenericRow(4); + paimonRow.setField(0, new GenericArray(new double[] {1.1, 2.2, 3.3})); + paimonRow.setField(1, 0); + paimonRow.setField(2, 0L); + paimonRow.setField(3, 0L); + + PaimonRowAsFlussRow flussRow = new PaimonRowAsFlussRow(paimonRow); + + InternalArray array = flussRow.getArray(0); + assertThat(array).isNotNull(); + assertThat(array.size()).isEqualTo(3); + assertThat(array.getDouble(0)).isEqualTo(1.1); + assertThat(array.getDouble(1)).isEqualTo(2.2); + assertThat(array.getDouble(2)).isEqualTo(3.3); + } + + @Test + void testArrayWithBooleanElements() { + GenericRow paimonRow = new GenericRow(4); + paimonRow.setField(0, new GenericArray(new boolean[] {true, false, true})); + paimonRow.setField(1, 0); + paimonRow.setField(2, 0L); + paimonRow.setField(3, 0L); + + PaimonRowAsFlussRow flussRow = new PaimonRowAsFlussRow(paimonRow); + + InternalArray array = flussRow.getArray(0); + assertThat(array).isNotNull(); + assertThat(array.size()).isEqualTo(3); + assertThat(array.getBoolean(0)).isTrue(); + assertThat(array.getBoolean(1)).isFalse(); + assertThat(array.getBoolean(2)).isTrue(); + } +} From 51a1a8f858bfe047f5f200088c1fef0341cf42ce Mon Sep 17 00:00:00 2001 From: binary-signal Date: Sat, 13 Dec 2025 18:06:03 +0100 Subject: [PATCH 2/4] add more tests with array primitives Signed-off-by: binary-signal --- .../source/FlussRowAsPaimonRowTest.java | 175 ++++++++++++++++++ .../paimon/utils/PaimonRowAsFlussRowTest.java | 142 ++++++++++++++ 2 files changed, 317 insertions(+) diff --git a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/source/FlussRowAsPaimonRowTest.java b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/source/FlussRowAsPaimonRowTest.java index b17894e409..e8a89d8aa8 100644 --- a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/source/FlussRowAsPaimonRowTest.java +++ b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/source/FlussRowAsPaimonRowTest.java @@ -309,4 +309,179 @@ void testEmptyArray() { assertThat(array).isNotNull(); assertThat(array.size()).isEqualTo(0); } + + @Test + void testArrayWithAllPrimitiveTypes() { + RowType tableRowType = + RowType.of( + new org.apache.paimon.types.ArrayType( + new org.apache.paimon.types.BooleanType()), + new org.apache.paimon.types.ArrayType( + new org.apache.paimon.types.TinyIntType()), + new org.apache.paimon.types.ArrayType( + new org.apache.paimon.types.SmallIntType()), + new org.apache.paimon.types.ArrayType( + new org.apache.paimon.types.IntType()), + new org.apache.paimon.types.ArrayType( + new org.apache.paimon.types.BigIntType()), + new org.apache.paimon.types.ArrayType( + new org.apache.paimon.types.FloatType()), + new org.apache.paimon.types.ArrayType( + new org.apache.paimon.types.DoubleType())); + + long logOffset = 0; + long timeStamp = System.currentTimeMillis(); + GenericRow genericRow = new GenericRow(7); + genericRow.setField(0, new GenericArray(new boolean[] {true, false, true})); + genericRow.setField(1, new GenericArray(new byte[] {1, 2, 3})); + genericRow.setField(2, new GenericArray(new short[] {100, 200, 300})); + genericRow.setField(3, new GenericArray(new int[] {1000, 2000, 3000})); + genericRow.setField(4, new GenericArray(new long[] {10000L, 20000L, 30000L})); + genericRow.setField(5, new GenericArray(new float[] {1.1f, 2.2f, 3.3f})); + genericRow.setField(6, new GenericArray(new double[] {1.11, 2.22, 3.33})); + + LogRecord logRecord = new GenericRecord(logOffset, timeStamp, APPEND_ONLY, genericRow); + FlussRowAsPaimonRow flussRowAsPaimonRow = + new FlussRowAsPaimonRow(logRecord.getRow(), tableRowType); + + // Test boolean array + InternalArray boolArray = flussRowAsPaimonRow.getArray(0); + assertThat(boolArray.size()).isEqualTo(3); + assertThat(boolArray.getBoolean(0)).isTrue(); + assertThat(boolArray.getBoolean(1)).isFalse(); + assertThat(boolArray.getBoolean(2)).isTrue(); + assertThat(boolArray.toBooleanArray()).isEqualTo(new boolean[] {true, false, true}); + + // Test byte array + InternalArray byteArray = flussRowAsPaimonRow.getArray(1); + assertThat(byteArray.size()).isEqualTo(3); + assertThat(byteArray.getByte(0)).isEqualTo((byte) 1); + assertThat(byteArray.getByte(1)).isEqualTo((byte) 2); + assertThat(byteArray.getByte(2)).isEqualTo((byte) 3); + assertThat(byteArray.toByteArray()).isEqualTo(new byte[] {1, 2, 3}); + + // Test short array + InternalArray shortArray = flussRowAsPaimonRow.getArray(2); + assertThat(shortArray.size()).isEqualTo(3); + assertThat(shortArray.getShort(0)).isEqualTo((short) 100); + assertThat(shortArray.getShort(1)).isEqualTo((short) 200); + assertThat(shortArray.getShort(2)).isEqualTo((short) 300); + assertThat(shortArray.toShortArray()).isEqualTo(new short[] {100, 200, 300}); + + // Test int array + InternalArray intArray = flussRowAsPaimonRow.getArray(3); + assertThat(intArray.size()).isEqualTo(3); + assertThat(intArray.getInt(0)).isEqualTo(1000); + assertThat(intArray.getInt(1)).isEqualTo(2000); + assertThat(intArray.getInt(2)).isEqualTo(3000); + assertThat(intArray.toIntArray()).isEqualTo(new int[] {1000, 2000, 3000}); + + // Test long array + InternalArray longArray = flussRowAsPaimonRow.getArray(4); + assertThat(longArray.size()).isEqualTo(3); + assertThat(longArray.getLong(0)).isEqualTo(10000L); + assertThat(longArray.getLong(1)).isEqualTo(20000L); + assertThat(longArray.getLong(2)).isEqualTo(30000L); + assertThat(longArray.toLongArray()).isEqualTo(new long[] {10000L, 20000L, 30000L}); + + // Test float array + InternalArray floatArray = flussRowAsPaimonRow.getArray(5); + assertThat(floatArray.size()).isEqualTo(3); + assertThat(floatArray.getFloat(0)).isEqualTo(1.1f); + assertThat(floatArray.getFloat(1)).isEqualTo(2.2f); + assertThat(floatArray.getFloat(2)).isEqualTo(3.3f); + assertThat(floatArray.toFloatArray()).isEqualTo(new float[] {1.1f, 2.2f, 3.3f}); + + // Test double array + InternalArray doubleArray = flussRowAsPaimonRow.getArray(6); + assertThat(doubleArray.size()).isEqualTo(3); + assertThat(doubleArray.getDouble(0)).isEqualTo(1.11); + assertThat(doubleArray.getDouble(1)).isEqualTo(2.22); + assertThat(doubleArray.getDouble(2)).isEqualTo(3.33); + assertThat(doubleArray.toDoubleArray()).isEqualTo(new double[] {1.11, 2.22, 3.33}); + } + + @Test + void testArrayWithDecimalElements() { + RowType tableRowType = + RowType.of( + new org.apache.paimon.types.ArrayType( + new org.apache.paimon.types.DecimalType(10, 2))); + + long logOffset = 0; + long timeStamp = System.currentTimeMillis(); + GenericRow genericRow = new GenericRow(1); + genericRow.setField( + 0, + new GenericArray( + new Object[] { + Decimal.fromBigDecimal(new BigDecimal("123.45"), 10, 2), + Decimal.fromBigDecimal(new BigDecimal("678.90"), 10, 2) + })); + + LogRecord logRecord = new GenericRecord(logOffset, timeStamp, APPEND_ONLY, genericRow); + FlussRowAsPaimonRow flussRowAsPaimonRow = + new FlussRowAsPaimonRow(logRecord.getRow(), tableRowType); + + InternalArray array = flussRowAsPaimonRow.getArray(0); + assertThat(array.size()).isEqualTo(2); + assertThat(array.getDecimal(0, 10, 2).toBigDecimal()).isEqualTo(new BigDecimal("123.45")); + assertThat(array.getDecimal(1, 10, 2).toBigDecimal()).isEqualTo(new BigDecimal("678.90")); + } + + @Test + void testArrayWithTimestampElements() { + RowType tableRowType = + RowType.of( + new org.apache.paimon.types.ArrayType( + new org.apache.paimon.types.TimestampType(3))); + + long logOffset = 0; + long timeStamp = System.currentTimeMillis(); + GenericRow genericRow = new GenericRow(1); + genericRow.setField( + 0, + new GenericArray( + new Object[] { + TimestampNtz.fromMillis(1698235273182L), + TimestampNtz.fromMillis(1698235274000L) + })); + + LogRecord logRecord = new GenericRecord(logOffset, timeStamp, APPEND_ONLY, genericRow); + FlussRowAsPaimonRow flussRowAsPaimonRow = + new FlussRowAsPaimonRow(logRecord.getRow(), tableRowType); + + InternalArray array = flussRowAsPaimonRow.getArray(0); + assertThat(array.size()).isEqualTo(2); + assertThat(array.getTimestamp(0, 3).getMillisecond()).isEqualTo(1698235273182L); + assertThat(array.getTimestamp(1, 3).getMillisecond()).isEqualTo(1698235274000L); + } + + @Test + void testArrayWithBinaryElements() { + RowType tableRowType = + RowType.of( + new org.apache.paimon.types.ArrayType( + new org.apache.paimon.types.VarBinaryType())); + + long logOffset = 0; + long timeStamp = System.currentTimeMillis(); + GenericRow genericRow = new GenericRow(1); + genericRow.setField( + 0, + new GenericArray( + new Object[] { + new byte[] {1, 2, 3}, + new byte[] {4, 5, 6, 7} + })); + + LogRecord logRecord = new GenericRecord(logOffset, timeStamp, APPEND_ONLY, genericRow); + FlussRowAsPaimonRow flussRowAsPaimonRow = + new FlussRowAsPaimonRow(logRecord.getRow(), tableRowType); + + InternalArray array = flussRowAsPaimonRow.getArray(0); + assertThat(array.size()).isEqualTo(2); + assertThat(array.getBinary(0)).isEqualTo(new byte[] {1, 2, 3}); + assertThat(array.getBinary(1)).isEqualTo(new byte[] {4, 5, 6, 7}); + } } diff --git a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/utils/PaimonRowAsFlussRowTest.java b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/utils/PaimonRowAsFlussRowTest.java index 598b46723e..20d34f83d0 100644 --- a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/utils/PaimonRowAsFlussRowTest.java +++ b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/utils/PaimonRowAsFlussRowTest.java @@ -239,4 +239,146 @@ void testArrayWithBooleanElements() { assertThat(array.getBoolean(1)).isFalse(); assertThat(array.getBoolean(2)).isTrue(); } + + @Test + void testArrayWithAllPrimitiveTypes() { + GenericRow paimonRow = new GenericRow(10); + paimonRow.setField(0, new GenericArray(new boolean[] {true, false})); + paimonRow.setField(1, new GenericArray(new byte[] {1, 2, 3})); + paimonRow.setField(2, new GenericArray(new short[] {100, 200})); + paimonRow.setField(3, new GenericArray(new int[] {1000, 2000})); + paimonRow.setField(4, new GenericArray(new long[] {10000L, 20000L})); + paimonRow.setField(5, new GenericArray(new float[] {1.1f, 2.2f})); + paimonRow.setField(6, new GenericArray(new double[] {1.11, 2.22})); + // System columns + paimonRow.setField(7, 0); + paimonRow.setField(8, 0L); + paimonRow.setField(9, 0L); + + PaimonRowAsFlussRow flussRow = new PaimonRowAsFlussRow(paimonRow); + + // Test boolean array with toArray + InternalArray boolArray = flussRow.getArray(0); + assertThat(boolArray.toBooleanArray()).isEqualTo(new boolean[] {true, false}); + + // Test byte array with toArray + InternalArray byteArray = flussRow.getArray(1); + assertThat(byteArray.getByte(0)).isEqualTo((byte) 1); + assertThat(byteArray.getByte(1)).isEqualTo((byte) 2); + assertThat(byteArray.toByteArray()).isEqualTo(new byte[] {1, 2, 3}); + + // Test short array with toArray + InternalArray shortArray = flussRow.getArray(2); + assertThat(shortArray.getShort(0)).isEqualTo((short) 100); + assertThat(shortArray.getShort(1)).isEqualTo((short) 200); + assertThat(shortArray.toShortArray()).isEqualTo(new short[] {100, 200}); + + // Test int array with toArray + InternalArray intArray = flussRow.getArray(3); + assertThat(intArray.toIntArray()).isEqualTo(new int[] {1000, 2000}); + + // Test long array with toArray + InternalArray longArray = flussRow.getArray(4); + assertThat(longArray.toLongArray()).isEqualTo(new long[] {10000L, 20000L}); + + // Test float array with toArray + InternalArray floatArray = flussRow.getArray(5); + assertThat(floatArray.getFloat(0)).isEqualTo(1.1f); + assertThat(floatArray.toFloatArray()).isEqualTo(new float[] {1.1f, 2.2f}); + + // Test double array with toArray + InternalArray doubleArray = flussRow.getArray(6); + assertThat(doubleArray.toDoubleArray()).isEqualTo(new double[] {1.11, 2.22}); + } + + @Test + void testArrayWithDecimalElements() { + GenericRow paimonRow = new GenericRow(4); + paimonRow.setField( + 0, + new GenericArray( + new Object[] { + org.apache.paimon.data.Decimal.fromBigDecimal( + new java.math.BigDecimal("123.45"), 10, 2), + org.apache.paimon.data.Decimal.fromBigDecimal( + new java.math.BigDecimal("678.90"), 10, 2) + })); + paimonRow.setField(1, 0); + paimonRow.setField(2, 0L); + paimonRow.setField(3, 0L); + + PaimonRowAsFlussRow flussRow = new PaimonRowAsFlussRow(paimonRow); + + InternalArray array = flussRow.getArray(0); + assertThat(array.size()).isEqualTo(2); + assertThat(array.getDecimal(0, 10, 2).toBigDecimal()) + .isEqualTo(new java.math.BigDecimal("123.45")); + assertThat(array.getDecimal(1, 10, 2).toBigDecimal()) + .isEqualTo(new java.math.BigDecimal("678.90")); + } + + @Test + void testArrayWithTimestampElements() { + GenericRow paimonRow = new GenericRow(4); + paimonRow.setField( + 0, + new GenericArray( + new Object[] { + org.apache.paimon.data.Timestamp.fromEpochMillis(1698235273182L), + org.apache.paimon.data.Timestamp.fromEpochMillis(1698235274000L) + })); + paimonRow.setField(1, 0); + paimonRow.setField(2, 0L); + paimonRow.setField(3, 0L); + + PaimonRowAsFlussRow flussRow = new PaimonRowAsFlussRow(paimonRow); + + InternalArray array = flussRow.getArray(0); + assertThat(array.size()).isEqualTo(2); + assertThat(array.getTimestampNtz(0, 3).getMillisecond()).isEqualTo(1698235273182L); + assertThat(array.getTimestampLtz(1, 3).getEpochMillisecond()).isEqualTo(1698235274000L); + } + + @Test + void testArrayWithBinaryElements() { + GenericRow paimonRow = new GenericRow(4); + paimonRow.setField( + 0, + new GenericArray( + new Object[] { + new byte[] {1, 2, 3}, + new byte[] {4, 5, 6, 7} + })); + paimonRow.setField(1, 0); + paimonRow.setField(2, 0L); + paimonRow.setField(3, 0L); + + PaimonRowAsFlussRow flussRow = new PaimonRowAsFlussRow(paimonRow); + + InternalArray array = flussRow.getArray(0); + assertThat(array.size()).isEqualTo(2); + assertThat(array.getBinary(0, 3)).isEqualTo(new byte[] {1, 2, 3}); + assertThat(array.getBytes(1)).isEqualTo(new byte[] {4, 5, 6, 7}); + } + + @Test + void testArrayWithCharElements() { + GenericRow paimonRow = new GenericRow(4); + paimonRow.setField( + 0, + new GenericArray( + new BinaryString[] { + BinaryString.fromString("abc"), BinaryString.fromString("def") + })); + paimonRow.setField(1, 0); + paimonRow.setField(2, 0L); + paimonRow.setField(3, 0L); + + PaimonRowAsFlussRow flussRow = new PaimonRowAsFlussRow(paimonRow); + + InternalArray array = flussRow.getArray(0); + assertThat(array.size()).isEqualTo(2); + assertThat(array.getChar(0, 3).toString()).isEqualTo("abc"); + assertThat(array.getString(1).toString()).isEqualTo("def"); + } } From cd7f61507a784bd1ec075f81bf472000c7fc05ff Mon Sep 17 00:00:00 2001 From: binary-signal Date: Sat, 13 Dec 2025 18:26:39 +0100 Subject: [PATCH 3/4] more tests for fluss record paimon rows in tiering Signed-off-by: binary-signal --- .../tiering/FlussRecordAsPaimonRowTest.java | 263 ++++++++++++++++++ 1 file changed, 263 insertions(+) diff --git a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/FlussRecordAsPaimonRowTest.java b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/FlussRecordAsPaimonRowTest.java index b2d17f613d..79b548966c 100644 --- a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/FlussRecordAsPaimonRowTest.java +++ b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/FlussRecordAsPaimonRowTest.java @@ -282,4 +282,267 @@ void testNestedArrayType() { assertThat(innerArray2.size()).isEqualTo(3); assertThat(innerArray2.getInt(0)).isEqualTo(3); } + + @Test + void testArrayWithAllPrimitiveTypes() { + int tableBucket = 0; + RowType tableRowType = + RowType.of( + new org.apache.paimon.types.ArrayType( + new org.apache.paimon.types.BooleanType()), + new org.apache.paimon.types.ArrayType( + new org.apache.paimon.types.TinyIntType()), + new org.apache.paimon.types.ArrayType( + new org.apache.paimon.types.SmallIntType()), + new org.apache.paimon.types.ArrayType( + new org.apache.paimon.types.IntType()), + new org.apache.paimon.types.ArrayType( + new org.apache.paimon.types.BigIntType()), + new org.apache.paimon.types.ArrayType( + new org.apache.paimon.types.FloatType()), + new org.apache.paimon.types.ArrayType( + new org.apache.paimon.types.DoubleType()), + // system columns: __bucket, __offset, __timestamp + new org.apache.paimon.types.IntType(), + new org.apache.paimon.types.BigIntType(), + new org.apache.paimon.types.LocalZonedTimestampType(3)); + + FlussRecordAsPaimonRow flussRecordAsPaimonRow = + new FlussRecordAsPaimonRow(tableBucket, tableRowType); + long logOffset = 0; + long timeStamp = System.currentTimeMillis(); + GenericRow genericRow = new GenericRow(7); + genericRow.setField(0, new GenericArray(new boolean[] {true, false, true})); + genericRow.setField(1, new GenericArray(new byte[] {1, 2, 3})); + genericRow.setField(2, new GenericArray(new short[] {100, 200, 300})); + genericRow.setField(3, new GenericArray(new int[] {1000, 2000, 3000})); + genericRow.setField(4, new GenericArray(new long[] {10000L, 20000L, 30000L})); + genericRow.setField(5, new GenericArray(new float[] {1.1f, 2.2f, 3.3f})); + genericRow.setField(6, new GenericArray(new double[] {1.11, 2.22, 3.33})); + + LogRecord logRecord = new GenericRecord(logOffset, timeStamp, APPEND_ONLY, genericRow); + flussRecordAsPaimonRow.setFlussRecord(logRecord); + + // Test boolean array + InternalArray boolArray = flussRecordAsPaimonRow.getArray(0); + assertThat(boolArray.size()).isEqualTo(3); + assertThat(boolArray.getBoolean(0)).isTrue(); + assertThat(boolArray.getBoolean(1)).isFalse(); + assertThat(boolArray.getBoolean(2)).isTrue(); + assertThat(boolArray.toBooleanArray()).isEqualTo(new boolean[] {true, false, true}); + + // Test byte array + InternalArray byteArray = flussRecordAsPaimonRow.getArray(1); + assertThat(byteArray.size()).isEqualTo(3); + assertThat(byteArray.getByte(0)).isEqualTo((byte) 1); + assertThat(byteArray.getByte(1)).isEqualTo((byte) 2); + assertThat(byteArray.toByteArray()).isEqualTo(new byte[] {1, 2, 3}); + + // Test short array + InternalArray shortArray = flussRecordAsPaimonRow.getArray(2); + assertThat(shortArray.size()).isEqualTo(3); + assertThat(shortArray.getShort(0)).isEqualTo((short) 100); + assertThat(shortArray.toShortArray()).isEqualTo(new short[] {100, 200, 300}); + + // Test int array + InternalArray intArray = flussRecordAsPaimonRow.getArray(3); + assertThat(intArray.toIntArray()).isEqualTo(new int[] {1000, 2000, 3000}); + + // Test long array + InternalArray longArray = flussRecordAsPaimonRow.getArray(4); + assertThat(longArray.getLong(0)).isEqualTo(10000L); + assertThat(longArray.toLongArray()).isEqualTo(new long[] {10000L, 20000L, 30000L}); + + // Test float array + InternalArray floatArray = flussRecordAsPaimonRow.getArray(5); + assertThat(floatArray.getFloat(0)).isEqualTo(1.1f); + assertThat(floatArray.toFloatArray()).isEqualTo(new float[] {1.1f, 2.2f, 3.3f}); + + // Test double array + InternalArray doubleArray = flussRecordAsPaimonRow.getArray(6); + assertThat(doubleArray.getDouble(0)).isEqualTo(1.11); + assertThat(doubleArray.toDoubleArray()).isEqualTo(new double[] {1.11, 2.22, 3.33}); + + // Verify system columns + assertThat(flussRecordAsPaimonRow.getInt(7)).isEqualTo(tableBucket); + assertThat(flussRecordAsPaimonRow.getLong(8)).isEqualTo(logOffset); + } + + @Test + void testArrayWithDecimalElements() { + int tableBucket = 0; + RowType tableRowType = + RowType.of( + new org.apache.paimon.types.ArrayType( + new org.apache.paimon.types.DecimalType(10, 2)), + // system columns + new org.apache.paimon.types.IntType(), + new org.apache.paimon.types.BigIntType(), + new org.apache.paimon.types.LocalZonedTimestampType(3)); + + FlussRecordAsPaimonRow flussRecordAsPaimonRow = + new FlussRecordAsPaimonRow(tableBucket, tableRowType); + long logOffset = 0; + long timeStamp = System.currentTimeMillis(); + GenericRow genericRow = new GenericRow(1); + genericRow.setField( + 0, + new GenericArray( + new Object[] { + Decimal.fromBigDecimal(new BigDecimal("123.45"), 10, 2), + Decimal.fromBigDecimal(new BigDecimal("678.90"), 10, 2) + })); + + LogRecord logRecord = new GenericRecord(logOffset, timeStamp, APPEND_ONLY, genericRow); + flussRecordAsPaimonRow.setFlussRecord(logRecord); + + InternalArray array = flussRecordAsPaimonRow.getArray(0); + assertThat(array.size()).isEqualTo(2); + assertThat(array.getDecimal(0, 10, 2).toBigDecimal()).isEqualTo(new BigDecimal("123.45")); + assertThat(array.getDecimal(1, 10, 2).toBigDecimal()).isEqualTo(new BigDecimal("678.90")); + } + + @Test + void testArrayWithTimestampElements() { + int tableBucket = 0; + RowType tableRowType = + RowType.of( + new org.apache.paimon.types.ArrayType( + new org.apache.paimon.types.TimestampType(3)), + // system columns + new org.apache.paimon.types.IntType(), + new org.apache.paimon.types.BigIntType(), + new org.apache.paimon.types.LocalZonedTimestampType(3)); + + FlussRecordAsPaimonRow flussRecordAsPaimonRow = + new FlussRecordAsPaimonRow(tableBucket, tableRowType); + long logOffset = 0; + long timeStamp = System.currentTimeMillis(); + GenericRow genericRow = new GenericRow(1); + genericRow.setField( + 0, + new GenericArray( + new Object[] { + TimestampNtz.fromMillis(1698235273182L), + TimestampNtz.fromMillis(1698235274000L) + })); + + LogRecord logRecord = new GenericRecord(logOffset, timeStamp, APPEND_ONLY, genericRow); + flussRecordAsPaimonRow.setFlussRecord(logRecord); + + InternalArray array = flussRecordAsPaimonRow.getArray(0); + assertThat(array.size()).isEqualTo(2); + assertThat(array.getTimestamp(0, 3).getMillisecond()).isEqualTo(1698235273182L); + assertThat(array.getTimestamp(1, 3).getMillisecond()).isEqualTo(1698235274000L); + } + + @Test + void testArrayWithBinaryElements() { + int tableBucket = 0; + RowType tableRowType = + RowType.of( + new org.apache.paimon.types.ArrayType( + new org.apache.paimon.types.VarBinaryType()), + // system columns + new org.apache.paimon.types.IntType(), + new org.apache.paimon.types.BigIntType(), + new org.apache.paimon.types.LocalZonedTimestampType(3)); + + FlussRecordAsPaimonRow flussRecordAsPaimonRow = + new FlussRecordAsPaimonRow(tableBucket, tableRowType); + long logOffset = 0; + long timeStamp = System.currentTimeMillis(); + GenericRow genericRow = new GenericRow(1); + genericRow.setField( + 0, new GenericArray(new Object[] {new byte[] {1, 2, 3}, new byte[] {4, 5, 6, 7}})); + + LogRecord logRecord = new GenericRecord(logOffset, timeStamp, APPEND_ONLY, genericRow); + flussRecordAsPaimonRow.setFlussRecord(logRecord); + + InternalArray array = flussRecordAsPaimonRow.getArray(0); + assertThat(array.size()).isEqualTo(2); + assertThat(array.getBinary(0)).isEqualTo(new byte[] {1, 2, 3}); + assertThat(array.getBinary(1)).isEqualTo(new byte[] {4, 5, 6, 7}); + } + + @Test + void testNullArray() { + int tableBucket = 0; + RowType tableRowType = + RowType.of( + new org.apache.paimon.types.ArrayType(new org.apache.paimon.types.IntType()) + .nullable(), + // system columns + new org.apache.paimon.types.IntType(), + new org.apache.paimon.types.BigIntType(), + new org.apache.paimon.types.LocalZonedTimestampType(3)); + + FlussRecordAsPaimonRow flussRecordAsPaimonRow = + new FlussRecordAsPaimonRow(tableBucket, tableRowType); + long logOffset = 0; + long timeStamp = System.currentTimeMillis(); + GenericRow genericRow = new GenericRow(1); + genericRow.setField(0, null); + + LogRecord logRecord = new GenericRecord(logOffset, timeStamp, APPEND_ONLY, genericRow); + flussRecordAsPaimonRow.setFlussRecord(logRecord); + + assertThat(flussRecordAsPaimonRow.isNullAt(0)).isTrue(); + } + + @Test + void testArrayWithNullableElements() { + int tableBucket = 0; + RowType tableRowType = + RowType.of( + new org.apache.paimon.types.ArrayType( + new org.apache.paimon.types.IntType().nullable()), + // system columns + new org.apache.paimon.types.IntType(), + new org.apache.paimon.types.BigIntType(), + new org.apache.paimon.types.LocalZonedTimestampType(3)); + + FlussRecordAsPaimonRow flussRecordAsPaimonRow = + new FlussRecordAsPaimonRow(tableBucket, tableRowType); + long logOffset = 0; + long timeStamp = System.currentTimeMillis(); + GenericRow genericRow = new GenericRow(1); + genericRow.setField(0, new GenericArray(new Object[] {1, null, 3})); + + LogRecord logRecord = new GenericRecord(logOffset, timeStamp, APPEND_ONLY, genericRow); + flussRecordAsPaimonRow.setFlussRecord(logRecord); + + InternalArray array = flussRecordAsPaimonRow.getArray(0); + assertThat(array.size()).isEqualTo(3); + assertThat(array.getInt(0)).isEqualTo(1); + assertThat(array.isNullAt(1)).isTrue(); + assertThat(array.getInt(2)).isEqualTo(3); + } + + @Test + void testEmptyArray() { + int tableBucket = 0; + RowType tableRowType = + RowType.of( + new org.apache.paimon.types.ArrayType( + new org.apache.paimon.types.IntType()), + // system columns + new org.apache.paimon.types.IntType(), + new org.apache.paimon.types.BigIntType(), + new org.apache.paimon.types.LocalZonedTimestampType(3)); + + FlussRecordAsPaimonRow flussRecordAsPaimonRow = + new FlussRecordAsPaimonRow(tableBucket, tableRowType); + long logOffset = 0; + long timeStamp = System.currentTimeMillis(); + GenericRow genericRow = new GenericRow(1); + genericRow.setField(0, new GenericArray(new int[] {})); + + LogRecord logRecord = new GenericRecord(logOffset, timeStamp, APPEND_ONLY, genericRow); + flussRecordAsPaimonRow.setFlussRecord(logRecord); + + InternalArray array = flussRecordAsPaimonRow.getArray(0); + assertThat(array).isNotNull(); + assertThat(array.size()).isEqualTo(0); + } } From 245e87d620d1e7ff4dbdf13c078e6a05105593fc Mon Sep 17 00:00:00 2001 From: luoyuxia Date: Thu, 25 Dec 2025 11:54:55 +0800 Subject: [PATCH 4/4] improve test --- .../source/FlussArrayAsPaimonArray.java | 50 +- .../paimon/source/FlussRowAsPaimonRow.java | 7 +- .../FlinkUnionReadPrimaryKeyTableITCase.java | 35 +- .../source/FlussRowAsPaimonRowTest.java | 395 +++++---------- .../paimon/utils/PaimonRowAsFlussRowTest.java | 454 ++++++------------ 5 files changed, 331 insertions(+), 610 deletions(-) diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/FlussArrayAsPaimonArray.java b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/FlussArrayAsPaimonArray.java index 83fac62551..37438582ba 100644 --- a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/FlussArrayAsPaimonArray.java +++ b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/FlussArrayAsPaimonArray.java @@ -17,6 +17,7 @@ package org.apache.fluss.lake.paimon.source; +import org.apache.fluss.row.TimestampLtz; import org.apache.fluss.row.TimestampNtz; import org.apache.paimon.data.BinaryString; @@ -26,14 +27,19 @@ import org.apache.paimon.data.InternalRow; import org.apache.paimon.data.Timestamp; import org.apache.paimon.data.variant.Variant; +import org.apache.paimon.types.ArrayType; +import org.apache.paimon.types.DataType; /** Adapter class for converting Fluss InternalArray to Paimon InternalArray. */ public class FlussArrayAsPaimonArray implements InternalArray { private final org.apache.fluss.row.InternalArray flussArray; + private final DataType elementType; - public FlussArrayAsPaimonArray(org.apache.fluss.row.InternalArray flussArray) { + public FlussArrayAsPaimonArray( + org.apache.fluss.row.InternalArray flussArray, DataType elementType) { this.flussArray = flussArray; + this.elementType = elementType; } @Override @@ -99,13 +105,36 @@ public Decimal getDecimal(int pos, int precision, int scale) { @Override public Timestamp getTimestamp(int pos, int precision) { // Default to TIMESTAMP_WITHOUT_TIME_ZONE behavior for arrays - if (TimestampNtz.isCompact(precision)) { - return Timestamp.fromEpochMillis( - flussArray.getTimestampNtz(pos, precision).getMillisecond()); - } else { - TimestampNtz timestampNtz = flussArray.getTimestampNtz(pos, precision); - return Timestamp.fromEpochMillis( - timestampNtz.getMillisecond(), timestampNtz.getNanoOfMillisecond()); + switch (elementType.getTypeRoot()) { + case TIMESTAMP_WITHOUT_TIME_ZONE: + if (TimestampNtz.isCompact(precision)) { + return Timestamp.fromEpochMillis( + flussArray.getTimestampNtz(pos, precision).getMillisecond()); + } else { + TimestampNtz timestampNtz = flussArray.getTimestampNtz(pos, precision); + return Timestamp.fromEpochMillis( + timestampNtz.getMillisecond(), timestampNtz.getNanoOfMillisecond()); + } + + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + if (TimestampLtz.isCompact(precision)) { + return Timestamp.fromEpochMillis( + flussArray.getTimestampLtz(pos, precision).getEpochMillisecond()); + } else { + TimestampLtz timestampLtz = flussArray.getTimestampLtz(pos, precision); + return Timestamp.fromEpochMillis( + timestampLtz.getEpochMillisecond(), + timestampLtz.getNanoOfMillisecond()); + } + default: + throw new UnsupportedOperationException( + "Unsupported array element type for getTimestamp. " + + "Only TIMESTAMP_WITHOUT_TIME_ZONE and " + + "TIMESTAMP_WITH_LOCAL_TIME_ZONE are supported, but got: " + + elementType.getTypeRoot() + + " (" + + elementType + + ")."); } } @@ -123,7 +152,10 @@ public Variant getVariant(int pos) { @Override public InternalArray getArray(int pos) { org.apache.fluss.row.InternalArray innerArray = flussArray.getArray(pos); - return innerArray == null ? null : new FlussArrayAsPaimonArray(innerArray); + return innerArray == null + ? null + : new FlussArrayAsPaimonArray( + innerArray, ((ArrayType) elementType).getElementType()); } @Override diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/FlussRowAsPaimonRow.java b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/FlussRowAsPaimonRow.java index 21fabe0636..e2df08cc3d 100644 --- a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/FlussRowAsPaimonRow.java +++ b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/FlussRowAsPaimonRow.java @@ -27,6 +27,7 @@ import org.apache.paimon.data.InternalRow; import org.apache.paimon.data.Timestamp; import org.apache.paimon.data.variant.Variant; +import org.apache.paimon.types.ArrayType; import org.apache.paimon.types.DataType; import org.apache.paimon.types.RowKind; import org.apache.paimon.types.RowType; @@ -160,7 +161,11 @@ public Variant getVariant(int i) { @Override public InternalArray getArray(int pos) { org.apache.fluss.row.InternalArray flussArray = internalRow.getArray(pos); - return flussArray == null ? null : new FlussArrayAsPaimonArray(flussArray); + return flussArray == null + ? null + : new FlussArrayAsPaimonArray( + flussArray, + ((ArrayType) tableRowType.getField(pos).type()).getElementType()); } @Override diff --git a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java index 7c8c2d698d..df57974bff 100644 --- a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java +++ b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java @@ -27,6 +27,7 @@ import org.apache.fluss.metadata.TablePath; import org.apache.fluss.row.BinaryString; import org.apache.fluss.row.Decimal; +import org.apache.fluss.row.GenericArray; import org.apache.fluss.row.GenericRow; import org.apache.fluss.row.InternalRow; import org.apache.fluss.row.TimestampLtz; @@ -132,7 +133,7 @@ void testUnionReadFullType(Boolean isPartitioned) throws Exception { String partitionName = isPartitioned ? waitUntilPartitions(t1).values().iterator().next() : null; if (partitionName != null) { - queryFilterStr = queryFilterStr + " and c16= '" + partitionName + "'"; + queryFilterStr = queryFilterStr + " and c17= '" + partitionName + "'"; } List expectedRows = new ArrayList<>(); @@ -155,6 +156,7 @@ void testUnionReadFullType(Boolean isPartitioned) throws Exception { TimestampNtz.fromMillis(1698235273183L, 0), TimestampNtz.fromMillis(1698235273183L, 6000), new byte[] {1, 2, 3, 4}, + new float[] {1.1f, 1.2f, 1.3f}, partition)); expectedRows.add( @@ -174,6 +176,7 @@ void testUnionReadFullType(Boolean isPartitioned) throws Exception { TimestampNtz.fromMillis(1698235273201L), TimestampNtz.fromMillis(1698235273201L, 6000), new byte[] {1, 2, 3, 4}, + new float[] {1.1f, 1.2f, 1.3f}, partition)); } } else { @@ -195,6 +198,7 @@ void testUnionReadFullType(Boolean isPartitioned) throws Exception { TimestampNtz.fromMillis(1698235273183L, 0), TimestampNtz.fromMillis(1698235273183L, 6000), new byte[] {1, 2, 3, 4}, + new float[] {1.1f, 1.2f, 1.3f}, null), Row.of( true, @@ -212,6 +216,7 @@ void testUnionReadFullType(Boolean isPartitioned) throws Exception { TimestampNtz.fromMillis(1698235273201L), TimestampNtz.fromMillis(1698235273201L, 6000), new byte[] {1, 2, 3, 4}, + new float[] {1.1f, 1.2f, 1.3f}, null)); } tableResult = @@ -225,7 +230,7 @@ void testUnionReadFullType(Boolean isPartitioned) throws Exception { row -> { boolean isMatch = row.getField(3).equals(30); if (partitionName != null) { - isMatch = isMatch && row.getField(15).equals(partitionName); + isMatch = isMatch && row.getField(16).equals(partitionName); } return isMatch; }) @@ -300,6 +305,7 @@ void testUnionReadFullType(Boolean isPartitioned) throws Exception { TimestampNtz.fromMillis(1698235273183L), TimestampNtz.fromMillis(1698235273183L, 6000), new byte[] {1, 2, 3, 4}, + new float[] {1.1f, 1.2f, 1.3f}, partition)); expectedRows.add( @@ -319,6 +325,7 @@ void testUnionReadFullType(Boolean isPartitioned) throws Exception { TimestampNtz.fromMillis(1698235273501L), TimestampNtz.fromMillis(1698235273501L, 8000), new byte[] {5, 6, 7, 8}, + new float[] {2.1f, 2.2f, 2.3f}, partition)); } } else { @@ -340,6 +347,7 @@ void testUnionReadFullType(Boolean isPartitioned) throws Exception { TimestampNtz.fromMillis(1698235273183L), TimestampNtz.fromMillis(1698235273183L, 6000), new byte[] {1, 2, 3, 4}, + new float[] {1.1f, 1.2f, 1.3f}, null), Row.of( true, @@ -357,6 +365,7 @@ void testUnionReadFullType(Boolean isPartitioned) throws Exception { TimestampNtz.fromMillis(1698235273501L), TimestampNtz.fromMillis(1698235273501L, 8000), new byte[] {5, 6, 7, 8}, + new float[] {2.1f, 2.2f, 2.3f}, null)); } @@ -473,6 +482,7 @@ void testUnionReadInStreamMode(Boolean isPartitioned) throws Exception { TimestampNtz.fromMillis(1698235273183L), TimestampNtz.fromMillis(1698235273183L, 6000), new byte[] {1, 2, 3, 4}, + new float[] {1.1f, 1.2f, 1.3f}, partition)); expectedRows.add( Row.of( @@ -491,6 +501,7 @@ void testUnionReadInStreamMode(Boolean isPartitioned) throws Exception { TimestampNtz.fromMillis(1698235273201L), TimestampNtz.fromMillis(1698235273201L, 6000), new byte[] {1, 2, 3, 4}, + new float[] {1.1f, 1.2f, 1.3f}, partition)); } } else { @@ -512,6 +523,7 @@ void testUnionReadInStreamMode(Boolean isPartitioned) throws Exception { TimestampNtz.fromMillis(1698235273183L), TimestampNtz.fromMillis(1698235273183L, 6000), new byte[] {1, 2, 3, 4}, + new float[] {1.1f, 1.2f, 1.3f}, null), Row.of( true, @@ -529,6 +541,7 @@ void testUnionReadInStreamMode(Boolean isPartitioned) throws Exception { TimestampNtz.fromMillis(1698235273201L), TimestampNtz.fromMillis(1698235273201L, 6000), new byte[] {1, 2, 3, 4}, + new float[] {1.1f, 1.2f, 1.3f}, null)); } @@ -571,6 +584,7 @@ void testUnionReadInStreamMode(Boolean isPartitioned) throws Exception { TimestampNtz.fromMillis(1698235273201L), TimestampNtz.fromMillis(1698235273201L, 6000), new byte[] {1, 2, 3, 4}, + new float[] {1.1f, 1.2f, 1.3f}, partition)); expectedRows2.add( Row.ofKind( @@ -590,6 +604,7 @@ void testUnionReadInStreamMode(Boolean isPartitioned) throws Exception { TimestampNtz.fromMillis(1698235273501L), TimestampNtz.fromMillis(1698235273501L, 8000), new byte[] {5, 6, 7, 8}, + new float[] {2.1f, 2.2f, 2.3f}, partition)); } } else { @@ -611,6 +626,7 @@ void testUnionReadInStreamMode(Boolean isPartitioned) throws Exception { TimestampNtz.fromMillis(1698235273201L), TimestampNtz.fromMillis(1698235273201L, 6000), new byte[] {1, 2, 3, 4}, + new float[] {1.1f, 1.2f, 1.3f}, null)); expectedRows2.add( Row.ofKind( @@ -630,6 +646,7 @@ void testUnionReadInStreamMode(Boolean isPartitioned) throws Exception { TimestampNtz.fromMillis(1698235273501L), TimestampNtz.fromMillis(1698235273501L, 8000), new byte[] {5, 6, 7, 8}, + new float[] {2.1f, 2.2f, 2.3f}, null)); } @@ -962,21 +979,21 @@ private String buildExpectedResult(boolean isPartitioned, int record1, int recor + "2023-10-25T12:01:13.182005Z, " + "2023-10-25T12:01:13.183, " + "2023-10-25T12:01:13.183006, " - + "[1, 2, 3, 4], %s]"); + + "[1, 2, 3, 4], [1.1, 1.2, 1.3], %s]"); records.add( "+I[true, 10, 20, 30, 40, 50.1, 60.0, another_string, 0.90, 100, " + "2023-10-25T12:01:13.200Z, " + "2023-10-25T12:01:13.200005Z, " + "2023-10-25T12:01:13.201, " + "2023-10-25T12:01:13.201006, " - + "[1, 2, 3, 4], %s]"); + + "[1, 2, 3, 4], [1.1, 1.2, 1.3], %s]"); records.add( "+I[true, 100, 200, 30, 400, 500.1, 600.0, another_string_2, 9.00, 1000, " + "2023-10-25T12:01:13.400Z, " + "2023-10-25T12:01:13.400007Z, " + "2023-10-25T12:01:13.501, " + "2023-10-25T12:01:13.501008, " - + "[5, 6, 7, 8], %s]"); + + "[5, 6, 7, 8], [2.1, 2.2, 2.3], %s]"); if (isPartitioned) { return String.format( @@ -1012,9 +1029,10 @@ protected long createPkTableFullType(TablePath tablePath, int bucketNum, boolean .column("c13", DataTypes.TIMESTAMP(3)) .column("c14", DataTypes.TIMESTAMP(6)) .column("c15", DataTypes.BINARY(4)) - .column("c16", DataTypes.STRING()); + .column("c16", DataTypes.ARRAY(DataTypes.FLOAT())) + .column("c17", DataTypes.STRING()); - return createPkTable(tablePath, bucketNum, isPartitioned, true, schemaBuilder, "c4", "c16"); + return createPkTable(tablePath, bucketNum, isPartitioned, true, schemaBuilder, "c4", "c17"); } protected long createSimplePkTable( @@ -1079,6 +1097,7 @@ private void writeFullTypeRow(TablePath tablePath, String partition) throws Exce TimestampNtz.fromMillis(1698235273501L), TimestampNtz.fromMillis(1698235273501L, 8000), new byte[] {5, 6, 7, 8}, + new GenericArray(new float[] {2.1f, 2.2f, 2.3f}), partition)); writeRows(tablePath, rows, false); } @@ -1101,6 +1120,7 @@ private static List generateKvRowsFullType(@Nullable String partiti TimestampNtz.fromMillis(1698235273183L), TimestampNtz.fromMillis(1698235273183L, 6000), new byte[] {1, 2, 3, 4}, + new GenericArray(new float[] {1.1f, 1.2f, 1.3f}), partition), row( true, @@ -1118,6 +1138,7 @@ private static List generateKvRowsFullType(@Nullable String partiti TimestampNtz.fromMillis(1698235273201L), TimestampNtz.fromMillis(1698235273201L, 6000), new byte[] {1, 2, 3, 4}, + new GenericArray(new float[] {1.1f, 1.2f, 1.3f}), partition)); } } diff --git a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/source/FlussRowAsPaimonRowTest.java b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/source/FlussRowAsPaimonRowTest.java index e8a89d8aa8..fd9bfdeb23 100644 --- a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/source/FlussRowAsPaimonRowTest.java +++ b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/source/FlussRowAsPaimonRowTest.java @@ -144,174 +144,7 @@ void testPrimaryKeyTableRecord() { } @Test - void testArrayTypeWithIntElements() { - RowType tableRowType = - RowType.of( - new org.apache.paimon.types.IntType(), - new org.apache.paimon.types.ArrayType( - new org.apache.paimon.types.IntType())); - - long logOffset = 0; - long timeStamp = System.currentTimeMillis(); - GenericRow genericRow = new GenericRow(2); - genericRow.setField(0, 42); - genericRow.setField(1, new GenericArray(new int[] {1, 2, 3, 4, 5})); - - LogRecord logRecord = new GenericRecord(logOffset, timeStamp, APPEND_ONLY, genericRow); - FlussRowAsPaimonRow flussRowAsPaimonRow = - new FlussRowAsPaimonRow(logRecord.getRow(), tableRowType); - - assertThat(flussRowAsPaimonRow.getInt(0)).isEqualTo(42); - InternalArray array = flussRowAsPaimonRow.getArray(1); - assertThat(array).isNotNull(); - assertThat(array.size()).isEqualTo(5); - assertThat(array.getInt(0)).isEqualTo(1); - assertThat(array.getInt(1)).isEqualTo(2); - assertThat(array.getInt(2)).isEqualTo(3); - assertThat(array.getInt(3)).isEqualTo(4); - assertThat(array.getInt(4)).isEqualTo(5); - } - - @Test - void testArrayTypeWithStringElements() { - RowType tableRowType = - RowType.of( - new org.apache.paimon.types.VarCharType(), - new org.apache.paimon.types.ArrayType( - new org.apache.paimon.types.VarCharType())); - - long logOffset = 0; - long timeStamp = System.currentTimeMillis(); - GenericRow genericRow = new GenericRow(2); - genericRow.setField(0, BinaryString.fromString("name")); - genericRow.setField( - 1, - new GenericArray( - new Object[] { - BinaryString.fromString("a"), - BinaryString.fromString("b"), - BinaryString.fromString("c") - })); - - LogRecord logRecord = new GenericRecord(logOffset, timeStamp, APPEND_ONLY, genericRow); - FlussRowAsPaimonRow flussRowAsPaimonRow = - new FlussRowAsPaimonRow(logRecord.getRow(), tableRowType); - - assertThat(flussRowAsPaimonRow.getString(0).toString()).isEqualTo("name"); - InternalArray array = flussRowAsPaimonRow.getArray(1); - assertThat(array).isNotNull(); - assertThat(array.size()).isEqualTo(3); - assertThat(array.getString(0).toString()).isEqualTo("a"); - assertThat(array.getString(1).toString()).isEqualTo("b"); - assertThat(array.getString(2).toString()).isEqualTo("c"); - } - - @Test - void testArrayTypeWithNullableElements() { - RowType tableRowType = - RowType.of( - new org.apache.paimon.types.ArrayType( - new org.apache.paimon.types.IntType().nullable())); - - long logOffset = 0; - long timeStamp = System.currentTimeMillis(); - GenericRow genericRow = new GenericRow(1); - genericRow.setField(0, new GenericArray(new Object[] {1, null, 3})); - - LogRecord logRecord = new GenericRecord(logOffset, timeStamp, APPEND_ONLY, genericRow); - FlussRowAsPaimonRow flussRowAsPaimonRow = - new FlussRowAsPaimonRow(logRecord.getRow(), tableRowType); - - InternalArray array = flussRowAsPaimonRow.getArray(0); - assertThat(array).isNotNull(); - assertThat(array.size()).isEqualTo(3); - assertThat(array.getInt(0)).isEqualTo(1); - assertThat(array.isNullAt(1)).isTrue(); - assertThat(array.getInt(2)).isEqualTo(3); - } - - @Test - void testNullArray() { - RowType tableRowType = - RowType.of( - new org.apache.paimon.types.ArrayType(new org.apache.paimon.types.IntType()) - .nullable()); - - long logOffset = 0; - long timeStamp = System.currentTimeMillis(); - GenericRow genericRow = new GenericRow(1); - genericRow.setField(0, null); - - LogRecord logRecord = new GenericRecord(logOffset, timeStamp, APPEND_ONLY, genericRow); - FlussRowAsPaimonRow flussRowAsPaimonRow = - new FlussRowAsPaimonRow(logRecord.getRow(), tableRowType); - - assertThat(flussRowAsPaimonRow.isNullAt(0)).isTrue(); - } - - @Test - void testNestedArrayType() { - // Test ARRAY> - RowType tableRowType = - RowType.of( - new org.apache.paimon.types.ArrayType( - new org.apache.paimon.types.ArrayType( - new org.apache.paimon.types.IntType()))); - - long logOffset = 0; - long timeStamp = System.currentTimeMillis(); - GenericRow genericRow = new GenericRow(1); - genericRow.setField( - 0, - new GenericArray( - new Object[] { - new GenericArray(new int[] {1, 2}), - new GenericArray(new int[] {3, 4, 5}) - })); - - LogRecord logRecord = new GenericRecord(logOffset, timeStamp, APPEND_ONLY, genericRow); - FlussRowAsPaimonRow flussRowAsPaimonRow = - new FlussRowAsPaimonRow(logRecord.getRow(), tableRowType); - - InternalArray outerArray = flussRowAsPaimonRow.getArray(0); - assertThat(outerArray).isNotNull(); - assertThat(outerArray.size()).isEqualTo(2); - - InternalArray innerArray1 = outerArray.getArray(0); - assertThat(innerArray1.size()).isEqualTo(2); - assertThat(innerArray1.getInt(0)).isEqualTo(1); - assertThat(innerArray1.getInt(1)).isEqualTo(2); - - InternalArray innerArray2 = outerArray.getArray(1); - assertThat(innerArray2.size()).isEqualTo(3); - assertThat(innerArray2.getInt(0)).isEqualTo(3); - assertThat(innerArray2.getInt(1)).isEqualTo(4); - assertThat(innerArray2.getInt(2)).isEqualTo(5); - } - - @Test - void testEmptyArray() { - RowType tableRowType = - RowType.of( - new org.apache.paimon.types.ArrayType( - new org.apache.paimon.types.IntType())); - - long logOffset = 0; - long timeStamp = System.currentTimeMillis(); - GenericRow genericRow = new GenericRow(1); - genericRow.setField(0, new GenericArray(new int[] {})); - - LogRecord logRecord = new GenericRecord(logOffset, timeStamp, APPEND_ONLY, genericRow); - FlussRowAsPaimonRow flussRowAsPaimonRow = - new FlussRowAsPaimonRow(logRecord.getRow(), tableRowType); - - InternalArray array = flussRowAsPaimonRow.getArray(0); - assertThat(array).isNotNull(); - assertThat(array.size()).isEqualTo(0); - } - - @Test - void testArrayWithAllPrimitiveTypes() { + void testArrayWithAllTypes() { RowType tableRowType = RowType.of( new org.apache.paimon.types.ArrayType( @@ -327,18 +160,90 @@ void testArrayWithAllPrimitiveTypes() { new org.apache.paimon.types.ArrayType( new org.apache.paimon.types.FloatType()), new org.apache.paimon.types.ArrayType( - new org.apache.paimon.types.DoubleType())); + new org.apache.paimon.types.DoubleType()), + new org.apache.paimon.types.ArrayType( + new org.apache.paimon.types.VarCharType(true, 30)), + new org.apache.paimon.types.ArrayType( + new org.apache.paimon.types.DecimalType(10, 2)), + new org.apache.paimon.types.ArrayType( + new org.apache.paimon.types.TimestampType(3)), + new org.apache.paimon.types.ArrayType( + new org.apache.paimon.types.LocalZonedTimestampType()), + new org.apache.paimon.types.ArrayType( + new org.apache.paimon.types.VarBinaryType()), + // array> + new org.apache.paimon.types.ArrayType( + new org.apache.paimon.types.ArrayType( + new org.apache.paimon.types.IntType()))); long logOffset = 0; long timeStamp = System.currentTimeMillis(); - GenericRow genericRow = new GenericRow(7); + GenericRow genericRow = new GenericRow(13); genericRow.setField(0, new GenericArray(new boolean[] {true, false, true})); genericRow.setField(1, new GenericArray(new byte[] {1, 2, 3})); genericRow.setField(2, new GenericArray(new short[] {100, 200, 300})); - genericRow.setField(3, new GenericArray(new int[] {1000, 2000, 3000})); + genericRow.setField(3, new GenericArray(new Object[] {1000, 2000, 3000})); genericRow.setField(4, new GenericArray(new long[] {10000L, 20000L, 30000L})); genericRow.setField(5, new GenericArray(new float[] {1.1f, 2.2f, 3.3f})); genericRow.setField(6, new GenericArray(new double[] {1.11, 2.22, 3.33})); + // String type + genericRow.setField( + 7, + new GenericArray( + new BinaryString[] { + BinaryString.fromString("hello"), + BinaryString.fromString("world"), + BinaryString.fromString("test") + })); + + // Decimal type + genericRow.setField( + 8, + new GenericArray( + new Object[] { + Decimal.fromBigDecimal(new BigDecimal("123.45"), 10, 2), + Decimal.fromBigDecimal(new BigDecimal("678.90"), 10, 2), + Decimal.fromBigDecimal(new BigDecimal("999.99"), 10, 2) + })); + + // Timestamp type + genericRow.setField( + 9, + new GenericArray( + new Object[] { + TimestampNtz.fromMillis(1698235273182L), + TimestampNtz.fromMillis(1698235274000L), + TimestampNtz.fromMillis(1698235275000L) + })); + + // TimestampLTZ type + genericRow.setField( + 10, + new GenericArray( + new Object[] { + TimestampLtz.fromEpochMillis(1698235273182L), + TimestampLtz.fromEpochMillis(1698235274000L), + TimestampLtz.fromEpochMillis(1698235275000L) + })); + + // Binary type + genericRow.setField( + 11, + new GenericArray( + new Object[] { + new byte[] {1, 2, 3}, + new byte[] {4, 5, 6, 7}, + new byte[] {8, 9, 10, 11, 12} + })); + + // array> type + genericRow.setField( + 12, + new GenericArray( + new Object[] { + new GenericArray(new int[] {1, 2}), + new GenericArray(new int[] {3, 4, 5}) + })); LogRecord logRecord = new GenericRecord(logOffset, timeStamp, APPEND_ONLY, genericRow); FlussRowAsPaimonRow flussRowAsPaimonRow = @@ -347,141 +252,85 @@ void testArrayWithAllPrimitiveTypes() { // Test boolean array InternalArray boolArray = flussRowAsPaimonRow.getArray(0); assertThat(boolArray.size()).isEqualTo(3); - assertThat(boolArray.getBoolean(0)).isTrue(); - assertThat(boolArray.getBoolean(1)).isFalse(); - assertThat(boolArray.getBoolean(2)).isTrue(); assertThat(boolArray.toBooleanArray()).isEqualTo(new boolean[] {true, false, true}); // Test byte array InternalArray byteArray = flussRowAsPaimonRow.getArray(1); assertThat(byteArray.size()).isEqualTo(3); - assertThat(byteArray.getByte(0)).isEqualTo((byte) 1); - assertThat(byteArray.getByte(1)).isEqualTo((byte) 2); - assertThat(byteArray.getByte(2)).isEqualTo((byte) 3); assertThat(byteArray.toByteArray()).isEqualTo(new byte[] {1, 2, 3}); // Test short array InternalArray shortArray = flussRowAsPaimonRow.getArray(2); assertThat(shortArray.size()).isEqualTo(3); - assertThat(shortArray.getShort(0)).isEqualTo((short) 100); - assertThat(shortArray.getShort(1)).isEqualTo((short) 200); - assertThat(shortArray.getShort(2)).isEqualTo((short) 300); assertThat(shortArray.toShortArray()).isEqualTo(new short[] {100, 200, 300}); // Test int array InternalArray intArray = flussRowAsPaimonRow.getArray(3); assertThat(intArray.size()).isEqualTo(3); - assertThat(intArray.getInt(0)).isEqualTo(1000); - assertThat(intArray.getInt(1)).isEqualTo(2000); - assertThat(intArray.getInt(2)).isEqualTo(3000); assertThat(intArray.toIntArray()).isEqualTo(new int[] {1000, 2000, 3000}); // Test long array InternalArray longArray = flussRowAsPaimonRow.getArray(4); assertThat(longArray.size()).isEqualTo(3); - assertThat(longArray.getLong(0)).isEqualTo(10000L); - assertThat(longArray.getLong(1)).isEqualTo(20000L); - assertThat(longArray.getLong(2)).isEqualTo(30000L); assertThat(longArray.toLongArray()).isEqualTo(new long[] {10000L, 20000L, 30000L}); // Test float array InternalArray floatArray = flussRowAsPaimonRow.getArray(5); assertThat(floatArray.size()).isEqualTo(3); - assertThat(floatArray.getFloat(0)).isEqualTo(1.1f); - assertThat(floatArray.getFloat(1)).isEqualTo(2.2f); - assertThat(floatArray.getFloat(2)).isEqualTo(3.3f); assertThat(floatArray.toFloatArray()).isEqualTo(new float[] {1.1f, 2.2f, 3.3f}); // Test double array InternalArray doubleArray = flussRowAsPaimonRow.getArray(6); assertThat(doubleArray.size()).isEqualTo(3); - assertThat(doubleArray.getDouble(0)).isEqualTo(1.11); - assertThat(doubleArray.getDouble(1)).isEqualTo(2.22); - assertThat(doubleArray.getDouble(2)).isEqualTo(3.33); assertThat(doubleArray.toDoubleArray()).isEqualTo(new double[] {1.11, 2.22, 3.33}); - } - - @Test - void testArrayWithDecimalElements() { - RowType tableRowType = - RowType.of( - new org.apache.paimon.types.ArrayType( - new org.apache.paimon.types.DecimalType(10, 2))); - - long logOffset = 0; - long timeStamp = System.currentTimeMillis(); - GenericRow genericRow = new GenericRow(1); - genericRow.setField( - 0, - new GenericArray( - new Object[] { - Decimal.fromBigDecimal(new BigDecimal("123.45"), 10, 2), - Decimal.fromBigDecimal(new BigDecimal("678.90"), 10, 2) - })); - - LogRecord logRecord = new GenericRecord(logOffset, timeStamp, APPEND_ONLY, genericRow); - FlussRowAsPaimonRow flussRowAsPaimonRow = - new FlussRowAsPaimonRow(logRecord.getRow(), tableRowType); - - InternalArray array = flussRowAsPaimonRow.getArray(0); - assertThat(array.size()).isEqualTo(2); - assertThat(array.getDecimal(0, 10, 2).toBigDecimal()).isEqualTo(new BigDecimal("123.45")); - assertThat(array.getDecimal(1, 10, 2).toBigDecimal()).isEqualTo(new BigDecimal("678.90")); - } - - @Test - void testArrayWithTimestampElements() { - RowType tableRowType = - RowType.of( - new org.apache.paimon.types.ArrayType( - new org.apache.paimon.types.TimestampType(3))); - - long logOffset = 0; - long timeStamp = System.currentTimeMillis(); - GenericRow genericRow = new GenericRow(1); - genericRow.setField( - 0, - new GenericArray( - new Object[] { - TimestampNtz.fromMillis(1698235273182L), - TimestampNtz.fromMillis(1698235274000L) - })); - LogRecord logRecord = new GenericRecord(logOffset, timeStamp, APPEND_ONLY, genericRow); - FlussRowAsPaimonRow flussRowAsPaimonRow = - new FlussRowAsPaimonRow(logRecord.getRow(), tableRowType); - - InternalArray array = flussRowAsPaimonRow.getArray(0); - assertThat(array.size()).isEqualTo(2); - assertThat(array.getTimestamp(0, 3).getMillisecond()).isEqualTo(1698235273182L); - assertThat(array.getTimestamp(1, 3).getMillisecond()).isEqualTo(1698235274000L); - } - - @Test - void testArrayWithBinaryElements() { - RowType tableRowType = - RowType.of( - new org.apache.paimon.types.ArrayType( - new org.apache.paimon.types.VarBinaryType())); - - long logOffset = 0; - long timeStamp = System.currentTimeMillis(); - GenericRow genericRow = new GenericRow(1); - genericRow.setField( - 0, - new GenericArray( - new Object[] { - new byte[] {1, 2, 3}, - new byte[] {4, 5, 6, 7} - })); + // Test string array + InternalArray stringArray = flussRowAsPaimonRow.getArray(7); + assertThat(stringArray.size()).isEqualTo(3); + assertThat(stringArray.getString(0).toString()).isEqualTo("hello"); + assertThat(stringArray.getString(1).toString()).isEqualTo("world"); + assertThat(stringArray.getString(2).toString()).isEqualTo("test"); + + // Test decimal array + InternalArray decimalArray = flussRowAsPaimonRow.getArray(8); + assertThat(decimalArray.size()).isEqualTo(3); + assertThat(decimalArray.getDecimal(0, 10, 2).toBigDecimal()) + .isEqualTo(new BigDecimal("123.45")); + assertThat(decimalArray.getDecimal(1, 10, 2).toBigDecimal()) + .isEqualTo(new BigDecimal("678.90")); + assertThat(decimalArray.getDecimal(2, 10, 2).toBigDecimal()) + .isEqualTo(new BigDecimal("999.99")); + + // Test timestamp array + InternalArray timestampArray = flussRowAsPaimonRow.getArray(9); + assertThat(timestampArray.size()).isEqualTo(3); + assertThat(timestampArray.getTimestamp(0, 3).getMillisecond()).isEqualTo(1698235273182L); + assertThat(timestampArray.getTimestamp(1, 3).getMillisecond()).isEqualTo(1698235274000L); + assertThat(timestampArray.getTimestamp(2, 3).getMillisecond()).isEqualTo(1698235275000L); + + // test timestamp_ltz array + timestampArray = flussRowAsPaimonRow.getArray(10); + assertThat(timestampArray.size()).isEqualTo(3); + assertThat(timestampArray.getTimestamp(0, 3).getMillisecond()).isEqualTo(1698235273182L); + assertThat(timestampArray.getTimestamp(1, 3).getMillisecond()).isEqualTo(1698235274000L); + assertThat(timestampArray.getTimestamp(2, 3).getMillisecond()).isEqualTo(1698235275000L); + + // Test binary array + InternalArray binaryArray = flussRowAsPaimonRow.getArray(11); + assertThat(binaryArray.size()).isEqualTo(3); + assertThat(binaryArray.getBinary(0)).isEqualTo(new byte[] {1, 2, 3}); + assertThat(binaryArray.getBinary(1)).isEqualTo(new byte[] {4, 5, 6, 7}); + assertThat(binaryArray.getBinary(2)).isEqualTo(new byte[] {8, 9, 10, 11, 12}); + + // Also test array> (nested int array) + InternalArray outerArray = flussRowAsPaimonRow.getArray(12); + assertThat(outerArray).isNotNull(); + assertThat(outerArray.size()).isEqualTo(2); - LogRecord logRecord = new GenericRecord(logOffset, timeStamp, APPEND_ONLY, genericRow); - FlussRowAsPaimonRow flussRowAsPaimonRow = - new FlussRowAsPaimonRow(logRecord.getRow(), tableRowType); + InternalArray innerArray1 = outerArray.getArray(0); + assertThat(innerArray1.toIntArray()).isEqualTo(new int[] {1, 2}); - InternalArray array = flussRowAsPaimonRow.getArray(0); - assertThat(array.size()).isEqualTo(2); - assertThat(array.getBinary(0)).isEqualTo(new byte[] {1, 2, 3}); - assertThat(array.getBinary(1)).isEqualTo(new byte[] {4, 5, 6, 7}); + InternalArray innerArray2 = outerArray.getArray(1); + assertThat(innerArray2.toIntArray()).isEqualTo(new int[] {3, 4, 5}); } } diff --git a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/utils/PaimonRowAsFlussRowTest.java b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/utils/PaimonRowAsFlussRowTest.java index 20d34f83d0..3c1e2c5519 100644 --- a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/utils/PaimonRowAsFlussRowTest.java +++ b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/utils/PaimonRowAsFlussRowTest.java @@ -20,365 +20,179 @@ import org.apache.fluss.row.InternalArray; import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.Decimal; import org.apache.paimon.data.GenericArray; import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.Timestamp; import org.junit.jupiter.api.Test; +import java.math.BigDecimal; + import static org.assertj.core.api.Assertions.assertThat; /** Test for {@link PaimonRowAsFlussRow}. */ class PaimonRowAsFlussRowTest { @Test - void testArrayTypeWithIntElements() { - // Create a Paimon row with INT and ARRAY columns - // Note: PaimonRowAsFlussRow expects system columns at the end, so we add 3 dummy system - // columns - GenericRow paimonRow = new GenericRow(4); - paimonRow.setField(0, 42); - paimonRow.setField(1, new GenericArray(new int[] {1, 2, 3, 4, 5})); - // System columns: __bucket, __offset, __timestamp - paimonRow.setField(2, 0); - paimonRow.setField(3, 0L); - - PaimonRowAsFlussRow flussRow = new PaimonRowAsFlussRow(paimonRow); - - assertThat(flussRow.getInt(0)).isEqualTo(42); - InternalArray array = flussRow.getArray(1); - assertThat(array).isNotNull(); - assertThat(array.size()).isEqualTo(5); - assertThat(array.getInt(0)).isEqualTo(1); - assertThat(array.getInt(1)).isEqualTo(2); - assertThat(array.getInt(2)).isEqualTo(3); - assertThat(array.getInt(3)).isEqualTo(4); - assertThat(array.getInt(4)).isEqualTo(5); - } - - @Test - void testArrayTypeWithStringElements() { - GenericRow paimonRow = new GenericRow(5); - paimonRow.setField(0, BinaryString.fromString("name")); + void testArrayWithAllTypes() { + GenericRow paimonRow = new GenericRow(16); + // Primitive types + paimonRow.setField(0, new GenericArray(new boolean[] {true, false, true})); + paimonRow.setField(1, new GenericArray(new byte[] {1, 2, 3})); + paimonRow.setField(2, new GenericArray(new short[] {100, 200, 300})); + paimonRow.setField(3, new GenericArray(new int[] {1000, 2000, 3000})); + paimonRow.setField(4, new GenericArray(new long[] {10000L, 20000L, 30000L})); + paimonRow.setField(5, new GenericArray(new float[] {1.1f, 2.2f, 3.3f})); + paimonRow.setField(6, new GenericArray(new double[] {1.11, 2.22, 3.33})); + // String type paimonRow.setField( - 1, + 7, new GenericArray( new BinaryString[] { - BinaryString.fromString("a"), - BinaryString.fromString("b"), - BinaryString.fromString("c") + BinaryString.fromString("hello"), + BinaryString.fromString("world"), + BinaryString.fromString("test") })); - // System columns: __bucket, __offset, __timestamp - paimonRow.setField(2, 0); - paimonRow.setField(3, 0L); - paimonRow.setField(4, 0L); - - PaimonRowAsFlussRow flussRow = new PaimonRowAsFlussRow(paimonRow); - - assertThat(flussRow.getString(0).toString()).isEqualTo("name"); - InternalArray array = flussRow.getArray(1); - assertThat(array).isNotNull(); - assertThat(array.size()).isEqualTo(3); - assertThat(array.getString(0).toString()).isEqualTo("a"); - assertThat(array.getString(1).toString()).isEqualTo("b"); - assertThat(array.getString(2).toString()).isEqualTo("c"); - } - - @Test - void testArrayTypeWithNullableElements() { - GenericRow paimonRow = new GenericRow(4); - paimonRow.setField(0, new GenericArray(new Object[] {1, null, 3})); - // System columns: __bucket, __offset, __timestamp - paimonRow.setField(1, 0); - paimonRow.setField(2, 0L); - paimonRow.setField(3, 0L); - - PaimonRowAsFlussRow flussRow = new PaimonRowAsFlussRow(paimonRow); - - InternalArray array = flussRow.getArray(0); - assertThat(array).isNotNull(); - assertThat(array.size()).isEqualTo(3); - assertThat(array.getInt(0)).isEqualTo(1); - assertThat(array.isNullAt(1)).isTrue(); - assertThat(array.getInt(2)).isEqualTo(3); - } - - @Test - void testNullArray() { - GenericRow paimonRow = new GenericRow(4); - paimonRow.setField(0, null); - // System columns: __bucket, __offset, __timestamp - paimonRow.setField(1, 0); - paimonRow.setField(2, 0L); - paimonRow.setField(3, 0L); - - PaimonRowAsFlussRow flussRow = new PaimonRowAsFlussRow(paimonRow); - - assertThat(flussRow.isNullAt(0)).isTrue(); - } - - @Test - void testNestedArrayType() { - // Test ARRAY> - GenericRow paimonRow = new GenericRow(4); + // Decimal type + paimonRow.setField( + 8, + new GenericArray( + new Object[] { + Decimal.fromBigDecimal(new BigDecimal("123.45"), 10, 2), + Decimal.fromBigDecimal(new BigDecimal("678.90"), 10, 2), + Decimal.fromBigDecimal(new BigDecimal("999.99"), 10, 2) + })); + // Timestamp type + paimonRow.setField( + 9, + new GenericArray( + new Object[] { + Timestamp.fromEpochMillis(1698235273182L), + Timestamp.fromEpochMillis(1698235274000L), + Timestamp.fromEpochMillis(1698235275000L) + })); + // TimestampLTZ type + paimonRow.setField( + 10, + new GenericArray( + new Object[] { + Timestamp.fromEpochMillis(1698235273182L), + Timestamp.fromEpochMillis(1698235274000L), + Timestamp.fromEpochMillis(1698235275000L) + })); + // Binary type paimonRow.setField( - 0, + 11, + new GenericArray( + new Object[] { + new byte[] {1, 2, 3}, + new byte[] {4, 5, 6, 7}, + new byte[] {8, 9, 10, 11, 12} + })); + // array> type + paimonRow.setField( + 12, new GenericArray( new Object[] { new GenericArray(new int[] {1, 2}), new GenericArray(new int[] {3, 4, 5}) })); // System columns: __bucket, __offset, __timestamp - paimonRow.setField(1, 0); - paimonRow.setField(2, 0L); - paimonRow.setField(3, 0L); - - PaimonRowAsFlussRow flussRow = new PaimonRowAsFlussRow(paimonRow); - - InternalArray outerArray = flussRow.getArray(0); - assertThat(outerArray).isNotNull(); - assertThat(outerArray.size()).isEqualTo(2); - - InternalArray innerArray1 = outerArray.getArray(0); - assertThat(innerArray1.size()).isEqualTo(2); - assertThat(innerArray1.getInt(0)).isEqualTo(1); - assertThat(innerArray1.getInt(1)).isEqualTo(2); - - InternalArray innerArray2 = outerArray.getArray(1); - assertThat(innerArray2.size()).isEqualTo(3); - assertThat(innerArray2.getInt(0)).isEqualTo(3); - assertThat(innerArray2.getInt(1)).isEqualTo(4); - assertThat(innerArray2.getInt(2)).isEqualTo(5); - } - - @Test - void testEmptyArray() { - GenericRow paimonRow = new GenericRow(4); - paimonRow.setField(0, new GenericArray(new int[] {})); - // System columns: __bucket, __offset, __timestamp - paimonRow.setField(1, 0); - paimonRow.setField(2, 0L); - paimonRow.setField(3, 0L); - - PaimonRowAsFlussRow flussRow = new PaimonRowAsFlussRow(paimonRow); - - InternalArray array = flussRow.getArray(0); - assertThat(array).isNotNull(); - assertThat(array.size()).isEqualTo(0); - } - - @Test - void testReplaceRow() { - GenericRow paimonRow1 = new GenericRow(4); - paimonRow1.setField(0, new GenericArray(new int[] {1, 2, 3})); - paimonRow1.setField(1, 0); - paimonRow1.setField(2, 0L); - paimonRow1.setField(3, 0L); - - GenericRow paimonRow2 = new GenericRow(4); - paimonRow2.setField(0, new GenericArray(new int[] {4, 5})); - paimonRow2.setField(1, 0); - paimonRow2.setField(2, 0L); - paimonRow2.setField(3, 0L); - - PaimonRowAsFlussRow flussRow = new PaimonRowAsFlussRow(paimonRow1); - assertThat(flussRow.getArray(0).size()).isEqualTo(3); - - flussRow.replaceRow(paimonRow2); - assertThat(flussRow.getArray(0).size()).isEqualTo(2); - } - - @Test - void testArrayWithLongElements() { - GenericRow paimonRow = new GenericRow(4); - paimonRow.setField(0, new GenericArray(new long[] {100L, 200L, 300L})); - paimonRow.setField(1, 0); - paimonRow.setField(2, 0L); - paimonRow.setField(3, 0L); - - PaimonRowAsFlussRow flussRow = new PaimonRowAsFlussRow(paimonRow); - - InternalArray array = flussRow.getArray(0); - assertThat(array).isNotNull(); - assertThat(array.size()).isEqualTo(3); - assertThat(array.getLong(0)).isEqualTo(100L); - assertThat(array.getLong(1)).isEqualTo(200L); - assertThat(array.getLong(2)).isEqualTo(300L); - } - - @Test - void testArrayWithDoubleElements() { - GenericRow paimonRow = new GenericRow(4); - paimonRow.setField(0, new GenericArray(new double[] {1.1, 2.2, 3.3})); - paimonRow.setField(1, 0); - paimonRow.setField(2, 0L); - paimonRow.setField(3, 0L); - - PaimonRowAsFlussRow flussRow = new PaimonRowAsFlussRow(paimonRow); - - InternalArray array = flussRow.getArray(0); - assertThat(array).isNotNull(); - assertThat(array.size()).isEqualTo(3); - assertThat(array.getDouble(0)).isEqualTo(1.1); - assertThat(array.getDouble(1)).isEqualTo(2.2); - assertThat(array.getDouble(2)).isEqualTo(3.3); - } - - @Test - void testArrayWithBooleanElements() { - GenericRow paimonRow = new GenericRow(4); - paimonRow.setField(0, new GenericArray(new boolean[] {true, false, true})); - paimonRow.setField(1, 0); - paimonRow.setField(2, 0L); - paimonRow.setField(3, 0L); + paimonRow.setField(13, 0); + paimonRow.setField(14, 0L); + paimonRow.setField(15, 0L); PaimonRowAsFlussRow flussRow = new PaimonRowAsFlussRow(paimonRow); - InternalArray array = flussRow.getArray(0); - assertThat(array).isNotNull(); - assertThat(array.size()).isEqualTo(3); - assertThat(array.getBoolean(0)).isTrue(); - assertThat(array.getBoolean(1)).isFalse(); - assertThat(array.getBoolean(2)).isTrue(); - } - - @Test - void testArrayWithAllPrimitiveTypes() { - GenericRow paimonRow = new GenericRow(10); - paimonRow.setField(0, new GenericArray(new boolean[] {true, false})); - paimonRow.setField(1, new GenericArray(new byte[] {1, 2, 3})); - paimonRow.setField(2, new GenericArray(new short[] {100, 200})); - paimonRow.setField(3, new GenericArray(new int[] {1000, 2000})); - paimonRow.setField(4, new GenericArray(new long[] {10000L, 20000L})); - paimonRow.setField(5, new GenericArray(new float[] {1.1f, 2.2f})); - paimonRow.setField(6, new GenericArray(new double[] {1.11, 2.22})); - // System columns - paimonRow.setField(7, 0); - paimonRow.setField(8, 0L); - paimonRow.setField(9, 0L); - - PaimonRowAsFlussRow flussRow = new PaimonRowAsFlussRow(paimonRow); - - // Test boolean array with toArray + // Test boolean array InternalArray boolArray = flussRow.getArray(0); - assertThat(boolArray.toBooleanArray()).isEqualTo(new boolean[] {true, false}); + assertThat(boolArray.size()).isEqualTo(3); + assertThat(boolArray.toBooleanArray()).isEqualTo(new boolean[] {true, false, true}); - // Test byte array with toArray + // Test byte array InternalArray byteArray = flussRow.getArray(1); - assertThat(byteArray.getByte(0)).isEqualTo((byte) 1); - assertThat(byteArray.getByte(1)).isEqualTo((byte) 2); + assertThat(byteArray.size()).isEqualTo(3); assertThat(byteArray.toByteArray()).isEqualTo(new byte[] {1, 2, 3}); - // Test short array with toArray + // Test short array InternalArray shortArray = flussRow.getArray(2); - assertThat(shortArray.getShort(0)).isEqualTo((short) 100); - assertThat(shortArray.getShort(1)).isEqualTo((short) 200); - assertThat(shortArray.toShortArray()).isEqualTo(new short[] {100, 200}); + assertThat(shortArray.size()).isEqualTo(3); + assertThat(shortArray.toShortArray()).isEqualTo(new short[] {100, 200, 300}); - // Test int array with toArray + // Test int array InternalArray intArray = flussRow.getArray(3); - assertThat(intArray.toIntArray()).isEqualTo(new int[] {1000, 2000}); + assertThat(intArray.size()).isEqualTo(3); + assertThat(intArray.toIntArray()).isEqualTo(new int[] {1000, 2000, 3000}); - // Test long array with toArray + // Test long array InternalArray longArray = flussRow.getArray(4); - assertThat(longArray.toLongArray()).isEqualTo(new long[] {10000L, 20000L}); + assertThat(longArray.size()).isEqualTo(3); + assertThat(longArray.toLongArray()).isEqualTo(new long[] {10000L, 20000L, 30000L}); - // Test float array with toArray + // Test float array InternalArray floatArray = flussRow.getArray(5); - assertThat(floatArray.getFloat(0)).isEqualTo(1.1f); - assertThat(floatArray.toFloatArray()).isEqualTo(new float[] {1.1f, 2.2f}); + assertThat(floatArray.size()).isEqualTo(3); + assertThat(floatArray.toFloatArray()).isEqualTo(new float[] {1.1f, 2.2f, 3.3f}); - // Test double array with toArray + // Test double array InternalArray doubleArray = flussRow.getArray(6); - assertThat(doubleArray.toDoubleArray()).isEqualTo(new double[] {1.11, 2.22}); - } - - @Test - void testArrayWithDecimalElements() { - GenericRow paimonRow = new GenericRow(4); - paimonRow.setField( - 0, - new GenericArray( - new Object[] { - org.apache.paimon.data.Decimal.fromBigDecimal( - new java.math.BigDecimal("123.45"), 10, 2), - org.apache.paimon.data.Decimal.fromBigDecimal( - new java.math.BigDecimal("678.90"), 10, 2) - })); - paimonRow.setField(1, 0); - paimonRow.setField(2, 0L); - paimonRow.setField(3, 0L); - - PaimonRowAsFlussRow flussRow = new PaimonRowAsFlussRow(paimonRow); - - InternalArray array = flussRow.getArray(0); - assertThat(array.size()).isEqualTo(2); - assertThat(array.getDecimal(0, 10, 2).toBigDecimal()) - .isEqualTo(new java.math.BigDecimal("123.45")); - assertThat(array.getDecimal(1, 10, 2).toBigDecimal()) - .isEqualTo(new java.math.BigDecimal("678.90")); - } - - @Test - void testArrayWithTimestampElements() { - GenericRow paimonRow = new GenericRow(4); - paimonRow.setField( - 0, - new GenericArray( - new Object[] { - org.apache.paimon.data.Timestamp.fromEpochMillis(1698235273182L), - org.apache.paimon.data.Timestamp.fromEpochMillis(1698235274000L) - })); - paimonRow.setField(1, 0); - paimonRow.setField(2, 0L); - paimonRow.setField(3, 0L); - - PaimonRowAsFlussRow flussRow = new PaimonRowAsFlussRow(paimonRow); - - InternalArray array = flussRow.getArray(0); - assertThat(array.size()).isEqualTo(2); - assertThat(array.getTimestampNtz(0, 3).getMillisecond()).isEqualTo(1698235273182L); - assertThat(array.getTimestampLtz(1, 3).getEpochMillisecond()).isEqualTo(1698235274000L); - } - - @Test - void testArrayWithBinaryElements() { - GenericRow paimonRow = new GenericRow(4); - paimonRow.setField( - 0, - new GenericArray( - new Object[] { - new byte[] {1, 2, 3}, - new byte[] {4, 5, 6, 7} - })); - paimonRow.setField(1, 0); - paimonRow.setField(2, 0L); - paimonRow.setField(3, 0L); - - PaimonRowAsFlussRow flussRow = new PaimonRowAsFlussRow(paimonRow); - - InternalArray array = flussRow.getArray(0); - assertThat(array.size()).isEqualTo(2); - assertThat(array.getBinary(0, 3)).isEqualTo(new byte[] {1, 2, 3}); - assertThat(array.getBytes(1)).isEqualTo(new byte[] {4, 5, 6, 7}); - } - - @Test - void testArrayWithCharElements() { - GenericRow paimonRow = new GenericRow(4); - paimonRow.setField( - 0, - new GenericArray( - new BinaryString[] { - BinaryString.fromString("abc"), BinaryString.fromString("def") - })); - paimonRow.setField(1, 0); - paimonRow.setField(2, 0L); - paimonRow.setField(3, 0L); + assertThat(doubleArray.size()).isEqualTo(3); + assertThat(doubleArray.toDoubleArray()).isEqualTo(new double[] {1.11, 2.22, 3.33}); + + // Test string array + InternalArray stringArray = flussRow.getArray(7); + assertThat(stringArray.size()).isEqualTo(3); + assertThat(stringArray.getString(0).toString()).isEqualTo("hello"); + assertThat(stringArray.getString(1).toString()).isEqualTo("world"); + assertThat(stringArray.getString(2).toString()).isEqualTo("test"); + + // Test decimal array + InternalArray decimalArray = flussRow.getArray(8); + assertThat(decimalArray.size()).isEqualTo(3); + assertThat(decimalArray.getDecimal(0, 10, 2).toBigDecimal()) + .isEqualTo(new BigDecimal("123.45")); + assertThat(decimalArray.getDecimal(1, 10, 2).toBigDecimal()) + .isEqualTo(new BigDecimal("678.90")); + assertThat(decimalArray.getDecimal(2, 10, 2).toBigDecimal()) + .isEqualTo(new BigDecimal("999.99")); + + // Test timestamp array + InternalArray timestampArray = flussRow.getArray(9); + assertThat(timestampArray.size()).isEqualTo(3); + assertThat(timestampArray.getTimestampNtz(0, 3).getMillisecond()).isEqualTo(1698235273182L); + assertThat(timestampArray.getTimestampNtz(1, 3).getMillisecond()).isEqualTo(1698235274000L); + assertThat(timestampArray.getTimestampNtz(2, 3).getMillisecond()).isEqualTo(1698235275000L); + + // Test timestamp_ltz array + InternalArray timestampLtzArray = flussRow.getArray(10); + assertThat(timestampLtzArray.size()).isEqualTo(3); + assertThat(timestampLtzArray.getTimestampLtz(0, 3).getEpochMillisecond()) + .isEqualTo(1698235273182L); + assertThat(timestampLtzArray.getTimestampLtz(1, 3).getEpochMillisecond()) + .isEqualTo(1698235274000L); + assertThat(timestampLtzArray.getTimestampLtz(2, 3).getEpochMillisecond()) + .isEqualTo(1698235275000L); + + // Test binary array + InternalArray binaryArray = flussRow.getArray(11); + assertThat(binaryArray.size()).isEqualTo(3); + assertThat(binaryArray.getBinary(0, 3)).isEqualTo(new byte[] {1, 2, 3}); + assertThat(binaryArray.getBinary(1, 4)).isEqualTo(new byte[] {4, 5, 6, 7}); + assertThat(binaryArray.getBinary(2, 5)).isEqualTo(new byte[] {8, 9, 10, 11, 12}); + // Also test getBytes method + assertThat(binaryArray.getBytes(0)).isEqualTo(new byte[] {1, 2, 3}); + assertThat(binaryArray.getBytes(1)).isEqualTo(new byte[] {4, 5, 6, 7}); + assertThat(binaryArray.getBytes(2)).isEqualTo(new byte[] {8, 9, 10, 11, 12}); + + // Also test array> (nested int array) + InternalArray outerArray = flussRow.getArray(12); + assertThat(outerArray).isNotNull(); + assertThat(outerArray.size()).isEqualTo(2); - PaimonRowAsFlussRow flussRow = new PaimonRowAsFlussRow(paimonRow); + InternalArray innerArray1 = outerArray.getArray(0); + assertThat(innerArray1.toIntArray()).isEqualTo(new int[] {1, 2}); - InternalArray array = flussRow.getArray(0); - assertThat(array.size()).isEqualTo(2); - assertThat(array.getChar(0, 3).toString()).isEqualTo("abc"); - assertThat(array.getString(1).toString()).isEqualTo("def"); + InternalArray innerArray2 = outerArray.getArray(1); + assertThat(innerArray2.toIntArray()).isEqualTo(new int[] {3, 4, 5}); } }