Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ protected void flushRemainingData() throws IOException, MetadataException
/**
* Initializes a new PixelsWriter and associated File/Path for a given bucket ID.
*/
private PerBucketWriter initializeBucketWriter(int bucketId) throws IOException, MetadataException
private PerBucketWriter initializeBucketWriter(int bucketId) throws IOException, MetadataException, IndexException
{
// Use the Node Cache to find the responsible Retina Node
NodeProto.NodeInfo targetNode = bucketCache.getRetinaNodeInfoByBucketId(bucketId);
Expand Down Expand Up @@ -262,6 +262,7 @@ private void closePixelsFile(PerBucketWriter bucketWriter) throws IOException, M
flushRowBatch(bucketWriter);
}

bucketWriter.indexService.flushIndexEntriesOfFile(index.getTableId(), index.getId(),bucketWriter.currFile.getId(), true);
closeWriterAndAddFile(bucketWriter.pixelsWriter, bucketWriter.currFile, bucketWriter.currTargetPath, bucketWriter.targetNode);
}

Expand Down Expand Up @@ -293,7 +294,7 @@ public PerBucketWriter(PixelsWriter writer, File file, Path path, NodeProto.Node
this.rowBatch = schema.createRowBatchWithHiddenColumn(pixelStride, TypeDescription.Mode.NONE);
this.indexService = indexServices.computeIfAbsent(node, nodeInfo ->
RPCIndexService.CreateInstance(nodeInfo.getAddress(), indexServerPort));
this.rowIdAllocator = new RowIdAllocator(index.getTableId(), 1000, this.indexService);
this.rowIdAllocator = new RowIdAllocator(index.getTableId(), maxRowNum, this.indexService);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -188,10 +188,10 @@ public List<RowIdRange> flush(long fileId) throws MainIndexException
currRangeBuilder.setRowIdEnd(prevRowId + 1);
currRangeBuilder.setRgRowOffsetEnd(prevRgRowOffset + 1);
ranges.add(currRangeBuilder.build());
last = true;
}
// start constructing a new row id range
first = false;
last = true;
currRangeBuilder.setRowIdStart(rowId);
currRangeBuilder.setFileId(fileId);
currRangeBuilder.setRgId(rgId);
Expand All @@ -202,13 +202,17 @@ public List<RowIdRange> flush(long fileId) throws MainIndexException
prevRgRowOffset = rgRowOffset;
}
// add the last range
if (!last)
if (last)
{
currRangeBuilder.setRowIdEnd(prevRowId + 1);
currRangeBuilder.setRgRowOffsetEnd(prevRgRowOffset + 1);
ranges.add(currRangeBuilder.build());
}
// release the flushed file index buffer
if(fileBuffer.size() != rowIds.length)
{
throw new MainIndexException("FileBuffer Changed while flush");
}
fileBuffer.clear();
this.indexBuffer.remove(fileId);
if (this.indexBuffer.size() <= CACHE_POP_ENABLE_THRESHOLD)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package io.pixelsdb.pixels.common.index;

import io.pixelsdb.pixels.common.exception.MainIndexException;
import io.pixelsdb.pixels.common.utils.ConfigFactory;
import io.pixelsdb.pixels.index.IndexProto;

import java.io.Closeable;
Expand All @@ -40,12 +41,11 @@ public class MainIndexCache implements Closeable
/**
* As row ids in a table is generated sequentially, and the hot row ids are usually the recent generated ones, the
* conflict rate of row ids should not be very high. Hence, one hash bucket should be good to cache 1 million main
* index entries. And 101 buckets should be large enough to cache 101 million entries, which may consume several GBs
* of memory. 101 is a prime number. It helps avoid imbalance buckets.
* index entries.
*/
private static final int NUM_BUCKETS = 101;
private final int NUM_BUCKETS;

