From 59eb4981b27d5485497553c820a4132e0b1a9524 Mon Sep 17 00:00:00 2001 From: AntiO2 Date: Wed, 17 Dec 2025 12:57:44 +0000 Subject: [PATCH 1/6] Fix concurreny issues --- .../pixelsdb/pixels/core/vector/BinaryColumnVector.java | 9 +++++---- .../pixelsdb/pixels/core/vector/VectorizedRowBatch.java | 4 +++- .../pixels/core/vector/TestBinaryColumnVector.java | 9 +++++++-- .../io/pixelsdb/pixels/retina/PixelsWriteBuffer.java | 9 +++------ 4 files changed, 18 insertions(+), 13 deletions(-) diff --git a/pixels-core/src/main/java/io/pixelsdb/pixels/core/vector/BinaryColumnVector.java b/pixels-core/src/main/java/io/pixelsdb/pixels/core/vector/BinaryColumnVector.java index e0bc902894..b75f50c720 100644 --- a/pixels-core/src/main/java/io/pixelsdb/pixels/core/vector/BinaryColumnVector.java +++ b/pixels-core/src/main/java/io/pixelsdb/pixels/core/vector/BinaryColumnVector.java @@ -823,12 +823,13 @@ public byte getFlatBufferType() public int serialize(FlatBufferBuilder builder) { int baseOffset = super.serialize(builder); - int[] byteArrayOffsets = new int[writeIndex]; IdentityHashMap bufToIndex = new IdentityHashMap<>(); ArrayList physicalList = new ArrayList<>(); - for (int i = 0; i < writeIndex; ++i) + int serializedWriteIndex = writeIndex; + + for (int i = 0; i < serializedWriteIndex; ++i) { byte[] buf = vector[i]; if (buf == null) @@ -872,8 +873,8 @@ public int serialize(FlatBufferBuilder builder) physicalBufferOffset = BinaryColumnVectorFlat.createPhysicalBufferVector(builder, physicalOffsets); } - int[] vectorIndices = new int[writeIndex]; - for (int i = 0; i < writeIndex; ++i) + int[] vectorIndices = new int[serializedWriteIndex]; + for (int i = 0; i < serializedWriteIndex; ++i) { byte[] buf = vector[i]; if (buf == null) diff --git a/pixels-core/src/main/java/io/pixelsdb/pixels/core/vector/VectorizedRowBatch.java b/pixels-core/src/main/java/io/pixelsdb/pixels/core/vector/VectorizedRowBatch.java index 890797c926..40b9ee098f 100644 --- a/pixels-core/src/main/java/io/pixelsdb/pixels/core/vector/VectorizedRowBatch.java +++ b/pixels-core/src/main/java/io/pixelsdb/pixels/core/vector/VectorizedRowBatch.java @@ -341,6 +341,8 @@ public byte[] serialize() int[] columnVectorOffsets = new int[numCols]; byte[] columnTypeOffsets = new byte[numCols]; + int writeSize = size; + for (int i = 0; i < numCols; ++i) { columnVectorOffsets[i] = cols[i].serialize(builder); @@ -353,7 +355,7 @@ public byte[] serialize() VectorizedRowBatchFlat.addNumCols(builder, numCols); VectorizedRowBatchFlat.addCols(builder, colsOffset); VectorizedRowBatchFlat.addColsType(builder, colsTypeOffset); - VectorizedRowBatchFlat.addSize(builder, size); + VectorizedRowBatchFlat.addSize(builder, writeSize); VectorizedRowBatchFlat.addProjectionSize(builder, projectionSize); VectorizedRowBatchFlat.addMaxSize(builder, maxSize); VectorizedRowBatchFlat.addMemoryUsage(builder, memoryUsage); diff --git a/pixels-core/src/test/java/io/pixelsdb/pixels/core/vector/TestBinaryColumnVector.java b/pixels-core/src/test/java/io/pixelsdb/pixels/core/vector/TestBinaryColumnVector.java index ca73e3a9b9..55f1771484 100644 --- a/pixels-core/src/test/java/io/pixelsdb/pixels/core/vector/TestBinaryColumnVector.java +++ b/pixels-core/src/test/java/io/pixelsdb/pixels/core/vector/TestBinaryColumnVector.java @@ -30,16 +30,21 @@ public void testSerialize() VectorizedRowBatch vectorizedRowBatch = new VectorizedRowBatch(1, 10240); BinaryColumnVector columnVector = new BinaryColumnVector(10240); vectorizedRowBatch.cols[0] = columnVector; - for (int i = 0; i < 10000; ++i) + int writeNum = 10000; + for (int i = 0; i < writeNum; ++i) { columnVector.add("test" + i); } + vectorizedRowBatch.size = writeNum; byte[] data = vectorizedRowBatch.serialize(); int length = data.length; Assert.assertEquals(length < 4 * 1024 * 1024, true); VectorizedRowBatch newBatch = VectorizedRowBatch.deserialize(data); + + Assert.assertEquals(newBatch.size, writeNum); + BinaryColumnVector col = (BinaryColumnVector) newBatch.cols[0]; - for (int i = 0; i < 10000; ++i) + for (int i = 0; i < writeNum; ++i) { String decode = new String(col.vector[i], col.start[i], col.lens[i]); Assert.assertEquals(decode, "test" + i); diff --git a/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/PixelsWriteBuffer.java b/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/PixelsWriteBuffer.java index 2ea3601509..e90fb2b76f 100644 --- a/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/PixelsWriteBuffer.java +++ b/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/PixelsWriteBuffer.java @@ -33,10 +33,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.Set; +import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReadWriteLock; @@ -108,7 +105,7 @@ public class PixelsWriteBuffer private final ReadWriteLock versionLock = new ReentrantReadWriteLock(); private int currentMemTableCount; - private final List fileWriterManagers; + private final Queue fileWriterManagers; private FileWriterManager currentFileWriterManager; private AtomicLong maxObjectKey; private String retinaHostName; @@ -144,7 +141,7 @@ public PixelsWriteBuffer(long tableId, TypeDescription schema, Path targetOrdere this.flushMinioExecutor = Executors.newSingleThreadExecutor(); this.flushDiskExecutor = Executors.newSingleThreadScheduledExecutor(); - this.fileWriterManagers = new ArrayList<>(); + this.fileWriterManagers = new ConcurrentLinkedQueue<>(); this.maxObjectKey = new AtomicLong(-1); this.retinaHostName = retinaHostName; From ba29302dfa3da5dc582eab832453791069af49df Mon Sep 17 00:00:00 2001 From: AntiO2 Date: Fri, 19 Dec 2025 15:46:40 +0000 Subject: [PATCH 2/6] optimize rocksdb index and mainindex --- .../pixels/cli/load/IndexedPixelsConsumer.java | 2 ++ .../pixelsdb/pixels/common/index/MainIndexCache.java | 11 ++++++----- pixels-common/src/main/resources/pixels.properties | 3 +++ .../pixelsdb/pixels/index/rocksdb/RocksDBFactory.java | 6 +++++- 4 files changed, 16 insertions(+), 6 deletions(-) diff --git a/pixels-cli/src/main/java/io/pixelsdb/pixels/cli/load/IndexedPixelsConsumer.java b/pixels-cli/src/main/java/io/pixelsdb/pixels/cli/load/IndexedPixelsConsumer.java index 30c4b4bdad..fcfac4dabc 100644 --- a/pixels-cli/src/main/java/io/pixelsdb/pixels/cli/load/IndexedPixelsConsumer.java +++ b/pixels-cli/src/main/java/io/pixelsdb/pixels/cli/load/IndexedPixelsConsumer.java @@ -252,6 +252,8 @@ private void flushRowBatch(PerBucketWriter bucketWriter) throws IOException, Ind // Push index entries to the corresponding IndexService (determined by targetNode address) bucketWriter.indexService.putPrimaryIndexEntries(index.getTableId(), index.getId(), bucketWriter.indexEntries); bucketWriter.indexEntries.clear(); + + bucketWriter.indexService.flushIndexEntriesOfFile(index.getTableId(), index.getId(),bucketWriter.currFile.getId(), true); } private void closePixelsFile(PerBucketWriter bucketWriter) throws IOException, MetadataException, IndexException diff --git a/pixels-common/src/main/java/io/pixelsdb/pixels/common/index/MainIndexCache.java b/pixels-common/src/main/java/io/pixelsdb/pixels/common/index/MainIndexCache.java index 4a649adcaa..b481b6a7b0 100644 --- a/pixels-common/src/main/java/io/pixelsdb/pixels/common/index/MainIndexCache.java +++ b/pixels-common/src/main/java/io/pixelsdb/pixels/common/index/MainIndexCache.java @@ -20,6 +20,7 @@ package io.pixelsdb.pixels.common.index; import io.pixelsdb.pixels.common.exception.MainIndexException; +import io.pixelsdb.pixels.common.utils.ConfigFactory; import io.pixelsdb.pixels.index.IndexProto; import java.io.Closeable; @@ -40,12 +41,11 @@ public class MainIndexCache implements Closeable /** * As row ids in a table is generated sequentially, and the hot row ids are usually the recent generated ones, the * conflict rate of row ids should not be very high. Hence, one hash bucket should be good to cache 1 million main - * index entries. And 101 buckets should be large enough to cache 101 million entries, which may consume several GBs - * of memory. 101 is a prime number. It helps avoid imbalance buckets. + * index entries. */ - private static final int NUM_BUCKETS = 101; + private final int NUM_BUCKETS; - private static final int MAX_BUCKET_SIZE = 1024 * 1024 * 1024; + private static final int MAX_BUCKET_SIZE = 1024 * 1024; /** * tableRowId % NUM_BUCKETS -> {tableRowId -> rowLocation}. * We use multiple hash maps (buckets) to reduce hash conflicts in a large cache. @@ -56,10 +56,11 @@ public class MainIndexCache implements Closeable public MainIndexCache() { + this.NUM_BUCKETS = Integer.parseInt(ConfigFactory.Instance().getProperty("index.main.cache.bucket.num")); this.entryCacheBuckets = new ArrayList<>(NUM_BUCKETS); for (int i = 0; i < NUM_BUCKETS; ++i) { - this.entryCacheBuckets.add(new HashMap<>()); + this.entryCacheBuckets.add(new LinkedHashMap<>()); } this.rangeCache = new TreeSet<>((o1, o2) -> { /* Issue #1150: diff --git a/pixels-common/src/main/resources/pixels.properties b/pixels-common/src/main/resources/pixels.properties index 6a25de454f..7b7e1fe590 100644 --- a/pixels-common/src/main/resources/pixels.properties +++ b/pixels-common/src/main/resources/pixels.properties @@ -339,6 +339,8 @@ index.rocksdb.max.bytes.for.level.multiplier=10 index.rocksdb.target.file.size.base=67108864 # rocksdb file size multiplier (default to 1) index.rocksdb.target.file.size.multiplier=1 +# rocksdb key fixed prefix length +index.rocksdb.prefix.length=12 # rocksdb max subcompactions index.rocksdb.max.subcompactions=1 # rocksdb compression type (e.g. NO_COMPRESSION, SNAPPY_COMPRESSION, ZLIB_COMPRESSION, BZ2_COMPRESSION, LZ4_COMPRESSION, LZ4HC_COMPRESSION, ZSTD_COMPRESSION) @@ -363,6 +365,7 @@ index.cache.expiration.seconds=3600 index.rocksdb.multicf=false # the directory where the sqlite files of main index are stored, each main index is stored as a sqlite file index.sqlite.path=/tmp/sqlite +index.main.cache.bucket.num=7 index.mapdb.path=/tmp/mapdb index.server.host=localhost index.server.port=18895 diff --git a/pixels-index/pixels-index-rocksdb/src/main/java/io/pixelsdb/pixels/index/rocksdb/RocksDBFactory.java b/pixels-index/pixels-index-rocksdb/src/main/java/io/pixelsdb/pixels/index/rocksdb/RocksDBFactory.java index 118b178c03..ecaa729199 100644 --- a/pixels-index/pixels-index-rocksdb/src/main/java/io/pixelsdb/pixels/index/rocksdb/RocksDBFactory.java +++ b/pixels-index/pixels-index-rocksdb/src/main/java/io/pixelsdb/pixels/index/rocksdb/RocksDBFactory.java @@ -128,6 +128,8 @@ private static ColumnFamilyDescriptor createCFDescriptor(byte[] name) .setFilterPolicy(new BloomFilter(10, false)) .setWholeKeyFiltering(false) .setBlockSize(blockSize) + .setCacheIndexAndFilterBlocks(true) + .setPinL0FilterAndIndexBlocksInCache(true) .setBlockCache(blockCache); // ColumnFamily Options @@ -141,6 +143,7 @@ private static ColumnFamilyDescriptor createCFDescriptor(byte[] name) int maxBytesForLevelMultiplier = Integer.parseInt(config.getProperty("index.rocksdb.max.bytes.for.level.multiplier")); long targetFileSizeBase = Long.parseLong(config.getProperty("index.rocksdb.target.file.size.base")); int targetFileSizeMultiplier = Integer.parseInt(config.getProperty("index.rocksdb.target.file.size.multiplier")); + int fixedLengthPrefix = Integer.parseInt(config.getProperty("index.rocksdb.prefix.length")); CompactionStyle compactionStyle = CompactionStyle.valueOf(config.getProperty("index.rocksdb.compaction.style")); // Compression Options @@ -160,7 +163,8 @@ private static ColumnFamilyDescriptor createCFDescriptor(byte[] name) .setTargetFileSizeMultiplier(targetFileSizeMultiplier) .setCompressionType(compressionType) .setBottommostCompressionType(bottommostCompressionType) - .setCompactionStyle(compactionStyle); + .setCompactionStyle(compactionStyle) + .useFixedLengthPrefixExtractor(fixedLengthPrefix); return new ColumnFamilyDescriptor(name, cfOptions); } From 8926c2ffe3692f57fc820d82b6c4f798e2a1af22 Mon Sep 17 00:00:00 2001 From: AntiO2 Date: Sat, 20 Dec 2025 12:47:37 +0000 Subject: [PATCH 3/6] Fix Flush Process --- .../cli/load/IndexedPixelsConsumer.java | 7 +- .../main/sqlite/TestSqliteMainIndexQuery.java | 101 ++++++++++++++++++ 2 files changed, 104 insertions(+), 4 deletions(-) create mode 100644 pixels-index/pixels-index-main-sqlite/src/test/java/io/pixelsdb/pixels/index/main/sqlite/TestSqliteMainIndexQuery.java diff --git a/pixels-cli/src/main/java/io/pixelsdb/pixels/cli/load/IndexedPixelsConsumer.java b/pixels-cli/src/main/java/io/pixelsdb/pixels/cli/load/IndexedPixelsConsumer.java index fcfac4dabc..1ee88c6880 100644 --- a/pixels-cli/src/main/java/io/pixelsdb/pixels/cli/load/IndexedPixelsConsumer.java +++ b/pixels-cli/src/main/java/io/pixelsdb/pixels/cli/load/IndexedPixelsConsumer.java @@ -164,7 +164,7 @@ protected void flushRemainingData() throws IOException, MetadataException /** * Initializes a new PixelsWriter and associated File/Path for a given bucket ID. */ - private PerBucketWriter initializeBucketWriter(int bucketId) throws IOException, MetadataException + private PerBucketWriter initializeBucketWriter(int bucketId) throws IOException, MetadataException, IndexException { // Use the Node Cache to find the responsible Retina Node NodeProto.NodeInfo targetNode = bucketCache.getRetinaNodeInfoByBucketId(bucketId); @@ -252,8 +252,6 @@ private void flushRowBatch(PerBucketWriter bucketWriter) throws IOException, Ind // Push index entries to the corresponding IndexService (determined by targetNode address) bucketWriter.indexService.putPrimaryIndexEntries(index.getTableId(), index.getId(), bucketWriter.indexEntries); bucketWriter.indexEntries.clear(); - - bucketWriter.indexService.flushIndexEntriesOfFile(index.getTableId(), index.getId(),bucketWriter.currFile.getId(), true); } private void closePixelsFile(PerBucketWriter bucketWriter) throws IOException, MetadataException, IndexException @@ -264,6 +262,7 @@ private void closePixelsFile(PerBucketWriter bucketWriter) throws IOException, M flushRowBatch(bucketWriter); } + bucketWriter.indexService.flushIndexEntriesOfFile(index.getTableId(), index.getId(),bucketWriter.currFile.getId(), true); closeWriterAndAddFile(bucketWriter.pixelsWriter, bucketWriter.currFile, bucketWriter.currTargetPath, bucketWriter.targetNode); } @@ -295,7 +294,7 @@ public PerBucketWriter(PixelsWriter writer, File file, Path path, NodeProto.Node this.rowBatch = schema.createRowBatchWithHiddenColumn(pixelStride, TypeDescription.Mode.NONE); this.indexService = indexServices.computeIfAbsent(node, nodeInfo -> RPCIndexService.CreateInstance(nodeInfo.getAddress(), indexServerPort)); - this.rowIdAllocator = new RowIdAllocator(index.getTableId(), 1000, this.indexService); + this.rowIdAllocator = new RowIdAllocator(index.getTableId(), maxRowNum, this.indexService); } } } \ No newline at end of file diff --git a/pixels-index/pixels-index-main-sqlite/src/test/java/io/pixelsdb/pixels/index/main/sqlite/TestSqliteMainIndexQuery.java b/pixels-index/pixels-index-main-sqlite/src/test/java/io/pixelsdb/pixels/index/main/sqlite/TestSqliteMainIndexQuery.java new file mode 100644 index 0000000000..62ecfc56d7 --- /dev/null +++ b/pixels-index/pixels-index-main-sqlite/src/test/java/io/pixelsdb/pixels/index/main/sqlite/TestSqliteMainIndexQuery.java @@ -0,0 +1,101 @@ +/* + * Copyright 2025 PixelsDB. + * + * This file is part of Pixels. + * + * Pixels is free software: you can redistribute it and/or modify + * it under the terms of the Affero GNU General Public License as + * published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * Pixels is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Affero GNU General Public License for more details. + * + * You should have received a copy of the Affero GNU General Public + * License along with Pixels. If not, see + * . + */ +package io.pixelsdb.pixels.index.main.sqlite; + +import io.pixelsdb.pixels.common.exception.MainIndexException; +import io.pixelsdb.pixels.common.exception.RowIdException; +import io.pixelsdb.pixels.common.index.MainIndex; +import io.pixelsdb.pixels.common.index.MainIndexFactory; +import io.pixelsdb.pixels.common.index.RowIdRange; +import io.pixelsdb.pixels.common.utils.ConfigFactory; +import io.pixelsdb.pixels.index.IndexProto; +import org.apache.commons.io.FileUtils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.File; +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +public class TestSqliteMainIndexQuery +{ + MainIndex mainIndex; + Long tableId =2876L; + Connection connection; + @BeforeEach + public void setUp() throws Exception + { + String sqlitePath = ConfigFactory.Instance().getProperty("index.sqlite.path"); + if (!sqlitePath.endsWith("/")) + { + sqlitePath += "/"; + } + mainIndex = MainIndexFactory.Instance().getMainIndex(tableId); + connection = DriverManager.getConnection("jdbc:sqlite:" + sqlitePath + tableId + ".main.index.db"); + } + + @Test + public void testQueryRowRanges() throws Exception + { + String query = "SELECT * FROM row_id_ranges order by row_id_start"; + long fileid = 0; + try (PreparedStatement pst = this.connection.prepareStatement(query)) + { +// pst.setLong(1, fileid); + try (ResultSet rs = pst.executeQuery()) + { + while (rs.next()) + { + long rowIdStart = rs.getLong("row_id_start"); + long rowIdEnd = rs.getLong("row_id_end"); + long fileId = rs.getLong("file_id"); + int rgId = rs.getInt("rg_id"); + int rgRowOffsetStart = rs.getInt("rg_row_offset_start"); + int rgRowOffsetEnd = rs.getInt("rg_row_offset_end"); + if (rowIdEnd - rowIdStart != rgRowOffsetEnd - rgRowOffsetStart) + { + throw new RowIdException("The width of row id range (" + rowIdStart + ", " + + rgRowOffsetEnd + ") does not match the width of row group row offset range (" + + rgRowOffsetStart + ", " + rgRowOffsetEnd + ")"); + } + System.out.println( + "rowIdStart=" + rowIdStart + + ", rowIdEnd=" + rowIdEnd + + ", fileId=" + fileId + + ", rgId=" + rgId + + ", rgRowOffsetStart=" + rgRowOffsetStart + + ", rgRowOffsetEnd=" + rgRowOffsetEnd + ); + } + } + } + + } +} From 2f4297e155c710647ee5837603e65a97d9bdd645 Mon Sep 17 00:00:00 2001 From: AntiO2 Date: Sat, 20 Dec 2025 13:34:45 +0000 Subject: [PATCH 4/6] Retina Flush Index --- .../pixelsdb/pixels/retina/PixelsWriteBuffer.java | 7 ++++++- .../pixels/retina/RetinaResourceManager.java | 15 ++++++++++++++- .../pixels/retina/TestPixelsWriteBuffer.java | 2 +- 3 files changed, 21 insertions(+), 3 deletions(-) diff --git a/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/PixelsWriteBuffer.java b/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/PixelsWriteBuffer.java index e90fb2b76f..d239fdc4aa 100644 --- a/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/PixelsWriteBuffer.java +++ b/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/PixelsWriteBuffer.java @@ -24,6 +24,7 @@ import io.pixelsdb.pixels.common.index.IndexServiceProvider; import io.pixelsdb.pixels.common.index.RowIdAllocator; import io.pixelsdb.pixels.common.metadata.domain.Path; +import io.pixelsdb.pixels.common.metadata.domain.SinglePointIndex; import io.pixelsdb.pixels.common.physical.Storage; import io.pixelsdb.pixels.common.physical.StorageFactory; import io.pixelsdb.pixels.common.utils.ConfigFactory; @@ -109,9 +110,10 @@ public class PixelsWriteBuffer private FileWriterManager currentFileWriterManager; private AtomicLong maxObjectKey; private String retinaHostName; + private final SinglePointIndex index; public PixelsWriteBuffer(long tableId, TypeDescription schema, Path targetOrderedDirPath, - Path targetCompactDirPath, String retinaHostName) throws RetinaException + Path targetCompactDirPath, String retinaHostName, SinglePointIndex index) throws RetinaException { this.tableId = tableId; this.schema = schema; @@ -163,6 +165,7 @@ public PixelsWriteBuffer(long tableId, TypeDescription schema, Path targetOrdere // initialization adds reference counts to all data this.currentVersion = new SuperVersion(activeMemTable, immutableMemTables, objectEntries); this.rowIdAllocator = new RowIdAllocator(tableId, this.memTableSize, IndexServiceProvider.ServiceMode.local); + this.index = index; startFlushMinioToDiskScheduler(Long.parseLong(configFactory.getProperty("retina.buffer.flush.interval"))); } @@ -361,6 +364,8 @@ private void startFlushMinioToDiskScheduler(long intervalSeconds) this.versionLock.writeLock().unlock(); finished.get(); + IndexServiceProvider.getService(IndexServiceProvider.ServiceMode.local) + .flushIndexEntriesOfFile(tableId, index.getId(), fileWriterManager.getFileId(), true); for (ObjectEntry objectEntry : toRemove) { if (objectEntry.unref()) diff --git a/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/RetinaResourceManager.java b/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/RetinaResourceManager.java index fe8003ea94..ca3dc08d5a 100644 --- a/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/RetinaResourceManager.java +++ b/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/RetinaResourceManager.java @@ -20,11 +20,14 @@ package io.pixelsdb.pixels.retina; import com.google.protobuf.ByteString; +import io.pixelsdb.pixels.common.exception.MetadataException; import io.pixelsdb.pixels.common.exception.RetinaException; import io.pixelsdb.pixels.common.exception.TransException; import io.pixelsdb.pixels.common.metadata.MetadataService; import io.pixelsdb.pixels.common.metadata.domain.Column; import io.pixelsdb.pixels.common.metadata.domain.Layout; +import io.pixelsdb.pixels.common.metadata.domain.SinglePointIndex; +import io.pixelsdb.pixels.common.metadata.domain.Table; import io.pixelsdb.pixels.common.physical.PhysicalReader; import io.pixelsdb.pixels.common.physical.PhysicalReaderUtil; import io.pixelsdb.pixels.common.physical.Storage; @@ -556,8 +559,18 @@ public void addWriteBuffer(String schemaName, String tableName) throws RetinaExc List columnTypes = columns.stream().map(Column::getType).collect(Collectors.toList()); TypeDescription schema = TypeDescription.createSchemaFromStrings(columnNames, columnTypes); + // get primary index + Table table = metadataService.getTable(schemaName, tableName); + SinglePointIndex index = null; + try + { + index = metadataService.getPrimaryIndex(table.getId()); + } catch (MetadataException ignored) + { + } + PixelsWriteBuffer pixelsWriteBuffer = new PixelsWriteBuffer(latestLayout.getTableId(), - schema, orderedPaths.get(0), compactPaths.get(0), retinaHostName); + schema, orderedPaths.get(0), compactPaths.get(0), retinaHostName, index); String writeBufferKey = schemaName + "_" + tableName; pixelsWriteBufferMap.put(writeBufferKey, pixelsWriteBuffer); } catch (Exception e) diff --git a/pixels-retina/src/test/java/io/pixelsdb/pixels/retina/TestPixelsWriteBuffer.java b/pixels-retina/src/test/java/io/pixelsdb/pixels/retina/TestPixelsWriteBuffer.java index 4e1b3d18b5..4d01588984 100644 --- a/pixels-retina/src/test/java/io/pixelsdb/pixels/retina/TestPixelsWriteBuffer.java +++ b/pixels-retina/src/test/java/io/pixelsdb/pixels/retina/TestPixelsWriteBuffer.java @@ -58,7 +58,7 @@ public void setup() columnTypes.add("int"); schema = TypeDescription.createSchemaFromStrings(columnNames, columnTypes); - buffer = new PixelsWriteBuffer(0L, schema, targetOrderDirPath, targetCompactDirPath, "localhost"); // table id get from mysql `TBLS` table + buffer = new PixelsWriteBuffer(0L, schema, targetOrderDirPath, targetCompactDirPath, "localhost", null); // table id get from mysql `TBLS` table } catch (Exception e) { System.out.println("setup error: " + e); From 3ac1efbe0a92f72cf4e9f023b74eba79bd88480d Mon Sep 17 00:00:00 2001 From: AntiO2 Date: Mon, 22 Dec 2025 06:21:32 +0000 Subject: [PATCH 5/6] log info --- .../java/io/pixelsdb/pixels/retina/RetinaResourceManager.java | 1 + 1 file changed, 1 insertion(+) diff --git a/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/RetinaResourceManager.java b/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/RetinaResourceManager.java index ca3dc08d5a..12507e21ea 100644 --- a/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/RetinaResourceManager.java +++ b/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/RetinaResourceManager.java @@ -567,6 +567,7 @@ public void addWriteBuffer(String schemaName, String tableName) throws RetinaExc index = metadataService.getPrimaryIndex(table.getId()); } catch (MetadataException ignored) { + logger.warn("There isn't primary index on table {}", tableName); } PixelsWriteBuffer pixelsWriteBuffer = new PixelsWriteBuffer(latestLayout.getTableId(), From 46f99ddb1f3e17a0d9ae896d5e3dfc8eec0b6551 Mon Sep 17 00:00:00 2001 From: AntiO2 Date: Mon, 22 Dec 2025 09:25:02 +0000 Subject: [PATCH 6/6] Fix IndexBuffer --- .../pixels/common/index/MainIndexBuffer.java | 8 +++- .../main/sqlite/TestSqliteMainIndexQuery.java | 2 +- .../pixels/retina/PixelsWriteBuffer.java | 46 +++++++++++++------ .../pixels/retina/RetinaResourceManager.java | 12 +---- .../pixels/retina/TestPixelsWriteBuffer.java | 2 +- 5 files changed, 42 insertions(+), 28 deletions(-) diff --git a/pixels-common/src/main/java/io/pixelsdb/pixels/common/index/MainIndexBuffer.java b/pixels-common/src/main/java/io/pixelsdb/pixels/common/index/MainIndexBuffer.java index 468932e689..60ae77903a 100644 --- a/pixels-common/src/main/java/io/pixelsdb/pixels/common/index/MainIndexBuffer.java +++ b/pixels-common/src/main/java/io/pixelsdb/pixels/common/index/MainIndexBuffer.java @@ -188,10 +188,10 @@ public List flush(long fileId) throws MainIndexException currRangeBuilder.setRowIdEnd(prevRowId + 1); currRangeBuilder.setRgRowOffsetEnd(prevRgRowOffset + 1); ranges.add(currRangeBuilder.build()); - last = true; } // start constructing a new row id range first = false; + last = true; currRangeBuilder.setRowIdStart(rowId); currRangeBuilder.setFileId(fileId); currRangeBuilder.setRgId(rgId); @@ -202,13 +202,17 @@ public List flush(long fileId) throws MainIndexException prevRgRowOffset = rgRowOffset; } // add the last range - if (!last) + if (last) { currRangeBuilder.setRowIdEnd(prevRowId + 1); currRangeBuilder.setRgRowOffsetEnd(prevRgRowOffset + 1); ranges.add(currRangeBuilder.build()); } // release the flushed file index buffer + if(fileBuffer.size() != rowIds.length) + { + throw new MainIndexException("FileBuffer Changed while flush"); + } fileBuffer.clear(); this.indexBuffer.remove(fileId); if (this.indexBuffer.size() <= CACHE_POP_ENABLE_THRESHOLD) diff --git a/pixels-index/pixels-index-main-sqlite/src/test/java/io/pixelsdb/pixels/index/main/sqlite/TestSqliteMainIndexQuery.java b/pixels-index/pixels-index-main-sqlite/src/test/java/io/pixelsdb/pixels/index/main/sqlite/TestSqliteMainIndexQuery.java index 62ecfc56d7..7847fcd34c 100644 --- a/pixels-index/pixels-index-main-sqlite/src/test/java/io/pixelsdb/pixels/index/main/sqlite/TestSqliteMainIndexQuery.java +++ b/pixels-index/pixels-index-main-sqlite/src/test/java/io/pixelsdb/pixels/index/main/sqlite/TestSqliteMainIndexQuery.java @@ -47,7 +47,7 @@ public class TestSqliteMainIndexQuery { MainIndex mainIndex; - Long tableId =2876L; + Long tableId =3035L; Connection connection; @BeforeEach public void setUp() throws Exception diff --git a/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/PixelsWriteBuffer.java b/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/PixelsWriteBuffer.java index 63ff5e04af..3e94659ff1 100644 --- a/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/PixelsWriteBuffer.java +++ b/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/PixelsWriteBuffer.java @@ -20,9 +20,11 @@ package io.pixelsdb.pixels.retina; import io.pixelsdb.pixels.common.exception.IndexException; +import io.pixelsdb.pixels.common.exception.MetadataException; import io.pixelsdb.pixels.common.exception.RetinaException; import io.pixelsdb.pixels.common.index.IndexServiceProvider; import io.pixelsdb.pixels.common.index.RowIdAllocator; +import io.pixelsdb.pixels.common.metadata.MetadataService; import io.pixelsdb.pixels.common.metadata.domain.Path; import io.pixelsdb.pixels.common.metadata.domain.SinglePointIndex; import io.pixelsdb.pixels.common.physical.Storage; @@ -117,12 +119,13 @@ public class PixelsWriteBuffer private AtomicLong continuousFlushedId; private final PriorityQueue outOfOrderFlushedIds; private final Object flushLock = new Object(); + private final Object rowLock = new Object(); private String retinaHostName; - private final SinglePointIndex index; + private SinglePointIndex index; public PixelsWriteBuffer(long tableId, TypeDescription schema, Path targetOrderedDirPath, - Path targetCompactDirPath, String retinaHostName, SinglePointIndex index) throws RetinaException + Path targetCompactDirPath, String retinaHostName) throws RetinaException { this.tableId = tableId; this.schema = schema; @@ -175,7 +178,6 @@ public PixelsWriteBuffer(long tableId, TypeDescription schema, Path targetOrdere // initialization adds reference counts to all data this.currentVersion = new SuperVersion(activeMemTable, immutableMemTables, objectEntries); this.rowIdAllocator = new RowIdAllocator(tableId, this.memTableSize, IndexServiceProvider.ServiceMode.local); - this.index = index; startFlushObjectToFileScheduler(Long.parseLong(configFactory.getProperty("retina.buffer.flush.interval"))); } @@ -195,15 +197,25 @@ public long addRow(byte[][] values, long timestamp, IndexProto.RowLocation.Build MemTable currentMemTable = null; int rowOffset = -1; + long rowId = -1; while (rowOffset < 0) { currentMemTable = this.activeMemTable; try { - rowOffset = currentMemTable.add(values, timestamp); + synchronized (rowLock) + { + // Ensure rgRowOffset and rowId are allocated synchronously to minimize + // fragmentation after MainIndex flush. + rowOffset = currentMemTable.add(values, timestamp); + rowId = rowIdAllocator.getRowId(); + } } catch (NullPointerException e) { continue; + } catch (IndexException e) + { + throw new RetinaException("Fail to get rowId from rowIdAllocator", e); } // active memTable is full @@ -220,13 +232,7 @@ public long addRow(byte[][] values, long timestamp, IndexProto.RowLocation.Build builder.setFileId(activeMemTable.getFileId()) .setRgId(0) .setRgRowOffset(rgRowOffset); - try - { - return rowIdAllocator.getRowId(); - } catch (IndexException e) - { - throw new RetinaException("Fail to get rowId from rowIdAllocator", e); - } + return rowId; } private void switchMemTable() throws RetinaException @@ -365,6 +371,17 @@ private void startFlushObjectToFileScheduler(long intervalSeconds) this.flushFileFuture = this.flushFileExecutor.scheduleWithFixedDelay(() -> { try { + if(index == null) + { + try + { + index = MetadataService.Instance().getPrimaryIndex(tableId); + } catch (MetadataException ignored) + { + logger.warn("There isn't primary index on table {}", tableId); + } + } + Iterator iterator = this.fileWriterManagers.iterator(); while (iterator.hasNext()) { @@ -390,8 +407,11 @@ private void startFlushObjectToFileScheduler(long intervalSeconds) this.versionLock.writeLock().unlock(); finished.get(); - IndexServiceProvider.getService(IndexServiceProvider.ServiceMode.local) - .flushIndexEntriesOfFile(tableId, index.getId(), fileWriterManager.getFileId(), true); + if(index != null) + { + IndexServiceProvider.getService(IndexServiceProvider.ServiceMode.local) + .flushIndexEntriesOfFile(tableId, index.getId(), fileWriterManager.getFileId(), true); + } for (ObjectEntry objectEntry : toRemove) { if (objectEntry.unref()) diff --git a/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/RetinaResourceManager.java b/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/RetinaResourceManager.java index 12507e21ea..34051e0f79 100644 --- a/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/RetinaResourceManager.java +++ b/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/RetinaResourceManager.java @@ -20,13 +20,11 @@ package io.pixelsdb.pixels.retina; import com.google.protobuf.ByteString; -import io.pixelsdb.pixels.common.exception.MetadataException; import io.pixelsdb.pixels.common.exception.RetinaException; import io.pixelsdb.pixels.common.exception.TransException; import io.pixelsdb.pixels.common.metadata.MetadataService; import io.pixelsdb.pixels.common.metadata.domain.Column; import io.pixelsdb.pixels.common.metadata.domain.Layout; -import io.pixelsdb.pixels.common.metadata.domain.SinglePointIndex; import io.pixelsdb.pixels.common.metadata.domain.Table; import io.pixelsdb.pixels.common.physical.PhysicalReader; import io.pixelsdb.pixels.common.physical.PhysicalReaderUtil; @@ -561,17 +559,9 @@ public void addWriteBuffer(String schemaName, String tableName) throws RetinaExc // get primary index Table table = metadataService.getTable(schemaName, tableName); - SinglePointIndex index = null; - try - { - index = metadataService.getPrimaryIndex(table.getId()); - } catch (MetadataException ignored) - { - logger.warn("There isn't primary index on table {}", tableName); - } PixelsWriteBuffer pixelsWriteBuffer = new PixelsWriteBuffer(latestLayout.getTableId(), - schema, orderedPaths.get(0), compactPaths.get(0), retinaHostName, index); + schema, orderedPaths.get(0), compactPaths.get(0), retinaHostName); String writeBufferKey = schemaName + "_" + tableName; pixelsWriteBufferMap.put(writeBufferKey, pixelsWriteBuffer); } catch (Exception e) diff --git a/pixels-retina/src/test/java/io/pixelsdb/pixels/retina/TestPixelsWriteBuffer.java b/pixels-retina/src/test/java/io/pixelsdb/pixels/retina/TestPixelsWriteBuffer.java index 4d01588984..4e1b3d18b5 100644 --- a/pixels-retina/src/test/java/io/pixelsdb/pixels/retina/TestPixelsWriteBuffer.java +++ b/pixels-retina/src/test/java/io/pixelsdb/pixels/retina/TestPixelsWriteBuffer.java @@ -58,7 +58,7 @@ public void setup() columnTypes.add("int"); schema = TypeDescription.createSchemaFromStrings(columnNames, columnTypes); - buffer = new PixelsWriteBuffer(0L, schema, targetOrderDirPath, targetCompactDirPath, "localhost", null); // table id get from mysql `TBLS` table + buffer = new PixelsWriteBuffer(0L, schema, targetOrderDirPath, targetCompactDirPath, "localhost"); // table id get from mysql `TBLS` table } catch (Exception e) { System.out.println("setup error: " + e);