Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions frac/fraction_concurrency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@

mapping := seq.Mapping{
"service": seq.NewSingleType(seq.TokenizerTypeKeyword, "", 20),
"pod": seq.NewSingleType(seq.TokenizerTypeKeyword, "", 20),
"message": seq.NewSingleType(seq.TokenizerTypeText, "", 100),
"level": seq.NewSingleType(seq.TokenizerTypeKeyword, "", 20),
"trace_id": seq.NewSingleType(seq.TokenizerTypeKeyword, "", 20),
Expand Down Expand Up @@ -171,7 +172,7 @@
case 1:
query = "service:gateway"
filter = func(doc *testDoc) bool {
return doc.service == "gateway"

Check failure on line 175 in frac/fraction_concurrency_test.go

View workflow job for this annotation

GitHub Actions / lint

string `gateway` has 4 occurrences, make it a constant (goconst)
}
case 2:
query = "level:2"
Expand Down Expand Up @@ -254,6 +255,7 @@
json string
message string
service string
pod string
level int
traceId string
timestamp time.Time
Expand All @@ -277,18 +279,20 @@
level := rand.IntN(6)
timestamp := fromTime.Add(time.Duration(i) * time.Millisecond)
traceId := fmt.Sprintf("trace-%d", i%5000)
pod := fmt.Sprintf("pod-%d", i%250)
if i == numMessages-1 {
toTime = timestamp
}

json := fmt.Sprintf(`{"timestamp":%q,"service":%q,"message":%q,"trace_id": %q,"level":"%d"}`,
timestamp.Format(time.RFC3339Nano), service, message, traceId, level)
json := fmt.Sprintf(`{"timestamp":%q,"service":%q,"pod":%q,"message":%q,"trace_id": %q,"level":"%d"}`,
timestamp.Format(time.RFC3339Nano), service, pod, message, traceId, level)

docs = append(docs, testDoc{
json: json,
timestamp: timestamp,
message: message,
service: service,
pod: pod,
level: level,
traceId: traceId,
})
Expand Down
105 changes: 105 additions & 0 deletions frac/fraction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
"level": seq.NewSingleType(seq.TokenizerTypeKeyword, "", 0),
"client_ip": seq.NewSingleType(seq.TokenizerTypeKeyword, "", 0),
"service": seq.NewSingleType(seq.TokenizerTypeKeyword, "", 0),
"pod": seq.NewSingleType(seq.TokenizerTypeKeyword, "", 0),
"status": seq.NewSingleType(seq.TokenizerTypeKeyword, "", 0),
"source": seq.NewSingleType(seq.TokenizerTypeKeyword, "", 0),
"trace_id": seq.NewSingleType(seq.TokenizerTypeKeyword, "", 0),
Expand Down Expand Up @@ -1109,8 +1110,76 @@
query: "trace_id:trace-2025",
filter: func(doc *testDoc) bool { return doc.traceId == "trace-2025" },
fromTime: fromTime,
toTime: midTime,

Check failure on line 1113 in frac/fraction_test.go

View workflow job for this annotation

GitHub Actions / lint

string `scheduler` has 3 occurrences, make it a constant (goconst)
},
// AND operator queries
{
name: "message:request AND message:failed",
query: "message:request AND message:failed",
filter: func(doc *testDoc) bool {
return strings.Contains(doc.message, "request") && strings.Contains(doc.message, "failed")
},
fromTime: fromTime,
toTime: toTime,
},
{
name: "service:gateway AND message:processing AND message:retry AND level:5",
query: "service:gateway AND message:processing AND message:retry AND level:5",
filter: func(doc *testDoc) bool {
return doc.service == "gateway" && strings.Contains(doc.message, "processing") &&
strings.Contains(doc.message, "retry") && doc.level == 5
},
fromTime: fromTime,
toTime: toTime,
},

// OR operator queries
{
name: "trace_id OR",
query: "trace_id:trace-1000 OR trace_id:trace-1500 OR trace_id:trace-2000 OR trace_id:trace-2500 OR trace_id:trace-3000",
filter: func(doc *testDoc) bool {
return doc.traceId == "trace-1000" ||
doc.traceId == "trace-1500" ||
doc.traceId == "trace-2000" ||
doc.traceId == "trace-2500" ||
doc.traceId == "trace-3000"
},
fromTime: fromTime,
toTime: toTime,
},