private static final int MAX_BUCKET_SIZE = 1024 * 1024 * 1024;
private static final int MAX_BUCKET_SIZE = 1024 * 1024;
/**
* tableRowId % NUM_BUCKETS -> {tableRowId -> rowLocation}.
* We use multiple hash maps (buckets) to reduce hash conflicts in a large cache.
Expand All @@ -56,10 +56,11 @@ public class MainIndexCache implements Closeable

public MainIndexCache()
{
this.NUM_BUCKETS = Integer.parseInt(ConfigFactory.Instance().getProperty("index.main.cache.bucket.num"));
this.entryCacheBuckets = new ArrayList<>(NUM_BUCKETS);
for (int i = 0; i < NUM_BUCKETS; ++i)
{
this.entryCacheBuckets.add(new HashMap<>());
this.entryCacheBuckets.add(new LinkedHashMap<>());
}
this.rangeCache = new TreeSet<>((o1, o2) -> {
/* Issue #1150:
Expand Down
3 changes: 3 additions & 0 deletions pixels-common/src/main/resources/pixels.properties
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,8 @@ index.rocksdb.max.bytes.for.level.multiplier=10
index.rocksdb.target.file.size.base=67108864
# rocksdb file size multiplier (default to 1)
index.rocksdb.target.file.size.multiplier=1
# rocksdb key fixed prefix length
index.rocksdb.prefix.length=12
# rocksdb max subcompactions
index.rocksdb.max.subcompactions=1
# rocksdb compression type (e.g. NO_COMPRESSION, SNAPPY_COMPRESSION, ZLIB_COMPRESSION, BZ2_COMPRESSION, LZ4_COMPRESSION, LZ4HC_COMPRESSION, ZSTD_COMPRESSION)
Expand All @@ -365,6 +367,7 @@ index.cache.expiration.seconds=3600
index.rocksdb.multicf=false
# the directory where the sqlite files of main index are stored, each main index is stored as a sqlite file
index.sqlite.path=/tmp/sqlite
index.main.cache.bucket.num=7
index.mapdb.path=/tmp/mapdb
index.server.host=localhost
index.server.port=18895
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Copyright 2025 PixelsDB.
*
* This file is part of Pixels.
*
* Pixels is free software: you can redistribute it and/or modify
* it under the terms of the Affero GNU General Public License as
* published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version.
*
* Pixels is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Affero GNU General Public License for more details.
*
* You should have received a copy of the Affero GNU General Public
* License along with Pixels. If not, see
* <https://www.gnu.org/licenses/>.
*/
package io.pixelsdb.pixels.index.main.sqlite;

