diff --git a/interface.go b/interface.go index 2ab9c86..0329895 100644 --- a/interface.go +++ b/interface.go @@ -71,7 +71,7 @@ type TableStore interface { GetColumnDefs() []ColumnDef AddRow(elem Row) (*RowLoc, error) GetRow(loc RowLoc) (map[string]any, error) - DeleteRow(key []byte) error + DeleteRow(loc RowLoc, id []byte) error Fetch(inputs chan Index, workers int) <-chan BulkResponse Remove(inputs chan Index, workers int) <-chan BulkResponse diff --git a/jsontable/driver.go b/jsontable/driver.go index a127a66..7e09837 100644 --- a/jsontable/driver.go +++ b/jsontable/driver.go @@ -28,7 +28,7 @@ const ROW_OFFSET_HSIZE = 8 type JSONDriver struct { base string Lock sync.RWMutex - PebbleLock sync.Mutex + PebbleLock sync.RWMutex db *pebble.DB Pb *pebblebulk.PebbleKV @@ -65,7 +65,7 @@ func NewJSONDriver(path string) (benchtop.TableDriver, error) { }), Fields: map[string]map[string]struct{}{}, Lock: sync.RWMutex{}, - PebbleLock: sync.Mutex{}, + PebbleLock: sync.RWMutex{}, LabelLookup: map[uint16]string{}, } @@ -107,7 +107,7 @@ func LoadJSONDriver(path string) (benchtop.TableDriver, error) { }, Fields: map[string]map[string]struct{}{}, Lock: sync.RWMutex{}, - PebbleLock: sync.Mutex{}, + PebbleLock: sync.RWMutex{}, PageCache: otter.Must(&otter.Options[string, benchtop.RowLoc]{ MaximumSize: 10000000, }), @@ -396,6 +396,8 @@ func (dr *JSONDriver) Delete(name string) error { return nil } +// BulkLoad +// tx: set null to initialize pebble bulk write context // BulkLoad // tx: set null to initialize pebble bulk write context func (dr *JSONDriver) BulkLoad(inputs chan *benchtop.Row, tx *pebblebulk.PebbleBulk) error { @@ -405,11 +407,20 @@ func (dr *JSONDriver) BulkLoad(inputs chan *benchtop.Row, tx *pebblebulk.PebbleB } var wg sync.WaitGroup tableChannels := make(map[string]chan *benchtop.Row) + + // New struct to hold the individual elements of a field key + type fieldKeyElements struct { + field string + tableName string + val any + rowId string + } + metadataChan := make(chan struct { - table *JSONTable - fieldIndexKeys [][]byte - metadata map[string]benchtop.RowLoc - err error + table *JSONTable + fieldIndexKeyElements []fieldKeyElements // Changed to the new struct + metadata map[string]benchtop.RowLoc + err error }, 100) startTableGoroutine := func(tableName string) { @@ -423,7 +434,7 @@ func (dr *JSONDriver) BulkLoad(inputs chan *benchtop.Row, tx *pebblebulk.PebbleB snapshot.Close() wg.Done() }() - var fieldIndexKeys [][]byte + var fieldIndexKeyElements []fieldKeyElements // Changed variable name metadata := make(map[string]benchtop.RowLoc) var localErr *multierror.Error @@ -435,10 +446,10 @@ func (dr *JSONDriver) BulkLoad(inputs chan *benchtop.Row, tx *pebblebulk.PebbleB if err != nil { localErr = multierror.Append(localErr, fmt.Errorf("failed to create table %s: %v", tableName, err)) metadataChan <- struct { - table *JSONTable - fieldIndexKeys [][]byte - metadata map[string]benchtop.RowLoc - err error + table *JSONTable + fieldIndexKeyElements []fieldKeyElements + metadata map[string]benchtop.RowLoc + err error }{nil, nil, nil, localErr.ErrorOrNil()} return } @@ -467,7 +478,13 @@ func (dr *JSONDriver) BulkLoad(inputs chan *benchtop.Row, tx *pebblebulk.PebbleB if fieldsExist { for field := range dr.Fields[tableName] { if val := PathLookup(row.Data, field); val != nil { - fieldIndexKeys = append(fieldIndexKeys, benchtop.FieldKey(field, tableName, val, row.Id)) + // Append the individual key elements to the new slice + fieldIndexKeyElements = append(fieldIndexKeyElements, fieldKeyElements{ + field: field, + tableName: tableName, + val: val, + rowId: string(row.Id), + }) } } } @@ -535,11 +552,11 @@ func (dr *JSONDriver) BulkLoad(inputs chan *benchtop.Row, tx *pebblebulk.PebbleB } metadataChan <- struct { - table *JSONTable - fieldIndexKeys [][]byte - metadata map[string]benchtop.RowLoc - err error - }{table, fieldIndexKeys, metadata, localErr.ErrorOrNil()} + table *JSONTable + fieldIndexKeyElements []fieldKeyElements + metadata map[string]benchtop.RowLoc + err error + }{table, fieldIndexKeyElements, metadata, localErr.ErrorOrNil()} }() } @@ -569,8 +586,22 @@ func (dr *JSONDriver) BulkLoad(inputs chan *benchtop.Row, tx *pebblebulk.PebbleB if meta.table == nil { continue } - for _, key := range meta.fieldIndexKeys { - err := tx.Set(key, []byte{}, nil) + + for _, keyElements := range meta.fieldIndexKeyElements { + forwardKey := benchtop.FieldKey(keyElements.field, keyElements.tableName, keyElements.val, []byte(keyElements.rowId)) + err := tx.Set(forwardKey, []byte{}, nil) + if err != nil { + errs = multierror.Append(errs, err) + } + + BVal, err := sonic.ConfigFastest.Marshal(keyElements.val) + if err != nil { + errs = multierror.Append(errs, err) + } + err = tx.Set(benchtop.RFieldKey( + keyElements.tableName, keyElements.field, keyElements.rowId, + ), + BVal, nil) if err != nil { errs = multierror.Append(errs, err) } diff --git a/jsontable/fields.go b/jsontable/fields.go index c89c882..d9376b8 100644 --- a/jsontable/fields.go +++ b/jsontable/fields.go @@ -6,6 +6,7 @@ import ( "github.com/bmeg/benchtop" "github.com/bmeg/grip/log" + "github.com/bytedance/sonic" "github.com/bmeg/benchtop/filters" "github.com/bmeg/benchtop/pebblebulk" @@ -29,18 +30,36 @@ func (dr *JSONDriver) AddField(label, field string) error { log.Errorf("Err attempting to add field %v", err) return err } + err = dr.db.Set( + bytes.Join([][]byte{ + benchtop.RFieldPrefix, + []byte(label), + []byte(field), + }, benchtop.FieldSep), + []byte{}, + nil, + ) + if err != nil { + log.Errorf("Err attempting to add field %v", err) + return err + } + } else { log.Debugf("Found table %s writing indices for field %s", label, field) err := dr.Pb.BulkWrite(func(tx *pebblebulk.PebbleBulk) error { var filter benchtop.RowFilter = nil for r := range foundTable.Scan(true, filter) { + fieldValue := PathLookup(r.(map[string]any), field) + rowId, ok := r.(map[string]any)["_id"].(string) + if !ok { + return fmt.Errorf("_id field not found or is not string in map %s", r) + } err := tx.Set( benchtop.FieldKey( field, label, - PathLookup( - r.(map[string]any), field), - []byte(r.(map[string]any)["_id"].(string)), + fieldValue, + []byte(rowId), ), []byte{}, nil, @@ -48,6 +67,16 @@ func (dr *JSONDriver) AddField(label, field string) error { if err != nil { return err } + if fieldValue != nil { + byteFV, err := sonic.ConfigFastest.Marshal(fieldValue) + if err != nil { + return err + } + err = tx.Set(benchtop.RFieldKey(label, field, rowId), byteFV, nil) + if err != nil { + return err + } + } } return nil }) @@ -65,7 +94,7 @@ func (dr *JSONDriver) AddField(label, field string) error { return fmt.Errorf("index label '%s' field '%s' already exists", label, field) } innerMap[field] = struct{}{} - log.Debugln("Fields: ", dr.Fields) + log.Debugln("List Fields: ", dr.Fields) return nil } @@ -81,15 +110,25 @@ func (dr *JSONDriver) RemoveField(label string, field string) error { } } - key := benchtop.FieldLabelKey(field, label) + FieldPrefix := benchtop.FieldLabelKey(field, label) + RFieldKeyPrefix := bytes.Join([][]byte{ + benchtop.RFieldPrefix, + []byte(label), + []byte(field), + }, benchtop.FieldSep) - log.Infof("Deleting prefix: %q", key) // Perform deletion in a bulk write transaction err := dr.Pb.BulkWrite(func(tx *pebblebulk.PebbleBulk) error { - return tx.DeletePrefix(key) + if err := tx.DeletePrefix(FieldPrefix); err != nil { + return fmt.Errorf("delete field prefix failed: %w", err) + } + if err := tx.DeletePrefix(RFieldKeyPrefix); err != nil { + return fmt.Errorf("delete row index prefix failed: %w", err) + } + return nil }) if err != nil { - return fmt.Errorf("delete range failed: %w", err) + return err } return nil } @@ -148,6 +187,78 @@ func (dr *JSONDriver) ListFields() []FieldInfo { return out } +func (dr *JSONDriver) DeleteRowField(label, field, rowID string) error { + /* Deletes a singular row index field */ + dr.Lock.Lock() + defer dr.Lock.Unlock() + + // Check if the table exists + _, ok := dr.Tables[label] + if !ok { + log.Errorf("Table '%s' does not exist", label) + return fmt.Errorf("table '%s' does not exist", label) + } + + // Check if the field exists + innerMap, existsLabel := dr.Fields[label] + if !existsLabel || innerMap == nil { + log.Errorf("No fields defined for table '%s'", label) + return fmt.Errorf("no fields defined for table '%s'", label) + } + if _, existsField := innerMap[field]; !existsField { + log.Errorf("Field '%s' does not exist in table '%s'", field, label) + return fmt.Errorf("field '%s' does not exist in table '%s'", field, label) + } + + // Get the field value from the reverse index + rowIndexKey := benchtop.RFieldKey(label, field, rowID) + var fieldValueBytes []byte + err := dr.Pb.View(func(it *pebblebulk.PebbleIterator) error { + var err error + if it.Seek(rowIndexKey); it.Valid() && bytes.Equal(it.Key(), rowIndexKey) { + fieldValueBytes, err = it.Value() + if err != nil { + return err + } + } + return nil + }) + if err != nil { + log.Errorf("Error finding reverse index for row '%s' in table '%s' for field '%s': %v", rowID, label, field, err) + return err + } + + // If no reverse index entry exists, no index to delete + if fieldValueBytes == nil { + log.Debugf("No index entry for row '%s' in table '%s' for field '%s'", rowID, label, field) + return nil + } + + var fieldValue any + if err := sonic.ConfigFastest.Unmarshal(fieldValueBytes, &fieldValue); err != nil { + log.Errorf("Error deserializing field value for row '%s' in table '%s' for field '%s': %v", rowID, label, field, err) + return err + } + fmt.Println("FIELD VALUE ANY: ", fieldValue) + + // Delete both the forward and reverse index entries + err = dr.Pb.BulkWrite(func(tx *pebblebulk.PebbleBulk) error { + if err := tx.Delete(benchtop.FieldKey(field, label, fieldValue, []byte(rowID)), nil); err != nil { + return err + } + if err := tx.Delete(rowIndexKey, nil); err != nil { + return err + } + return nil + }) + if err != nil { + log.Errorf("Error deleting index for field '%s' in table '%s' for row '%s': %v", field, label, rowID, err) + return err + } + log.Debugf("Successfully deleted index for field '%s' in table '%s' for row '%s'", field, label, rowID) + return nil +} + func (dr *JSONDriver) RowIdsByHas(fltField string, fltValue any, fltOp gripql.Condition) chan string { dr.Lock.RLock() defer dr.Lock.RUnlock() diff --git a/jsontable/table.go b/jsontable/table.go index bab88b8..ddc8ed0 100644 --- a/jsontable/table.go +++ b/jsontable/table.go @@ -42,9 +42,8 @@ type JSONTable struct { func (b *JSONTable) Init(poolSize int) error { b.FilePool = make(chan *os.File, poolSize) for i := range poolSize { - file, err := os.Open(b.Path) + file, err := os.OpenFile(b.Path, os.O_RDWR, 0666) if err != nil { - // Close already opened files for range i { if file, ok := <-b.FilePool; ok { file.Close() @@ -94,7 +93,7 @@ func (b *JSONTable) AddRow(elem benchtop.Row) (*benchtop.RowLoc, error) { return nil, err } - log.Debugln("WRITE ENTRY: ", offset, len(bData)) + //log.Debugln("WRITE ENTRY: ", offset, len(bData)) writesize, err := b.writeJsonEntry(offset, bData) if err != nil { log.Errorf("write handler err in Load: bulkSet: %s", err) @@ -137,17 +136,30 @@ func (b *JSONTable) GetRow(loc benchtop.RowLoc) (map[string]any, error) { return out.(map[string]any), nil } -func (b *JSONTable) DeleteRow(name []byte) error { - offset, _, err := b.GetBlockPos(name) - if err != nil { - return err +func (b *JSONTable) MarkDeleteTable(loc benchtop.RowLoc) error { + // Since we're not explicitly 'adding' to a part of the file, should be able + // to get away with no lock here since the space is just 'marked' as empty + file := <-b.FilePool + defer func() { + b.FilePool <- file + }() + if _, err := file.WriteAt([]byte{0x00, 0x00, 0x00, 0x00}, int64(loc.Offset+ROW_OFFSET_HSIZE)); err != nil { + return fmt.Errorf("writeAt failed: %w", err) } + return nil +} + +func (b *JSONTable) DeleteRow(loc benchtop.RowLoc, id []byte) error { b.handleLock.Lock() - if _, err := b.handle.WriteAt([]byte{0x00, 0x00, 0x00, 0x00}, int64(offset+12)); err != nil { + defer b.handleLock.Unlock() + + if _, err := b.handle.WriteAt([]byte{0x00, 0x00, 0x00, 0x00}, int64(loc.Offset+ROW_OFFSET_HSIZE)); err != nil { return fmt.Errorf("writeAt failed: %w", err) } - b.handleLock.Unlock() - b.db.Delete(benchtop.NewPosKey(b.TableId, name), nil) + err := b.db.Delete(benchtop.NewPosKey(b.TableId, id), nil) + if err != nil { + return err + } return nil } @@ -213,7 +225,6 @@ func (b *JSONTable) Scan(loadData bool, filter benchtop.RowFilter) chan any { } rowData := m[jsonStart:jsonEnd] - err = b.processJSONRowData(rowData, loadData, filter, outChan) if err != nil { log.Debugf("Skipping malformed row at offset %d: %v", offset, err) diff --git a/keys.go b/keys.go index 2bb2bc1..cdb2961 100644 --- a/keys.go +++ b/keys.go @@ -20,12 +20,26 @@ var PosPrefix = byte('P') // Field // key: F -// used for indexing specific field values in kvgraph +// used for indexing specific field values var FieldPrefix = []byte{'F'} +// ReverseField Index +// key: R +// used for reverse indexing specific field keys in order to be able to efficiently delete indices +var RFieldPrefix = []byte{'R'} + // The '0x1F' invisible character unit seperator not supposed to appear in ASCII text var FieldSep = []byte{0x1F} +func RFieldKey(label, field, rowID string) []byte { + return bytes.Join([][]byte{ + RFieldPrefix, + []byte(label), + []byte(field), + []byte(rowID), + }, FieldSep) +} + func FieldKey(field string, label string, value any, rowID []byte) []byte { /* creates a full field key for optimizing the beginning of a query */ valueBytes, err := json.Marshal(value) diff --git a/test/benchmark/compact_test.go b/test/benchmark/compact_test.go index 304f260..e1c085c 100644 --- a/test/benchmark/compact_test.go +++ b/test/benchmark/compact_test.go @@ -64,11 +64,17 @@ func BenchmarkCompactJson(b *testing.B) { b.Fatal(err) } + bT, _ := compactjsonTable.(*jsontable.JSONTable) + count := 0 deleted := 0 for key := range keys { if _, exists := randomIndexSet[count]; exists { - if err := compactjsonTable.DeleteRow(key.Key); err != nil { + offset, size, err := bT.GetBlockPos(key.Key) + if err != nil { + b.Error(err) + } + if err := compactjsonTable.DeleteRow(benchtop.RowLoc{Offset: offset, Size: size, Label: bT.TableId}, key.Key); err != nil { b.Fatal(err) } deleted++ diff --git a/test/integration/compact_test.go b/test/integration/compact_test.go index d5e8cac..258a4e8 100644 --- a/test/integration/compact_test.go +++ b/test/integration/compact_test.go @@ -37,7 +37,11 @@ func TestCompact(t *testing.T) { } - err = ts.DeleteRow([]byte("key4")) + offset, size, err := bT.GetBlockPos([]byte("key4")) + if err != nil { + t.Error(err) + } + err = ts.DeleteRow(benchtop.RowLoc{Offset: offset, Size: size, Label: bT.TableId}, []byte("key4")) if err != nil { t.Fatal(err) } diff --git a/test/integration/delete_test.go b/test/integration/delete_test.go index ecf7bd2..96bfb16 100644 --- a/test/integration/delete_test.go +++ b/test/integration/delete_test.go @@ -73,7 +73,11 @@ func TestDelete(t *testing.T) { i := 0 for k := range keys { if i%3 == 0 { - err := bT.DeleteRow(k.Key) + offset, size, err := bT.GetBlockPos(k.Key) + if err != nil { + t.Error(err) + } + err = bT.DeleteRow(benchtop.RowLoc{Offset: offset, Size: size, Label: bT.TableId}, k.Key) if err != nil { t.Errorf("delete %s error: %s", string(k.Key), err) } diff --git a/test/integration/scan_test.go b/test/integration/scan_test.go index 134bb0c..de7b845 100644 --- a/test/integration/scan_test.go +++ b/test/integration/scan_test.go @@ -174,7 +174,11 @@ func TestScan(t *testing.T) { t.Errorf("Expecting 6 items returned but got %d", scanChanLen3) } - err = bT.DeleteRow([]byte("key4")) + offset, size, err := bT.GetBlockPos([]byte("key4")) + if err != nil { + t.Error(err) + } + err = bT.DeleteRow(benchtop.RowLoc{Offset: offset, Size: size, Label: bT.TableId}, []byte("key4")) if err != nil { t.Error(err) }