// mixed AND/OR
{
name: "message:request AND (level:1 OR level:3 OR level:5) AND trace_id:trace-2*",
query: "message:request AND (level:1 OR level:3 OR level:5) AND trace_id:trace-2*",
filter: func(doc *testDoc) bool {
return strings.Contains(doc.message, "request") && (doc.level == 1 || doc.level == 3 || doc.level == 5) &&
strings.Contains(doc.traceId, "trace-2")
},
fromTime: fromTime,
toTime: toTime,
},
{
name: "complex AND+OR",
query: "(service:gateway OR service:proxy OR service:scheduler) AND " +
"(message:request OR message:failed) AND level:[1 to 3]",
filter: func(doc *testDoc) bool {
return (doc.service == "gateway" || doc.service == "proxy" || doc.service == "scheduler") &&
(strings.Contains(doc.message, "request") || strings.Contains(doc.message, "failed")) &&
(doc.level >= 1 && doc.level <= 3)
},
fromTime: fromTime,
toTime: toTime,
},

// other queries
{
name: "trace_id:trace-4*",
query: "trace_id:trace-4*",
filter: func(doc *testDoc) bool { return strings.Contains(doc.traceId, "trace-4") },
fromTime: fromTime,
toTime: toTime,
},
}

for _, tc := range searchTestCases {
Expand Down Expand Up @@ -1144,6 +1213,42 @@
})
}

s.Run("service:scheduler | group by pod avg(level)", func() {
levelsByPod := make(map[string][]int)
for _, doc := range testDocs {
if doc.service != "scheduler" {
continue
}

levelsByPod[doc.pod] = append(levelsByPod[doc.pod], doc.level)
}

var expectedBuckets []seq.AggregationBucket
for pod, levels := range levelsByPod {
sum := 0
for _, level := range levels {
sum += level
}
avg := float64(sum) / float64(len(levels))
expectedBuckets = append(expectedBuckets, seq.AggregationBucket{
Name: pod,
Value: avg,
NotExists: 0,
})
}

searchParams := s.query(
"service:scheduler",
withTo(toTime.Format(time.RFC3339Nano)),
withAggQuery(processor.AggQuery{
Field: aggField("level"),
GroupBy: aggField("pod"),
Func: seq.AggFuncAvg,
}))

s.AssertAggregation(searchParams, seq.AggregateArgs{Func: seq.AggFuncAvg}, expectedBuckets)
})

