Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ type TableStore interface {
GetColumnDefs() []ColumnDef
AddRow(elem Row) (*RowLoc, error)
GetRow(loc RowLoc) (map[string]any, error)
DeleteRow(key []byte) error
DeleteRow(loc RowLoc, id []byte) error

Fetch(inputs chan Index, workers int) <-chan BulkResponse
Remove(inputs chan Index, workers int) <-chan BulkResponse
Expand Down
71 changes: 51 additions & 20 deletions jsontable/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ const ROW_OFFSET_HSIZE = 8
type JSONDriver struct {
base string
Lock sync.RWMutex
PebbleLock sync.Mutex
PebbleLock sync.RWMutex
db *pebble.DB
Pb *pebblebulk.PebbleKV

Expand Down Expand Up @@ -65,7 +65,7 @@ func NewJSONDriver(path string) (benchtop.TableDriver, error) {
}),
Fields: map[string]map[string]struct{}{},
Lock: sync.RWMutex{},
PebbleLock: sync.Mutex{},
PebbleLock: sync.RWMutex{},
LabelLookup: map[uint16]string{},
}

Expand Down Expand Up @@ -107,7 +107,7 @@ func LoadJSONDriver(path string) (benchtop.TableDriver, error) {
},
Fields: map[string]map[string]struct{}{},
Lock: sync.RWMutex{},
PebbleLock: sync.Mutex{},
PebbleLock: sync.RWMutex{},
PageCache: otter.Must(&otter.Options[string, benchtop.RowLoc]{
MaximumSize: 10000000,
}),
Expand Down Expand Up @@ -396,6 +396,8 @@ func (dr *JSONDriver) Delete(name string) error {
return nil
}

