Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions api/seqproxyapi/v1/seq_proxy_api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ message SearchRequest {
int64 offset = 3; // Search offset.
bool with_total = 4; // Should total number of documents be returned in response.
Order order = 5; // Document order ORDER_DESC/ORDER_ASC.
string offset_id = 6; // ID offset for pagination.
}

message ComplexSearchRequest {
Expand All @@ -224,6 +225,7 @@ message ComplexSearchRequest {
int64 offset = 5; // Search offset.
bool with_total = 6; // Should total number of documents be returned in response.
Order order = 7; // Document order ORDER_DESC/ORDER_ASC.
string offset_id = 8; // ID offset for pagination.
}

message SearchResponse {
Expand Down
1 change: 1 addition & 0 deletions api/storeapi/store_api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ message SearchRequest {
string aggregation_filter = 11 [deprecated = true];
repeated AggQuery aggs = 12;
Order order = 13;
string offset_id = 14;
}

message SearchResponse {
Expand Down
125 changes: 125 additions & 0 deletions benchmarks/k6/es-paging-offset-id.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
import http from 'k6/http';
import { check, sleep } from 'k6';

/*
* Elasticsearch has search_after field which can be used same way as seq-db's offset-id field. In order to use
* the field, one must create a PIT (point in time) and then specify _shard_doc as sort field while querying.
* Elasticsearch will provide sort field in response hits which is a number that can be used for
* efficient scrolling on the next query.
*/

const BASE_URL = __ENV.BASE_URL;
const PAGE_SIZE = 100;
const TOTAL_PAGES = 50;

export let options = {
vus: 20,
duration: '10s',
};

let pitId = null;

export function setup() {
const pitRes = http.post(
`${BASE_URL}/logs-index/_pit?keep_alive=1m`,
null,
{ headers: { 'Content-Type': 'application/json' } }
);

if (pitRes.status === 200) {
try {
const pitBody = JSON.parse(pitRes.body);
pitId = pitBody.id;
console.log(`PIT created successfully: ${pitId}`);
return { pitId: pitBody.id };
} catch (e) {
console.error('Failed to parse PIT response:', e);
return { pitId: null };
}
} else {
console.error(`Failed to create PIT: ${pitRes.status} - ${pitRes.body}`);
return { pitId: null };
}
}

const vuState = {};

export default function (data) {
if (!vuState[__VU]) {
vuState[__VU] = {
searchAfter: null,
pageCount: 0
};
}

const state = vuState[__VU];
const pitId = data?.pitId || pitId;

if (!pitId) {
console.error('PIT ID not available, skipping iteration');
return;
}

if (state.pageCount >= TOTAL_PAGES) {
state.searchAfter = null;
state.pageCount = 0;
}

const queryObj = {
track_total_hits: false,
query: { match_all: {} },
pit: {
id: pitId,
keep_alive: '2m'
},
size: PAGE_SIZE,
sort: [
{ _shard_doc: 'asc' }
]
};

if (state.searchAfter !== null) {
queryObj.search_after = state.searchAfter;
}

const res = http.post(
`${BASE_URL}/_search?request_cache=false`,
JSON.stringify(queryObj),
{ headers: { 'Content-Type': 'application/json' } }
);

check(res, {
"200-ok": (res) => res.status == 200,
"has-hits": (res) => {
if (res.status === 200) {
try {
const body = JSON.parse(res.body);
return body.hits && body.hits.hits && body.hits.hits.length > 0;
} catch (e) {
return false;
}
}
return false;
}
});

if (res.status === 200) {
try {
const body = JSON.parse(res.body);
const hits = body.hits?.hits || [];
if (hits.length > 0) {
const lastHit = hits[hits.length - 1];
state.searchAfter = lastHit.sort || null;
state.pageCount++;
} else {
state.searchAfter = null;
state.pageCount = 0;
}
} catch (e) {
console.error(e)
return;
}
}

sleep(0.2);
}
97 changes: 97 additions & 0 deletions benchmarks/k6/seq-db-paging-offset-id.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import http from 'k6/http';
import { check, sleep } from 'k6';

/*
* seq-db has offset-id field which is similar to Elasticsearch's search_after field.
* To use offset-id, you query with offset id: null for the first page,
* then extract the document ID from the last document in the response and use it
* as offset_id in subsequent queries for efficient pagination.
*/

const BASE_URL = __ENV.BASE_URL;
const PAGE_SIZE = 100;
const TOTAL_PAGES = 50;

export let options = {
vus: 20,
duration: '10s',
};

const vuState = {};

export default function () {
if (!vuState[__VU]) {
vuState[__VU] = {
offsetId: null,
page: 0
};
}

const state = vuState[__VU];

if (state.page >= TOTAL_PAGES) {
state.offsetId = null;
state.page = 0;
}

const queryObj = {
query: {
query: "",
from: "2000-01-01T00:00:00Z",
to: "2050-01-01T00:00:00Z",
explain: false,
},
order: "ORDER_ASC",
size: PAGE_SIZE
};

if (state.offsetId !== null) {
queryObj.offset_id = state.offsetId;
}

const query = JSON.stringify(queryObj);

const res = http.post(
`${BASE_URL}/complex-search`,
query,
{ headers: { 'Content-Type': 'application/json' } }
);

check(res, {
"200-ok": (res) => res.status == 200,
"has-docs": (res) => {
if (res.status === 200) {
try {
const body = JSON.parse(res.body);
return body.docs && body.docs.length > 0;
} catch (e) {
return false;
}
}
return false;
}
});

if (res.status === 200) {
try {
const body = JSON.parse(res.body);
const docs = body.docs || [];
if (docs.length > 0) {
// Set offset id as the last doc id - move to the next page
const lastDoc = docs[docs.length - 1];
state.offsetId = lastDoc.id || null;
state.page++;
} else {
// No more docs, reset page
state.offsetId = null;
state.page = 0;
}
} catch (e) {
console.error('Failed to parse response:', e);
return;
}
}

sleep(0.2);
}

116 changes: 116 additions & 0 deletions frac/fraction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,58 @@ func (s *FractionTestSuite) TestSearchWithLimit() {
[]int{5, 3})
}

func (s *FractionTestSuite) TestSearchWithOffsetId() {
docs := []string{
`{"timestamp":"2000-01-01T12:59:59.999Z","message":"outsider1"}`,
`{"timestamp":"2000-01-01T12:59:59.999Z","message":"outsider2"}`,
`{"timestamp":"2000-01-01T13:00:00.000Z","message":"bad"}`,
`{"timestamp":"2000-01-01T13:00:00.000Z","message":"good"}`,
`{"timestamp":"2000-01-01T13:00:00.001Z","message":"bad"}`,
`{"timestamp":"2000-01-01T13:00:00.001Z","message":"good"}`,
`{"timestamp":"2000-01-01T13:00:00.001Z","message":"bad"}`,
`{"timestamp":"2000-01-01T13:00:00.001Z","message":"good"}`,
`{"timestamp":"2000-01-01T13:00:00.002Z","message":"bad"}`,
`{"timestamp":"2000-01-01T13:00:00.002Z","message":"good"}`,
`{"timestamp":"2000-01-01T13:00:00.002Z","message":"bad"}`,
`{"timestamp":"2000-01-01T13:00:00.003Z","message":"good"}`,
`{"timestamp":"2000-01-01T13:00:00.003Z","message":"bad"}`,
`{"timestamp":"2000-01-01T13:00:00.004Z","message":"ugly"}`,
`{"timestamp":"2000-01-01T13:00:00.004Z","message":"ugly"}`,
}

s.insertDocuments(docs)

// validate that we can page through fraction using offset id in both orders.
// every message must appear exactly once. some docs have same MID

for _, order := range []seq.DocsOrder{seq.DocsOrderDesc, seq.DocsOrderAsc} {
searchParams := s.query("message:*",
withFrom("2000-01-01T13:00:00.000Z"),
withTo("2000-01-01T13:00:00.003Z"),
withLimit(2))
searchParams.Order = order

ids := make(map[seq.ID]bool)

for {
qpr, err := s.fraction.Search(context.Background(), *searchParams)
s.Require().NoError(err, "search failed")
if len(qpr.IDs) == 0 {
break
}

qprIDs := qpr.IDs.IDs()
for _, id := range qprIDs {
ids[id] = true
}
// switch to the next page
searchParams.OffsetId = qprIDs[len(qprIDs)-1]
}

s.Require().Equal(11, len(ids), "duplicate IDs found")
}
}

func (s *FractionTestSuite) TestSearchWithTotal() {
docs := []string{
`{"timestamp":"2000-01-01T13:00:01.549Z","message": "apple banana smoothie"}`,
Expand Down Expand Up @@ -1195,6 +1247,70 @@ func (s *FractionTestSuite) TestSearchLargeFrac() {
s.query("service:database AND level:3", withTo(toTime.Format(time.RFC3339Nano)), withHist(1000)),
histBuckets)
})

s.Run("scroll with offset id", func() {
query := "message:request AND level:4"
scrollFrom := fromTime
scrollTo := midTime
pageSize := 98

var expectedIndexesAsc []int
for i := range testDocs {
doc := &testDocs[i]
if !doc.timestamp.Before(scrollFrom) &&
!doc.timestamp.After(scrollTo) &&
strings.Contains(doc.message, "request") &&
doc.level == 4 {
expectedIndexesAsc = append(expectedIndexesAsc, i)
}
}

var expectedIndexes []int
for _, order := range []seq.DocsOrder{seq.DocsOrderDesc, seq.DocsOrderAsc} {
if order == seq.DocsOrderAsc {
expectedIndexes = expectedIndexesAsc
} else {
expectedIndexes = append([]int{}, expectedIndexesAsc...)
slices.Reverse(expectedIndexes)
}

searchParams := s.query(query,
withFrom(scrollFrom.Format(time.RFC3339Nano)),
withTo(scrollTo.Format(time.RFC3339Nano)),
withLimit(pageSize))
searchParams.Order = order

expectedOffset := 0
totalIDsScrolled := 0

for {
qpr, err := s.fraction.Search(context.Background(), *searchParams)

s.Require().NoError(err, "search failed")

if len(qpr.IDs) == 0 {
break
}

qprIDs := qpr.IDs.IDs()
totalIDsScrolled += len(qprIDs)

docs, err := s.fraction.Fetch(context.Background(), qprIDs)
s.Require().NoError(err, "fetch failed for order=%v", order)

for j, doc := range docs {
idx := expectedOffset + j
s.Require().Equalf(docJsons[expectedIndexes[idx]], string(doc),
"doc at scroll position %d (order=%v) doesn't match", idx, order)
}
expectedOffset += len(docs)

searchParams.OffsetId = qprIDs[len(qprIDs)-1]
}

s.Require().Equal(totalIDsScrolled, len(expectedIndexesAsc), "total number of docs scrolled mismatch")
}
})
}

func (s *FractionTestSuite) TestIntersectingNanoseconds() {
Expand Down
Loading
Loading