s.Run("NOT message:retry | group by service avg(level)", func() {
levelsByService := make(map[string][]int)
for _, doc := range testDocs {
Expand Down
2 changes: 1 addition & 1 deletion frac/processor/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ func NewSourcedNodeIterator(sourced node.Sourced, ti tokenIndex, tids []uint32,

func (s *SourcedNodeIterator) ConsumeTokenSource(lid uint32) (uint32, bool, error) {
for s.has && s.less(s.lastID, lid) {
s.lastID, s.lastSource, s.has = s.sourcedNode.NextSourced()
s.lastID, s.lastSource, s.has = s.sourcedNode.NextSourcedGeq(lid)
}

exists := s.has && s.lastID == lid
Expand Down
7 changes: 7 additions & 0 deletions frac/processor/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,13 @@ func (m *MockNode) String() string {
}

func (m *MockNode) NextSourced() (uint32, uint32, bool) {
return m.NextSourcedGeq(0)
}

func (m *MockNode) NextSourcedGeq(minLID uint32) (uint32, uint32, bool) {
for len(m.Pairs) > 0 && m.Pairs[0].LID < minLID {
m.Pairs = m.Pairs[1:]
}
if len(m.Pairs) == 0 {
return 0, 0, false
}
Expand Down
5 changes: 5 additions & 0 deletions frac/sealed/lids/iterator_asc.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,8 @@ func (it *IteratorAsc) Next() (uint32, bool) {
it.lids = it.lids[:i]
return lid, true
}

func (it *IteratorAsc) NextGeq(minLid uint32) (uint32, bool) {
// TODO support NextGeq for ascending iterator
return it.Next()
}
49 changes: 49 additions & 0 deletions frac/sealed/lids/iterator_desc.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,52 @@ func (it *IteratorDesc) Next() (uint32, bool) {
it.lids = it.lids[1:]
return lid, true
}

func (it *IteratorDesc) NextGeq(minLID uint32) (uint32, bool) {
for {
for len(it.lids) == 0 {
if !it.tryNextBlock {
return 0, false
}

it.loadNextLIDsBlock() // last chunk in block but not last for tid; need load next block
it.lids, it.tryNextBlock = it.narrowLIDsRange(it.lids, it.tryNextBlock)
it.counter.AddLIDsCount(len(it.lids)) // inc loaded LIDs count
}

last := it.lids[len(it.lids)-1]
// fast path check: the last LID is less than minLID. if true, then skip the entire block
if minLID > last {
it.lids = it.lids[:0]
continue
}

l := len(it.lids)

if l >= 32 && it.lids[31] > minLID {
idx := sort.Search(len(it.lids[0:32]), func(i int) bool { return it.lids[i] >= minLID })
if idx < 32 {
// TODO single it.lids = it.lids
it.lids = it.lids[idx:]
lid := it.lids[0]
it.lids = it.lids[1:]
return lid, true
}

if len(it.lids) == 0 {
continue
}
}

// use binary search to find lower bound
idx := sort.Search(len(it.lids), func(i int) bool { return it.lids[i] >= minLID })
if idx < len(it.lids) {
it.lids = it.lids[idx:]
lid := it.lids[0]
it.lids = it.lids[1:]
return lid, true
}

it.lids = it.lids[:0]
}
}
4 changes: 4 additions & 0 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,14 @@ import (
type Node interface {
fmt.Stringer // for testing
Next() (id uint32, has bool)
// NextGeq returns next greater or equal (GEQ) lid. Currently, some nodes do not support it
// so the caller must check the output and be ready call it again if needed.
NextGeq(minLID uint32) (id uint32, has bool)
}

type Sourced interface {
fmt.Stringer // for testing
// aggregation need source
NextSourced() (id uint32, source uint32, has bool)
NextSourcedGeq(nextLID uint32) (id uint32, source uint32, has bool)
}
34 changes: 31 additions & 3 deletions node/node_and.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package node

import "fmt"
import (
"fmt"
)

type nodeAnd struct {
less LessFn
Expand Down Expand Up @@ -38,13 +40,39 @@ func (n *nodeAnd) readRight() {
n.rightID, n.hasRight = n.right.Next()
}

func (n *nodeAnd) readLeftGeq(minLID uint32) {
n.leftID, n.hasLeft = n.left.NextGeq(minLID)
}

func (n *nodeAnd) readRightGeq(minLID uint32) {
n.rightID, n.hasRight = n.right.NextGeq(minLID)
}

func (n *nodeAnd) Next() (uint32, bool) {
for n.hasLeft && n.hasRight && n.leftID != n.rightID {
for n.hasLeft && n.hasRight && n.less(n.leftID, n.rightID) {
n.readLeft()
n.readLeftGeq(n.rightID)
}
for n.hasLeft && n.hasRight && n.less(n.rightID, n.leftID) {
n.readRightGeq(n.leftID)
}
}
if !n.hasLeft || !n.hasRight {
return 0, false
}
cur := n.leftID
n.readLeft()
n.readRight()
return cur, true
}

func (n *nodeAnd) NextGeq(minLID uint32) (uint32, bool) {
for n.hasLeft && n.hasRight && n.leftID != n.rightID {
for n.hasLeft && n.hasRight && n.less(n.leftID, n.rightID) {
n.readLeftGeq(max(minLID, n.rightID))
}
for n.hasLeft && n.hasRight && n.less(n.rightID, n.leftID) {
n.readRight()
n.readRightGeq(max(minLID, n.leftID))
}
}
if !n.hasLeft || !n.hasRight {
Expand Down
4 changes: 4 additions & 0 deletions node/node_nand.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,7 @@ func (n *nodeNAnd) Next() (uint32, bool) {
}
return 0, false
}

func (n *nodeNAnd) NextGeq(minLID uint32) (uint32, bool) {
return n.Next()
}
Loading
Loading