diff --git a/network/circuitbreaker/circuitbreaker.go b/network/circuitbreaker/circuitbreaker.go index e2559e75..44eef3aa 100644 --- a/network/circuitbreaker/circuitbreaker.go +++ b/network/circuitbreaker/circuitbreaker.go @@ -51,6 +51,9 @@ type Config struct { // SleepWindow how long to deny requests before allowing attempts // again to determine if the chain should be closed again. SleepWindow time.Duration + + // CustomSuffix exists for adding custom suffix in circuit breaker name in order not to catch panic via existing name. + CustomSuffix string } const ( @@ -63,6 +66,10 @@ const ( ) func New(name string, config Config) *CircuitBreaker { + if config.CustomSuffix != "" { + name += fmt.Sprintf("-%s", config.CustomSuffix) + } + breaker := manager.GetCircuit(name) if breaker != nil { return &CircuitBreaker{ diff --git a/proxy/bulk/ingestor_test.go b/proxy/bulk/ingestor_test.go index 8800af6a..490fa421 100644 --- a/proxy/bulk/ingestor_test.go +++ b/proxy/bulk/ingestor_test.go @@ -15,6 +15,7 @@ import ( "github.com/ozontech/seq-db/consts" "github.com/ozontech/seq-db/frac" "github.com/ozontech/seq-db/mappingprovider" + "github.com/ozontech/seq-db/network/circuitbreaker" "github.com/ozontech/seq-db/packer" "github.com/ozontech/seq-db/seq" "github.com/ozontech/seq-db/storage" @@ -83,6 +84,9 @@ func TestProcessDocuments(t *testing.T) { DocsZSTDCompressLevel: -1, MetasZSTDCompressLevel: -1, MaxDocumentSize: int(units.KiB), + BulkCircuit: circuitbreaker.Config{ + CustomSuffix: "TestProcessDocuments", + }, } now := time.Now().UTC() @@ -486,6 +490,9 @@ func BenchmarkProcessDocuments(b *testing.B) { MappingProvider: mappingProvider, CaseSensitive: false, MaxTokenSize: int(units.KiB), + BulkCircuit: circuitbreaker.Config{ + CustomSuffix: "BenchProcessDocuments", + }, } ingestor := NewIngestor(cfg, &FakeClient{}) @@ -563,7 +570,13 @@ func TestProcessDocumentType(t *testing.T) { client := &FakeClient{} mp, err := mappingprovider.New("", mappingprovider.WithMapping(map[string]seq.MappingTypes{})) r.NoError(err) - ingestor := NewIngestor(IngestorConfig{MaxInflightBulks: 1, MappingProvider: mp}, client) + ingestor := NewIngestor(IngestorConfig{ + MaxInflightBulks: 1, + MappingProvider: mp, + BulkCircuit: circuitbreaker.Config{ + CustomSuffix: "TestProcessDocumentType", + }, + }, client) defer ingestor.Stop() stop := false diff --git a/tests/integration_tests/integration_test.go b/tests/integration_tests/integration_test.go index 131f5e2d..a5f30594 100644 --- a/tests/integration_tests/integration_test.go +++ b/tests/integration_tests/integration_test.go @@ -60,6 +60,7 @@ func getAutoTimeGenerator(start time.Time, step time.Duration) func() time.Time } func (s *IntegrationTestSuite) TestSearchOne() { + s.Config.Name = fmt.Sprintf("%s-%s", s.Config.Name, s.T().Name()) origDocs := []string{ `{"service":"a", "xxxx":"yyyy"}`, `{"k8s_pod":"sq-toloka-loader-1788964-dryrun-58hmw", "yyyy":"xxxx"}`, @@ -118,6 +119,7 @@ func (s *IntegrationTestSuite) TestSearchOne() { func (s *IntegrationTestSuite) TestPipeFields() { config := *s.Config + config.Name = fmt.Sprintf("%s-%s", config.Name, s.T().Name()) config.Mapping = map[string]seq.MappingTypes{ "event": seq.NewSingleType(seq.TokenizerTypeKeyword, "", 0), "message": seq.NewSingleType(seq.TokenizerTypeKeyword, "", 0), @@ -199,6 +201,7 @@ func (s *IntegrationTestSuite) TestPipeFields() { } func (s *IntegrationTestSuite) TestSearchOneHTTP() { + s.Config.Name = fmt.Sprintf("%s-%s", s.Config.Name, s.T().Name()) origDocs := []string{ `{"service":"a", "xxxx":"yyyy"}`, `{"service":"b", "k8s_pod":"sq-toloka-loader-1788964-dryrun-58hmw", "yyyy":"xxxx"}`, @@ -240,6 +243,7 @@ func (s *IntegrationTestSuite) TestSearchOneHTTP() { } func (s *IntegrationTestSuite) TestSearchNothing() { + s.Config.Name = fmt.Sprintf("%s-%s", s.Config.Name, s.T().Name()) origDocs := []string{ `{"service":"a", "xxxx":"yyyy"}`, `{"k8s_pod":"sq-toloka-loader-1788964-dryrun-58hmw", "yyyy":"xxxx"}`, @@ -256,6 +260,7 @@ func (s *IntegrationTestSuite) TestSearchNothing() { } func (s *IntegrationTestSuite) TestSearchBackwards() { + s.Config.Name = fmt.Sprintf("%s-%s", s.Config.Name, s.T().Name()) now := time.Now() before := now.Add(-5 * time.Hour) origDocs := []string{ @@ -287,6 +292,7 @@ func (s *IntegrationTestSuite) TestSearchBackwards() { } func (s *IntegrationTestSuite) TestSearchSequence() { + s.Config.Name = fmt.Sprintf("%s-%s", s.Config.Name, s.T().Name()) docTemplate := `{"service":"a","time":"%s"}` bulks := 16 bulkSize := 1024 @@ -336,6 +342,7 @@ func (s *IntegrationTestSuite) TestSearchSequence() { } func (s *IntegrationTestSuite) TestSearchMany() { + s.Config.Name = fmt.Sprintf("%s-%s", s.Config.Name, s.T().Name()) const NetN = 256 * 1024 n := int(math.Floor(NetN * 1.2)) @@ -377,6 +384,7 @@ func getBulkIterationsNum(e *setup.TestingEnv) int { } func (s *IntegrationTestSuite) envWithDummyDocs(n int) (*setup.TestingEnv, []string) { + s.Config.Name = fmt.Sprintf("%s-%s", s.Config.Name, s.T().Name()) env := setup.NewTestingEnv(s.Config) str := largeString(20) @@ -439,6 +447,7 @@ func (s *IntegrationTestSuite) TestFetch() { } func (s *IntegrationTestSuite) TestFetchNotFound() { + s.Config.Name = fmt.Sprintf("%s-%s", s.Config.Name, s.T().Name()) env := setup.NewTestingEnv(s.Config) defer env.StopAll() @@ -461,6 +470,7 @@ func (s *IntegrationTestSuite) TestFetchNotFound() { } func (s *IntegrationTestSuite) TestMulti() { + s.Config.Name = fmt.Sprintf("%s-%s", s.Config.Name, s.T().Name()) // ingest getNextTs := getAutoTsGenerator(time.Now(), -time.Second) origDocs := []string{ @@ -498,6 +508,7 @@ func collectIDs(qpr *seq.QPR) []string { } func (s *IntegrationTestSuite) TestSearchNot() { + s.Config.Name = fmt.Sprintf("%s-%s", s.Config.Name, s.T().Name()) env := setup.NewTestingEnv(s.Config) defer env.StopAll() @@ -573,6 +584,7 @@ func (s *IntegrationTestSuite) TestSearchNot() { } func (s *IntegrationTestSuite) TestSearchPattern() { + s.Config.Name = fmt.Sprintf("%s-%s", s.Config.Name, s.T().Name()) env := setup.NewTestingEnv(s.Config) defer env.StopAll() @@ -607,6 +619,7 @@ func (s *IntegrationTestSuite) TestSearchPattern() { } func (s *IntegrationTestSuite) TestSearchSimple() { + s.Config.Name = fmt.Sprintf("%s-%s", s.Config.Name, s.T().Name()) env := setup.NewTestingEnv(s.Config) defer env.StopAll() @@ -645,6 +658,7 @@ func (s *IntegrationTestSuite) TestSearchSimple() { } func (s *IntegrationTestSuite) TestManySearchRequests() { + s.Config.Name = fmt.Sprintf("%s-%s", s.Config.Name, s.T().Name()) env := setup.NewTestingEnv(s.Config) defer env.StopAll() @@ -665,6 +679,7 @@ func (s *IntegrationTestSuite) TestManySearchRequests() { } func (s *IntegrationTestSuite) TestAgg() { + s.Config.Name = fmt.Sprintf("%s-%s", s.Config.Name, s.T().Name()) t := s.T() env := setup.NewTestingEnv(s.Config) @@ -750,6 +765,7 @@ func (s *IntegrationTestSuite) TestAgg() { } func (s *IntegrationTestSuite) TestTimeseries() { + s.Config.Name = fmt.Sprintf("%s-%s", s.Config.Name, s.T().Name()) t := s.T() env := setup.NewTestingEnv(s.Config) @@ -1208,6 +1224,7 @@ func (s *IntegrationTestSuite) TestAggStat() { } func (s *IntegrationTestSuite) TestAggNoTotal() { + s.Config.Name = fmt.Sprintf("%s-%s", s.Config.Name, s.T().Name()) env := setup.NewTestingEnv(s.Config) defer env.StopAll() @@ -1315,6 +1332,7 @@ func (s *IntegrationTestSuite) TestAggNoTotal() { } func (s *IntegrationTestSuite) TestSeal() { + s.Config.Name = fmt.Sprintf("%s-%s", s.Config.Name, s.T().Name()) env := setup.NewTestingEnv(s.Config) bulksNum := getBulkIterationsNum(env) @@ -1384,6 +1402,7 @@ func (s *IntegrationTestSuite) TestSeal() { } func (s *IntegrationTestSuite) TestSearchRange() { + s.Config.Name = fmt.Sprintf("%s-%s", s.Config.Name, s.T().Name()) doc := `{"service": "test-service", "level": "%d"}` env := setup.NewTestingEnv(s.Config) @@ -1424,6 +1443,7 @@ func (s *IntegrationTestSuite) TestSearchRange() { } func (s *IntegrationTestSuite) TestQueryErr() { + s.Config.Name = fmt.Sprintf("%s-%s", s.Config.Name, s.T().Name()) origDocs := []string{ `{"service":"a", "xxxx":"yyyy"}`, `{"service":"a", "yyyy":"xxxx"}`, @@ -1443,6 +1463,7 @@ func (s *IntegrationTestSuite) TestQueryErr() { func (s *IntegrationTestSuite) TestConnectionRefused() { s.T().Skip() // temporary skip this test until we fix it in CORELOG-299 + s.Config.Name = fmt.Sprintf("%s-%s", s.Config.Name, s.T().Name()) env := setup.NewTestingEnv(s.Config) env.StopStore() defer env.StopAll() @@ -1469,6 +1490,7 @@ func (s *IntegrationTestSuite) TestConnectionRefused() { } func (s *IntegrationTestSuite) TestSearchProxyTimeout() { + s.Config.Name = fmt.Sprintf("%s-%s", s.Config.Name, s.T().Name()) if s.Config.Name != configBasic { s.T().Skip("no need to run in", s.Config.Name, "env") } @@ -1507,6 +1529,7 @@ func (s *IntegrationTestSuite) TestSearchProxyTimeout() { } func (s *IntegrationTestSuite) TestSearchStoreTimeout() { + s.Config.Name = fmt.Sprintf("%s-%s", s.Config.Name, s.T().Name()) if s.Config.Name != configBasic { s.T().Skip("no need to run in", s.Config.Name, "env") } @@ -1539,6 +1562,7 @@ func (s *IntegrationTestSuite) TestSearchStoreTimeout() { } func (s *IntegrationTestSuite) TestBulkBadTimestamp() { + s.Config.Name = fmt.Sprintf("%s-%s", s.Config.Name, s.T().Name()) type Doc struct { Service string `json:"service"` Level string `json:"level"` @@ -1586,7 +1610,7 @@ const configBasic = "Basic" func TestBasicIntegration(t *testing.T) { cfg := setup.TestingEnvConfig{ - Name: configBasic, + Name: fmt.Sprintf("%s-%s", configBasic, t.Name()), IngestorCount: 1, HotShards: 1, HotFactor: 1, @@ -1598,7 +1622,7 @@ func TestBasicIntegration(t *testing.T) { func TestColdStoreIntegration(t *testing.T) { cfg := setup.TestingEnvConfig{ - Name: "WithColdStore", + Name: fmt.Sprintf("%s-%s", "WithColdStore", t.Name()), IngestorCount: 1, ColdShards: 1, ColdFactor: 1, @@ -1613,7 +1637,7 @@ func TestColdStoreIntegration(t *testing.T) { func TestColdHotStoreIntegration(t *testing.T) { cfg := setup.TestingEnvConfig{ - Name: "WithColdAndHotStoreEnabled", + Name: fmt.Sprintf("%s-%s", "WithColdAndHotStoreEnabled", t.Name()), IngestorCount: 2, ColdShards: 1, ColdFactor: 1, @@ -1628,7 +1652,7 @@ func TestColdHotStoreIntegration(t *testing.T) { func TestBigWithReplicasIntegration(t *testing.T) { cfg := setup.TestingEnvConfig{ - Name: "BigWithReplicas", + Name: fmt.Sprintf("%s-%s", "BigWithReplicas", t.Name()), IngestorCount: 2, ColdShards: 4, ColdFactor: 1, @@ -1684,6 +1708,7 @@ func copySlice[V any](src []V) []V { } func (s *IntegrationTestSuite) TestPathSearch() { + s.Config.Name = fmt.Sprintf("%s-%s", s.Config.Name, s.T().Name()) env := setup.NewTestingEnv(s.Config) defer env.StopAll() @@ -1742,6 +1767,7 @@ func (s *IntegrationTestSuite) TestPathSearch() { } func (s *IntegrationTestSuite) TestSearchFieldsWithMultipleTypes() { + s.Config.Name = fmt.Sprintf("%s-%s", s.Config.Name, s.T().Name()) t := s.T() env := setup.NewTestingEnv(s.Config) @@ -1794,6 +1820,7 @@ func (s *IntegrationTestSuite) TestSearchFieldsWithMultipleTypes() { } func (s *IntegrationTestSuite) TestAggregateFieldsWithMultipleTypes() { + s.Config.Name = fmt.Sprintf("%s-%s", s.Config.Name, s.T().Name()) t := s.T() env := setup.NewTestingEnv(s.Config) @@ -1843,6 +1870,7 @@ func (s *IntegrationTestSuite) TestAggregateFieldsWithMultipleTypes() { // time field is replaced with time.Now() func (s *IntegrationTestSuite) TestTimeField() { config := *s.Config + config.Name = fmt.Sprintf("%s-%s", config.Name, s.T().Name()) config.Mapping = map[string]seq.MappingTypes{ "event": seq.NewSingleType(seq.TokenizerTypeKeyword, "", 0), "message": seq.NewSingleType(seq.TokenizerTypeKeyword, "", 0), @@ -1883,6 +1911,7 @@ func (s *IntegrationTestSuite) TestAsyncSearch() { r := require.New(t) cfg := *s.Config + cfg.Name = fmt.Sprintf("%s-%s", cfg.Name, s.T().Name()) cfg.Mapping = map[string]seq.MappingTypes{ "ip": seq.NewSingleType(seq.TokenizerTypeKeyword, "", 0), "method": seq.NewSingleType(seq.TokenizerTypeKeyword, "", 0), diff --git a/tests/integration_tests/replicas_test.go b/tests/integration_tests/replicas_test.go index 8ebcb024..cf4750cb 100644 --- a/tests/integration_tests/replicas_test.go +++ b/tests/integration_tests/replicas_test.go @@ -37,6 +37,7 @@ func NewReplicaEnv(t *testing.T, config setup.TestingEnvConfig) ReplicasEnv { } config.DataDir = dir config.IngestorCount = 1 + config.Name = fmt.Sprintf("%s-%s", config.Name, t.Name()) env := ReplicasEnv{setup.NewTestingEnv(&config)} diff --git a/tests/integration_tests/single_test.go b/tests/integration_tests/single_test.go index 76c63e29..b1ee90fb 100644 --- a/tests/integration_tests/single_test.go +++ b/tests/integration_tests/single_test.go @@ -571,9 +571,10 @@ func (s *SingleTestSuite) TestSealedMultiFetch() { } func TestSingleSuite(t *testing.T) { - for _, cfg := range suites.SingleEnvs() { + for i, cfg := range suites.SingleEnvs() { t.Run(cfg.Name, func(t *testing.T) { t.Parallel() + cfg.Name = fmt.Sprintf("%s%d", cfg.Name, i) suite.Run(t, NewSingleTestSuite(cfg)) }) } diff --git a/tests/setup/env.go b/tests/setup/env.go index e95543ce..d36681f9 100644 --- a/tests/setup/env.go +++ b/tests/setup/env.go @@ -350,6 +350,7 @@ func MakeIngestors(cfg *TestingEnvConfig, hot, cold [][]string) []*Ingestor { BulkCircuit: circuitbreaker.Config{ RequestVolumeThreshold: 101, // disable circuit breaker Timeout: time.Hour, + CustomSuffix: cfg.Name, }, MaxInflightBulks: 0, AllowedTimeDrift: 24 * time.Hour, diff --git a/tests/suites/single.go b/tests/suites/single.go index 46978219..11739491 100644 --- a/tests/suites/single.go +++ b/tests/suites/single.go @@ -1,6 +1,7 @@ package suites import ( + "fmt" "math" "slices" "strings" @@ -147,6 +148,7 @@ func NewSingle(cfg *setup.TestingEnvConfig) *Single { func (s *Single) BeforeTest(suiteName, testName string) { s.Base.BeforeTest(suiteName, testName) + s.Config.Name = fmt.Sprintf("%s-%s", s.Config.Name, testName) s.Env = setup.NewTestingEnv(s.Config) }