// BulkLoad
// tx: set null to initialize pebble bulk write context
// BulkLoad
// tx: set null to initialize pebble bulk write context
func (dr *JSONDriver) BulkLoad(inputs chan *benchtop.Row, tx *pebblebulk.PebbleBulk) error {
Expand All @@ -405,11 +407,20 @@ func (dr *JSONDriver) BulkLoad(inputs chan *benchtop.Row, tx *pebblebulk.PebbleB
}
var wg sync.WaitGroup
tableChannels := make(map[string]chan *benchtop.Row)

// New struct to hold the individual elements of a field key
type fieldKeyElements struct {
field string
tableName string
val any
rowId string
}

metadataChan := make(chan struct {
table *JSONTable
fieldIndexKeys [][]byte
metadata map[string]benchtop.RowLoc
err error
table *JSONTable
fieldIndexKeyElements []fieldKeyElements // Changed to the new struct
metadata map[string]benchtop.RowLoc
err error
}, 100)

startTableGoroutine := func(tableName string) {
Expand All @@ -423,7 +434,7 @@ func (dr *JSONDriver) BulkLoad(inputs chan *benchtop.Row, tx *pebblebulk.PebbleB
snapshot.Close()
wg.Done()
}()
var fieldIndexKeys [][]byte
var fieldIndexKeyElements []fieldKeyElements // Changed variable name
metadata := make(map[string]benchtop.RowLoc)
var localErr *multierror.Error

Expand All @@ -435,10 +446,10 @@ func (dr *JSONDriver) BulkLoad(inputs chan *benchtop.Row, tx *pebblebulk.PebbleB
if err != nil {
localErr = multierror.Append(localErr, fmt.Errorf("failed to create table %s: %v", tableName, err))
metadataChan <- struct {
table *JSONTable
fieldIndexKeys [][]byte
metadata map[string]benchtop.RowLoc
err error
table *JSONTable
fieldIndexKeyElements []fieldKeyElements
metadata map[string]benchtop.RowLoc
err error
}{nil, nil, nil, localErr.ErrorOrNil()}
return
}
Expand Down Expand Up @@ -467,7 +478,13 @@ func (dr *JSONDriver) BulkLoad(inputs chan *benchtop.Row, tx *pebblebulk.PebbleB
if fieldsExist {
for field := range dr.Fields[tableName] {
if val := PathLookup(row.Data, field); val != nil {
fieldIndexKeys = append(fieldIndexKeys, benchtop.FieldKey(field, tableName, val, row.Id))
// Append the individual key elements to the new slice
fieldIndexKeyElements = append(fieldIndexKeyElements, fieldKeyElements{
field: field,
tableName: tableName,
val: val,
rowId: string(row.Id),
})
}
}
}
Expand Down Expand Up @@ -535,11 +552,11 @@ func (dr *JSONDriver) BulkLoad(inputs chan *benchtop.Row, tx *pebblebulk.PebbleB
}

metadataChan <- struct {
table *JSONTable
fieldIndexKeys [][]byte
metadata map[string]benchtop.RowLoc
err error
}{table, fieldIndexKeys, metadata, localErr.ErrorOrNil()}
table *JSONTable
fieldIndexKeyElements []fieldKeyElements
metadata map[string]benchtop.RowLoc
err error
}{table, fieldIndexKeyElements, metadata, localErr.ErrorOrNil()}
}()
}

Expand Down Expand Up @@ -569,8 +586,22 @@ func (dr *JSONDriver) BulkLoad(inputs chan *benchtop.Row, tx *pebblebulk.PebbleB
if meta.table == nil {
continue
}
for _, key := range meta.fieldIndexKeys {
err := tx.Set(key, []byte{}, nil)

for _, keyElements := range meta.fieldIndexKeyElements {
forwardKey := benchtop.FieldKey(keyElements.field, keyElements.tableName, keyElements.val, []byte(keyElements.rowId))
err := tx.Set(forwardKey, []byte{}, nil)
if err != nil {
errs = multierror.Append(errs, err)
}

BVal, err := sonic.ConfigFastest.Marshal(keyElements.val)
if err != nil {
errs = multierror.Append(errs, err)
}
err = tx.Set(benchtop.RFieldKey(
keyElements.tableName, keyElements.field, keyElements.rowId,
),
BVal, nil)
if err != nil {
errs = multierror.Append(errs, err)
}
Expand Down
127 changes: 119 additions & 8 deletions jsontable/fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/bmeg/benchtop"
"github.com/bmeg/grip/log"
"github.com/bytedance/sonic"

"github.com/bmeg/benchtop/filters"
"github.com/bmeg/benchtop/pebblebulk"
Expand All @@ -29,25 +30,53 @@ func (dr *JSONDriver) AddField(label, field string) error {
log.Errorf("Err attempting to add field %v", err)
return err
}
err = dr.db.Set(
bytes.Join([][]byte{
benchtop.RFieldPrefix,
[]byte(label),
[]byte(field),
}, benchtop.FieldSep),
[]byte{},
nil,
)
if err != nil {
log.Errorf("Err attempting to add field %v", err)
return err
}

} else {
log.Debugf("Found table %s writing indices for field %s", label, field)
err := dr.Pb.BulkWrite(func(tx *pebblebulk.PebbleBulk) error {
var filter benchtop.RowFilter = nil
for r := range foundTable.Scan(true, filter) {
fieldValue := PathLookup(r.(map[string]any), field)
rowId, ok := r.(map[string]any)["_id"].(string)
if !ok {
return fmt.Errorf("_id field not found or is not string in map %s", r)
}
err := tx.Set(
benchtop.FieldKey(
field,
label,
PathLookup(
r.(map[string]any), field),
[]byte(r.(map[string]any)["_id"].(string)),
fieldValue,
[]byte(rowId),
),
[]byte{},
nil,
)
if err != nil {
return err
}
if fieldValue != nil {
byteFV, err := sonic.ConfigFastest.Marshal(fieldValue)
if err != nil {
return err
}
err = tx.Set(benchtop.RFieldKey(label, field, rowId), byteFV, nil)
if err != nil {
return err
}
}
}
return nil
})
Expand All @@ -65,7 +94,7 @@ func (dr *JSONDriver) AddField(label, field string) error {
return fmt.Errorf("index label '%s' field '%s' already exists", label, field)
}
innerMap[field] = struct{}{}
log.Debugln("Fields: ", dr.Fields)
log.Debugln("List Fields: ", dr.Fields)

return nil
}
Expand All @@ -81,15 +110,25 @@ func (dr *JSONDriver) RemoveField(label string, field string) error {
}
}

key := benchtop.FieldLabelKey(field, label)
FieldPrefix := benchtop.FieldLabelKey(field, label)
RFieldKeyPrefix := bytes.Join([][]byte{
benchtop.RFieldPrefix,
[]byte(label),
[]byte(field),
}, benchtop.FieldSep)

log.Infof("Deleting prefix: %q", key)
// Perform deletion in a bulk write transaction
err := dr.Pb.BulkWrite(func(tx *pebblebulk.PebbleBulk) error {
return tx.DeletePrefix(key)
if err := tx.DeletePrefix(FieldPrefix); err != nil {
return fmt.Errorf("delete field prefix failed: %w", err)
}
if err := tx.DeletePrefix(RFieldKeyPrefix); err != nil {
return fmt.Errorf("delete row index prefix failed: %w", err)
}
return nil
})
if err != nil {
return fmt.Errorf("delete range failed: %w", err)
return err
}
return nil
}
Expand Down Expand Up @@ -148,6 +187,78 @@ func (dr *JSONDriver) ListFields() []FieldInfo {
return out
}

func (dr *JSONDriver) DeleteRowField(label, field, rowID string) error {
/* Deletes a singular row index field */
dr.Lock.Lock()
defer dr.Lock.Unlock()

// Check if the table exists
_, ok := dr.Tables[label]
if !ok {
log.Errorf("Table '%s' does not exist", label)
return fmt.Errorf("table '%s' does not exist", label)
}

// Check if the field exists
innerMap, existsLabel := dr.Fields[label]
if !existsLabel || innerMap == nil {
log.Errorf("No fields defined for table '%s'", label)
return fmt.Errorf("no fields defined for table '%s'", label)
}
if _, existsField := innerMap[field]; !existsField {
log.Errorf("Field '%s' does not exist in table '%s'", field, label)
return fmt.Errorf("field '%s' does not exist in table '%s'", field, label)
}

// Get the field value from the reverse index
rowIndexKey := benchtop.RFieldKey(label, field, rowID)
var fieldValueBytes []byte
err := dr.Pb.View(func(it *pebblebulk.PebbleIterator) error {
var err error
if it.Seek(rowIndexKey); it.Valid() && bytes.Equal(it.Key(), rowIndexKey) {
fieldValueBytes, err = it.Value()
if err != nil {
return err
}
}
return nil
})
if err != nil {
log.Errorf("Error finding reverse index for row '%s' in table '%s' for field '%s': %v", rowID, label, field, err)
return err
}

// If no reverse index entry exists, no index to delete
if fieldValueBytes == nil {
log.Debugf("No index entry for row '%s' in table '%s' for field '%s'", rowID, label, field)
return nil
}

var fieldValue any
if err := sonic.ConfigFastest.Unmarshal(fieldValueBytes, &fieldValue); err != nil {
log.Errorf("Error deserializing field value for row '%s' in table '%s' for field '%s': %v", rowID, label, field, err)
return err
}
fmt.Println("FIELD VALUE ANY: ", fieldValue)

// Delete both the forward and reverse index entries
err = dr.Pb.BulkWrite(func(tx *pebblebulk.PebbleBulk) error {
if err := tx.Delete(benchtop.FieldKey(field, label, fieldValue, []byte(rowID)), nil); err != nil {
return err
}
if err := tx.Delete(rowIndexKey, nil); err != nil {
return err
}
return nil
})
if err != nil {
log.Errorf("Error deleting index for field '%s' in table '%s' for row '%s': %v", field, label, rowID, err)
return err
}
log.Debugf("Successfully deleted index for field '%s' in table '%s' for row '%s'", field, label, rowID)
return nil
}

func (dr *JSONDriver) RowIdsByHas(fltField string, fltValue any, fltOp gripql.Condition) chan string {
dr.Lock.RLock()
defer dr.Lock.RUnlock()
Expand Down
33 changes: 22 additions & 11 deletions jsontable/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,8 @@ type JSONTable struct {
func (b *JSONTable) Init(poolSize int) error {
b.FilePool = make(chan *os.File, poolSize)
for i := range poolSize {
file, err := os.Open(b.Path)
file, err := os.OpenFile(b.Path, os.O_RDWR, 0666)
if err != nil {
// Close already opened files
for range i {
if file, ok := <-b.FilePool; ok {
file.Close()
Expand Down Expand Up @@ -94,7 +93,7 @@ func (b *JSONTable) AddRow(elem benchtop.Row) (*benchtop.RowLoc, error) {
return nil, err
}

log.Debugln("WRITE ENTRY: ", offset, len(bData))
//log.Debugln("WRITE ENTRY: ", offset, len(bData))
writesize, err := b.writeJsonEntry(offset, bData)
if err != nil {
log.Errorf("write handler err in Load: bulkSet: %s", err)
Expand Down Expand Up @@ -137,17 +136,30 @@ func (b *JSONTable) GetRow(loc benchtop.RowLoc) (map[string]any, error) {
return out.(map[string]any), nil
}

func (b *JSONTable) DeleteRow(name []byte) error {
offset, _, err := b.GetBlockPos(name)
if err != nil {
return err
func (b *JSONTable) MarkDeleteTable(loc benchtop.RowLoc) error {
// Since we're not explicitly 'adding' to a part of the file, should be able
// to get away with no lock here since the space is just 'marked' as empty
file := <-b.FilePool
defer func() {
b.FilePool <- file
}()
if _, err := file.WriteAt([]byte{0x00, 0x00, 0x00, 0x00}, int64(loc.Offset+ROW_OFFSET_HSIZE)); err != nil {
return fmt.Errorf("writeAt failed: %w", err)
}
return nil
}

func (b *JSONTable) DeleteRow(loc benchtop.RowLoc, id []byte) error {
b.handleLock.Lock()
if _, err := b.handle.WriteAt([]byte{0x00, 0x00, 0x00, 0x00}, int64(offset+12)); err != nil {
defer b.handleLock.Unlock()

if _, err := b.handle.WriteAt([]byte{0x00, 0x00, 0x00, 0x00}, int64(loc.Offset+ROW_OFFSET_HSIZE)); err != nil {
return fmt.Errorf("writeAt failed: %w", err)
}
b.handleLock.Unlock()
b.db.Delete(benchtop.NewPosKey(b.TableId, name), nil)
err := b.db.Delete(benchtop.NewPosKey(b.TableId, id), nil)
if err != nil {
return err
}
return nil
}

Expand Down Expand Up @@ -213,7 +225,6 @@ func (b *JSONTable) Scan(loadData bool, filter benchtop.RowFilter) chan any {
}

rowData := m[jsonStart:jsonEnd]

err = b.processJSONRowData(rowData, loadData, filter, outChan)
if err != nil {
log.Debugf("Skipping malformed row at offset %d: %v", offset, err)
Expand Down
Loading
Loading