From c751ac35c51c08fb73dde0a6f0e11ef588b5bfd3 Mon Sep 17 00:00:00 2001 From: Andrei Cheboksarov <37665782+cheb0@users.noreply.github.com> Date: Sun, 1 Feb 2026 11:14:16 +0400 Subject: [PATCH 1/2] Next GEQ --- frac/fraction_concurrency_test.go | 8 ++- frac/fraction_test.go | 105 ++++++++++++++++++++++++++++++ frac/processor/aggregator.go | 2 +- frac/sealed/lids/iterator_asc.go | 5 ++ frac/sealed/lids/iterator_desc.go | 49 ++++++++++++++ node/node.go | 4 ++ node/node_and.go | 34 +++++++++- node/node_nand.go | 4 ++ node/node_or.go | 70 ++++++++++++++++++++ node/node_range.go | 4 ++ node/node_static.go | 10 ++- node/sourced_node_wrapper.go | 5 ++ 12 files changed, 293 insertions(+), 7 deletions(-) diff --git a/frac/fraction_concurrency_test.go b/frac/fraction_concurrency_test.go index a1d36957..82cb9af4 100644 --- a/frac/fraction_concurrency_test.go +++ b/frac/fraction_concurrency_test.go @@ -56,6 +56,7 @@ func TestConcurrentAppendAndQuery(t *testing.T) { mapping := seq.Mapping{ "service": seq.NewSingleType(seq.TokenizerTypeKeyword, "", 20), + "pod": seq.NewSingleType(seq.TokenizerTypeKeyword, "", 20), "message": seq.NewSingleType(seq.TokenizerTypeText, "", 100), "level": seq.NewSingleType(seq.TokenizerTypeKeyword, "", 20), "trace_id": seq.NewSingleType(seq.TokenizerTypeKeyword, "", 20), @@ -254,6 +255,7 @@ type testDoc = struct { json string message string service string + pod string level int traceId string timestamp time.Time @@ -277,18 +279,20 @@ func generatesMessages(numMessages, bulkSize int) ([]testDoc, [][]string, time.T level := rand.IntN(6) timestamp := fromTime.Add(time.Duration(i) * time.Millisecond) traceId := fmt.Sprintf("trace-%d", i%5000) + pod := fmt.Sprintf("pod-%d", i%250) if i == numMessages-1 { toTime = timestamp } - json := fmt.Sprintf(`{"timestamp":%q,"service":%q,"message":%q,"trace_id": %q,"level":"%d"}`, - timestamp.Format(time.RFC3339Nano), service, message, traceId, level) + json := fmt.Sprintf(`{"timestamp":%q,"service":%q,"pod":%q,"message":%q,"trace_id": %q,"level":"%d"}`, + timestamp.Format(time.RFC3339Nano), service, pod, message, traceId, level) docs = append(docs, testDoc{ json: json, timestamp: timestamp, message: message, service: service, + pod: pod, level: level, traceId: traceId, }) diff --git a/frac/fraction_test.go b/frac/fraction_test.go index ff4ac602..b3834847 100644 --- a/frac/fraction_test.go +++ b/frac/fraction_test.go @@ -72,6 +72,7 @@ func (s *FractionTestSuite) SetupTestCommon() { "level": seq.NewSingleType(seq.TokenizerTypeKeyword, "", 0), "client_ip": seq.NewSingleType(seq.TokenizerTypeKeyword, "", 0), "service": seq.NewSingleType(seq.TokenizerTypeKeyword, "", 0), + "pod": seq.NewSingleType(seq.TokenizerTypeKeyword, "", 0), "status": seq.NewSingleType(seq.TokenizerTypeKeyword, "", 0), "source": seq.NewSingleType(seq.TokenizerTypeKeyword, "", 0), "trace_id": seq.NewSingleType(seq.TokenizerTypeKeyword, "", 0), @@ -1111,6 +1112,74 @@ func (s *FractionTestSuite) TestSearchLargeFrac() { fromTime: fromTime, toTime: midTime, }, + // AND operator queries + { + name: "message:request AND message:failed", + query: "message:request AND message:failed", + filter: func(doc *testDoc) bool { + return strings.Contains(doc.message, "request") && strings.Contains(doc.message, "failed") + }, + fromTime: fromTime, + toTime: toTime, + }, + { + name: "service:gateway AND message:processing AND message:retry AND level:5", + query: "service:gateway AND message:processing AND message:retry AND level:5", + filter: func(doc *testDoc) bool { + return doc.service == "gateway" && strings.Contains(doc.message, "processing") && + strings.Contains(doc.message, "retry") && doc.level == 5 + }, + fromTime: fromTime, + toTime: toTime, + }, + + // OR operator queries + { + name: "trace_id OR", + query: "trace_id:trace-1000 OR trace_id:trace-1500 OR trace_id:trace-2000 OR trace_id:trace-2500 OR trace_id:trace-3000", + filter: func(doc *testDoc) bool { + return doc.traceId == "trace-1000" || + doc.traceId == "trace-1500" || + doc.traceId == "trace-2000" || + doc.traceId == "trace-2500" || + doc.traceId == "trace-3000" + }, + fromTime: fromTime, + toTime: toTime, + }, + + // mixed AND/OR + { + name: "message:request AND (level:1 OR level:3 OR level:5) AND trace_id:trace-2*", + query: "message:request AND (level:1 OR level:3 OR level:5) AND trace_id:trace-2*", + filter: func(doc *testDoc) bool { + return strings.Contains(doc.message, "request") && (doc.level == 1 || doc.level == 3 || doc.level == 5) && + strings.Contains(doc.traceId, "trace-2") + }, + fromTime: fromTime, + toTime: toTime, + }, + { + name: "complex AND+OR", + query: "(service:gateway OR service:proxy OR service:scheduler) AND " + + "(message:request OR message:failed) AND level:[1 to 3]", + filter: func(doc *testDoc) bool { + return (doc.service == "gateway" || doc.service == "proxy" || doc.service == "scheduler") && + (strings.Contains(doc.message, "request") || strings.Contains(doc.message, "failed")) && + (doc.level >= 1 && doc.level <= 3) + }, + fromTime: fromTime, + toTime: toTime, + }, + + // other queries + { + name: "trace_id:trace-4*", + query: "trace_id:trace-4*", + filter: func(doc *testDoc) bool { return strings.Contains(doc.traceId, "trace-4") }, + fromTime: fromTime, + toTime: toTime, + }, } for _, tc := range searchTestCases { @@ -1144,6 +1213,42 @@ func (s *FractionTestSuite) TestSearchLargeFrac() { }) } + s.Run("service:scheduler | group by pod avg(level)", func() { + levelsByPod := make(map[string][]int) + for _, doc := range testDocs { + if doc.service != "scheduler" { + continue + } + + levelsByPod[doc.pod] = append(levelsByPod[doc.pod], doc.level) + } + + var expectedBuckets []seq.AggregationBucket + for pod, levels := range levelsByPod { + sum := 0 + for _, level := range levels { + sum += level + } + avg := float64(sum) / float64(len(levels)) + expectedBuckets = append(expectedBuckets, seq.AggregationBucket{ + Name: pod, + Value: avg, + NotExists: 0, + }) + } + + searchParams := s.query( + "service:scheduler", + withTo(toTime.Format(time.RFC3339Nano)), + withAggQuery(processor.AggQuery{ + Field: aggField("level"), + GroupBy: aggField("pod"), + Func: seq.AggFuncAvg, + })) + + s.AssertAggregation(searchParams, seq.AggregateArgs{Func: seq.AggFuncAvg}, expectedBuckets) + }) + s.Run("NOT message:retry | group by service avg(level)", func() { levelsByService := make(map[string][]int) for _, doc := range testDocs { diff --git a/frac/processor/aggregator.go b/frac/processor/aggregator.go index 59dd0d52..f36540ee 100644 --- a/frac/processor/aggregator.go +++ b/frac/processor/aggregator.go @@ -381,7 +381,7 @@ func NewSourcedNodeIterator(sourced node.Sourced, ti tokenIndex, tids []uint32, func (s *SourcedNodeIterator) ConsumeTokenSource(lid uint32) (uint32, bool, error) { for s.has && s.less(s.lastID, lid) { - s.lastID, s.lastSource, s.has = s.sourcedNode.NextSourced() + s.lastID, s.lastSource, s.has = s.sourcedNode.NextSourcedGeq(lid) } exists := s.has && s.lastID == lid diff --git a/frac/sealed/lids/iterator_asc.go b/frac/sealed/lids/iterator_asc.go index 11bb48d1..a57fdcc5 100644 --- a/frac/sealed/lids/iterator_asc.go +++ b/frac/sealed/lids/iterator_asc.go @@ -71,3 +71,8 @@ func (it *IteratorAsc) Next() (uint32, bool) { it.lids = it.lids[:i] return lid, true } + +func (it *IteratorAsc) NextGeq(minLid uint32) (uint32, bool) { + // TODO support NextGeq for ascending iterator + return it.Next() +} diff --git a/frac/sealed/lids/iterator_desc.go b/frac/sealed/lids/iterator_desc.go index f3fa741b..4d5c98cd 100644 --- a/frac/sealed/lids/iterator_desc.go +++ b/frac/sealed/lids/iterator_desc.go @@ -70,3 +70,52 @@ func (it *IteratorDesc) Next() (uint32, bool) { it.lids = it.lids[1:] return lid, true } + +func (it *IteratorDesc) NextGeq(minLID uint32) (uint32, bool) { + for { + for len(it.lids) == 0 { + if !it.tryNextBlock { + return 0, false + } + + it.loadNextLIDsBlock() // last chunk in block but not last for tid; need load next block + it.lids, it.tryNextBlock = it.narrowLIDsRange(it.lids, it.tryNextBlock) + it.counter.AddLIDsCount(len(it.lids)) // inc loaded LIDs count + } + + last := it.lids[len(it.lids)-1] + // fast path check: the last LID is less than minLID. if true, then skip the entire block + if minLID > last { + it.lids = it.lids[:0] + continue + } + + l := len(it.lids) + + if l >= 32 && it.lids[31] > minLID { + idx := sort.Search(len(it.lids[0:32]), func(i int) bool { return it.lids[i] >= minLID }) + if idx < 32 { + // TODO single it.lids = it.lids + it.lids = it.lids[idx:] + lid := it.lids[0] + it.lids = it.lids[1:] + return lid, true + } + + if len(it.lids) == 0 { + continue + } + } + + // use binary search to find lower bound + idx := sort.Search(len(it.lids), func(i int) bool { return it.lids[i] >= minLID }) + if idx < len(it.lids) { + it.lids = it.lids[idx:] + lid := it.lids[0] + it.lids = it.lids[1:] + return lid, true + } + + it.lids = it.lids[:0] + } +} diff --git a/node/node.go b/node/node.go index e6525b45..8f312932 100644 --- a/node/node.go +++ b/node/node.go @@ -7,10 +7,14 @@ import ( type Node interface { fmt.Stringer // for testing Next() (id uint32, has bool) + // NextGeq returns next greater or equal (GEQ) lid. Currently, some nodes do not support it + // so the caller must check the output and be ready call it again if needed. + NextGeq(minLID uint32) (id uint32, has bool) } type Sourced interface { fmt.Stringer // for testing // aggregation need source NextSourced() (id uint32, source uint32, has bool) + NextSourcedGeq(nextLID uint32) (id uint32, source uint32, has bool) } diff --git a/node/node_and.go b/node/node_and.go index 58bc9391..a4f058a2 100644 --- a/node/node_and.go +++ b/node/node_and.go @@ -1,6 +1,8 @@ package node -import "fmt" +import ( + "fmt" +) type nodeAnd struct { less LessFn @@ -38,13 +40,39 @@ func (n *nodeAnd) readRight() { n.rightID, n.hasRight = n.right.Next() } +func (n *nodeAnd) readLeftGeq(minLID uint32) { + n.leftID, n.hasLeft = n.left.NextGeq(minLID) +} + +func (n *nodeAnd) readRightGeq(minLID uint32) { + n.rightID, n.hasRight = n.right.NextGeq(minLID) +} + func (n *nodeAnd) Next() (uint32, bool) { for n.hasLeft && n.hasRight && n.leftID != n.rightID { for n.hasLeft && n.hasRight && n.less(n.leftID, n.rightID) { - n.readLeft() + n.readLeftGeq(n.rightID) + } + for n.hasLeft && n.hasRight && n.less(n.rightID, n.leftID) { + n.readRightGeq(n.leftID) + } + } + if !n.hasLeft || !n.hasRight { + return 0, false + } + cur := n.leftID + n.readLeft() + n.readRight() + return cur, true +} + +func (n *nodeAnd) NextGeq(minLID uint32) (uint32, bool) { + for n.hasLeft && n.hasRight && n.leftID != n.rightID { + for n.hasLeft && n.hasRight && n.less(n.leftID, n.rightID) { + n.readLeftGeq(max(minLID, n.rightID)) } for n.hasLeft && n.hasRight && n.less(n.rightID, n.leftID) { - n.readRight() + n.readRightGeq(max(minLID, n.leftID)) } } if !n.hasLeft || !n.hasRight { diff --git a/node/node_nand.go b/node/node_nand.go index 7f7b7a89..02242adb 100644 --- a/node/node_nand.go +++ b/node/node_nand.go @@ -52,3 +52,7 @@ func (n *nodeNAnd) Next() (uint32, bool) { } return 0, false } + +func (n *nodeNAnd) NextGeq(minLID uint32) (uint32, bool) { + return n.Next() +} diff --git a/node/node_or.go b/node/node_or.go index 1cfec7ac..6d751f08 100644 --- a/node/node_or.go +++ b/node/node_or.go @@ -38,6 +38,14 @@ func (n *nodeOr) readRight() { n.rightID, n.hasRight = n.right.Next() } +func (n *nodeOr) readLeftGeq(minLID uint32) { + n.leftID, n.hasLeft = n.left.NextGeq(minLID) +} + +func (n *nodeOr) readRightGeq(minLID uint32) { + n.rightID, n.hasRight = n.right.NextGeq(minLID) +} + func (n *nodeOr) Next() (uint32, bool) { if !n.hasLeft && !n.hasRight { return 0, false @@ -62,6 +70,30 @@ func (n *nodeOr) Next() (uint32, bool) { return cur, true } +func (n *nodeOr) NextGeq(minLID uint32) (uint32, bool) { + if !n.hasLeft && !n.hasRight { + return 0, false + } + + if n.hasLeft && (!n.hasRight || n.less(n.leftID, n.rightID)) { + cur := n.leftID + n.readLeftGeq(minLID) + return cur, true + } + + if n.hasRight && (!n.hasLeft || n.less(n.rightID, n.leftID)) { + cur := n.rightID + n.readRightGeq(minLID) + return cur, true + } + + cur := n.leftID + n.readLeftGeq(minLID) + n.readRightGeq(minLID) + + return cur, true +} + type nodeOrAgg struct { left Sourced right Sourced @@ -100,6 +132,14 @@ func (n *nodeOrAgg) readRight() { n.rightID, n.rightSource, n.hasRight = n.right.NextSourced() } +func (n *nodeOrAgg) readLeftGeq(minLID uint32) { + n.leftID, n.leftSource, n.hasLeft = n.left.NextSourcedGeq(minLID) +} + +func (n *nodeOrAgg) readRightGeq(minLID uint32) { + n.rightID, n.rightSource, n.hasRight = n.right.NextSourcedGeq(minLID) +} + func (n *nodeOrAgg) NextSourced() (uint32, uint32, bool) { if !n.hasLeft && !n.hasRight { return 0, 0, false @@ -120,3 +160,33 @@ func (n *nodeOrAgg) NextSourced() (uint32, uint32, bool) { return cur, curSource, true } + +func (n *nodeOrAgg) NextSourcedGeq(minLID uint32) (uint32, uint32, bool) { + if !n.hasLeft && !n.hasRight { + return 0, 0, false + } + + var cur uint32 + var curSource uint32 + + for cur < minLID && (n.hasRight || n.hasLeft) { + if n.hasLeft && (!n.hasRight || n.less(n.leftID, n.rightID)) { + cur = n.leftID + curSource = n.leftSource + n.readLeftGeq(minLID) + + return cur, curSource, true + } + + // we don't need deduplication + cur = n.rightID + curSource = n.rightSource + n.readRightGeq(minLID) + } + + if cur >= minLID { + return cur, curSource, true + } else { + return 0, 0, false + } +} diff --git a/node/node_range.go b/node/node_range.go index ed5d0486..1c07fe78 100644 --- a/node/node_range.go +++ b/node/node_range.go @@ -35,3 +35,7 @@ func (n *nodeRange) Next() (uint32, bool) { n.cur += n.step return cur, true } + +func (n *nodeRange) NextGeq(minLID uint32) (uint32, bool) { + return n.Next() +} diff --git a/node/node_static.go b/node/node_static.go index d40135d2..4d5c8d22 100644 --- a/node/node_static.go +++ b/node/node_static.go @@ -49,7 +49,15 @@ func (n *staticDesc) Next() (uint32, bool) { return cur, true } -// MakeStaticNodes is currently used only for tests +func (n *staticAsc) NextGeq(minLID uint32) (uint32, bool) { + return n.Next() +} + +func (n *staticDesc) NextGeq(minLID uint32) (uint32, bool) { + return n.Next() +} + +// MakeStaticNodes is currently used only for tests func MakeStaticNodes(data [][]uint32) []Node { nodes := make([]Node, len(data)) for i, values := range data { diff --git a/node/sourced_node_wrapper.go b/node/sourced_node_wrapper.go index 369e2eed..0b6b8fd3 100644 --- a/node/sourced_node_wrapper.go +++ b/node/sourced_node_wrapper.go @@ -14,6 +14,11 @@ func (w *sourcedNodeWrapper) NextSourced() (uint32, uint32, bool) { return id, w.source, has } +func (w *sourcedNodeWrapper) NextSourcedGeq(minLID uint32) (uint32, uint32, bool) { + id, has := w.node.NextGeq(minLID) + return id, w.source, has +} + func NewSourcedNodeWrapper(d Node, source int) Sourced { return &sourcedNodeWrapper{node: d, source: uint32(source)} } From 09ef27db5cb790084202df9189f7d7746aed08ba Mon Sep 17 00:00:00 2001 From: Andrei Cheboksarov <37665782+cheb0@users.noreply.github.com> Date: Sat, 7 Feb 2026 16:57:34 +0400 Subject: [PATCH 2/2] fix aggregator_test --- frac/processor/aggregator_test.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/frac/processor/aggregator_test.go b/frac/processor/aggregator_test.go index 2bbd2186..ccbca679 100644 --- a/frac/processor/aggregator_test.go +++ b/frac/processor/aggregator_test.go @@ -150,6 +150,13 @@ func (m *MockNode) String() string { } func (m *MockNode) NextSourced() (uint32, uint32, bool) { + return m.NextSourcedGeq(0) +} + +func (m *MockNode) NextSourcedGeq(minLID uint32) (uint32, uint32, bool) { + for len(m.Pairs) > 0 && m.Pairs[0].LID < minLID { + m.Pairs = m.Pairs[1:] + } if len(m.Pairs) == 0 { return 0, 0, false }