From 3391ec3e7836b64e1d99185830418e260bf0bc35 Mon Sep 17 00:00:00 2001 From: shuwenwei Date: Wed, 7 Jan 2026 14:54:31 +0800 Subject: [PATCH 1/3] fix the bug that chunk metadata mask is lost when deserializing from temporary .meta file --- .../java/org/apache/tsfile/write/writer/TsFileIOWriter.java | 2 ++ .../tsfile/write/writer/tsmiterator/DiskTSMIterator.java | 5 ++++- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/java/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriter.java b/java/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriter.java index cc59fd20d..08dc1fba9 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriter.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriter.java @@ -707,6 +707,8 @@ private int writeChunkMetadataToTempFile( writtenSize += ReadWriteIOUtils.write( iChunkMetadataList.get(0).getDataType(), tempOutput.wrapAsStream()); + writtenSize += + ReadWriteIOUtils.write(iChunkMetadataList.get(0).getMask(), tempOutput.wrapAsStream()); } PublicBAOS buffer = new PublicBAOS(); int totalSize = 0; diff --git a/java/tsfile/src/main/java/org/apache/tsfile/write/writer/tsmiterator/DiskTSMIterator.java b/java/tsfile/src/main/java/org/apache/tsfile/write/writer/tsmiterator/DiskTSMIterator.java index 32a32340c..6bcbd4c61 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/write/writer/tsmiterator/DiskTSMIterator.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/write/writer/tsmiterator/DiskTSMIterator.java @@ -102,6 +102,7 @@ private Pair getTimeSerisMetadataFromFile() throws IOE String measurementUid = ReadWriteIOUtils.readVarIntString(input.wrapAsInputStream()); byte dataTypeInByte = ReadWriteIOUtils.readByte(input.wrapAsInputStream()); TSDataType dataType = TSDataType.getTsDataType(dataTypeInByte); + byte mask = ReadWriteIOUtils.readByte(input.wrapAsInputStream()); int chunkBufferSize = ReadWriteIOUtils.readInt(input.wrapAsInputStream()); ByteBuffer chunkBuffer = ByteBuffer.allocate(chunkBufferSize); int readSize = ReadWriteIOUtils.readAsPossible(input, chunkBuffer); @@ -115,7 +116,9 @@ private Pair getTimeSerisMetadataFromFile() throws IOE // deserialize chunk metadata from chunk buffer List chunkMetadataList = new ArrayList<>(); while (chunkBuffer.hasRemaining()) { - chunkMetadataList.add(ChunkMetadata.deserializeFrom(chunkBuffer, dataType)); + ChunkMetadata chunkMetadata = ChunkMetadata.deserializeFrom(chunkBuffer, dataType); + chunkMetadata.setMask(mask); + chunkMetadataList.add(chunkMetadata); } updateCurrentPos(); return new Pair<>( From 27254df4820aba386b6b7e65e8f697006c39057e Mon Sep 17 00:00:00 2001 From: shuwenwei Date: Wed, 7 Jan 2026 18:38:18 +0800 Subject: [PATCH 2/3] add ut --- ...ileIOWriterFlushTempChunkMetadataTest.java | 154 ++++++++++++++++++ 1 file changed, 154 insertions(+) create mode 100644 java/tsfile/src/test/java/org/apache/tsfile/read/TsFileIOWriterFlushTempChunkMetadataTest.java diff --git a/java/tsfile/src/test/java/org/apache/tsfile/read/TsFileIOWriterFlushTempChunkMetadataTest.java b/java/tsfile/src/test/java/org/apache/tsfile/read/TsFileIOWriterFlushTempChunkMetadataTest.java new file mode 100644 index 000000000..98ebe2e6a --- /dev/null +++ b/java/tsfile/src/test/java/org/apache/tsfile/read/TsFileIOWriterFlushTempChunkMetadataTest.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tsfile.read; + +import org.apache.tsfile.constant.TestConstant; +import org.apache.tsfile.enums.ColumnCategory; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.exception.write.WriteProcessException; +import org.apache.tsfile.file.metadata.StringArrayDeviceID; +import org.apache.tsfile.file.metadata.TableSchema; +import org.apache.tsfile.write.TsFileWriter; +import org.apache.tsfile.write.record.Tablet; +import org.apache.tsfile.write.schema.MeasurementSchema; +import org.apache.tsfile.write.writer.TsFileIOWriter; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.Arrays; + +public class TsFileIOWriterFlushTempChunkMetadataTest { + + private static final String FILE_PATH1 = + TestConstant.BASE_OUTPUT_PATH.concat("TsFileIOWriterFlushTempChunkMetadataTest1.tsfile"); + private static final String FILE_PATH2 = + TestConstant.BASE_OUTPUT_PATH.concat("TsFileIOWriterFlushTempChunkMetadataTest2.tsfile"); + + @After + public void teardown() { + new File(FILE_PATH1).delete(); + new File(FILE_PATH2).delete(); + } + + @Test + public void testAligned() throws IOException, WriteProcessException { + TableSchema tableSchema = + new TableSchema( + "t1", + Arrays.asList( + new MeasurementSchema("device", TSDataType.STRING), + new MeasurementSchema("s1", TSDataType.INT32), + new MeasurementSchema("s2", TSDataType.INT32), + new MeasurementSchema("s3", TSDataType.INT32)), + Arrays.asList( + ColumnCategory.TAG, + ColumnCategory.FIELD, + ColumnCategory.FIELD, + ColumnCategory.FIELD)); + Tablet tablet1 = + new Tablet( + tableSchema.getTableName(), + Arrays.asList("device", "s1", "s2", "s3"), + Arrays.asList(TSDataType.STRING, TSDataType.INT32, TSDataType.INT32, TSDataType.INT32), + Arrays.asList( + ColumnCategory.TAG, + ColumnCategory.FIELD, + ColumnCategory.FIELD, + ColumnCategory.FIELD)); + for (int i = 0; i < 1000; i++) { + tablet1.addTimestamp(i, i); + tablet1.addValue("device", i, "d" + i); + tablet1.addValue("s1", i, 0); + tablet1.addValue("s2", i, 0); + tablet1.addValue("s3", i, 0); + } + try (TsFileIOWriter writer = new TsFileIOWriter(new File(FILE_PATH1), 1)) { + TsFileWriter tsFileWriter = new TsFileWriter(writer); + tsFileWriter.registerTableSchema(tableSchema); + tsFileWriter.writeTable(tablet1); + tsFileWriter.flush(); + writer.checkMetadataSizeAndMayFlush(); + tsFileWriter.close(); + } + try (TsFileIOWriter writer = new TsFileIOWriter(new File(FILE_PATH2))) { + TsFileWriter tsFileWriter = new TsFileWriter(writer); + tsFileWriter.registerTableSchema(tableSchema); + tsFileWriter.writeTable(tablet1); + tsFileWriter.flush(); + tsFileWriter.close(); + } + byte[] file1Contents = Files.readAllBytes(new File(FILE_PATH1).toPath()); + byte[] file2Contents = Files.readAllBytes(new File(FILE_PATH2).toPath()); + Assert.assertArrayEquals(file1Contents, file2Contents); + } + + @Test + public void testNonAligned() throws IOException, WriteProcessException { + try (TsFileIOWriter writer = new TsFileIOWriter(new File(FILE_PATH1), 1)) { + TsFileWriter tsFileWriter = new TsFileWriter(writer); + writeNonAlignedData(tsFileWriter, writer, true); + tsFileWriter.close(); + } + try (TsFileIOWriter writer = new TsFileIOWriter(new File(FILE_PATH2))) { + TsFileWriter tsFileWriter = new TsFileWriter(writer); + writeNonAlignedData(tsFileWriter, writer, false); + tsFileWriter.close(); + } + byte[] file1Contents = Files.readAllBytes(new File(FILE_PATH1).toPath()); + byte[] file2Contents = Files.readAllBytes(new File(FILE_PATH2).toPath()); + Assert.assertArrayEquals(file1Contents, file2Contents); + } + + private void writeNonAlignedData( + TsFileWriter tsFileWriter, + TsFileIOWriter tsFileIOWriter, + boolean flushChunkMetadataToTempFile) + throws WriteProcessException, IOException { + for (int i = 0; i < 10; i++) { + Tablet tablet = + new Tablet( + new StringArrayDeviceID("root.test.d" + i), + Arrays.asList("s1", "s2", "s3"), + Arrays.asList(TSDataType.INT32, TSDataType.INT32, TSDataType.INT32)); + for (int j = 0; j < 1000; j++) { + tablet.addTimestamp(j, j); + tablet.addValue("s1", j, 0); + tablet.addValue("s2", j, 0); + tablet.addValue("s3", j, 0); + } + tsFileWriter.registerTimeseries( + "root.test.d" + i, new MeasurementSchema("s1", TSDataType.INT32)); + tsFileWriter.registerTimeseries( + "root.test.d" + i, new MeasurementSchema("s2", TSDataType.INT32)); + tsFileWriter.registerTimeseries( + "root.test.d" + i, new MeasurementSchema("s3", TSDataType.INT32)); + tsFileWriter.writeTree(tablet); + tsFileWriter.flush(); + if (flushChunkMetadataToTempFile) { + tsFileIOWriter.checkMetadataSizeAndMayFlush(); + } + } + } +} From f4d93246afa1b81d062e4a0e87ac8f19e6d374f6 Mon Sep 17 00:00:00 2001 From: shuwenwei Date: Wed, 7 Jan 2026 18:50:33 +0800 Subject: [PATCH 3/3] fix compile --- ...ileIOWriterFlushTempChunkMetadataTest.java | 86 +++++++------------ 1 file changed, 31 insertions(+), 55 deletions(-) diff --git a/java/tsfile/src/test/java/org/apache/tsfile/read/TsFileIOWriterFlushTempChunkMetadataTest.java b/java/tsfile/src/test/java/org/apache/tsfile/read/TsFileIOWriterFlushTempChunkMetadataTest.java index 98ebe2e6a..ab698e1e1 100644 --- a/java/tsfile/src/test/java/org/apache/tsfile/read/TsFileIOWriterFlushTempChunkMetadataTest.java +++ b/java/tsfile/src/test/java/org/apache/tsfile/read/TsFileIOWriterFlushTempChunkMetadataTest.java @@ -20,11 +20,9 @@ package org.apache.tsfile.read; import org.apache.tsfile.constant.TestConstant; -import org.apache.tsfile.enums.ColumnCategory; import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.exception.write.WriteProcessException; -import org.apache.tsfile.file.metadata.StringArrayDeviceID; -import org.apache.tsfile.file.metadata.TableSchema; +import org.apache.tsfile.read.common.Path; import org.apache.tsfile.write.TsFileWriter; import org.apache.tsfile.write.record.Tablet; import org.apache.tsfile.write.schema.MeasurementSchema; @@ -54,49 +52,14 @@ public void teardown() { @Test public void testAligned() throws IOException, WriteProcessException { - TableSchema tableSchema = - new TableSchema( - "t1", - Arrays.asList( - new MeasurementSchema("device", TSDataType.STRING), - new MeasurementSchema("s1", TSDataType.INT32), - new MeasurementSchema("s2", TSDataType.INT32), - new MeasurementSchema("s3", TSDataType.INT32)), - Arrays.asList( - ColumnCategory.TAG, - ColumnCategory.FIELD, - ColumnCategory.FIELD, - ColumnCategory.FIELD)); - Tablet tablet1 = - new Tablet( - tableSchema.getTableName(), - Arrays.asList("device", "s1", "s2", "s3"), - Arrays.asList(TSDataType.STRING, TSDataType.INT32, TSDataType.INT32, TSDataType.INT32), - Arrays.asList( - ColumnCategory.TAG, - ColumnCategory.FIELD, - ColumnCategory.FIELD, - ColumnCategory.FIELD)); - for (int i = 0; i < 1000; i++) { - tablet1.addTimestamp(i, i); - tablet1.addValue("device", i, "d" + i); - tablet1.addValue("s1", i, 0); - tablet1.addValue("s2", i, 0); - tablet1.addValue("s3", i, 0); - } try (TsFileIOWriter writer = new TsFileIOWriter(new File(FILE_PATH1), 1)) { TsFileWriter tsFileWriter = new TsFileWriter(writer); - tsFileWriter.registerTableSchema(tableSchema); - tsFileWriter.writeTable(tablet1); - tsFileWriter.flush(); - writer.checkMetadataSizeAndMayFlush(); + writeData(tsFileWriter, writer, true, true); tsFileWriter.close(); } try (TsFileIOWriter writer = new TsFileIOWriter(new File(FILE_PATH2))) { TsFileWriter tsFileWriter = new TsFileWriter(writer); - tsFileWriter.registerTableSchema(tableSchema); - tsFileWriter.writeTable(tablet1); - tsFileWriter.flush(); + writeData(tsFileWriter, writer, false, true); tsFileWriter.close(); } byte[] file1Contents = Files.readAllBytes(new File(FILE_PATH1).toPath()); @@ -108,12 +71,12 @@ public void testAligned() throws IOException, WriteProcessException { public void testNonAligned() throws IOException, WriteProcessException { try (TsFileIOWriter writer = new TsFileIOWriter(new File(FILE_PATH1), 1)) { TsFileWriter tsFileWriter = new TsFileWriter(writer); - writeNonAlignedData(tsFileWriter, writer, true); + writeData(tsFileWriter, writer, true, false); tsFileWriter.close(); } try (TsFileIOWriter writer = new TsFileIOWriter(new File(FILE_PATH2))) { TsFileWriter tsFileWriter = new TsFileWriter(writer); - writeNonAlignedData(tsFileWriter, writer, false); + writeData(tsFileWriter, writer, false, false); tsFileWriter.close(); } byte[] file1Contents = Files.readAllBytes(new File(FILE_PATH1).toPath()); @@ -121,31 +84,44 @@ public void testNonAligned() throws IOException, WriteProcessException { Assert.assertArrayEquals(file1Contents, file2Contents); } - private void writeNonAlignedData( + private void writeData( TsFileWriter tsFileWriter, TsFileIOWriter tsFileIOWriter, - boolean flushChunkMetadataToTempFile) + boolean flushChunkMetadataToTempFile, + boolean aligned) throws WriteProcessException, IOException { for (int i = 0; i < 10; i++) { Tablet tablet = new Tablet( - new StringArrayDeviceID("root.test.d" + i), - Arrays.asList("s1", "s2", "s3"), - Arrays.asList(TSDataType.INT32, TSDataType.INT32, TSDataType.INT32)); + "root.test.d" + i, + Arrays.asList( + new MeasurementSchema("s1", TSDataType.INT32), + new MeasurementSchema("s2", TSDataType.INT32), + new MeasurementSchema("s3", TSDataType.INT32))); for (int j = 0; j < 1000; j++) { tablet.addTimestamp(j, j); tablet.addValue("s1", j, 0); tablet.addValue("s2", j, 0); tablet.addValue("s3", j, 0); } - tsFileWriter.registerTimeseries( - "root.test.d" + i, new MeasurementSchema("s1", TSDataType.INT32)); - tsFileWriter.registerTimeseries( - "root.test.d" + i, new MeasurementSchema("s2", TSDataType.INT32)); - tsFileWriter.registerTimeseries( - "root.test.d" + i, new MeasurementSchema("s3", TSDataType.INT32)); - tsFileWriter.writeTree(tablet); - tsFileWriter.flush(); + if (aligned) { + tsFileWriter.registerAlignedTimeseries( + new Path("root.test.d" + i), + Arrays.asList( + new MeasurementSchema("s1", TSDataType.INT32), + new MeasurementSchema("s2", TSDataType.INT32), + new MeasurementSchema("s3", TSDataType.INT32))); + tsFileWriter.writeAligned(tablet); + } else { + tsFileWriter.registerTimeseries( + new Path("root.test.d" + i), new MeasurementSchema("s1", TSDataType.INT32)); + tsFileWriter.registerTimeseries( + new Path("root.test.d" + i), new MeasurementSchema("s2", TSDataType.INT32)); + tsFileWriter.registerTimeseries( + new Path("root.test.d" + i), new MeasurementSchema("s3", TSDataType.INT32)); + tsFileWriter.write(tablet); + } + tsFileWriter.flushAllChunkGroups(); if (flushChunkMetadataToTempFile) { tsFileIOWriter.checkMetadataSizeAndMayFlush(); }