import io.pixelsdb.pixels.common.exception.MainIndexException;
import io.pixelsdb.pixels.common.exception.RowIdException;
import io.pixelsdb.pixels.common.index.MainIndex;
import io.pixelsdb.pixels.common.index.MainIndexFactory;
import io.pixelsdb.pixels.common.index.RowIdRange;
import io.pixelsdb.pixels.common.utils.ConfigFactory;
import io.pixelsdb.pixels.index.IndexProto;
import org.apache.commons.io.FileUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.io.File;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class TestSqliteMainIndexQuery
{
MainIndex mainIndex;
Long tableId =3035L;
Connection connection;
@BeforeEach
public void setUp() throws Exception
{
String sqlitePath = ConfigFactory.Instance().getProperty("index.sqlite.path");
if (!sqlitePath.endsWith("/"))
{
sqlitePath += "/";
}
mainIndex = MainIndexFactory.Instance().getMainIndex(tableId);
connection = DriverManager.getConnection("jdbc:sqlite:" + sqlitePath + tableId + ".main.index.db");
}

@Test
public void testQueryRowRanges() throws Exception
{
String query = "SELECT * FROM row_id_ranges order by row_id_start";
long fileid = 0;
try (PreparedStatement pst = this.connection.prepareStatement(query))
{
// pst.setLong(1, fileid);
try (ResultSet rs = pst.executeQuery())
{
while (rs.next())
{
long rowIdStart = rs.getLong("row_id_start");
long rowIdEnd = rs.getLong("row_id_end");
long fileId = rs.getLong("file_id");
int rgId = rs.getInt("rg_id");
int rgRowOffsetStart = rs.getInt("rg_row_offset_start");
int rgRowOffsetEnd = rs.getInt("rg_row_offset_end");
if (rowIdEnd - rowIdStart != rgRowOffsetEnd - rgRowOffsetStart)
{
throw new RowIdException("The width of row id range (" + rowIdStart + ", " +
rgRowOffsetEnd + ") does not match the width of row group row offset range (" +
rgRowOffsetStart + ", " + rgRowOffsetEnd + ")");
}
System.out.println(
"rowIdStart=" + rowIdStart +
", rowIdEnd=" + rowIdEnd +
", fileId=" + fileId +
", rgId=" + rgId +
", rgRowOffsetStart=" + rgRowOffsetStart +
", rgRowOffsetEnd=" + rgRowOffsetEnd
);
}
}
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ private static ColumnFamilyDescriptor createCFDescriptor(byte[] name)
.setFilterPolicy(new BloomFilter(10, false))
.setWholeKeyFiltering(false)
.setBlockSize(blockSize)
.setCacheIndexAndFilterBlocks(true)
.setPinL0FilterAndIndexBlocksInCache(true)
.setBlockCache(blockCache);

// ColumnFamily Options
Expand All @@ -141,6 +143,7 @@ private static ColumnFamilyDescriptor createCFDescriptor(byte[] name)
int maxBytesForLevelMultiplier = Integer.parseInt(config.getProperty("index.rocksdb.max.bytes.for.level.multiplier"));
long targetFileSizeBase = Long.parseLong(config.getProperty("index.rocksdb.target.file.size.base"));
int targetFileSizeMultiplier = Integer.parseInt(config.getProperty("index.rocksdb.target.file.size.multiplier"));
int fixedLengthPrefix = Integer.parseInt(config.getProperty("index.rocksdb.prefix.length"));
CompactionStyle compactionStyle = CompactionStyle.valueOf(config.getProperty("index.rocksdb.compaction.style"));

// Compression Options
Expand All @@ -160,7 +163,8 @@ private static ColumnFamilyDescriptor createCFDescriptor(byte[] name)
.setTargetFileSizeMultiplier(targetFileSizeMultiplier)
.setCompressionType(compressionType)
.setBottommostCompressionType(bottommostCompressionType)
.setCompactionStyle(compactionStyle);
.setCompactionStyle(compactionStyle)
.useFixedLengthPrefixExtractor(fixedLengthPrefix);

return new ColumnFamilyDescriptor(name, cfOptions);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@
package io.pixelsdb.pixels.retina;

import io.pixelsdb.pixels.common.exception.IndexException;
import io.pixelsdb.pixels.common.exception.MetadataException;
import io.pixelsdb.pixels.common.exception.RetinaException;
import io.pixelsdb.pixels.common.index.IndexServiceProvider;
import io.pixelsdb.pixels.common.index.RowIdAllocator;
import io.pixelsdb.pixels.common.metadata.MetadataService;
import io.pixelsdb.pixels.common.metadata.domain.Path;
import io.pixelsdb.pixels.common.metadata.domain.SinglePointIndex;
import io.pixelsdb.pixels.common.physical.Storage;
import io.pixelsdb.pixels.common.physical.StorageFactory;
import io.pixelsdb.pixels.common.utils.ConfigFactory;
Expand Down Expand Up @@ -116,11 +119,13 @@ public class PixelsWriteBuffer
private AtomicLong continuousFlushedId;
private final PriorityQueue<Long> outOfOrderFlushedIds;
private final Object flushLock = new Object();
private final Object rowLock = new Object();

private String retinaHostName;
private SinglePointIndex index;

public PixelsWriteBuffer(long tableId, TypeDescription schema, Path targetOrderedDirPath,
Path targetCompactDirPath, String retinaHostName) throws RetinaException
Path targetCompactDirPath, String retinaHostName) throws RetinaException
{
this.tableId = tableId;
this.schema = schema;
Expand Down Expand Up @@ -192,15 +197,25 @@ public long addRow(byte[][] values, long timestamp, IndexProto.RowLocation.Build

MemTable currentMemTable = null;
int rowOffset = -1;
long rowId = -1;
while (rowOffset < 0)
{
currentMemTable = this.activeMemTable;
try
{
rowOffset = currentMemTable.add(values, timestamp);
synchronized (rowLock)
{
// Ensure rgRowOffset and rowId are allocated synchronously to minimize
// fragmentation after MainIndex flush.
rowOffset = currentMemTable.add(values, timestamp);
rowId = rowIdAllocator.getRowId();
}
} catch (NullPointerException e)
{
continue;
} catch (IndexException e)
{
throw new RetinaException("Fail to get rowId from rowIdAllocator", e);
}

// active memTable is full
Expand All @@ -217,13 +232,7 @@ public long addRow(byte[][] values, long timestamp, IndexProto.RowLocation.Build
builder.setFileId(activeMemTable.getFileId())
.setRgId(0)
.setRgRowOffset(rgRowOffset);
try
{
return rowIdAllocator.getRowId();
} catch (IndexException e)
{
throw new RetinaException("Fail to get rowId from rowIdAllocator", e);
}
return rowId;
}

private void switchMemTable() throws RetinaException
Expand Down Expand Up @@ -362,6 +371,17 @@ private void startFlushObjectToFileScheduler(long intervalSeconds)
this.flushFileFuture = this.flushFileExecutor.scheduleWithFixedDelay(() -> {
try
{
if(index == null)
{
try
{
index = MetadataService.Instance().getPrimaryIndex(tableId);
} catch (MetadataException ignored)
{
logger.warn("There isn't primary index on table {}", tableId);
}
}

Iterator<FileWriterManager> iterator = this.fileWriterManagers.iterator();
while (iterator.hasNext())
{
Expand All @@ -387,6 +407,11 @@ private void startFlushObjectToFileScheduler(long intervalSeconds)
this.versionLock.writeLock().unlock();

finished.get();
if(index != null)
{
IndexServiceProvider.getService(IndexServiceProvider.ServiceMode.local)
.flushIndexEntriesOfFile(tableId, index.getId(), fileWriterManager.getFileId(), true);
}
for (ObjectEntry objectEntry : toRemove)
{
if (objectEntry.unref())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.pixelsdb.pixels.common.metadata.MetadataService;
import io.pixelsdb.pixels.common.metadata.domain.Column;
import io.pixelsdb.pixels.common.metadata.domain.Layout;
import io.pixelsdb.pixels.common.metadata.domain.Table;
import io.pixelsdb.pixels.common.physical.PhysicalReader;
import io.pixelsdb.pixels.common.physical.PhysicalReaderUtil;
import io.pixelsdb.pixels.common.physical.Storage;
Expand Down Expand Up @@ -556,6 +557,9 @@ public void addWriteBuffer(String schemaName, String tableName) throws RetinaExc
List<String> columnTypes = columns.stream().map(Column::getType).collect(Collectors.toList());
TypeDescription schema = TypeDescription.createSchemaFromStrings(columnNames, columnTypes);

// get primary index
Table table = metadataService.getTable(schemaName, tableName);

PixelsWriteBuffer pixelsWriteBuffer = new PixelsWriteBuffer(latestLayout.getTableId(),
schema, orderedPaths.get(0), compactPaths.get(0), retinaHostName);
String writeBufferKey = schemaName + "_" + tableName;
Expand Down