Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions api/seqproxyapi/v1/seq_proxy_api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -171,13 +171,14 @@ message SearchQuery {

// Aggregation function used in request.
enum AggFunc {
AGG_FUNC_COUNT = 0; // Returns how many times `field` was equal to particular value.
AGG_FUNC_SUM = 1; // Performs an addition operation on `field`, among documents with same `group_by` field.
AGG_FUNC_MIN = 2; // Finds minimum value for `field`, among documents with same `group_by` field.
AGG_FUNC_MAX = 3; // Finds maximum value for `field`, among documents with same `group_by` field.
AGG_FUNC_AVG = 4; // Finds average value for `field`, among documents with same `group_by` field.
AGG_FUNC_QUANTILE = 5; // Finds quantiles for `field`, among documents with same `group_by` field.
AGG_FUNC_UNIQUE = 6; // Finds unique values for `group_by` field.
AGG_FUNC_COUNT = 0; // Returns how many times `field` was equal to particular value.
AGG_FUNC_SUM = 1; // Performs an addition operation on `field`, among documents with same `group_by` field.
AGG_FUNC_MIN = 2; // Finds minimum value for `field`, among documents with same `group_by` field.
AGG_FUNC_MAX = 3; // Finds maximum value for `field`, among documents with same `group_by` field.
AGG_FUNC_AVG = 4; // Finds average value for `field`, among documents with same `group_by` field.
AGG_FUNC_QUANTILE = 5; // Finds quantiles for `field`, among documents with same `group_by` field.
AGG_FUNC_UNIQUE = 6; // Finds unique values for `group_by` field.
AGG_FUNC_UNIQUE_COUNT = 7; // Finds count for every unique value of `field`, among documents with same `group_by` field.
}

// Order of document sorting.
Expand Down
3 changes: 3 additions & 0 deletions api/storeapi/store_api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ enum AggFunc {
AGG_FUNC_AVG = 4;
AGG_FUNC_QUANTILE = 5;
AGG_FUNC_UNIQUE = 6;
AGG_FUNC_UNIQUE_COUNT = 7;
}

enum Order {
Expand Down Expand Up @@ -95,6 +96,7 @@ message SearchResponse {
int64 total = 4;
int64 not_exists = 5;
repeated double samples = 6;
repeated uint32 values = 7;
}

message Bin {
Expand All @@ -114,6 +116,7 @@ message SearchResponse {
// { (foo, ts2) -> (val) }
// ]
repeated Bin timeseries = 4;
repeated string values_pool = 5;
}

bytes data = 1 [deprecated = true];
Expand Down
1 change: 1 addition & 0 deletions cmd/seq-db/seq-db.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ func startStore(
Search: frac.SearchConfig{
AggLimits: frac.AggLimits{
MaxFieldTokens: cfg.Limits.Aggregation.FieldTokens,
MaxFieldValues: cfg.Limits.Aggregation.FieldValues,
MaxGroupTokens: cfg.Limits.Aggregation.GroupTokens,
MaxTIDsPerFraction: cfg.Limits.Aggregation.FractionTokens,
},
Expand Down
4 changes: 4 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,10 @@ type Config struct {
// that can be processed in single aggregation requests.
// Setting this field to 0 disables limit.
FieldTokens int `config:"field_tokens" default:"1000000"`
// FieldValues specifies maximum amount of unique field values
// that partial aggregation results (buckets) can contain in single aggregation requests.
// Setting this field to 0 disables limit.
FieldValues int `config:"field_values" default:"1000000"`
// GroupTokens specifies maximum amount of unique group tokens
// that can be processed in single aggregation requests.
// Setting this field to 0 disables limit.
Expand Down
1 change: 1 addition & 0 deletions consts/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ var (
ErrInvalidAggQuery = errors.New("invalid agg query")
ErrInvalidArgument = errors.New("invalid argument")
ErrTooManyFieldTokens = errors.New("aggregation has too many field tokens")
ErrTooManyFieldValues = errors.New("aggregation has too many field values in memory")
ErrTooManyGroupTokens = errors.New("aggregation has too many group tokens")
ErrTooManyFractionTokens = errors.New("aggregation has too many fraction tokens")
ErrTooManyFractionsHit = errors.New("too many fractions hit")
Expand Down
41 changes: 41 additions & 0 deletions docs/en/10-public-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ Supported aggregation functions:
- `AGG_FUNC_MAX` — maximum value of the field
- `AGG_FUNC_QUANTILE` — quantile computation for the field
- `AGG_FUNC_UNIQUE` — computation of unique field values (not supported in timeseries)
- `AGG_FUNC_UNIQUE_COUNT` — count of unique field values (can be used with group by)
- `AGG_FUNC_COUNT` — count of documents per group

#### Aggregation Examples
Expand Down Expand Up @@ -443,6 +444,46 @@ grpcurl -plaintext -d '
}
```

##### UNIQUE COUNT

> For `AGG_FUNC_UNIQUE_COUNT` both `group_by` and `field` are required.

**Request:**

```sh
grpcurl -plaintext -d '
{
"query": {
"from": "2000-01-01T00:00:00Z",
"to": "2077-01-01T00:00:00Z",
"query": "*"
},
"aggs": [
{
"func": "AGG_FUNC_UNIQUE_COUNT",
"group_by": "service",
"field": "pod",
}
]
}' localhost:9004 seqproxyapi.v1.SeqProxyApi/GetAggregation
```

**Response:**

```json
{
"aggs": [
{
"buckets": [
{"key": "svc1", "value": 2},
{"key": "svc2", "value": 2},
{"key": "svc3", "value": 1}
]
}
]
}
```

##### COUNT (with interval)

**Request:**
Expand Down
1 change: 1 addition & 0 deletions frac/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ type SearchConfig struct {

type AggLimits struct {
MaxFieldTokens int // MaxFieldTokens max AggQuery.Field uniq values to parse.
MaxFieldValues int // MaxFieldValues max AggQuery.Field uniq values to hold per aggregation request.
MaxGroupTokens int // MaxGroupTokens max AggQuery.GroupBy unique values.
MaxTIDsPerFraction int // MaxTIDsPerFraction max number of tokens per fraction.
}
108 changes: 104 additions & 4 deletions frac/fraction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -735,6 +735,42 @@ func (s *FractionTestSuite) TestAggSum() {
s.AssertAggregation(searchParams, seq.AggregateArgs{Func: seq.AggFuncSum}, expectedBuckets)
}

func (s *FractionTestSuite) TestAggSumTimeSeries() {
docs := []string{
`{"timestamp":"2000-01-01T13:00:00.000Z","service":"sum1","v":1}`,
`{"timestamp":"2000-01-01T13:00:00.000Z","service":"sum1","v":1}`,
`{"timestamp":"2000-01-01T13:00:00.000Z","service":"sum1","v":1}`,
`{"timestamp":"2000-01-01T13:00:00.000Z","service":"sum2","v":1}`,
`{"timestamp":"2000-01-01T13:00:00.000Z","service":"sum2","v":1}`,
`{"timestamp":"2000-01-01T13:00:00.000Z","service":"sum3","v":1}`,
`{"timestamp":"2000-01-01T13:00:00.000Z","service":"sum4","v":1}`,
`{"timestamp":"2000-01-01T13:00:00.000Z","service":"sum4","v":1}`,
`{"timestamp":"2000-01-01T13:00:00.000Z","service":"sum4"}`,
`{"timestamp":"2000-01-01T13:00:00.000Z","service":"sum5","v":1}`,
}

s.insertDocuments(docs)

searchParams := s.query(
"service:sum*",
withAggQuery(processor.AggQuery{
Field: aggField("v"),
GroupBy: aggField("service"),
Func: seq.AggFuncSum,
Interval: 1000,
}))
expectedBuckets := []seq.AggregationBucket{
// all NotExists go to a dedicated bucket with MID=0 in time series mode
{Name: "sum4", MID: seq.MID(0), Value: math.NaN(), NotExists: 1},
{Name: "sum4", MID: seq.TimeToMID(mustParseTime("2000-01-01T13:00:00.000Z")), Value: 2, NotExists: 0},
{Name: "sum1", MID: seq.TimeToMID(mustParseTime("2000-01-01T13:00:00.000Z")), Value: 3, NotExists: 0},
{Name: "sum3", MID: seq.TimeToMID(mustParseTime("2000-01-01T13:00:00.000Z")), Value: 1, NotExists: 0},
{Name: "sum5", MID: seq.TimeToMID(mustParseTime("2000-01-01T13:00:00.000Z")), Value: 1, NotExists: 0},
{Name: "sum2", MID: seq.TimeToMID(mustParseTime("2000-01-01T13:00:00.000Z")), Value: 2, NotExists: 0},
}
s.AssertAggregation(searchParams, seq.AggregateArgs{Func: seq.AggFuncSum}, expectedBuckets)
}

func (s *FractionTestSuite) TestAggMin() {
docs := []string{
`{"timestamp":"2000-01-01T13:00:00.000Z","service":"min1","v":1}`,
Expand Down Expand Up @@ -959,6 +995,70 @@ func (s *FractionTestSuite) TestAggAvgWithoutGroupBy() {
s.AssertAggregation(searchParams, seq.AggregateArgs{Func: seq.AggFuncAvg}, expectedBuckets)
}

func (s *FractionTestSuite) TestAggUniqueCountTimeSeries() {
docs := []string{
`{"timestamp":"2000-01-01T13:00:01.000Z","service":"service1","level":1}`,
`{"timestamp":"2000-01-01T13:00:01.000Z","service":"service1","level":2}`,
`{"timestamp":"2000-01-01T13:00:01.000Z","service":"service1","level":3}`,
`{"timestamp":"2000-01-01T13:00:01.000Z","service":"service2","level":1}`,
`{"timestamp":"2000-01-01T13:00:01.000Z","service":"service2","level":2}`,
`{"timestamp":"2000-01-01T13:00:01.000Z","service":"service3","level":1}`,
`{"timestamp":"2000-01-01T13:00:01.000Z","service":"service4","level":1}`,
`{"timestamp":"2000-01-01T13:00:01.000Z","service":"service4","level":1}`,
`{"timestamp":"2000-01-01T13:00:01.000Z","service":"service4","level":2}`,
`{"timestamp":"2000-01-01T13:00:01.000Z","service":"service4"}`,
}

s.insertDocuments(docs)
searchParams := s.query(
"service:service*",
withAggQuery(processor.AggQuery{
Field: aggField("level"),
GroupBy: aggField("service"),
Func: seq.AggFuncUniqueCount,
Interval: 1000,
}))
expectedBuckets := []seq.AggregationBucket{
{Name: "service1", MID: seq.TimeToMID(mustParseTime("2000-01-01T13:00:01.000Z")), Value: 3, NotExists: 0},
{Name: "service2", MID: seq.TimeToMID(mustParseTime("2000-01-01T13:00:01.000Z")), Value: 2, NotExists: 0},
{Name: "service3", MID: seq.TimeToMID(mustParseTime("2000-01-01T13:00:01.000Z")), Value: 1, NotExists: 0},
{Name: "service4", MID: seq.TimeToMID(mustParseTime("2000-01-01T13:00:01.000Z")), Value: 2, NotExists: 0},
{Name: "service4", MID: seq.MID(0), Value: math.NaN(), NotExists: 1},
}
s.AssertAggregation(searchParams, seq.AggregateArgs{Func: seq.AggFuncUniqueCount}, expectedBuckets)
}

func (s *FractionTestSuite) TestAggUniqueCount() {
docs := []string{
`{"timestamp":"2000-01-01T13:00:00.002Z","service":"service1","level":1}`,
`{"timestamp":"2000-01-01T13:00:00.003Z","service":"service1","level":2}`,
`{"timestamp":"2000-01-01T13:00:00.007Z","service":"service1","level":3}`,
`{"timestamp":"2000-01-01T13:00:00.009Z","service":"service2","level":1}`,
`{"timestamp":"2000-01-01T13:00:00.010Z","service":"service2","level":2}`,
`{"timestamp":"2000-01-01T13:00:00.011Z","service":"service3","level":1}`,
`{"timestamp":"2000-01-01T13:00:00.012Z","service":"service4","level":1}`,
`{"timestamp":"2000-01-01T13:00:00.013Z","service":"service4","level":1}`,
`{"timestamp":"2000-01-01T13:00:00.017Z","service":"service4","level":2}`,
`{"timestamp":"2000-01-01T13:00:00.017Z","service":"service4"}`,
}

s.insertDocuments(docs)
searchParams := s.query(
"service:service*",
withAggQuery(processor.AggQuery{
Field: aggField("level"),
GroupBy: aggField("service"),
Func: seq.AggFuncUniqueCount,
}))
expectedBuckets := []seq.AggregationBucket{
{Name: "service1", Value: 3, NotExists: 0},
{Name: "service2", Value: 2, NotExists: 0},
{Name: "service3", Value: 1, NotExists: 0},
{Name: "service4", Value: 2, NotExists: 1},
}
s.AssertAggregation(searchParams, seq.AggregateArgs{Func: seq.AggFuncUniqueCount}, expectedBuckets)
}

func (s *FractionTestSuite) TestSearchMultipleBulks() {
docs := []string{
/*0*/ `{"timestamp":"2000-01-01T13:00:01Z","service":"service_a","message":"request started","source":"prod01","level":"1"}`,
Expand Down Expand Up @@ -1522,20 +1622,20 @@ func (s *FractionTestSuite) AssertAggregation(
for _, expectedBucket := range expectedBuckets {
found := false
for _, gotBucket := range aggResults[0].Buckets {
if gotBucket.Name == expectedBucket.Name {
if gotBucket.Name == expectedBucket.Name && gotBucket.MID == expectedBucket.MID {
if math.IsNaN(expectedBucket.Value) || math.IsNaN(gotBucket.Value) {
s.Require().Truef(math.IsNaN(expectedBucket.Value) && math.IsNaN(gotBucket.Value),
"wrong value for bucket %s: expected NaN=%v, got NaN=%v",
expectedBucket.Name, math.IsNaN(expectedBucket.Value), math.IsNaN(gotBucket.Value))
} else {
s.Require().Equal(expectedBucket.Value, gotBucket.Value, "wrong value for bucket %s", expectedBucket.Name)
s.Require().Equal(expectedBucket.Value, gotBucket.Value, "wrong value for bucket %s-%s", expectedBucket.Name, expectedBucket.MID)
}
s.Require().Equal(expectedBucket.NotExists, gotBucket.NotExists, "wrong NotExists for bucket %s", expectedBucket.Name)
s.Require().Equal(expectedBucket.NotExists, gotBucket.NotExists, "wrong NotExists for bucket %s-%s", expectedBucket.Name, expectedBucket.MID)
found = true
break
}
}
s.Require().True(found, "bucket %s not found in results", expectedBucket.Name)
s.Require().True(found, "bucket %s-%s not found in results", expectedBucket.Name, expectedBucket.MID)
}
}

Expand Down
45 changes: 35 additions & 10 deletions frac/processor/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,24 +41,31 @@ type TwoSourceAggregator struct {
groupByNotExists map[uint32]int64
// collectSamples is a flag to indicate if collect samples is required, this is useful if you need to calculate the quantile.
collectSamples bool
// collectValues is a flag to indicate if collect values is required
collectValues bool
// countBySource map to count occurrences by histogram source.
countBySource map[AggBin[twoSources]]int64
// extractMID will be used for building time series.
extractMID ExtractMIDFunc
// limits enforces upper bound constraints on how many unique values we parse and hold in memory
limits AggLimits
}

func NewGroupAndFieldAggregator(
fieldIterator, groupByIterator *SourcedNodeIterator,
fn ExtractMIDFunc, collectSamples bool,
fn ExtractMIDFunc, collectSamples bool, collectValues bool,
limits AggLimits,
) *TwoSourceAggregator {
return &TwoSourceAggregator{
collectSamples: collectSamples,
collectValues: collectValues,
countBySource: make(map[AggBin[twoSources]]int64),
field: fieldIterator,
groupNotExists: 0,
groupBy: groupByIterator,
groupByNotExists: make(map[uint32]int64),
extractMID: fn,
limits: limits,
}
}

Expand Down Expand Up @@ -108,6 +115,9 @@ func (n *TwoSourceAggregator) Next(lid uint32) error {
func (n *TwoSourceAggregator) Aggregate() (seq.AggregatableSamples, error) {
aggMap := make(map[seq.AggBin]*seq.SamplesContainer, n.groupBy.UniqueSources())

var sourceValuePool []string
sourceValuePoolMap := make(map[string]uint32)

for groupBySource, cnt := range n.groupByNotExists {
groupByVal := seq.AggBin{Token: n.groupBy.ValueBySource(groupBySource)}
if aggMap[groupByVal] == nil {
Expand All @@ -128,22 +138,37 @@ func (n *TwoSourceAggregator) Aggregate() (seq.AggregatableSamples, error) {

// For example, for a value named "request_duration" it can be "42.13"
value := n.field.ValueBySource(bin.Source.FieldSource)
num, err := parseNum(value)
if err != nil {
return seq.AggregatableSamples{}, err
}

// The same token can appear multiple times,
// so we need to insert the num cnt times.
hist.InsertNTimes(num, cnt)
if n.collectSamples {
hist.InsertSampleNTimes(num, cnt)
if n.collectValues {
poolIdx, exists := sourceValuePoolMap[value]
if !exists {
poolIdx = uint32(len(sourceValuePool))
sourceValuePool = append(sourceValuePool, value)
sourceValuePoolMap[value] = poolIdx
if n.limits.MaxFieldValues > 0 && len(sourceValuePool) > n.limits.MaxFieldValues {
return seq.AggregatableSamples{}, consts.ErrTooManyFieldValues
}
}
hist.InsertValueIndex(poolIdx, cnt)
} else {
num, err := parseNum(value)
if err != nil {
return seq.AggregatableSamples{}, err
}

// The same token can appear multiple times,
// so we need to insert the num cnt times.
hist.InsertNTimes(num, cnt)
if n.collectSamples {
hist.InsertSampleNTimes(num, cnt)
}
}
}

return seq.AggregatableSamples{
NotExists: n.groupNotExists,
SamplesByBin: aggMap,
ValuesPool: sourceValuePool,
}, nil
}

Expand Down
3 changes: 2 additions & 1 deletion frac/processor/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,9 @@ func TestTwoSourceAggregator(t *testing.T) {
groupByTIDs := []uint32{1, 2}
groupIterator := NewSourcedNodeIterator(groupBy, dp, groupByTIDs, iteratorLimit{limit: 0, err: consts.ErrTooManyGroupTokens}, false)
fieldIterator := NewSourcedNodeIterator(field, dp, fieldTIDs, iteratorLimit{limit: 0, err: consts.ErrTooManyGroupTokens}, false)
limits := AggLimits{}
aggregator := NewGroupAndFieldAggregator(
fieldIterator, groupIterator, provideExtractTimeFunc(nil, nil, 0), true,
fieldIterator, groupIterator, provideExtractTimeFunc(nil, nil, 0), true, false, limits,
)

// Call Next for two data points.
Expand Down
Loading
Loading