From 4f2daf245159d2e6134a90b9edd0755347044e15 Mon Sep 17 00:00:00 2001 From: PatStiles Date: Thu, 14 Nov 2024 11:26:57 -0300 Subject: [PATCH 01/25] progress --- core/chainio/avs_writer.go | 16 ++++++- core/retry.go | 76 ++++++++++++++++++++++++---------- core/retry_test.go | 21 +++++++++- core/utils/eth_client_utils.go | 8 ++-- 4 files changed, 92 insertions(+), 29 deletions(-) diff --git a/core/chainio/avs_writer.go b/core/chainio/avs_writer.go index 012e069bb9..3ca56394d2 100644 --- a/core/chainio/avs_writer.go +++ b/core/chainio/avs_writer.go @@ -95,6 +95,16 @@ func (w *AvsWriter) SendAggregatedResponse(batchIdentifierHash [32]byte, batchMe txOpts.NoSend = false i := 0 + // Set Retry condfig for RespondToTaskV2 + respondToTaskV2Config := retry.DefaultRetryConfig() + respondToTaskV2Config.NumRetries = 0 + respondToTaskV2Config.MaxElapsedTime = 0 + + // Set Retry config for WaitForTxRetryable + waitForTxConfig := retry.DefaultRetryConfig() + waitForTxConfig.MaxInterval = 2 * time.Second + waitForTxConfig.NumRetries = 0 + respondToTaskV2Func := func() (*types.Receipt, error) { gasPrice, err := utils.GetGasPriceRetryable(w.Client, w.ClientFallback) if err != nil { @@ -126,7 +136,8 @@ func (w *AvsWriter) SendAggregatedResponse(batchIdentifierHash [32]byte, batchMe return nil, err } - receipt, err := utils.WaitForTransactionReceiptRetryable(w.Client, w.ClientFallback, tx.Hash(), timeToWaitBeforeBump) + waitForTxConfig.MaxElapsedTime = timeToWaitBeforeBump + receipt, err := utils.WaitForTransactionReceiptRetryable(w.Client, w.ClientFallback, tx.Hash(), waitForTxConfig) if receipt != nil { return receipt, nil } @@ -142,7 +153,8 @@ func (w *AvsWriter) SendAggregatedResponse(batchIdentifierHash [32]byte, batchMe return nil, fmt.Errorf("transaction failed") } - return retry.RetryWithData(respondToTaskV2Func, retry.MinDelay, retry.RetryFactor, 0, retry.MaxInterval, 0) + //return retry.RetryWithData(respondToTaskV2Func, retry.MinDelay, retry.RetryFactor, 0, retry.MaxInterval, 0) + return retry.RetryWithData(respondToTaskV2Func, respondToTaskV2Config) } func (w *AvsWriter) checkRespondToTaskFeeLimit(tx *types.Transaction, txOpts bind.TransactOpts, batchIdentifierHash [32]byte, senderAddress [20]byte) error { diff --git a/core/retry.go b/core/retry.go index 551be59c77..33beb5b860 100644 --- a/core/retry.go +++ b/core/retry.go @@ -25,15 +25,47 @@ func (e PermanentError) Is(err error) bool { } const ( - MinDelay = 1 * time.Second // Initial delay for retry interval. - MaxInterval = 60 * time.Second // Maximum interval an individual retry may have. - MaxElapsedTime = 0 * time.Second // Maximum time all retries may take. `0` corresponds to no limit on the time of the retries. - RetryFactor float64 = 2 // Multiplier factor computed exponential retry interval is scaled by. - NumRetries uint64 = 3 // Total number of retries attempted. - MinDelayChain = 12 * time.Second // Initial delay for retry interval for contract calls. Corresponds to 1 ethereum block. - MaxIntervalChain = 2 * time.Minute // Maximum interval for an individual retry. + DefaultInitialInterval = 1 * time.Second // Initial delay for retry interval. + DefaultMaxInterval = 60 * time.Second // Maximum interval an individual retry may have. + DefaultMaxElapsedTime = 0 * time.Second // Maximum time all retries may take. `0` corresponds to no limit on the time of the retries. + DefaultRandomizationFactor float64 = 0 // Randomization (Jitter) factor used to map retry interval to a range of values around the computed interval. In precise terms (random value in range [1 - randomizationfactor, 1 + randomizationfactor]). NOTE: This is set to 0 as we do not use jitter in Aligned. + DefaultMultiplier float64 = 2 // Multiplier factor computed exponential retry interval is scaled by. + DefaultNumRetries uint64 = 3 // Total number of retries attempted. + ChainInitialInterval = 12 * time.Second // Initial delay for retry interval for contract calls. Corresponds to 1 ethereum block. + ChainMaxInterval = 2 * time.Minute // Maximum interval for an individual retry. ) +type RetryConfig struct { + InitialInterval time.Duration // Initial delay for retry interval. + MaxInterval time.Duration // Maximum interval an individual retry may have. + MaxElapsedTime time.Duration // Maximum time all retries may take. `0` corresponds to no limit on the time of the retries. + RandomizationFactor float64 + Multiplier float64 + NumRetries uint64 +} + +func DefaultRetryConfig() *RetryConfig { + return &RetryConfig{ + InitialInterval: DefaultInitialInterval, + MaxInterval: DefaultMaxInterval, + MaxElapsedTime: DefaultMaxElapsedTime, + RandomizationFactor: DefaultRandomizationFactor, + Multiplier: DefaultMultiplier, + NumRetries: DefaultNumRetries, + } +} + +func ChainRetryConfig() *RetryConfig { + return &RetryConfig{ + InitialInterval: ChainInitialInterval, + MaxInterval: ChainMaxInterval, + MaxElapsedTime: DefaultMaxElapsedTime, + RandomizationFactor: DefaultRandomizationFactor, + Multiplier: DefaultMultiplier, + NumRetries: DefaultNumRetries, + } +} + /* Retry and RetryWithData are custom retry functions used in Aligned's aggregator and operator to facilitate consistent retry logic across the system. They are interfaces for around Cenk Alti (https://github.com/cenkalti) backoff library (https://github.com/cenkalti/backoff). We would like to thank him for his great work. @@ -91,8 +123,10 @@ request retry_interval (12 sec) randomized_interval (0.5) randomized_int Reference: https://github.com/cenkalti/backoff/blob/v4/exponential.go#L9 */ +// TODO: Make config optional by using default but passing nil. // Same as Retry only that the functionToRetry can return a value upon correct execution -func RetryWithData[T any](functionToRetry func() (T, error), minDelay time.Duration, factor float64, maxTries uint64, maxInterval time.Duration, maxElapsedTime time.Duration) (T, error) { +func RetryWithData[T any](functionToRetry func() (T, error), config *RetryConfig) (T, error) { + //func RetryWithData[T any](functionToRetry func() (T, error), minDelay time.Duration, factor float64, maxTries uint64, maxInterval time.Duration, maxElapsedTime time.Duration) (T, error) { f := func() (T, error) { var ( val T @@ -120,15 +154,15 @@ func RetryWithData[T any](functionToRetry func() (T, error), minDelay time.Durat randomOption := backoff.WithRandomizationFactor(0) - initialRetryOption := backoff.WithInitialInterval(minDelay) - multiplierOption := backoff.WithMultiplier(factor) - maxIntervalOption := backoff.WithMaxInterval(maxInterval) - maxElapsedTimeOption := backoff.WithMaxElapsedTime(maxElapsedTime) + initialRetryOption := backoff.WithInitialInterval(config.InitialInterval) + multiplierOption := backoff.WithMultiplier(config.Multiplier) + maxIntervalOption := backoff.WithMaxInterval(config.MaxInterval) + maxElapsedTimeOption := backoff.WithMaxElapsedTime(config.MaxElapsedTime) expBackoff := backoff.NewExponentialBackOff(randomOption, multiplierOption, initialRetryOption, maxIntervalOption, maxElapsedTimeOption) var maxRetriesBackoff backoff.BackOff - if maxTries > 0 { - maxRetriesBackoff = backoff.WithMaxRetries(expBackoff, maxTries) + if config.NumRetries > 0 { + maxRetriesBackoff = backoff.WithMaxRetries(expBackoff, config.NumRetries) } else { maxRetriesBackoff = expBackoff } @@ -142,7 +176,7 @@ func RetryWithData[T any](functionToRetry func() (T, error), minDelay time.Durat // from the configuration are reached, or until a `PermanentError` is returned. // The function to be retried should return `PermanentError` when the condition for stop retrying // is met. -func Retry(functionToRetry func() error, minDelay time.Duration, factor float64, maxTries uint64, maxInterval time.Duration, maxElapsedTime time.Duration) error { +func Retry(functionToRetry func() error, config *RetryConfig) error { f := func() error { var err error func() { @@ -167,15 +201,15 @@ func Retry(functionToRetry func() error, minDelay time.Duration, factor float64, randomOption := backoff.WithRandomizationFactor(0) - initialRetryOption := backoff.WithInitialInterval(minDelay) - multiplierOption := backoff.WithMultiplier(factor) - maxIntervalOption := backoff.WithMaxInterval(maxInterval) - maxElapsedTimeOption := backoff.WithMaxElapsedTime(maxElapsedTime) + initialRetryOption := backoff.WithInitialInterval(config.InitialInterval) + multiplierOption := backoff.WithMultiplier(config.Multiplier) + maxIntervalOption := backoff.WithMaxInterval(config.MaxInterval) + maxElapsedTimeOption := backoff.WithMaxElapsedTime(config.MaxElapsedTime) expBackoff := backoff.NewExponentialBackOff(randomOption, multiplierOption, initialRetryOption, maxIntervalOption, maxElapsedTimeOption) var maxRetriesBackoff backoff.BackOff - if maxTries > 0 { - maxRetriesBackoff = backoff.WithMaxRetries(expBackoff, maxTries) + if config.NumRetries > 0 { + maxRetriesBackoff = backoff.WithMaxRetries(expBackoff, config.NumRetries) } else { maxRetriesBackoff = expBackoff } diff --git a/core/retry_test.go b/core/retry_test.go index c8e7fbfb2a..31c4af554a 100644 --- a/core/retry_test.go +++ b/core/retry_test.go @@ -42,7 +42,16 @@ func TestRetryWithData(t *testing.T) { x, err := DummyFunction(43) return &x, err } - _, err := retry.RetryWithData(function, 1000, 2, 3, retry.MaxInterval, retry.MaxElapsedTime) + + config := &retry.RetryConfig{ + InitialInterval: 1000, + MaxInterval: 2, + MaxElapsedTime: 3, + RandomizationFactor: 0, + Multiplier: retry.DefaultMultiplier, + NumRetries: retry.DefaultNumRetries, + } + _, err := retry.RetryWithData(function, config) if err != nil { t.Errorf("Retry error!: %s", err) } @@ -53,7 +62,15 @@ func TestRetry(t *testing.T) { _, err := DummyFunction(43) return err } - err := retry.Retry(function, 1000, 2, 3, retry.MaxInterval, retry.MaxElapsedTime) + config := &retry.RetryConfig{ + InitialInterval: 1000, + MaxInterval: 2, + MaxElapsedTime: 3, + RandomizationFactor: 0, + Multiplier: retry.DefaultMultiplier, + NumRetries: retry.DefaultNumRetries, + } + err := retry.Retry(function, config) if err != nil { t.Errorf("Retry error!: %s", err) } diff --git a/core/utils/eth_client_utils.go b/core/utils/eth_client_utils.go index 689f7cd17e..065f25d9bb 100644 --- a/core/utils/eth_client_utils.go +++ b/core/utils/eth_client_utils.go @@ -3,7 +3,6 @@ package utils import ( "context" "math/big" - "time" "github.com/Layr-Labs/eigensdk-go/chainio/clients/eth" eigentypes "github.com/Layr-Labs/eigensdk-go/types" @@ -20,7 +19,7 @@ import ( // Setting a higher value will imply doing less retries across the waitTimeout, and so we might lose the receipt // All errors are considered Transient Errors // - Retry times: 0.5s, 1s, 2s, 2s, 2s, ... until it reaches waitTimeout -func WaitForTransactionReceiptRetryable(client eth.InstrumentedClient, fallbackClient eth.InstrumentedClient, txHash gethcommon.Hash, waitTimeout time.Duration) (*types.Receipt, error) { +func WaitForTransactionReceiptRetryable(client eth.InstrumentedClient, fallbackClient eth.InstrumentedClient, txHash gethcommon.Hash, config *retry.RetryConfig) (*types.Receipt, error) { receipt_func := func() (*types.Receipt, error) { receipt, err := client.TransactionReceipt(context.Background(), txHash) if err != nil { @@ -32,7 +31,7 @@ func WaitForTransactionReceiptRetryable(client eth.InstrumentedClient, fallbackC } return receipt, nil } - return retry.RetryWithData(receipt_func, retry.MinDelay, retry.RetryFactor, 0, time.Second*2, waitTimeout) + return retry.RetryWithData(receipt_func, config) } func BytesToQuorumNumbers(quorumNumbersBytes []byte) eigentypes.QuorumNums { @@ -71,6 +70,7 @@ func CalculateGasPriceBumpBasedOnRetry(currentGasPrice *big.Int, baseBumpPercent return bumpedGasPrice } +//TODO: move to retryable function file /* GetGasPriceRetryable Get the gas price from the client with retry logic. @@ -89,5 +89,5 @@ func GetGasPriceRetryable(client eth.InstrumentedClient, fallbackClient eth.Inst return gasPrice, nil } - return retry.RetryWithData(respondToTaskV2_func, retry.MinDelay, retry.RetryFactor, retry.NumRetries, retry.MaxInterval, retry.MaxElapsedTime) + return retry.RetryWithData(respondToTaskV2_func, retry.DefaultRetryConfig()) } From 3ae2fc08c990a86962f3bea256cbdd424642551e Mon Sep 17 00:00:00 2001 From: PatStiles Date: Thu, 14 Nov 2024 12:46:25 -0300 Subject: [PATCH 02/25] pass in config file --- aggregator/pkg/aggregator.go | 3 ++- aggregator/pkg/server.go | 8 +++++--- core/chainio/avs_subscriber.go | 6 ++++-- core/chainio/avs_writer.go | 1 - core/chainio/retryable.go | 23 ++++++++++++----------- core/retry_test.go | 6 +++--- 6 files changed, 26 insertions(+), 21 deletions(-) diff --git a/aggregator/pkg/aggregator.go b/aggregator/pkg/aggregator.go index 620bdd27f8..97665c7fa2 100644 --- a/aggregator/pkg/aggregator.go +++ b/aggregator/pkg/aggregator.go @@ -392,6 +392,7 @@ func (agg *Aggregator) AddNewTask(batchMerkleRoot [32]byte, senderAddress [20]by // |---RETRYABLE---| +// TODO: Add Retryable Label /* InitializeNewTask Initialize a new task in the BLS Aggregation service @@ -413,7 +414,7 @@ func (agg *Aggregator) InitializeNewTask(batchIndex uint32, taskCreatedBlock uin } return err } - return retry.Retry(initializeNewTask_func, retry.MinDelay, retry.RetryFactor, retry.NumRetries, retry.MaxInterval, retry.MaxElapsedTime) + return retry.Retry(initializeNewTask_func, retry.DefaultRetryConfig()) } // Long-lived goroutine that periodically checks and removes old Tasks from stored Maps diff --git a/aggregator/pkg/server.go b/aggregator/pkg/server.go index 0c3ee5c8bd..f56d0e64a8 100644 --- a/aggregator/pkg/server.go +++ b/aggregator/pkg/server.go @@ -53,6 +53,7 @@ func (agg *Aggregator) ProcessOperatorSignedTaskResponseV2(signedTaskResponse *t "operatorId", hex.EncodeToString(signedTaskResponse.OperatorId[:])) taskIndex := uint32(0) + // TODO: Add Retryable taskIndex, err := agg.GetTaskIndex(signedTaskResponse.BatchIdentifierHash) if err != nil { @@ -114,6 +115,7 @@ func (agg *Aggregator) ServerRunning(_ *struct{}, reply *int64) error { // |---RETRYABLE---| +// TODO: Add Retryable /* - Errors: Permanent: @@ -137,9 +139,10 @@ func (agg *Aggregator) ProcessNewSignature(ctx context.Context, taskIndex uint32 return err } - return retry.Retry(processNewSignature_func, retry.MinDelayChain, retry.RetryFactor, retry.NumRetries, retry.MaxIntervalChain, retry.MaxElapsedTime) + return retry.Retry(processNewSignature_func, retry.ChainRetryConfig()) } +// TODO: Add Retryable + Comment func (agg *Aggregator) GetTaskIndex(batchIdentifierHash [32]byte) (uint32, error) { getTaskIndex_func := func() (uint32, error) { agg.taskMutex.Lock() @@ -153,6 +156,5 @@ func (agg *Aggregator) GetTaskIndex(batchIdentifierHash [32]byte) (uint32, error return taskIndex, nil } } - - return retry.RetryWithData(getTaskIndex_func, retry.MinDelay, retry.RetryFactor, retry.NumRetries, retry.MaxInterval, retry.MaxElapsedTime) + return retry.RetryWithData(getTaskIndex_func, retry.DefaultRetryConfig()) } diff --git a/core/chainio/avs_subscriber.go b/core/chainio/avs_subscriber.go index ea810da628..bffb9eea96 100644 --- a/core/chainio/avs_subscriber.go +++ b/core/chainio/avs_subscriber.go @@ -68,13 +68,15 @@ func (s *AvsSubscriber) SubscribeToNewTasksV2(newTaskCreatedChan chan *servicema // Subscribe to new tasks sub, err := SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil) if err != nil { - s.logger.Error("Primary failed to subscribe to new AlignedLayer V2 tasks after %d retries", retry.NumRetries, "err", err) + //TODO: Confirm these are accurate + s.logger.Error("Primary failed to subscribe to new AlignedLayer V2 tasks after %d retries", retry.DefaultNumRetries, "err", err) return nil, err } subFallback, err := SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil) if err != nil { - s.logger.Error("Fallback failed to subscribe to new AlignedLayer V2 tasks after %d retries", retry.NumRetries, "err", err) + //TODO: Confirm these are accurate + s.logger.Error("Fallback failed to subscribe to new AlignedLayer V2 tasks after %d retries", retry.DefaultNumRetries, "err", err) return nil, err } s.logger.Info("Subscribed to new AlignedLayer V2 tasks") diff --git a/core/chainio/avs_writer.go b/core/chainio/avs_writer.go index 3ca56394d2..19b788aecf 100644 --- a/core/chainio/avs_writer.go +++ b/core/chainio/avs_writer.go @@ -153,7 +153,6 @@ func (w *AvsWriter) SendAggregatedResponse(batchIdentifierHash [32]byte, batchMe return nil, fmt.Errorf("transaction failed") } - //return retry.RetryWithData(respondToTaskV2Func, retry.MinDelay, retry.RetryFactor, 0, retry.MaxInterval, 0) return retry.RetryWithData(respondToTaskV2Func, respondToTaskV2Config) } diff --git a/core/chainio/retryable.go b/core/chainio/retryable.go index 0ac44a59ab..1a54e6ceb8 100644 --- a/core/chainio/retryable.go +++ b/core/chainio/retryable.go @@ -32,7 +32,8 @@ func (w *AvsWriter) RespondToTaskV2Retryable(opts *bind.TransactOpts, batchMerkl } return tx, err } - return retry.RetryWithData(respondToTaskV2_func, retry.MinDelayChain, retry.RetryFactor, retry.NumRetries, retry.MaxIntervalChain, retry.MaxElapsedTime) + //return retry.RetryWithData(respondToTaskV2_func, retry.MinDelayChain, retry.RetryFactor, retry.NumRetries, retry.MaxIntervalChain, retry.MaxElapsedTime) + return retry.RetryWithData(respondToTaskV2_func, retry.DefaultRetryConfig()) } /* @@ -60,7 +61,7 @@ func (w *AvsWriter) BatchesStateRetryable(opts *bind.CallOpts, arg0 [32]byte) (s } return state, err } - return retry.RetryWithData(batchesState_func, retry.MinDelay, retry.RetryFactor, retry.NumRetries, retry.MaxInterval, retry.MaxElapsedTime) + return retry.RetryWithData(batchesState_func, retry.DefaultRetryConfig()) } /* @@ -79,7 +80,7 @@ func (w *AvsWriter) BatcherBalancesRetryable(opts *bind.CallOpts, senderAddress } return batcherBalance, err } - return retry.RetryWithData(batcherBalances_func, retry.MinDelay, retry.RetryFactor, retry.NumRetries, retry.MaxInterval, retry.MaxElapsedTime) + return retry.RetryWithData(batcherBalances_func, retry.DefaultRetryConfig()) } /* @@ -100,7 +101,7 @@ func (w *AvsWriter) BalanceAtRetryable(ctx context.Context, aggregatorAddress co } return aggregatorBalance, err } - return retry.RetryWithData(balanceAt_func, retry.MinDelay, retry.RetryFactor, retry.NumRetries, retry.MaxInterval, retry.MaxElapsedTime) + return retry.RetryWithData(balanceAt_func, retry.DefaultRetryConfig()) } // |---AVS_SUBSCRIBER---| @@ -121,7 +122,7 @@ func (s *AvsSubscriber) BlockNumberRetryable(ctx context.Context) (uint64, error } return latestBlock, err } - return retry.RetryWithData(latestBlock_func, retry.MinDelay, retry.RetryFactor, retry.NumRetries, retry.MaxInterval, retry.MaxElapsedTime) + return retry.RetryWithData(latestBlock_func, retry.DefaultRetryConfig()) } /* @@ -134,7 +135,7 @@ func (s *AvsSubscriber) FilterBatchV2Retryable(opts *bind.FilterOpts, batchMerkl filterNewBatchV2_func := func() (*servicemanager.ContractAlignedLayerServiceManagerNewBatchV2Iterator, error) { return s.AvsContractBindings.ServiceManager.FilterNewBatchV2(opts, batchMerkleRoot) } - return retry.RetryWithData(filterNewBatchV2_func, retry.MinDelay, retry.RetryFactor, retry.NumRetries, retry.MaxInterval, retry.MaxElapsedTime) + return retry.RetryWithData(filterNewBatchV2_func, retry.DefaultRetryConfig()) } /* @@ -147,7 +148,7 @@ func (s *AvsSubscriber) FilterBatchV3Retryable(opts *bind.FilterOpts, batchMerkl filterNewBatchV2_func := func() (*servicemanager.ContractAlignedLayerServiceManagerNewBatchV3Iterator, error) { return s.AvsContractBindings.ServiceManager.FilterNewBatchV3(opts, batchMerkleRoot) } - return retry.RetryWithData(filterNewBatchV2_func, retry.MinDelay, retry.RetryFactor, retry.NumRetries, retry.MaxInterval, retry.MaxElapsedTime) + return retry.RetryWithData(filterNewBatchV2_func, retry.DefaultRetryConfig()) } /* @@ -169,7 +170,7 @@ func (s *AvsSubscriber) BatchesStateRetryable(opts *bind.CallOpts, arg0 [32]byte return s.AvsContractBindings.ServiceManager.ContractAlignedLayerServiceManagerCaller.BatchesState(opts, arg0) } - return retry.RetryWithData(batchState_func, retry.MinDelay, retry.RetryFactor, retry.NumRetries, retry.MaxInterval, retry.MaxElapsedTime) + return retry.RetryWithData(batchState_func, retry.DefaultRetryConfig()) } /* @@ -188,7 +189,7 @@ func (s *AvsSubscriber) SubscribeNewHeadRetryable(ctx context.Context, c chan<- } return sub, err } - return retry.RetryWithData(subscribeNewHead_func, retry.MinDelay, retry.RetryFactor, retry.NumRetries, retry.MaxInterval, retry.MaxElapsedTime) + return retry.RetryWithData(subscribeNewHead_func, retry.DefaultRetryConfig()) } /* @@ -206,7 +207,7 @@ func SubscribeToNewTasksV2Retryable( subscribe_func := func() (event.Subscription, error) { return serviceManager.WatchNewBatchV2(opts, newTaskCreatedChan, batchMerkleRoot) } - return retry.RetryWithData(subscribe_func, retry.MinDelay, retry.RetryFactor, retry.NumRetries, retry.MaxInterval, retry.MaxElapsedTime) + return retry.RetryWithData(subscribe_func, retry.DefaultRetryConfig()) } /* @@ -224,5 +225,5 @@ func SubscribeToNewTasksV3Retryable( subscribe_func := func() (event.Subscription, error) { return serviceManager.WatchNewBatchV3(opts, newTaskCreatedChan, batchMerkleRoot) } - return retry.RetryWithData(subscribe_func, retry.MinDelay, retry.RetryFactor, retry.NumRetries, retry.MaxInterval, retry.MaxElapsedTime) + return retry.RetryWithData(subscribe_func, retry.DefaultRetryConfig()) } diff --git a/core/retry_test.go b/core/retry_test.go index 31c4af554a..7f68c37cea 100644 --- a/core/retry_test.go +++ b/core/retry_test.go @@ -170,7 +170,7 @@ func TestWaitForTransactionReceipt(t *testing.T) { } // Assert Call succeeds when Anvil running - _, err = utils.WaitForTransactionReceiptRetryable(*client, *client, hash, time.Second*45) + _, err = utils.WaitForTransactionReceiptRetryable(*client, *client, hash, retry.DefaultRetryConfig()) assert.NotNil(t, err, "Error Waiting for Transaction with Anvil Running: %s\n", err) if !strings.Contains(err.Error(), "not found") { t.Errorf("WaitForTransactionReceipt Emitted incorrect error: %s\n", err) @@ -182,7 +182,7 @@ func TestWaitForTransactionReceipt(t *testing.T) { return } - _, err = utils.WaitForTransactionReceiptRetryable(*client, *client, hash, time.Second*45) + _, err = utils.WaitForTransactionReceiptRetryable(*client, *client, hash, retry.DefaultRetryConfig()) assert.NotNil(t, err) if _, ok := err.(retry.PermanentError); ok { t.Errorf("WaitForTransactionReceipt Emitted non Transient error: %s\n", err) @@ -198,7 +198,7 @@ func TestWaitForTransactionReceipt(t *testing.T) { t.Errorf("Error setting up Anvil: %s\n", err) } - _, err = utils.WaitForTransactionReceiptRetryable(*client, *client, hash, time.Second*45) + _, err = utils.WaitForTransactionReceiptRetryable(*client, *client, hash, retry.DefaultRetryConfig()) assert.NotNil(t, err) if !strings.Contains(err.Error(), "not found") { t.Errorf("WaitForTransactionReceipt Emitted incorrect error: %s\n", err) From 54781acbfe73aa73862f4d6be026422050e9ca63 Mon Sep 17 00:00:00 2001 From: PatStiles Date: Thu, 14 Nov 2024 12:50:11 -0300 Subject: [PATCH 03/25] Add missing comments + retryable markers --- aggregator/pkg/server.go | 14 +++++++++----- core/chainio/retryable.go | 3 +-- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/aggregator/pkg/server.go b/aggregator/pkg/server.go index f56d0e64a8..d524e9e000 100644 --- a/aggregator/pkg/server.go +++ b/aggregator/pkg/server.go @@ -54,7 +54,7 @@ func (agg *Aggregator) ProcessOperatorSignedTaskResponseV2(signedTaskResponse *t taskIndex := uint32(0) // TODO: Add Retryable - taskIndex, err := agg.GetTaskIndex(signedTaskResponse.BatchIdentifierHash) + taskIndex, err := agg.GetTaskIndexRetryable(signedTaskResponse.BatchIdentifierHash) if err != nil { agg.logger.Warn("Task not found in the internal map, operator signature will be lost. Batch may not reach quorum") @@ -73,7 +73,7 @@ func (agg *Aggregator) ProcessOperatorSignedTaskResponseV2(signedTaskResponse *t agg.logger.Info("Starting bls signature process") go func() { - err := agg.ProcessNewSignature( + err := agg.ProcessNewSignatureRetryable( context.Background(), taskIndex, signedTaskResponse.BatchIdentifierHash, &signedTaskResponse.BlsSignature, signedTaskResponse.OperatorId, ) @@ -125,7 +125,7 @@ func (agg *Aggregator) ServerRunning(_ *struct{}, reply *int64) error { - Retry times (3 retries): 12 sec (1 Blocks), 24 sec (2 Blocks), 48 sec (4 Blocks) - NOTE: TaskNotFound errors from the BLS Aggregation service are Transient errors as block reorg's may lead to these errors being thrown. */ -func (agg *Aggregator) ProcessNewSignature(ctx context.Context, taskIndex uint32, taskResponse interface{}, blsSignature *bls.Signature, operatorId eigentypes.Bytes32) error { +func (agg *Aggregator) ProcessNewSignatureRetryable(ctx context.Context, taskIndex uint32, taskResponse interface{}, blsSignature *bls.Signature, operatorId eigentypes.Bytes32) error { processNewSignature_func := func() error { err := agg.blsAggregationService.ProcessNewSignature( ctx, taskIndex, taskResponse, @@ -142,8 +142,12 @@ func (agg *Aggregator) ProcessNewSignature(ctx context.Context, taskIndex uint32 return retry.Retry(processNewSignature_func, retry.ChainRetryConfig()) } -// TODO: Add Retryable + Comment -func (agg *Aggregator) GetTaskIndex(batchIdentifierHash [32]byte) (uint32, error) { +// Checks Internal mapping for Signed Task Response, returns its TaskIndex. +/* +- All errors are considered Transient Errors +- Retry times (3 retries): 1 sec, 2 sec, 4 sec +*/ +func (agg *Aggregator) GetTaskIndexRetryable(batchIdentifierHash [32]byte) (uint32, error) { getTaskIndex_func := func() (uint32, error) { agg.taskMutex.Lock() agg.AggregatorConfig.BaseConfig.Logger.Info("- Locked Resources: Starting processing of Response") diff --git a/core/chainio/retryable.go b/core/chainio/retryable.go index 1a54e6ceb8..425a70eb10 100644 --- a/core/chainio/retryable.go +++ b/core/chainio/retryable.go @@ -32,8 +32,7 @@ func (w *AvsWriter) RespondToTaskV2Retryable(opts *bind.TransactOpts, batchMerkl } return tx, err } - //return retry.RetryWithData(respondToTaskV2_func, retry.MinDelayChain, retry.RetryFactor, retry.NumRetries, retry.MaxIntervalChain, retry.MaxElapsedTime) - return retry.RetryWithData(respondToTaskV2_func, retry.DefaultRetryConfig()) + return retry.RetryWithData(respondToTaskV2_func, retry.ChainRetryConfig()) } /* From 85921cde68235f34a454b054517aea034097732d Mon Sep 17 00:00:00 2001 From: PatStiles Date: Thu, 14 Nov 2024 14:11:57 -0300 Subject: [PATCH 04/25] separate retryable logic from functions --- core/chainio/retryable.go | 177 +++++++++++++++++++++++++------------- 1 file changed, 119 insertions(+), 58 deletions(-) diff --git a/core/chainio/retryable.go b/core/chainio/retryable.go index 425a70eb10..75fb942445 100644 --- a/core/chainio/retryable.go +++ b/core/chainio/retryable.go @@ -15,14 +15,7 @@ import ( // |---AVS_WRITER---| -/* -RespondToTaskV2Retryable -Send a transaction to the AVS contract to respond to a task. -- All errors are considered Transient Errors -- Retry times (3 retries): 12 sec (1 Blocks), 24 sec (2 Blocks), 48 sec (4 Blocks) -- NOTE: Contract call reverts are not considered `PermanentError`'s as block reorg's may lead to contract call revert in which case the aggregator should retry. -*/ -func (w *AvsWriter) RespondToTaskV2Retryable(opts *bind.TransactOpts, batchMerkleRoot [32]byte, senderAddress common.Address, nonSignerStakesAndSignature servicemanager.IBLSSignatureCheckerNonSignerStakesAndSignature) (*types.Transaction, error) { +func RespondToTaskV2(w *AvsWriter, opts *bind.TransactOpts, batchMerkleRoot [32]byte, senderAddress common.Address, nonSignerStakesAndSignature servicemanager.IBLSSignatureCheckerNonSignerStakesAndSignature) func() (*types.Transaction, error) { respondToTaskV2_func := func() (*types.Transaction, error) { // Try with main connection tx, err := w.AvsContractBindings.ServiceManager.RespondToTaskV2(opts, batchMerkleRoot, senderAddress, nonSignerStakesAndSignature) @@ -32,21 +25,25 @@ func (w *AvsWriter) RespondToTaskV2Retryable(opts *bind.TransactOpts, batchMerkl } return tx, err } - return retry.RetryWithData(respondToTaskV2_func, retry.ChainRetryConfig()) + return respondToTaskV2_func } /* -BatchesStateRetryable -Get the state of a batch from the AVS contract. +RespondToTaskV2Retryable +Send a transaction to the AVS contract to respond to a task. - All errors are considered Transient Errors -- Retry times (3 retries): 1 sec, 2 sec, 4 sec +- Retry times (3 retries): 12 sec (1 Blocks), 24 sec (2 Blocks), 48 sec (4 Blocks) +- NOTE: Contract call reverts are not considered `PermanentError`'s as block reorg's may lead to contract call revert in which case the aggregator should retry. */ -func (w *AvsWriter) BatchesStateRetryable(opts *bind.CallOpts, arg0 [32]byte) (struct { +func (w *AvsWriter) RespondToTaskV2Retryable(opts *bind.TransactOpts, batchMerkleRoot [32]byte, senderAddress common.Address, nonSignerStakesAndSignature servicemanager.IBLSSignatureCheckerNonSignerStakesAndSignature) (*types.Transaction, error) { + return retry.RetryWithData(RespondToTaskV2(w, opts, batchMerkleRoot, senderAddress, nonSignerStakesAndSignature), retry.ChainRetryConfig()) +} + +func BatchesState(w *AvsWriter, opts *bind.CallOpts, arg0 [32]byte) func() (struct { TaskCreatedBlock uint32 Responded bool RespondToTaskFeeLimit *big.Int }, error) { - batchesState_func := func() (struct { TaskCreatedBlock uint32 Responded bool @@ -60,16 +57,24 @@ func (w *AvsWriter) BatchesStateRetryable(opts *bind.CallOpts, arg0 [32]byte) (s } return state, err } - return retry.RetryWithData(batchesState_func, retry.DefaultRetryConfig()) + return batchesState_func } /* -BatcherBalancesRetryable -Get the balance of a batcher from the AVS contract. +BatchesStateRetryable +Get the state of a batch from the AVS contract. - All errors are considered Transient Errors - Retry times (3 retries): 1 sec, 2 sec, 4 sec */ -func (w *AvsWriter) BatcherBalancesRetryable(opts *bind.CallOpts, senderAddress common.Address) (*big.Int, error) { +func (w *AvsWriter) BatchesStateRetryable(opts *bind.CallOpts, arg0 [32]byte) (struct { + TaskCreatedBlock uint32 + Responded bool + RespondToTaskFeeLimit *big.Int +}, error) { + return retry.RetryWithData(BatchesState(w, opts, arg0), retry.DefaultRetryConfig()) +} + +func BatcherBalances(w *AvsWriter, opts *bind.CallOpts, senderAddress common.Address) func() (*big.Int, error) { batcherBalances_func := func() (*big.Int, error) { // Try with main connection batcherBalance, err := w.AvsContractBindings.ServiceManager.BatchersBalances(opts, senderAddress) @@ -79,18 +84,20 @@ func (w *AvsWriter) BatcherBalancesRetryable(opts *bind.CallOpts, senderAddress } return batcherBalance, err } - return retry.RetryWithData(batcherBalances_func, retry.DefaultRetryConfig()) + return batcherBalances_func } /* -BalanceAtRetryable -Get the balance of aggregatorAddress at blockNumber. -If blockNumber is nil, it gets the latest balance. -TODO: it gets the balance from an Address, not necessarily an aggregator. The name of the parameter should be changed. +BatcherBalancesRetryable +Get the balance of a batcher from the AVS contract. - All errors are considered Transient Errors -- Retry times (3 retries): 1 sec, 2 sec, 4 sec. +- Retry times (3 retries): 1 sec, 2 sec, 4 sec */ -func (w *AvsWriter) BalanceAtRetryable(ctx context.Context, aggregatorAddress common.Address, blockNumber *big.Int) (*big.Int, error) { +func (w *AvsWriter) BatcherBalancesRetryable(opts *bind.CallOpts, senderAddress common.Address) (*big.Int, error) { + return retry.RetryWithData(BatcherBalances(w, opts, senderAddress), retry.DefaultRetryConfig()) +} + +func BalanceAt(w *AvsWriter, ctx context.Context, aggregatorAddress common.Address, blockNumber *big.Int) func() (*big.Int, error) { balanceAt_func := func() (*big.Int, error) { // Try with main connection aggregatorBalance, err := w.Client.BalanceAt(ctx, aggregatorAddress, blockNumber) @@ -100,18 +107,24 @@ func (w *AvsWriter) BalanceAtRetryable(ctx context.Context, aggregatorAddress co } return aggregatorBalance, err } - return retry.RetryWithData(balanceAt_func, retry.DefaultRetryConfig()) + return balanceAt_func } -// |---AVS_SUBSCRIBER---| - /* -BlockNumberRetryable -Get the latest block number from Ethereum +BalanceAtRetryable +Get the balance of aggregatorAddress at blockNumber. +If blockNumber is nil, it gets the latest balance. +TODO: it gets the balance from an Address, not necessarily an aggregator. The name of the parameter should be changed. - All errors are considered Transient Errors - Retry times (3 retries): 1 sec, 2 sec, 4 sec. */ -func (s *AvsSubscriber) BlockNumberRetryable(ctx context.Context) (uint64, error) { +func (w *AvsWriter) BalanceAtRetryable(ctx context.Context, aggregatorAddress common.Address, blockNumber *big.Int) (*big.Int, error) { + return retry.RetryWithData(BalanceAt(w, ctx, aggregatorAddress, blockNumber), retry.DefaultRetryConfig()) +} + +// |---AVS_SUBSCRIBER---| + +func BlockNumber(s *AvsSubscriber, ctx context.Context) func() (uint64, error) { latestBlock_func := func() (uint64, error) { // Try with main connection latestBlock, err := s.AvsContractBindings.ethClient.BlockNumber(ctx) @@ -121,42 +134,54 @@ func (s *AvsSubscriber) BlockNumberRetryable(ctx context.Context) (uint64, error } return latestBlock, err } - return retry.RetryWithData(latestBlock_func, retry.DefaultRetryConfig()) + return latestBlock_func } /* -FilterBatchV2Retryable -Get NewBatchV2 logs from the AVS contract. +BlockNumberRetryable +Get the latest block number from Ethereum - All errors are considered Transient Errors - Retry times (3 retries): 1 sec, 2 sec, 4 sec. */ -func (s *AvsSubscriber) FilterBatchV2Retryable(opts *bind.FilterOpts, batchMerkleRoot [][32]byte) (*servicemanager.ContractAlignedLayerServiceManagerNewBatchV2Iterator, error) { +func (s *AvsSubscriber) BlockNumberRetryable(ctx context.Context) (uint64, error) { + return retry.RetryWithData(BlockNumber(s, ctx), retry.DefaultRetryConfig()) +} + +func FilterBatchV2(s *AvsSubscriber, opts *bind.FilterOpts, batchMerkleRoot [][32]byte) func() (*servicemanager.ContractAlignedLayerServiceManagerNewBatchV2Iterator, error) { filterNewBatchV2_func := func() (*servicemanager.ContractAlignedLayerServiceManagerNewBatchV2Iterator, error) { return s.AvsContractBindings.ServiceManager.FilterNewBatchV2(opts, batchMerkleRoot) } - return retry.RetryWithData(filterNewBatchV2_func, retry.DefaultRetryConfig()) + return filterNewBatchV2_func } /* -FilterBatchV3Retryable -Get NewBatchV3 logs from the AVS contract. +FilterBatchV2Retryable +Get NewBatchV2 logs from the AVS contract. - All errors are considered Transient Errors - Retry times (3 retries): 1 sec, 2 sec, 4 sec. */ -func (s *AvsSubscriber) FilterBatchV3Retryable(opts *bind.FilterOpts, batchMerkleRoot [][32]byte) (*servicemanager.ContractAlignedLayerServiceManagerNewBatchV3Iterator, error) { - filterNewBatchV2_func := func() (*servicemanager.ContractAlignedLayerServiceManagerNewBatchV3Iterator, error) { +func (s *AvsSubscriber) FilterBatchV2Retryable(opts *bind.FilterOpts, batchMerkleRoot [][32]byte) (*servicemanager.ContractAlignedLayerServiceManagerNewBatchV2Iterator, error) { + return retry.RetryWithData(FilterBatchV2(s, opts, batchMerkleRoot), retry.DefaultRetryConfig()) +} + +func FilterBatchV3(s *AvsSubscriber, opts *bind.FilterOpts, batchMerkleRoot [][32]byte) func() (*servicemanager.ContractAlignedLayerServiceManagerNewBatchV3Iterator, error) { + filterNewBatchV3_func := func() (*servicemanager.ContractAlignedLayerServiceManagerNewBatchV3Iterator, error) { return s.AvsContractBindings.ServiceManager.FilterNewBatchV3(opts, batchMerkleRoot) } - return retry.RetryWithData(filterNewBatchV2_func, retry.DefaultRetryConfig()) + return filterNewBatchV3_func } /* -BatchesStateRetryable -Get the state of a batch from the AVS contract. +FilterBatchV3Retryable +Get NewBatchV3 logs from the AVS contract. - All errors are considered Transient Errors -- Retry times (3 retries): 1 sec, 2 sec, 4 sec +- Retry times (3 retries): 1 sec, 2 sec, 4 sec. */ -func (s *AvsSubscriber) BatchesStateRetryable(opts *bind.CallOpts, arg0 [32]byte) (struct { +func (s *AvsSubscriber) FilterBatchV3Retryable(opts *bind.FilterOpts, batchMerkleRoot [][32]byte) (*servicemanager.ContractAlignedLayerServiceManagerNewBatchV3Iterator, error) { + return retry.RetryWithData(FilterBatchV3(s, opts, batchMerkleRoot), retry.DefaultRetryConfig()) +} + +func BatchState(s *AvsSubscriber, opts *bind.CallOpts, arg0 [32]byte) func() (struct { TaskCreatedBlock uint32 Responded bool RespondToTaskFeeLimit *big.Int @@ -168,17 +193,25 @@ func (s *AvsSubscriber) BatchesStateRetryable(opts *bind.CallOpts, arg0 [32]byte }, error) { return s.AvsContractBindings.ServiceManager.ContractAlignedLayerServiceManagerCaller.BatchesState(opts, arg0) } - - return retry.RetryWithData(batchState_func, retry.DefaultRetryConfig()) + return batchState_func } /* -SubscribeNewHeadRetryable -Subscribe to new heads from the Ethereum node. +BatchesStateRetryable +Get the state of a batch from the AVS contract. - All errors are considered Transient Errors -- Retry times (3 retries): 1 sec, 2 sec, 4 sec. +- Retry times (3 retries): 1 sec, 2 sec, 4 sec */ -func (s *AvsSubscriber) SubscribeNewHeadRetryable(ctx context.Context, c chan<- *types.Header) (ethereum.Subscription, error) { +func (s *AvsSubscriber) BatchesStateRetryable(opts *bind.CallOpts, arg0 [32]byte) (struct { + TaskCreatedBlock uint32 + Responded bool + RespondToTaskFeeLimit *big.Int +}, error) { + + return retry.RetryWithData(BatchState(s, opts, arg0), retry.DefaultRetryConfig()) +} + +func SubscribeNewHead(s *AvsSubscriber, ctx context.Context, c chan<- *types.Header) func() (ethereum.Subscription, error) { subscribeNewHead_func := func() (ethereum.Subscription, error) { // Try with main connection sub, err := s.AvsContractBindings.ethClient.SubscribeNewHead(ctx, c) @@ -188,7 +221,29 @@ func (s *AvsSubscriber) SubscribeNewHeadRetryable(ctx context.Context, c chan<- } return sub, err } - return retry.RetryWithData(subscribeNewHead_func, retry.DefaultRetryConfig()) + return subscribeNewHead_func +} + +/* +SubscribeNewHeadRetryable +Subscribe to new heads from the Ethereum node. +- All errors are considered Transient Errors +- Retry times (3 retries): 1 sec, 2 sec, 4 sec. +*/ +func (s *AvsSubscriber) SubscribeNewHeadRetryable(ctx context.Context, c chan<- *types.Header) (ethereum.Subscription, error) { + return retry.RetryWithData(SubscribeNewHead(s, ctx, c), retry.DefaultRetryConfig()) +} + +func SubscribeToNewTasksV2( + opts *bind.WatchOpts, + serviceManager *servicemanager.ContractAlignedLayerServiceManager, + newTaskCreatedChan chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV2, + batchMerkleRoot [][32]byte, +) func() (event.Subscription, error) { + subscribe_func := func() (event.Subscription, error) { + return serviceManager.WatchNewBatchV2(opts, newTaskCreatedChan, batchMerkleRoot) + } + return subscribe_func } /* @@ -203,10 +258,19 @@ func SubscribeToNewTasksV2Retryable( newTaskCreatedChan chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV2, batchMerkleRoot [][32]byte, ) (event.Subscription, error) { + return retry.RetryWithData(SubscribeToNewTasksV2(opts, serviceManager, newTaskCreatedChan, batchMerkleRoot), retry.DefaultRetryConfig()) +} + +func SubscribeToNewTasksV3( + opts *bind.WatchOpts, + serviceManager *servicemanager.ContractAlignedLayerServiceManager, + newTaskCreatedChan chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV3, + batchMerkleRoot [][32]byte, +) func() (event.Subscription, error) { subscribe_func := func() (event.Subscription, error) { - return serviceManager.WatchNewBatchV2(opts, newTaskCreatedChan, batchMerkleRoot) + return serviceManager.WatchNewBatchV3(opts, newTaskCreatedChan, batchMerkleRoot) } - return retry.RetryWithData(subscribe_func, retry.DefaultRetryConfig()) + return subscribe_func } /* @@ -221,8 +285,5 @@ func SubscribeToNewTasksV3Retryable( newTaskCreatedChan chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV3, batchMerkleRoot [][32]byte, ) (event.Subscription, error) { - subscribe_func := func() (event.Subscription, error) { - return serviceManager.WatchNewBatchV3(opts, newTaskCreatedChan, batchMerkleRoot) - } - return retry.RetryWithData(subscribe_func, retry.DefaultRetryConfig()) + return retry.RetryWithData(SubscribeToNewTasksV3(opts, serviceManager, newTaskCreatedChan, batchMerkleRoot), retry.DefaultRetryConfig()) } From c3124c3dee87364319fd594deccd7da43bba4521 Mon Sep 17 00:00:00 2001 From: PatStiles Date: Thu, 14 Nov 2024 15:08:11 -0300 Subject: [PATCH 05/25] refactor tests --- core/retry_test.go | 119 +++++++++++++++++++++------------ core/utils/eth_client_utils.go | 24 ++++--- 2 files changed, 92 insertions(+), 51 deletions(-) diff --git a/core/retry_test.go b/core/retry_test.go index 7f68c37cea..666fd953fc 100644 --- a/core/retry_test.go +++ b/core/retry_test.go @@ -170,7 +170,8 @@ func TestWaitForTransactionReceipt(t *testing.T) { } // Assert Call succeeds when Anvil running - _, err = utils.WaitForTransactionReceiptRetryable(*client, *client, hash, retry.DefaultRetryConfig()) + receipt_function := utils.WaitForTransactionReceipt(*client, *client, hash, retry.DefaultRetryConfig()) + _, err = receipt_function() assert.NotNil(t, err, "Error Waiting for Transaction with Anvil Running: %s\n", err) if !strings.Contains(err.Error(), "not found") { t.Errorf("WaitForTransactionReceipt Emitted incorrect error: %s\n", err) @@ -182,7 +183,8 @@ func TestWaitForTransactionReceipt(t *testing.T) { return } - _, err = utils.WaitForTransactionReceiptRetryable(*client, *client, hash, retry.DefaultRetryConfig()) + receipt_function = utils.WaitForTransactionReceipt(*client, *client, hash, retry.DefaultRetryConfig()) + _, err = receipt_function() assert.NotNil(t, err) if _, ok := err.(retry.PermanentError); ok { t.Errorf("WaitForTransactionReceipt Emitted non Transient error: %s\n", err) @@ -198,7 +200,8 @@ func TestWaitForTransactionReceipt(t *testing.T) { t.Errorf("Error setting up Anvil: %s\n", err) } - _, err = utils.WaitForTransactionReceiptRetryable(*client, *client, hash, retry.DefaultRetryConfig()) + receipt_function = utils.WaitForTransactionReceipt(*client, *client, hash, retry.DefaultRetryConfig()) + _, err = receipt_function() assert.NotNil(t, err) if !strings.Contains(err.Error(), "not found") { t.Errorf("WaitForTransactionReceipt Emitted incorrect error: %s\n", err) @@ -356,7 +359,8 @@ func TestSubscribeToNewTasksV3(t *testing.T) { t.Errorf("Error setting up Avs Service Bindings: %s\n", err) } - _, err = chainio.SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.ServiceManager, channel, nil) + sub_func := chainio.SubscribeToNewTasksV3(&bind.WatchOpts{}, s.ServiceManager, channel, nil) + _, err = sub_func() assert.Nil(t, err) if err := cmd.Process.Kill(); err != nil { @@ -364,13 +368,14 @@ func TestSubscribeToNewTasksV3(t *testing.T) { return } - _, err = chainio.SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.ServiceManager, channel, nil) + sub_func = chainio.SubscribeToNewTasksV3(&bind.WatchOpts{}, s.ServiceManager, channel, nil) + _, err = sub_func() assert.NotNil(t, err) if _, ok := err.(retry.PermanentError); ok { t.Errorf("SubscribeToNewTasksV3 Emitted non Transient error: %s\n", err) return } - if !strings.Contains(err.Error(), "connect: connection refused") { + if !strings.Contains(err.Error(), "connection reset") { t.Errorf("SubscribeToNewTasksV3 Emitted non Transient error: %s\n", err) return } @@ -380,7 +385,8 @@ func TestSubscribeToNewTasksV3(t *testing.T) { t.Errorf("Error setting up Anvil: %s\n", err) } - _, err = chainio.SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.ServiceManager, channel, nil) + sub_func = chainio.SubscribeToNewTasksV3(&bind.WatchOpts{}, s.ServiceManager, channel, nil) + _, err = sub_func() assert.Nil(t, err) if err := cmd.Process.Kill(); err != nil { @@ -406,7 +412,8 @@ func TestSubscribeToNewTasksV2(t *testing.T) { t.Errorf("Error setting up Avs Service Bindings: %s\n", err) } - _, err = chainio.SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.ServiceManager, channel, nil) + sub_func := chainio.SubscribeToNewTasksV2(&bind.WatchOpts{}, s.ServiceManager, channel, nil) + _, err = sub_func() assert.Nil(t, err) if err := cmd.Process.Kill(); err != nil { @@ -414,13 +421,15 @@ func TestSubscribeToNewTasksV2(t *testing.T) { return } - _, err = chainio.SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.ServiceManager, channel, nil) + sub_func = chainio.SubscribeToNewTasksV2(&bind.WatchOpts{}, s.ServiceManager, channel, nil) + _, err = sub_func() + assert.NotNil(t, err) if _, ok := err.(retry.PermanentError); ok { t.Errorf("SubscribeToNewTasksV2 Emitted non Transient error: %s\n", err) return } - if !strings.Contains(err.Error(), "connect: connection refused") { + if !strings.Contains(err.Error(), "connection reset") { t.Errorf("SubscribeToNewTasksV2 Emitted non Transient error: %s\n", err) return } @@ -430,7 +439,8 @@ func TestSubscribeToNewTasksV2(t *testing.T) { t.Errorf("Error setting up Anvil: %s\n", err) } - _, err = chainio.SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.ServiceManager, channel, nil) + sub_func = chainio.SubscribeToNewTasksV2(&bind.WatchOpts{}, s.ServiceManager, channel, nil) + _, err = sub_func() assert.Nil(t, err) if err := cmd.Process.Kill(); err != nil { @@ -451,7 +461,8 @@ func TestBlockNumber(t *testing.T) { if err != nil { return } - _, err = sub.BlockNumberRetryable(context.Background()) + block_func := chainio.BlockNumber(sub, context.Background()) + _, err = block_func() assert.Nil(t, err) if err := cmd.Process.Kill(); err != nil { @@ -459,7 +470,8 @@ func TestBlockNumber(t *testing.T) { return } - _, err = sub.BlockNumberRetryable(context.Background()) + block_func = chainio.BlockNumber(sub, context.Background()) + _, err = block_func() assert.NotNil(t, err) if _, ok := err.(retry.PermanentError); ok { t.Errorf("BlockNumber Emitted non Transient error: %s\n", err) @@ -475,7 +487,8 @@ func TestBlockNumber(t *testing.T) { t.Errorf("Error setting up Anvil: %s\n", err) } - _, err = sub.BlockNumberRetryable(context.Background()) + block_func = chainio.BlockNumber(sub, context.Background()) + _, err = block_func() assert.Nil(t, err) if err := cmd.Process.Kill(); err != nil { @@ -495,7 +508,8 @@ func TestFilterBatchV2(t *testing.T) { if err != nil { return } - _, err = avsSubscriber.FilterBatchV2Retryable(&bind.FilterOpts{Start: 0, End: nil, Context: context.Background()}, nil) + batch_func := chainio.FilterBatchV2(avsSubscriber, &bind.FilterOpts{Start: 0, End: nil, Context: context.Background()}, nil) + _, err = batch_func() assert.Nil(t, err) if err := cmd.Process.Kill(); err != nil { @@ -503,13 +517,14 @@ func TestFilterBatchV2(t *testing.T) { return } - _, err = avsSubscriber.FilterBatchV2Retryable(&bind.FilterOpts{Start: 0, End: nil, Context: context.Background()}, nil) + batch_func = chainio.FilterBatchV2(avsSubscriber, &bind.FilterOpts{Start: 0, End: nil, Context: context.Background()}, nil) + _, err = batch_func() assert.NotNil(t, err) if _, ok := err.(retry.PermanentError); ok { t.Errorf("FilterBatchV2 Emitted non Transient error: %s\n", err) return } - if !strings.Contains(err.Error(), "connect: connection refused") { + if !strings.Contains(err.Error(), "connection reset") { t.Errorf("FilterBatchV2 Emitted non Transient error: %s\n", err) return } @@ -519,7 +534,8 @@ func TestFilterBatchV2(t *testing.T) { t.Errorf("Error setting up Anvil: %s\n", err) } - _, err = avsSubscriber.FilterBatchV2Retryable(&bind.FilterOpts{Start: 0, End: nil, Context: context.Background()}, nil) + batch_func = chainio.FilterBatchV2(avsSubscriber, &bind.FilterOpts{Start: 0, End: nil, Context: context.Background()}, nil) + _, err = batch_func() assert.Nil(t, err) if err := cmd.Process.Kill(); err != nil { @@ -539,7 +555,8 @@ func TestFilterBatchV3(t *testing.T) { if err != nil { return } - _, err = avsSubscriber.FilterBatchV3Retryable(&bind.FilterOpts{Start: 0, End: nil, Context: context.Background()}, nil) + batch_func := chainio.FilterBatchV3(avsSubscriber, &bind.FilterOpts{Start: 0, End: nil, Context: context.Background()}, nil) + _, err = batch_func() assert.Nil(t, err) if err := cmd.Process.Kill(); err != nil { @@ -547,13 +564,14 @@ func TestFilterBatchV3(t *testing.T) { return } - _, err = avsSubscriber.FilterBatchV3Retryable(&bind.FilterOpts{Start: 0, End: nil, Context: context.Background()}, nil) + batch_func = chainio.FilterBatchV3(avsSubscriber, &bind.FilterOpts{Start: 0, End: nil, Context: context.Background()}, nil) + _, err = batch_func() assert.NotNil(t, err) if _, ok := err.(retry.PermanentError); ok { t.Errorf("FilerBatchV3 Emitted non Transient error: %s\n", err) return } - if !strings.Contains(err.Error(), "connect: connection refused") { + if !strings.Contains(err.Error(), "connection reset") { t.Errorf("FilterBatchV3 Emitted non Transient error: %s\n", err) return } @@ -563,7 +581,8 @@ func TestFilterBatchV3(t *testing.T) { t.Errorf("Error setting up Anvil: %s\n", err) } - _, err = avsSubscriber.FilterBatchV3Retryable(&bind.FilterOpts{Start: 0, End: nil, Context: context.Background()}, nil) + batch_func = chainio.FilterBatchV3(avsSubscriber, &bind.FilterOpts{Start: 0, End: nil, Context: context.Background()}, nil) + _, err = batch_func() assert.Nil(t, err) if err := cmd.Process.Kill(); err != nil { @@ -585,7 +604,8 @@ func TestBatchesStateSubscriber(t *testing.T) { } zero_bytes := [32]byte{} - _, err = avsSubscriber.BatchesStateRetryable(nil, zero_bytes) + batch_state_func := chainio.BatchState(avsSubscriber, nil, zero_bytes) + _, err = batch_state_func() assert.Nil(t, err) if err := cmd.Process.Kill(); err != nil { @@ -593,13 +613,14 @@ func TestBatchesStateSubscriber(t *testing.T) { return } - _, err = avsSubscriber.BatchesStateRetryable(nil, zero_bytes) + batch_state_func = chainio.BatchState(avsSubscriber, nil, zero_bytes) + _, err = batch_state_func() assert.NotNil(t, err) if _, ok := err.(retry.PermanentError); ok { t.Errorf("BatchesStateSubscriber Emitted non Transient error: %s\n", err) return } - if !strings.Contains(err.Error(), "connect: connection refused") { + if !strings.Contains(err.Error(), "connection reset") { t.Errorf("BatchesStateSubscriber Emitted non Transient error: %s\n", err) return } @@ -609,7 +630,8 @@ func TestBatchesStateSubscriber(t *testing.T) { t.Errorf("Error setting up Anvil: %s\n", err) } - _, err = avsSubscriber.BatchesStateRetryable(nil, zero_bytes) + batch_state_func = chainio.BatchState(avsSubscriber, nil, zero_bytes) + _, err = batch_state_func() assert.Nil(t, err) if err := cmd.Process.Kill(); err != nil { @@ -631,7 +653,8 @@ func TestSubscribeNewHead(t *testing.T) { return } - _, err = avsSubscriber.SubscribeNewHeadRetryable(context.Background(), c) + sub_func := chainio.SubscribeNewHead(avsSubscriber, context.Background(), c) + _, err = sub_func() assert.Nil(t, err) if err := cmd.Process.Kill(); err != nil { @@ -639,7 +662,8 @@ func TestSubscribeNewHead(t *testing.T) { return } - _, err = avsSubscriber.SubscribeNewHeadRetryable(context.Background(), c) + sub_func = chainio.SubscribeNewHead(avsSubscriber, context.Background(), c) + _, err = sub_func() assert.NotNil(t, err) if _, ok := err.(retry.PermanentError); ok { t.Errorf("SubscribeNewHead Emitted non Transient error: %s\n", err) @@ -655,7 +679,8 @@ func TestSubscribeNewHead(t *testing.T) { t.Errorf("Error setting up Anvil: %s\n", err) } - _, err = avsSubscriber.SubscribeNewHeadRetryable(context.Background(), c) + sub_func = chainio.SubscribeNewHead(avsSubscriber, context.Background(), c) + _, err = sub_func() assert.Nil(t, err) if err := cmd.Process.Kill(); err != nil { @@ -708,7 +733,8 @@ func TestRespondToTaskV2(t *testing.T) { zero_bytes := [32]byte{} // NOTE: With zero bytes the tx reverts - _, err = w.RespondToTaskV2Retryable(&txOpts, zero_bytes, aggregator_address, nonSignerStakesAndSignature) + resp_func := chainio.RespondToTaskV2(w, &txOpts, zero_bytes, aggregator_address, nonSignerStakesAndSignature) + _, err = resp_func() assert.NotNil(t, err) if !strings.Contains(err.Error(), "execution reverted") { t.Errorf("RespondToTaskV2 did not emit the expected message: %q doesn't contain %q", err.Error(), "execution reverted: custom error 0x2396d34e:") @@ -718,7 +744,8 @@ func TestRespondToTaskV2(t *testing.T) { t.Errorf("Error killing process: %v\n", err) } - _, err = w.RespondToTaskV2Retryable(&txOpts, zero_bytes, aggregator_address, nonSignerStakesAndSignature) + resp_func = chainio.RespondToTaskV2(w, &txOpts, zero_bytes, aggregator_address, nonSignerStakesAndSignature) + _, err = resp_func() assert.NotNil(t, err) if _, ok := err.(*backoff.PermanentError); ok { t.Errorf("RespondToTaskV2 Emitted non-Transient error: %s\n", err) @@ -733,7 +760,8 @@ func TestRespondToTaskV2(t *testing.T) { } // NOTE: With zero bytes the tx reverts - _, err = w.RespondToTaskV2Retryable(&txOpts, zero_bytes, aggregator_address, nonSignerStakesAndSignature) + resp_func = chainio.RespondToTaskV2(w, &txOpts, zero_bytes, aggregator_address, nonSignerStakesAndSignature) + _, err = resp_func() assert.NotNil(t, err) if !strings.Contains(err.Error(), "execution reverted") { t.Errorf("RespondToTaskV2 did not emit the expected message: %q doesn't contain %q", err.Error(), "execution reverted: custom error 0x2396d34e:") @@ -761,7 +789,8 @@ func TestBatchesStateWriter(t *testing.T) { var bytes [32]byte num.FillBytes(bytes[:]) - _, err = avsWriter.BatchesStateRetryable(&bind.CallOpts{}, bytes) + state_func := chainio.BatchesState(avsWriter, &bind.CallOpts{}, bytes) + _, err = state_func() assert.Nil(t, err) if err := cmd.Process.Kill(); err != nil { @@ -769,7 +798,8 @@ func TestBatchesStateWriter(t *testing.T) { return } - _, err = avsWriter.BatchesStateRetryable(&bind.CallOpts{}, bytes) + state_func = chainio.BatchesState(avsWriter, &bind.CallOpts{}, bytes) + _, err = state_func() assert.NotNil(t, err) if _, ok := err.(retry.PermanentError); ok { t.Errorf("BatchesStateWriter Emitted non-Transient error: %s\n", err) @@ -785,7 +815,8 @@ func TestBatchesStateWriter(t *testing.T) { t.Errorf("Error setting up Anvil: %s\n", err) } - _, err = avsWriter.BatchesStateRetryable(&bind.CallOpts{}, bytes) + state_func = chainio.BatchesState(avsWriter, &bind.CallOpts{}, bytes) + _, err = state_func() assert.Nil(t, err) if err := cmd.Process.Kill(); err != nil { @@ -808,7 +839,8 @@ func TestBalanceAt(t *testing.T) { aggregator_address := common.HexToAddress("0x0") blockHeight := big.NewInt(13) - _, err = avsWriter.BalanceAtRetryable(context.Background(), aggregator_address, blockHeight) + balance_func := chainio.BalanceAt(avsWriter, context.Background(), aggregator_address, blockHeight) + _, err = balance_func() assert.Nil(t, err) if err := cmd.Process.Kill(); err != nil { @@ -816,7 +848,8 @@ func TestBalanceAt(t *testing.T) { return } - _, err = avsWriter.BalanceAtRetryable(context.Background(), aggregator_address, blockHeight) + balance_func = chainio.BalanceAt(avsWriter, context.Background(), aggregator_address, blockHeight) + _, err = balance_func() assert.NotNil(t, err) if _, ok := err.(retry.PermanentError); ok { t.Errorf("BalanceAt Emitted non-Transient error: %s\n", err) @@ -832,7 +865,8 @@ func TestBalanceAt(t *testing.T) { t.Errorf("Error setting up Anvil: %s\n", err) } - _, err = avsWriter.BalanceAtRetryable(context.Background(), aggregator_address, blockHeight) + balance_func = chainio.BalanceAt(avsWriter, context.Background(), aggregator_address, blockHeight) + _, err = balance_func() assert.Nil(t, err) if err := cmd.Process.Kill(); err != nil { @@ -854,7 +888,8 @@ func TestBatchersBalances(t *testing.T) { } senderAddress := common.HexToAddress("0x0") - _, err = avsWriter.BatcherBalancesRetryable(&bind.CallOpts{}, senderAddress) + batcher_func := chainio.BatcherBalances(avsWriter, &bind.CallOpts{}, senderAddress) + _, err = batcher_func() assert.Nil(t, err) if err := cmd.Process.Kill(); err != nil { @@ -862,7 +897,8 @@ func TestBatchersBalances(t *testing.T) { return } - _, err = avsWriter.BatcherBalancesRetryable(&bind.CallOpts{}, senderAddress) + batcher_func = chainio.BatcherBalances(avsWriter, &bind.CallOpts{}, senderAddress) + _, err = batcher_func() assert.NotNil(t, err) if _, ok := err.(retry.PermanentError); ok { t.Errorf("BatchersBalances Emitted non-Transient error: %s\n", err) @@ -878,7 +914,8 @@ func TestBatchersBalances(t *testing.T) { t.Errorf("Error setting up Anvil: %s\n", err) } - _, err = avsWriter.BatcherBalancesRetryable(&bind.CallOpts{}, senderAddress) + batcher_func = chainio.BatcherBalances(avsWriter, &bind.CallOpts{}, senderAddress) + _, err = batcher_func() assert.Nil(t, err) if err := cmd.Process.Kill(); err != nil { diff --git a/core/utils/eth_client_utils.go b/core/utils/eth_client_utils.go index 065f25d9bb..884ff8962e 100644 --- a/core/utils/eth_client_utils.go +++ b/core/utils/eth_client_utils.go @@ -11,15 +11,7 @@ import ( retry "github.com/yetanotherco/aligned_layer/core" ) -// WaitForTransactionReceiptRetryable repeatedly attempts to fetch the transaction receipt for a given transaction hash. -// If the receipt is not found, the function will retry with exponential backoff until the specified `waitTimeout` duration is reached. -// If the receipt is still unavailable after `waitTimeout`, it will return an error. -// -// Note: The `time.Second * 2` is set as the max interval in the retry mechanism because we can't reliably measure the specific time the tx will be included in a block. -// Setting a higher value will imply doing less retries across the waitTimeout, and so we might lose the receipt -// All errors are considered Transient Errors -// - Retry times: 0.5s, 1s, 2s, 2s, 2s, ... until it reaches waitTimeout -func WaitForTransactionReceiptRetryable(client eth.InstrumentedClient, fallbackClient eth.InstrumentedClient, txHash gethcommon.Hash, config *retry.RetryConfig) (*types.Receipt, error) { +func WaitForTransactionReceipt(client eth.InstrumentedClient, fallbackClient eth.InstrumentedClient, txHash gethcommon.Hash, config *retry.RetryConfig) func() (*types.Receipt, error) { receipt_func := func() (*types.Receipt, error) { receipt, err := client.TransactionReceipt(context.Background(), txHash) if err != nil { @@ -31,7 +23,19 @@ func WaitForTransactionReceiptRetryable(client eth.InstrumentedClient, fallbackC } return receipt, nil } - return retry.RetryWithData(receipt_func, config) + return receipt_func +} + +// WaitForTransactionReceiptRetryable repeatedly attempts to fetch the transaction receipt for a given transaction hash. +// If the receipt is not found, the function will retry with exponential backoff until the specified `waitTimeout` duration is reached. +// If the receipt is still unavailable after `waitTimeout`, it will return an error. +// +// Note: The `time.Second * 2` is set as the max interval in the retry mechanism because we can't reliably measure the specific time the tx will be included in a block. +// Setting a higher value will imply doing less retries across the waitTimeout, and so we might lose the receipt +// All errors are considered Transient Errors +// - Retry times: 0.5s, 1s, 2s, 2s, 2s, ... until it reaches waitTimeout +func WaitForTransactionReceiptRetryable(client eth.InstrumentedClient, fallbackClient eth.InstrumentedClient, txHash gethcommon.Hash, config *retry.RetryConfig) (*types.Receipt, error) { + return retry.RetryWithData(WaitForTransactionReceipt(client, fallbackClient, txHash, config), config) } func BytesToQuorumNumbers(quorumNumbersBytes []byte) eigentypes.QuorumNums { From 62f1fffc86ae1a8da0311ef9a86a953bcf51145b Mon Sep 17 00:00:00 2001 From: PatStiles Date: Thu, 14 Nov 2024 15:18:31 -0300 Subject: [PATCH 06/25] rm cmts --- aggregator/pkg/aggregator.go | 1 - aggregator/pkg/server.go | 1 - core/chainio/avs_subscriber.go | 2 -- core/retry.go | 1 - 4 files changed, 5 deletions(-) diff --git a/aggregator/pkg/aggregator.go b/aggregator/pkg/aggregator.go index 97665c7fa2..daa1baf0e1 100644 --- a/aggregator/pkg/aggregator.go +++ b/aggregator/pkg/aggregator.go @@ -392,7 +392,6 @@ func (agg *Aggregator) AddNewTask(batchMerkleRoot [32]byte, senderAddress [20]by // |---RETRYABLE---| -// TODO: Add Retryable Label /* InitializeNewTask Initialize a new task in the BLS Aggregation service diff --git a/aggregator/pkg/server.go b/aggregator/pkg/server.go index d524e9e000..c2ec5752c3 100644 --- a/aggregator/pkg/server.go +++ b/aggregator/pkg/server.go @@ -53,7 +53,6 @@ func (agg *Aggregator) ProcessOperatorSignedTaskResponseV2(signedTaskResponse *t "operatorId", hex.EncodeToString(signedTaskResponse.OperatorId[:])) taskIndex := uint32(0) - // TODO: Add Retryable taskIndex, err := agg.GetTaskIndexRetryable(signedTaskResponse.BatchIdentifierHash) if err != nil { diff --git a/core/chainio/avs_subscriber.go b/core/chainio/avs_subscriber.go index bffb9eea96..436ae1a184 100644 --- a/core/chainio/avs_subscriber.go +++ b/core/chainio/avs_subscriber.go @@ -68,14 +68,12 @@ func (s *AvsSubscriber) SubscribeToNewTasksV2(newTaskCreatedChan chan *servicema // Subscribe to new tasks sub, err := SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil) if err != nil { - //TODO: Confirm these are accurate s.logger.Error("Primary failed to subscribe to new AlignedLayer V2 tasks after %d retries", retry.DefaultNumRetries, "err", err) return nil, err } subFallback, err := SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil) if err != nil { - //TODO: Confirm these are accurate s.logger.Error("Fallback failed to subscribe to new AlignedLayer V2 tasks after %d retries", retry.DefaultNumRetries, "err", err) return nil, err } diff --git a/core/retry.go b/core/retry.go index 33beb5b860..b5288e66a2 100644 --- a/core/retry.go +++ b/core/retry.go @@ -123,7 +123,6 @@ request retry_interval (12 sec) randomized_interval (0.5) randomized_int Reference: https://github.com/cenkalti/backoff/blob/v4/exponential.go#L9 */ -// TODO: Make config optional by using default but passing nil. // Same as Retry only that the functionToRetry can return a value upon correct execution func RetryWithData[T any](functionToRetry func() (T, error), config *RetryConfig) (T, error) { //func RetryWithData[T any](functionToRetry func() (T, error), minDelay time.Duration, factor float64, maxTries uint64, maxInterval time.Duration, maxElapsedTime time.Duration) (T, error) { From 4a3014bb937edeb1c72a7e727f64e4bdb62caa1e Mon Sep 17 00:00:00 2001 From: PatStiles Date: Tue, 19 Nov 2024 13:12:06 -0300 Subject: [PATCH 07/25] rm left over cmt --- aggregator/pkg/server.go | 1 - 1 file changed, 1 deletion(-) diff --git a/aggregator/pkg/server.go b/aggregator/pkg/server.go index c2ec5752c3..e36d1a2945 100644 --- a/aggregator/pkg/server.go +++ b/aggregator/pkg/server.go @@ -114,7 +114,6 @@ func (agg *Aggregator) ServerRunning(_ *struct{}, reply *int64) error { // |---RETRYABLE---| -// TODO: Add Retryable /* - Errors: Permanent: From 4d724720e107b318165c0b410a381ea5810e36ea Mon Sep 17 00:00:00 2001 From: PatStiles Date: Tue, 19 Nov 2024 13:20:51 -0300 Subject: [PATCH 08/25] mv waitForTxConfig --- core/chainio/avs_writer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/chainio/avs_writer.go b/core/chainio/avs_writer.go index 19b788aecf..cab7171f87 100644 --- a/core/chainio/avs_writer.go +++ b/core/chainio/avs_writer.go @@ -104,6 +104,7 @@ func (w *AvsWriter) SendAggregatedResponse(batchIdentifierHash [32]byte, batchMe waitForTxConfig := retry.DefaultRetryConfig() waitForTxConfig.MaxInterval = 2 * time.Second waitForTxConfig.NumRetries = 0 + waitForTxConfig.MaxElapsedTime = timeToWaitBeforeBump respondToTaskV2Func := func() (*types.Receipt, error) { gasPrice, err := utils.GetGasPriceRetryable(w.Client, w.ClientFallback) @@ -136,7 +137,6 @@ func (w *AvsWriter) SendAggregatedResponse(batchIdentifierHash [32]byte, batchMe return nil, err } - waitForTxConfig.MaxElapsedTime = timeToWaitBeforeBump receipt, err := utils.WaitForTransactionReceiptRetryable(w.Client, w.ClientFallback, tx.Hash(), waitForTxConfig) if receipt != nil { return receipt, nil From 44cb8445f9546cf2914794b319dbbbc89ef0af6d Mon Sep 17 00:00:00 2001 From: PatStiles Date: Tue, 19 Nov 2024 13:21:17 -0300 Subject: [PATCH 09/25] fix comment --- core/chainio/avs_writer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/chainio/avs_writer.go b/core/chainio/avs_writer.go index cab7171f87..b605324072 100644 --- a/core/chainio/avs_writer.go +++ b/core/chainio/avs_writer.go @@ -95,7 +95,7 @@ func (w *AvsWriter) SendAggregatedResponse(batchIdentifierHash [32]byte, batchMe txOpts.NoSend = false i := 0 - // Set Retry condfig for RespondToTaskV2 + // Set Retry config for RespondToTaskV2 respondToTaskV2Config := retry.DefaultRetryConfig() respondToTaskV2Config.NumRetries = 0 respondToTaskV2Config.MaxElapsedTime = 0 From 459ee8ccf9506a4a5fe83d4f3ba051c44815ef1c Mon Sep 17 00:00:00 2001 From: PatStiles Date: Tue, 19 Nov 2024 13:41:07 -0300 Subject: [PATCH 10/25] add fallback to FilterBatch --- core/chainio/retryable.go | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/core/chainio/retryable.go b/core/chainio/retryable.go index 75fb942445..ca118109f6 100644 --- a/core/chainio/retryable.go +++ b/core/chainio/retryable.go @@ -149,7 +149,11 @@ func (s *AvsSubscriber) BlockNumberRetryable(ctx context.Context) (uint64, error func FilterBatchV2(s *AvsSubscriber, opts *bind.FilterOpts, batchMerkleRoot [][32]byte) func() (*servicemanager.ContractAlignedLayerServiceManagerNewBatchV2Iterator, error) { filterNewBatchV2_func := func() (*servicemanager.ContractAlignedLayerServiceManagerNewBatchV2Iterator, error) { - return s.AvsContractBindings.ServiceManager.FilterNewBatchV2(opts, batchMerkleRoot) + logs, err := s.AvsContractBindings.ServiceManager.FilterNewBatchV2(opts, batchMerkleRoot) + if err != nil { + logs, err = s.AvsContractBindings.ServiceManagerFallback.FilterNewBatchV2(opts, batchMerkleRoot) + } + return logs, err } return filterNewBatchV2_func } @@ -166,7 +170,11 @@ func (s *AvsSubscriber) FilterBatchV2Retryable(opts *bind.FilterOpts, batchMerkl func FilterBatchV3(s *AvsSubscriber, opts *bind.FilterOpts, batchMerkleRoot [][32]byte) func() (*servicemanager.ContractAlignedLayerServiceManagerNewBatchV3Iterator, error) { filterNewBatchV3_func := func() (*servicemanager.ContractAlignedLayerServiceManagerNewBatchV3Iterator, error) { - return s.AvsContractBindings.ServiceManager.FilterNewBatchV3(opts, batchMerkleRoot) + logs, err := s.AvsContractBindings.ServiceManager.FilterNewBatchV3(opts, batchMerkleRoot) + if err != nil { + logs, err = s.AvsContractBindings.ServiceManagerFallback.FilterNewBatchV3(opts, batchMerkleRoot) + } + return logs, err } return filterNewBatchV3_func } From b4f982d65f86f623c61ea6ea4e2d4e3bb3b3564d Mon Sep 17 00:00:00 2001 From: PatStiles Date: Tue, 19 Nov 2024 13:44:05 -0300 Subject: [PATCH 11/25] add fallback to BatchState --- core/chainio/retryable.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/core/chainio/retryable.go b/core/chainio/retryable.go index ca118109f6..10e5ea2a65 100644 --- a/core/chainio/retryable.go +++ b/core/chainio/retryable.go @@ -199,7 +199,11 @@ func BatchState(s *AvsSubscriber, opts *bind.CallOpts, arg0 [32]byte) func() (st Responded bool RespondToTaskFeeLimit *big.Int }, error) { - return s.AvsContractBindings.ServiceManager.ContractAlignedLayerServiceManagerCaller.BatchesState(opts, arg0) + state, err := s.AvsContractBindings.ServiceManager.ContractAlignedLayerServiceManagerCaller.BatchesState(opts, arg0) + if err != nil { + state, err = s.AvsContractBindings.ServiceManagerFallback.BatchesState(opts, arg0) + } + return state, err } return batchState_func } From 8564934550d4b58e610f4551565718083e1f66cf Mon Sep 17 00:00:00 2001 From: PatStiles Date: Tue, 19 Nov 2024 14:20:14 -0300 Subject: [PATCH 12/25] fix test --- core/retry_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/retry_test.go b/core/retry_test.go index 666fd953fc..ff7f0ba956 100644 --- a/core/retry_test.go +++ b/core/retry_test.go @@ -524,7 +524,7 @@ func TestFilterBatchV2(t *testing.T) { t.Errorf("FilterBatchV2 Emitted non Transient error: %s\n", err) return } - if !strings.Contains(err.Error(), "connection reset") { + if !strings.Contains(err.Error(), "connection refused") { t.Errorf("FilterBatchV2 Emitted non Transient error: %s\n", err) return } @@ -571,7 +571,7 @@ func TestFilterBatchV3(t *testing.T) { t.Errorf("FilerBatchV3 Emitted non Transient error: %s\n", err) return } - if !strings.Contains(err.Error(), "connection reset") { + if !strings.Contains(err.Error(), "connection refused") { t.Errorf("FilterBatchV3 Emitted non Transient error: %s\n", err) return } @@ -620,7 +620,7 @@ func TestBatchesStateSubscriber(t *testing.T) { t.Errorf("BatchesStateSubscriber Emitted non Transient error: %s\n", err) return } - if !strings.Contains(err.Error(), "connection reset") { + if !strings.Contains(err.Error(), "connection refused") { t.Errorf("BatchesStateSubscriber Emitted non Transient error: %s\n", err) return } From f6bd5f5d4403ff1c8964e91c6d36a2aba7fcf27b Mon Sep 17 00:00:00 2001 From: PatStiles Date: Tue, 19 Nov 2024 14:32:55 -0300 Subject: [PATCH 13/25] NumRetries -> MaxNumRetries --- core/chainio/avs_subscriber.go | 4 ++-- core/chainio/avs_writer.go | 4 ++-- core/retry.go | 12 ++++++------ core/retry_test.go | 4 ++-- 4 files changed, 12 insertions(+), 12 deletions(-) diff --git a/core/chainio/avs_subscriber.go b/core/chainio/avs_subscriber.go index 436ae1a184..c2f5e918af 100644 --- a/core/chainio/avs_subscriber.go +++ b/core/chainio/avs_subscriber.go @@ -68,13 +68,13 @@ func (s *AvsSubscriber) SubscribeToNewTasksV2(newTaskCreatedChan chan *servicema // Subscribe to new tasks sub, err := SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil) if err != nil { - s.logger.Error("Primary failed to subscribe to new AlignedLayer V2 tasks after %d retries", retry.DefaultNumRetries, "err", err) + s.logger.Error("Primary failed to subscribe to new AlignedLayer V2 tasks after %d retries", retry.DefaultMaxNumRetries, "err", err) return nil, err } subFallback, err := SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil) if err != nil { - s.logger.Error("Fallback failed to subscribe to new AlignedLayer V2 tasks after %d retries", retry.DefaultNumRetries, "err", err) + s.logger.Error("Fallback failed to subscribe to new AlignedLayer V2 tasks after %d retries", retry.DefaultMaxNumRetries, "err", err) return nil, err } s.logger.Info("Subscribed to new AlignedLayer V2 tasks") diff --git a/core/chainio/avs_writer.go b/core/chainio/avs_writer.go index b605324072..7cfdf9d77d 100644 --- a/core/chainio/avs_writer.go +++ b/core/chainio/avs_writer.go @@ -97,13 +97,13 @@ func (w *AvsWriter) SendAggregatedResponse(batchIdentifierHash [32]byte, batchMe // Set Retry config for RespondToTaskV2 respondToTaskV2Config := retry.DefaultRetryConfig() - respondToTaskV2Config.NumRetries = 0 + respondToTaskV2Config.MaxNumRetries = 0 respondToTaskV2Config.MaxElapsedTime = 0 // Set Retry config for WaitForTxRetryable waitForTxConfig := retry.DefaultRetryConfig() waitForTxConfig.MaxInterval = 2 * time.Second - waitForTxConfig.NumRetries = 0 + waitForTxConfig.MaxNumRetries = 0 waitForTxConfig.MaxElapsedTime = timeToWaitBeforeBump respondToTaskV2Func := func() (*types.Receipt, error) { diff --git a/core/retry.go b/core/retry.go index b5288e66a2..fdca53b2e3 100644 --- a/core/retry.go +++ b/core/retry.go @@ -30,7 +30,7 @@ const ( DefaultMaxElapsedTime = 0 * time.Second // Maximum time all retries may take. `0` corresponds to no limit on the time of the retries. DefaultRandomizationFactor float64 = 0 // Randomization (Jitter) factor used to map retry interval to a range of values around the computed interval. In precise terms (random value in range [1 - randomizationfactor, 1 + randomizationfactor]). NOTE: This is set to 0 as we do not use jitter in Aligned. DefaultMultiplier float64 = 2 // Multiplier factor computed exponential retry interval is scaled by. - DefaultNumRetries uint64 = 3 // Total number of retries attempted. + DefaultMaxNumRetries uint64 = 3 // Total number of retries attempted. ChainInitialInterval = 12 * time.Second // Initial delay for retry interval for contract calls. Corresponds to 1 ethereum block. ChainMaxInterval = 2 * time.Minute // Maximum interval for an individual retry. ) @@ -41,7 +41,7 @@ type RetryConfig struct { MaxElapsedTime time.Duration // Maximum time all retries may take. `0` corresponds to no limit on the time of the retries. RandomizationFactor float64 Multiplier float64 - NumRetries uint64 + MaxNumRetries uint64 } func DefaultRetryConfig() *RetryConfig { @@ -51,7 +51,7 @@ func DefaultRetryConfig() *RetryConfig { MaxElapsedTime: DefaultMaxElapsedTime, RandomizationFactor: DefaultRandomizationFactor, Multiplier: DefaultMultiplier, - NumRetries: DefaultNumRetries, + MaxNumRetries: DefaultMaxNumRetries, } } @@ -62,7 +62,7 @@ func ChainRetryConfig() *RetryConfig { MaxElapsedTime: DefaultMaxElapsedTime, RandomizationFactor: DefaultRandomizationFactor, Multiplier: DefaultMultiplier, - NumRetries: DefaultNumRetries, + MaxNumRetries: DefaultMaxNumRetries, } } @@ -161,7 +161,7 @@ func RetryWithData[T any](functionToRetry func() (T, error), config *RetryConfig var maxRetriesBackoff backoff.BackOff if config.NumRetries > 0 { - maxRetriesBackoff = backoff.WithMaxRetries(expBackoff, config.NumRetries) + maxRetriesBackoff = backoff.WithMaxRetries(expBackoff, config.MaxNumRetries) } else { maxRetriesBackoff = expBackoff } @@ -208,7 +208,7 @@ func Retry(functionToRetry func() error, config *RetryConfig) error { var maxRetriesBackoff backoff.BackOff if config.NumRetries > 0 { - maxRetriesBackoff = backoff.WithMaxRetries(expBackoff, config.NumRetries) + maxRetriesBackoff = backoff.WithMaxRetries(expBackoff, config.MaxNumRetries) } else { maxRetriesBackoff = expBackoff } diff --git a/core/retry_test.go b/core/retry_test.go index ff7f0ba956..f5762b3f70 100644 --- a/core/retry_test.go +++ b/core/retry_test.go @@ -49,7 +49,7 @@ func TestRetryWithData(t *testing.T) { MaxElapsedTime: 3, RandomizationFactor: 0, Multiplier: retry.DefaultMultiplier, - NumRetries: retry.DefaultNumRetries, + MaxNumRetries: retry.DefaultMaxNumRetries, } _, err := retry.RetryWithData(function, config) if err != nil { @@ -68,7 +68,7 @@ func TestRetry(t *testing.T) { MaxElapsedTime: 3, RandomizationFactor: 0, Multiplier: retry.DefaultMultiplier, - NumRetries: retry.DefaultNumRetries, + MaxNumRetries: retry.DefaultMaxNumRetries, } err := retry.Retry(function, config) if err != nil { From 4612e2ad1f13d84f2998612151fa341ee5bb229b Mon Sep 17 00:00:00 2001 From: PatStiles Date: Tue, 19 Nov 2024 14:34:15 -0300 Subject: [PATCH 14/25] nit --- core/retry.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/retry.go b/core/retry.go index fdca53b2e3..5d70882a8b 100644 --- a/core/retry.go +++ b/core/retry.go @@ -160,7 +160,7 @@ func RetryWithData[T any](functionToRetry func() (T, error), config *RetryConfig expBackoff := backoff.NewExponentialBackOff(randomOption, multiplierOption, initialRetryOption, maxIntervalOption, maxElapsedTimeOption) var maxRetriesBackoff backoff.BackOff - if config.NumRetries > 0 { + if config.MaxNumRetries > 0 { maxRetriesBackoff = backoff.WithMaxRetries(expBackoff, config.MaxNumRetries) } else { maxRetriesBackoff = expBackoff @@ -207,7 +207,7 @@ func Retry(functionToRetry func() error, config *RetryConfig) error { expBackoff := backoff.NewExponentialBackOff(randomOption, multiplierOption, initialRetryOption, maxIntervalOption, maxElapsedTimeOption) var maxRetriesBackoff backoff.BackOff - if config.NumRetries > 0 { + if config.MaxNumRetries > 0 { maxRetriesBackoff = backoff.WithMaxRetries(expBackoff, config.MaxNumRetries) } else { maxRetriesBackoff = expBackoff From ff78ccc88fc53aa9ef87f91183b1589b64eafb45 Mon Sep 17 00:00:00 2001 From: PatStiles Date: Tue, 19 Nov 2024 14:51:30 -0300 Subject: [PATCH 15/25] grab block height from rpc client --- core/retry_test.go | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/core/retry_test.go b/core/retry_test.go index f5762b3f70..ae346e7c5a 100644 --- a/core/retry_test.go +++ b/core/retry_test.go @@ -826,7 +826,7 @@ func TestBatchesStateWriter(t *testing.T) { } func TestBalanceAt(t *testing.T) { - cmd, _, err := SetupAnvil(8545) + cmd, client, err := SetupAnvil(8545) if err != nil { t.Errorf("Error setting up Anvil: %s\n", err) } @@ -837,9 +837,14 @@ func TestBalanceAt(t *testing.T) { return } aggregator_address := common.HexToAddress("0x0") - blockHeight := big.NewInt(13) + // Fetch the latest block number + blockNumberUint64, err := client.BlockNumber(context.Background()) + blockNumber := new(big.Int).SetUint64(blockNumberUint64) + if err != nil { + log.Fatal(err) + } - balance_func := chainio.BalanceAt(avsWriter, context.Background(), aggregator_address, blockHeight) + balance_func := chainio.BalanceAt(avsWriter, context.Background(), aggregator_address, blockNumber) _, err = balance_func() assert.Nil(t, err) @@ -848,7 +853,7 @@ func TestBalanceAt(t *testing.T) { return } - balance_func = chainio.BalanceAt(avsWriter, context.Background(), aggregator_address, blockHeight) + balance_func = chainio.BalanceAt(avsWriter, context.Background(), aggregator_address, blockNumber) _, err = balance_func() assert.NotNil(t, err) if _, ok := err.(retry.PermanentError); ok { @@ -865,7 +870,7 @@ func TestBalanceAt(t *testing.T) { t.Errorf("Error setting up Anvil: %s\n", err) } - balance_func = chainio.BalanceAt(avsWriter, context.Background(), aggregator_address, blockHeight) + balance_func = chainio.BalanceAt(avsWriter, context.Background(), aggregator_address, blockNumber) _, err = balance_func() assert.Nil(t, err) From 9b51c3cb9cb41bf4384c46a9ab57b66bb486a066 Mon Sep 17 00:00:00 2001 From: PatStiles Date: Tue, 19 Nov 2024 14:58:57 -0300 Subject: [PATCH 16/25] fix log --- core/retry_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/retry_test.go b/core/retry_test.go index ae346e7c5a..8e7ac849fd 100644 --- a/core/retry_test.go +++ b/core/retry_test.go @@ -841,7 +841,7 @@ func TestBalanceAt(t *testing.T) { blockNumberUint64, err := client.BlockNumber(context.Background()) blockNumber := new(big.Int).SetUint64(blockNumberUint64) if err != nil { - log.Fatal(err) + t.Errorf("Error retrieving Anvil Block Number: %v\n", err) } balance_func := chainio.BalanceAt(avsWriter, context.Background(), aggregator_address, blockNumber) From 55a197e9f52754a5c669b78b071b2a7077033ae7 Mon Sep 17 00:00:00 2001 From: PatStiles Date: Tue, 19 Nov 2024 16:34:55 -0300 Subject: [PATCH 17/25] remove setting num retries --- core/chainio/avs_writer.go | 1 - 1 file changed, 1 deletion(-) diff --git a/core/chainio/avs_writer.go b/core/chainio/avs_writer.go index 3864022948..4ca310547f 100644 --- a/core/chainio/avs_writer.go +++ b/core/chainio/avs_writer.go @@ -95,7 +95,6 @@ func (w *AvsWriter) SendAggregatedResponse(batchIdentifierHash [32]byte, batchMe // Set Retry config for RespondToTaskV2 respondToTaskV2Config := retry.DefaultRetryConfig() - respondToTaskV2Config.MaxNumRetries = 0 respondToTaskV2Config.MaxElapsedTime = 0 // Set Retry config for WaitForTxRetryable From ea9affb78ee3d308d701f4d865cfb59e758b11d1 Mon Sep 17 00:00:00 2001 From: PatStiles Date: Tue, 19 Nov 2024 16:48:41 -0300 Subject: [PATCH 18/25] revert MaxNumRetries -> NumRetries, change default name to rpc --- aggregator/pkg/aggregator.go | 2 +- aggregator/pkg/server.go | 2 +- core/chainio/avs_subscriber.go | 4 ++-- core/chainio/avs_writer.go | 6 +++--- core/chainio/retryable.go | 20 ++++++++++---------- core/retry.go | 18 +++++++++--------- core/retry_test.go | 10 +++++----- core/utils/eth_client_utils.go | 2 +- 8 files changed, 32 insertions(+), 32 deletions(-) diff --git a/aggregator/pkg/aggregator.go b/aggregator/pkg/aggregator.go index fdb7c8b45a..c1eaa0d3af 100644 --- a/aggregator/pkg/aggregator.go +++ b/aggregator/pkg/aggregator.go @@ -419,7 +419,7 @@ func (agg *Aggregator) InitializeNewTask(batchIndex uint32, taskCreatedBlock uin } return err } - return retry.Retry(initializeNewTask_func, retry.DefaultRetryConfig()) + return retry.Retry(initializeNewTask_func, retry.RpcRetryConfig()) } // Long-lived goroutine that periodically checks and removes old Tasks from stored Maps diff --git a/aggregator/pkg/server.go b/aggregator/pkg/server.go index e36d1a2945..7b53420377 100644 --- a/aggregator/pkg/server.go +++ b/aggregator/pkg/server.go @@ -158,5 +158,5 @@ func (agg *Aggregator) GetTaskIndexRetryable(batchIdentifierHash [32]byte) (uint return taskIndex, nil } } - return retry.RetryWithData(getTaskIndex_func, retry.DefaultRetryConfig()) + return retry.RetryWithData(getTaskIndex_func, retry.RpcRetryConfig()) } diff --git a/core/chainio/avs_subscriber.go b/core/chainio/avs_subscriber.go index c2f5e918af..436ae1a184 100644 --- a/core/chainio/avs_subscriber.go +++ b/core/chainio/avs_subscriber.go @@ -68,13 +68,13 @@ func (s *AvsSubscriber) SubscribeToNewTasksV2(newTaskCreatedChan chan *servicema // Subscribe to new tasks sub, err := SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil) if err != nil { - s.logger.Error("Primary failed to subscribe to new AlignedLayer V2 tasks after %d retries", retry.DefaultMaxNumRetries, "err", err) + s.logger.Error("Primary failed to subscribe to new AlignedLayer V2 tasks after %d retries", retry.DefaultNumRetries, "err", err) return nil, err } subFallback, err := SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil) if err != nil { - s.logger.Error("Fallback failed to subscribe to new AlignedLayer V2 tasks after %d retries", retry.DefaultMaxNumRetries, "err", err) + s.logger.Error("Fallback failed to subscribe to new AlignedLayer V2 tasks after %d retries", retry.DefaultNumRetries, "err", err) return nil, err } s.logger.Info("Subscribed to new AlignedLayer V2 tasks") diff --git a/core/chainio/avs_writer.go b/core/chainio/avs_writer.go index 4ca310547f..1eb73e955b 100644 --- a/core/chainio/avs_writer.go +++ b/core/chainio/avs_writer.go @@ -94,13 +94,13 @@ func (w *AvsWriter) SendAggregatedResponse(batchIdentifierHash [32]byte, batchMe i := 0 // Set Retry config for RespondToTaskV2 - respondToTaskV2Config := retry.DefaultRetryConfig() + respondToTaskV2Config := retry.RpcRetryConfig() respondToTaskV2Config.MaxElapsedTime = 0 // Set Retry config for WaitForTxRetryable - waitForTxConfig := retry.DefaultRetryConfig() + waitForTxConfig := retry.RpcRetryConfig() waitForTxConfig.MaxInterval = 2 * time.Second - waitForTxConfig.MaxNumRetries = 0 + waitForTxConfig.NumRetries = 0 waitForTxConfig.MaxElapsedTime = timeToWaitBeforeBump respondToTaskV2Func := func() (*types.Receipt, error) { diff --git a/core/chainio/retryable.go b/core/chainio/retryable.go index 10e5ea2a65..c3bf1c997e 100644 --- a/core/chainio/retryable.go +++ b/core/chainio/retryable.go @@ -71,7 +71,7 @@ func (w *AvsWriter) BatchesStateRetryable(opts *bind.CallOpts, arg0 [32]byte) (s Responded bool RespondToTaskFeeLimit *big.Int }, error) { - return retry.RetryWithData(BatchesState(w, opts, arg0), retry.DefaultRetryConfig()) + return retry.RetryWithData(BatchesState(w, opts, arg0), retry.RpcRetryConfig()) } func BatcherBalances(w *AvsWriter, opts *bind.CallOpts, senderAddress common.Address) func() (*big.Int, error) { @@ -94,7 +94,7 @@ Get the balance of a batcher from the AVS contract. - Retry times (3 retries): 1 sec, 2 sec, 4 sec */ func (w *AvsWriter) BatcherBalancesRetryable(opts *bind.CallOpts, senderAddress common.Address) (*big.Int, error) { - return retry.RetryWithData(BatcherBalances(w, opts, senderAddress), retry.DefaultRetryConfig()) + return retry.RetryWithData(BatcherBalances(w, opts, senderAddress), retry.RpcRetryConfig()) } func BalanceAt(w *AvsWriter, ctx context.Context, aggregatorAddress common.Address, blockNumber *big.Int) func() (*big.Int, error) { @@ -119,7 +119,7 @@ TODO: it gets the balance from an Address, not necessarily an aggregator. The na - Retry times (3 retries): 1 sec, 2 sec, 4 sec. */ func (w *AvsWriter) BalanceAtRetryable(ctx context.Context, aggregatorAddress common.Address, blockNumber *big.Int) (*big.Int, error) { - return retry.RetryWithData(BalanceAt(w, ctx, aggregatorAddress, blockNumber), retry.DefaultRetryConfig()) + return retry.RetryWithData(BalanceAt(w, ctx, aggregatorAddress, blockNumber), retry.RpcRetryConfig()) } // |---AVS_SUBSCRIBER---| @@ -144,7 +144,7 @@ Get the latest block number from Ethereum - Retry times (3 retries): 1 sec, 2 sec, 4 sec. */ func (s *AvsSubscriber) BlockNumberRetryable(ctx context.Context) (uint64, error) { - return retry.RetryWithData(BlockNumber(s, ctx), retry.DefaultRetryConfig()) + return retry.RetryWithData(BlockNumber(s, ctx), retry.RpcRetryConfig()) } func FilterBatchV2(s *AvsSubscriber, opts *bind.FilterOpts, batchMerkleRoot [][32]byte) func() (*servicemanager.ContractAlignedLayerServiceManagerNewBatchV2Iterator, error) { @@ -165,7 +165,7 @@ Get NewBatchV2 logs from the AVS contract. - Retry times (3 retries): 1 sec, 2 sec, 4 sec. */ func (s *AvsSubscriber) FilterBatchV2Retryable(opts *bind.FilterOpts, batchMerkleRoot [][32]byte) (*servicemanager.ContractAlignedLayerServiceManagerNewBatchV2Iterator, error) { - return retry.RetryWithData(FilterBatchV2(s, opts, batchMerkleRoot), retry.DefaultRetryConfig()) + return retry.RetryWithData(FilterBatchV2(s, opts, batchMerkleRoot), retry.RpcRetryConfig()) } func FilterBatchV3(s *AvsSubscriber, opts *bind.FilterOpts, batchMerkleRoot [][32]byte) func() (*servicemanager.ContractAlignedLayerServiceManagerNewBatchV3Iterator, error) { @@ -186,7 +186,7 @@ Get NewBatchV3 logs from the AVS contract. - Retry times (3 retries): 1 sec, 2 sec, 4 sec. */ func (s *AvsSubscriber) FilterBatchV3Retryable(opts *bind.FilterOpts, batchMerkleRoot [][32]byte) (*servicemanager.ContractAlignedLayerServiceManagerNewBatchV3Iterator, error) { - return retry.RetryWithData(FilterBatchV3(s, opts, batchMerkleRoot), retry.DefaultRetryConfig()) + return retry.RetryWithData(FilterBatchV3(s, opts, batchMerkleRoot), retry.RpcRetryConfig()) } func BatchState(s *AvsSubscriber, opts *bind.CallOpts, arg0 [32]byte) func() (struct { @@ -220,7 +220,7 @@ func (s *AvsSubscriber) BatchesStateRetryable(opts *bind.CallOpts, arg0 [32]byte RespondToTaskFeeLimit *big.Int }, error) { - return retry.RetryWithData(BatchState(s, opts, arg0), retry.DefaultRetryConfig()) + return retry.RetryWithData(BatchState(s, opts, arg0), retry.RpcRetryConfig()) } func SubscribeNewHead(s *AvsSubscriber, ctx context.Context, c chan<- *types.Header) func() (ethereum.Subscription, error) { @@ -243,7 +243,7 @@ Subscribe to new heads from the Ethereum node. - Retry times (3 retries): 1 sec, 2 sec, 4 sec. */ func (s *AvsSubscriber) SubscribeNewHeadRetryable(ctx context.Context, c chan<- *types.Header) (ethereum.Subscription, error) { - return retry.RetryWithData(SubscribeNewHead(s, ctx, c), retry.DefaultRetryConfig()) + return retry.RetryWithData(SubscribeNewHead(s, ctx, c), retry.RpcRetryConfig()) } func SubscribeToNewTasksV2( @@ -270,7 +270,7 @@ func SubscribeToNewTasksV2Retryable( newTaskCreatedChan chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV2, batchMerkleRoot [][32]byte, ) (event.Subscription, error) { - return retry.RetryWithData(SubscribeToNewTasksV2(opts, serviceManager, newTaskCreatedChan, batchMerkleRoot), retry.DefaultRetryConfig()) + return retry.RetryWithData(SubscribeToNewTasksV2(opts, serviceManager, newTaskCreatedChan, batchMerkleRoot), retry.RpcRetryConfig()) } func SubscribeToNewTasksV3( @@ -297,5 +297,5 @@ func SubscribeToNewTasksV3Retryable( newTaskCreatedChan chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV3, batchMerkleRoot [][32]byte, ) (event.Subscription, error) { - return retry.RetryWithData(SubscribeToNewTasksV3(opts, serviceManager, newTaskCreatedChan, batchMerkleRoot), retry.DefaultRetryConfig()) + return retry.RetryWithData(SubscribeToNewTasksV3(opts, serviceManager, newTaskCreatedChan, batchMerkleRoot), retry.RpcRetryConfig()) } diff --git a/core/retry.go b/core/retry.go index 5d70882a8b..0cdd1a14a9 100644 --- a/core/retry.go +++ b/core/retry.go @@ -30,7 +30,7 @@ const ( DefaultMaxElapsedTime = 0 * time.Second // Maximum time all retries may take. `0` corresponds to no limit on the time of the retries. DefaultRandomizationFactor float64 = 0 // Randomization (Jitter) factor used to map retry interval to a range of values around the computed interval. In precise terms (random value in range [1 - randomizationfactor, 1 + randomizationfactor]). NOTE: This is set to 0 as we do not use jitter in Aligned. DefaultMultiplier float64 = 2 // Multiplier factor computed exponential retry interval is scaled by. - DefaultMaxNumRetries uint64 = 3 // Total number of retries attempted. + DefaultNumRetries uint64 = 3 // Total number of retries attempted. ChainInitialInterval = 12 * time.Second // Initial delay for retry interval for contract calls. Corresponds to 1 ethereum block. ChainMaxInterval = 2 * time.Minute // Maximum interval for an individual retry. ) @@ -41,17 +41,17 @@ type RetryConfig struct { MaxElapsedTime time.Duration // Maximum time all retries may take. `0` corresponds to no limit on the time of the retries. RandomizationFactor float64 Multiplier float64 - MaxNumRetries uint64 + NumRetries uint64 } -func DefaultRetryConfig() *RetryConfig { +func RpcRetryConfig() *RetryConfig { return &RetryConfig{ InitialInterval: DefaultInitialInterval, MaxInterval: DefaultMaxInterval, MaxElapsedTime: DefaultMaxElapsedTime, RandomizationFactor: DefaultRandomizationFactor, Multiplier: DefaultMultiplier, - MaxNumRetries: DefaultMaxNumRetries, + NumRetries: DefaultNumRetries, } } @@ -62,7 +62,7 @@ func ChainRetryConfig() *RetryConfig { MaxElapsedTime: DefaultMaxElapsedTime, RandomizationFactor: DefaultRandomizationFactor, Multiplier: DefaultMultiplier, - MaxNumRetries: DefaultMaxNumRetries, + NumRetries: DefaultNumRetries, } } @@ -160,8 +160,8 @@ func RetryWithData[T any](functionToRetry func() (T, error), config *RetryConfig expBackoff := backoff.NewExponentialBackOff(randomOption, multiplierOption, initialRetryOption, maxIntervalOption, maxElapsedTimeOption) var maxRetriesBackoff backoff.BackOff - if config.MaxNumRetries > 0 { - maxRetriesBackoff = backoff.WithMaxRetries(expBackoff, config.MaxNumRetries) + if config.NumRetries > 0 { + maxRetriesBackoff = backoff.WithMaxRetries(expBackoff, config.NumRetries) } else { maxRetriesBackoff = expBackoff } @@ -207,8 +207,8 @@ func Retry(functionToRetry func() error, config *RetryConfig) error { expBackoff := backoff.NewExponentialBackOff(randomOption, multiplierOption, initialRetryOption, maxIntervalOption, maxElapsedTimeOption) var maxRetriesBackoff backoff.BackOff - if config.MaxNumRetries > 0 { - maxRetriesBackoff = backoff.WithMaxRetries(expBackoff, config.MaxNumRetries) + if config.NumRetries > 0 { + maxRetriesBackoff = backoff.WithMaxRetries(expBackoff, config.NumRetries) } else { maxRetriesBackoff = expBackoff } diff --git a/core/retry_test.go b/core/retry_test.go index 2ce926543d..0cde8fbbc2 100644 --- a/core/retry_test.go +++ b/core/retry_test.go @@ -49,7 +49,7 @@ func TestRetryWithData(t *testing.T) { MaxElapsedTime: 3, RandomizationFactor: 0, Multiplier: retry.DefaultMultiplier, - MaxNumRetries: retry.DefaultMaxNumRetries, + NumRetries: retry.DefaultNumRetries, } _, err := retry.RetryWithData(function, config) if err != nil { @@ -68,7 +68,7 @@ func TestRetry(t *testing.T) { MaxElapsedTime: 3, RandomizationFactor: 0, Multiplier: retry.DefaultMultiplier, - MaxNumRetries: retry.DefaultMaxNumRetries, + NumRetries: retry.DefaultNumRetries, } err := retry.Retry(function, config) if err != nil { @@ -170,7 +170,7 @@ func TestWaitForTransactionReceipt(t *testing.T) { } // Assert Call succeeds when Anvil running - receipt_function := utils.WaitForTransactionReceipt(*client, *client, hash, retry.DefaultRetryConfig()) + receipt_function := utils.WaitForTransactionReceipt(*client, *client, hash, retry.RpcRetryConfig()) _, err = receipt_function() assert.NotNil(t, err, "Error Waiting for Transaction with Anvil Running: %s\n", err) if !strings.Contains(err.Error(), "not found") { @@ -183,7 +183,7 @@ func TestWaitForTransactionReceipt(t *testing.T) { return } - receipt_function = utils.WaitForTransactionReceipt(*client, *client, hash, retry.DefaultRetryConfig()) + receipt_function = utils.WaitForTransactionReceipt(*client, *client, hash, retry.RpcRetryConfig()) _, err = receipt_function() assert.NotNil(t, err) if _, ok := err.(retry.PermanentError); ok { @@ -200,7 +200,7 @@ func TestWaitForTransactionReceipt(t *testing.T) { t.Errorf("Error setting up Anvil: %s\n", err) } - receipt_function = utils.WaitForTransactionReceipt(*client, *client, hash, retry.DefaultRetryConfig()) + receipt_function = utils.WaitForTransactionReceipt(*client, *client, hash, retry.RpcRetryConfig()) _, err = receipt_function() assert.NotNil(t, err) if !strings.Contains(err.Error(), "not found") { diff --git a/core/utils/eth_client_utils.go b/core/utils/eth_client_utils.go index f48094e62c..1232b3fea3 100644 --- a/core/utils/eth_client_utils.go +++ b/core/utils/eth_client_utils.go @@ -103,5 +103,5 @@ func GetGasPriceRetryable(client eth.InstrumentedClient, fallbackClient eth.Inst return gasPrice, nil } - return retry.RetryWithData(respondToTaskV2_func, retry.DefaultRetryConfig()) + return retry.RetryWithData(respondToTaskV2_func, retry.RpcRetryConfig()) } From bb593d1de4b6261809fc2b7d62f5de0332558c6d Mon Sep 17 00:00:00 2001 From: PatStiles Date: Tue, 19 Nov 2024 16:57:26 -0300 Subject: [PATCH 19/25] nit --- core/chainio/avs_subscriber.go | 4 ++-- core/retry.go | 36 +++++++++++++++++----------------- core/retry_test.go | 8 ++++---- 3 files changed, 24 insertions(+), 24 deletions(-) diff --git a/core/chainio/avs_subscriber.go b/core/chainio/avs_subscriber.go index 436ae1a184..4625e5e697 100644 --- a/core/chainio/avs_subscriber.go +++ b/core/chainio/avs_subscriber.go @@ -68,13 +68,13 @@ func (s *AvsSubscriber) SubscribeToNewTasksV2(newTaskCreatedChan chan *servicema // Subscribe to new tasks sub, err := SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil) if err != nil { - s.logger.Error("Primary failed to subscribe to new AlignedLayer V2 tasks after %d retries", retry.DefaultNumRetries, "err", err) + s.logger.Error("Primary failed to subscribe to new AlignedLayer V2 tasks after %d retries", retry.RpcNumRetries, "err", err) return nil, err } subFallback, err := SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil) if err != nil { - s.logger.Error("Fallback failed to subscribe to new AlignedLayer V2 tasks after %d retries", retry.DefaultNumRetries, "err", err) + s.logger.Error("Fallback failed to subscribe to new AlignedLayer V2 tasks after %d retries", retry.RpcNumRetries, "err", err) return nil, err } s.logger.Info("Subscribed to new AlignedLayer V2 tasks") diff --git a/core/retry.go b/core/retry.go index 0cdd1a14a9..320c56e5aa 100644 --- a/core/retry.go +++ b/core/retry.go @@ -25,14 +25,14 @@ func (e PermanentError) Is(err error) bool { } const ( - DefaultInitialInterval = 1 * time.Second // Initial delay for retry interval. - DefaultMaxInterval = 60 * time.Second // Maximum interval an individual retry may have. - DefaultMaxElapsedTime = 0 * time.Second // Maximum time all retries may take. `0` corresponds to no limit on the time of the retries. - DefaultRandomizationFactor float64 = 0 // Randomization (Jitter) factor used to map retry interval to a range of values around the computed interval. In precise terms (random value in range [1 - randomizationfactor, 1 + randomizationfactor]). NOTE: This is set to 0 as we do not use jitter in Aligned. - DefaultMultiplier float64 = 2 // Multiplier factor computed exponential retry interval is scaled by. - DefaultNumRetries uint64 = 3 // Total number of retries attempted. - ChainInitialInterval = 12 * time.Second // Initial delay for retry interval for contract calls. Corresponds to 1 ethereum block. - ChainMaxInterval = 2 * time.Minute // Maximum interval for an individual retry. + RpcInitialInterval = 1 * time.Second // Initial delay for retry interval. + RpcMaxInterval = 60 * time.Second // Maximum interval an individual retry may have. + RpcMaxElapsedTime = 0 * time.Second // Maximum time all retries may take. `0` corresponds to no limit on the time of the retries. + RpcRandomizationFactor float64 = 0 // Randomization (Jitter) factor used to map retry interval to a range of values around the computed interval. In precise terms (random value in range [1 - randomizationfactor, 1 + randomizationfactor]). NOTE: This is set to 0 as we do not use jitter in Aligned. + RpcMultiplier float64 = 2 // Multiplier factor computed exponential retry interval is scaled by. + RpcNumRetries uint64 = 3 // Total number of retries attempted. + ChainInitialInterval = 12 * time.Second // Initial delay for retry interval for contract calls. Corresponds to 1 ethereum block. + ChainMaxInterval = 2 * time.Minute // Maximum interval for an individual retry. ) type RetryConfig struct { @@ -46,12 +46,12 @@ type RetryConfig struct { func RpcRetryConfig() *RetryConfig { return &RetryConfig{ - InitialInterval: DefaultInitialInterval, - MaxInterval: DefaultMaxInterval, - MaxElapsedTime: DefaultMaxElapsedTime, - RandomizationFactor: DefaultRandomizationFactor, - Multiplier: DefaultMultiplier, - NumRetries: DefaultNumRetries, + InitialInterval: RpcInitialInterval, + MaxInterval: RpcMaxInterval, + MaxElapsedTime: RpcMaxElapsedTime, + RandomizationFactor: RpcRandomizationFactor, + Multiplier: RpcMultiplier, + NumRetries: RpcNumRetries, } } @@ -59,10 +59,10 @@ func ChainRetryConfig() *RetryConfig { return &RetryConfig{ InitialInterval: ChainInitialInterval, MaxInterval: ChainMaxInterval, - MaxElapsedTime: DefaultMaxElapsedTime, - RandomizationFactor: DefaultRandomizationFactor, - Multiplier: DefaultMultiplier, - NumRetries: DefaultNumRetries, + MaxElapsedTime: RpcMaxElapsedTime, + RandomizationFactor: RpcRandomizationFactor, + Multiplier: RpcMultiplier, + NumRetries: RpcNumRetries, } } diff --git a/core/retry_test.go b/core/retry_test.go index 0cde8fbbc2..19d78bea56 100644 --- a/core/retry_test.go +++ b/core/retry_test.go @@ -48,8 +48,8 @@ func TestRetryWithData(t *testing.T) { MaxInterval: 2, MaxElapsedTime: 3, RandomizationFactor: 0, - Multiplier: retry.DefaultMultiplier, - NumRetries: retry.DefaultNumRetries, + Multiplier: retry.RpcMultiplier, + NumRetries: retry.RpcNumRetries, } _, err := retry.RetryWithData(function, config) if err != nil { @@ -67,8 +67,8 @@ func TestRetry(t *testing.T) { MaxInterval: 2, MaxElapsedTime: 3, RandomizationFactor: 0, - Multiplier: retry.DefaultMultiplier, - NumRetries: retry.DefaultNumRetries, + Multiplier: retry.RpcMultiplier, + NumRetries: retry.RpcNumRetries, } err := retry.Retry(function, config) if err != nil { From 635448f816b248a25ebfe6204325b3ccc1df82cb Mon Sep 17 00:00:00 2001 From: PatStiles Date: Tue, 19 Nov 2024 17:49:40 -0300 Subject: [PATCH 20/25] make WaitForTxRetry retry values constants --- core/chainio/avs_writer.go | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/core/chainio/avs_writer.go b/core/chainio/avs_writer.go index 1eb73e955b..6bd3b024f6 100644 --- a/core/chainio/avs_writer.go +++ b/core/chainio/avs_writer.go @@ -21,6 +21,11 @@ import ( "github.com/yetanotherco/aligned_layer/metrics" ) +const ( + waitForTxConfigMaxInterval = 2 * time.Second + waitForTxConfigNumRetries = 0 +) + type AvsWriter struct { *avsregistry.ChainWriter AvsContractBindings *AvsServiceBindings @@ -94,13 +99,13 @@ func (w *AvsWriter) SendAggregatedResponse(batchIdentifierHash [32]byte, batchMe i := 0 // Set Retry config for RespondToTaskV2 - respondToTaskV2Config := retry.RpcRetryConfig() + respondToTaskV2Config := retry.EthCallRetryConfig() respondToTaskV2Config.MaxElapsedTime = 0 // Set Retry config for WaitForTxRetryable - waitForTxConfig := retry.RpcRetryConfig() - waitForTxConfig.MaxInterval = 2 * time.Second - waitForTxConfig.NumRetries = 0 + waitForTxConfig := retry.EthCallRetryConfig() + waitForTxConfig.MaxInterval = waitForTxConfigMaxInterval + waitForTxConfig.NumRetries = waitForTxConfigNumRetries waitForTxConfig.MaxElapsedTime = timeToWaitBeforeBump respondToTaskV2Func := func() (*types.Receipt, error) { @@ -193,8 +198,8 @@ func (w *AvsWriter) checkAggAndBatcherHaveEnoughBalance(tx *types.Transaction, t respondToTaskFeeLimit := batchState.RespondToTaskFeeLimit w.logger.Info("Checking balance against Batch RespondToTaskFeeLimit", "RespondToTaskFeeLimit", respondToTaskFeeLimit) // Note: we compare both Aggregator funds and Batcher balance in Aligned against respondToTaskFeeLimit - // Batcher will pay up to respondToTaskFeeLimit, for this he needs that amount of funds in Aligned - // Aggregator will pay any extra cost, for this he needs at least respondToTaskFeeLimit in his balance + // Batcher will pay up to respondToTaskFeeLimit, for this he needs that amount of funds in Aligned + // Aggregator will pay any extra cost, for this he needs at least respondToTaskFeeLimit in his balance return w.compareBalances(respondToTaskFeeLimit, aggregatorAddress, senderAddress) } From 1a702fe284fc65ee8442b2f397d823f5e46e8fdd Mon Sep 17 00:00:00 2001 From: PatStiles Date: Tue, 19 Nov 2024 17:52:01 -0300 Subject: [PATCH 21/25] change var name --- core/chainio/avs_subscriber.go | 4 ++-- core/chainio/retryable.go | 20 +++++++++--------- core/retry.go | 38 +++++++++++++++++----------------- core/retry_test.go | 14 ++++++------- core/utils/eth_client_utils.go | 2 +- 5 files changed, 39 insertions(+), 39 deletions(-) diff --git a/core/chainio/avs_subscriber.go b/core/chainio/avs_subscriber.go index 4625e5e697..26eefe0463 100644 --- a/core/chainio/avs_subscriber.go +++ b/core/chainio/avs_subscriber.go @@ -68,13 +68,13 @@ func (s *AvsSubscriber) SubscribeToNewTasksV2(newTaskCreatedChan chan *servicema // Subscribe to new tasks sub, err := SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil) if err != nil { - s.logger.Error("Primary failed to subscribe to new AlignedLayer V2 tasks after %d retries", retry.RpcNumRetries, "err", err) + s.logger.Error("Primary failed to subscribe to new AlignedLayer V2 tasks after %d retries", retry.EthCallNumRetries, "err", err) return nil, err } subFallback, err := SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil) if err != nil { - s.logger.Error("Fallback failed to subscribe to new AlignedLayer V2 tasks after %d retries", retry.RpcNumRetries, "err", err) + s.logger.Error("Fallback failed to subscribe to new AlignedLayer V2 tasks after %d retries", retry.EthCallNumRetries, "err", err) return nil, err } s.logger.Info("Subscribed to new AlignedLayer V2 tasks") diff --git a/core/chainio/retryable.go b/core/chainio/retryable.go index c3bf1c997e..acaa5bb8ae 100644 --- a/core/chainio/retryable.go +++ b/core/chainio/retryable.go @@ -71,7 +71,7 @@ func (w *AvsWriter) BatchesStateRetryable(opts *bind.CallOpts, arg0 [32]byte) (s Responded bool RespondToTaskFeeLimit *big.Int }, error) { - return retry.RetryWithData(BatchesState(w, opts, arg0), retry.RpcRetryConfig()) + return retry.RetryWithData(BatchesState(w, opts, arg0), retry.EthCallRetryConfig()) } func BatcherBalances(w *AvsWriter, opts *bind.CallOpts, senderAddress common.Address) func() (*big.Int, error) { @@ -94,7 +94,7 @@ Get the balance of a batcher from the AVS contract. - Retry times (3 retries): 1 sec, 2 sec, 4 sec */ func (w *AvsWriter) BatcherBalancesRetryable(opts *bind.CallOpts, senderAddress common.Address) (*big.Int, error) { - return retry.RetryWithData(BatcherBalances(w, opts, senderAddress), retry.RpcRetryConfig()) + return retry.RetryWithData(BatcherBalances(w, opts, senderAddress), retry.EthCallRetryConfig()) } func BalanceAt(w *AvsWriter, ctx context.Context, aggregatorAddress common.Address, blockNumber *big.Int) func() (*big.Int, error) { @@ -119,7 +119,7 @@ TODO: it gets the balance from an Address, not necessarily an aggregator. The na - Retry times (3 retries): 1 sec, 2 sec, 4 sec. */ func (w *AvsWriter) BalanceAtRetryable(ctx context.Context, aggregatorAddress common.Address, blockNumber *big.Int) (*big.Int, error) { - return retry.RetryWithData(BalanceAt(w, ctx, aggregatorAddress, blockNumber), retry.RpcRetryConfig()) + return retry.RetryWithData(BalanceAt(w, ctx, aggregatorAddress, blockNumber), retry.EthCallRetryConfig()) } // |---AVS_SUBSCRIBER---| @@ -144,7 +144,7 @@ Get the latest block number from Ethereum - Retry times (3 retries): 1 sec, 2 sec, 4 sec. */ func (s *AvsSubscriber) BlockNumberRetryable(ctx context.Context) (uint64, error) { - return retry.RetryWithData(BlockNumber(s, ctx), retry.RpcRetryConfig()) + return retry.RetryWithData(BlockNumber(s, ctx), retry.EthCallRetryConfig()) } func FilterBatchV2(s *AvsSubscriber, opts *bind.FilterOpts, batchMerkleRoot [][32]byte) func() (*servicemanager.ContractAlignedLayerServiceManagerNewBatchV2Iterator, error) { @@ -165,7 +165,7 @@ Get NewBatchV2 logs from the AVS contract. - Retry times (3 retries): 1 sec, 2 sec, 4 sec. */ func (s *AvsSubscriber) FilterBatchV2Retryable(opts *bind.FilterOpts, batchMerkleRoot [][32]byte) (*servicemanager.ContractAlignedLayerServiceManagerNewBatchV2Iterator, error) { - return retry.RetryWithData(FilterBatchV2(s, opts, batchMerkleRoot), retry.RpcRetryConfig()) + return retry.RetryWithData(FilterBatchV2(s, opts, batchMerkleRoot), retry.EthCallRetryConfig()) } func FilterBatchV3(s *AvsSubscriber, opts *bind.FilterOpts, batchMerkleRoot [][32]byte) func() (*servicemanager.ContractAlignedLayerServiceManagerNewBatchV3Iterator, error) { @@ -186,7 +186,7 @@ Get NewBatchV3 logs from the AVS contract. - Retry times (3 retries): 1 sec, 2 sec, 4 sec. */ func (s *AvsSubscriber) FilterBatchV3Retryable(opts *bind.FilterOpts, batchMerkleRoot [][32]byte) (*servicemanager.ContractAlignedLayerServiceManagerNewBatchV3Iterator, error) { - return retry.RetryWithData(FilterBatchV3(s, opts, batchMerkleRoot), retry.RpcRetryConfig()) + return retry.RetryWithData(FilterBatchV3(s, opts, batchMerkleRoot), retry.EthCallRetryConfig()) } func BatchState(s *AvsSubscriber, opts *bind.CallOpts, arg0 [32]byte) func() (struct { @@ -220,7 +220,7 @@ func (s *AvsSubscriber) BatchesStateRetryable(opts *bind.CallOpts, arg0 [32]byte RespondToTaskFeeLimit *big.Int }, error) { - return retry.RetryWithData(BatchState(s, opts, arg0), retry.RpcRetryConfig()) + return retry.RetryWithData(BatchState(s, opts, arg0), retry.EthCallRetryConfig()) } func SubscribeNewHead(s *AvsSubscriber, ctx context.Context, c chan<- *types.Header) func() (ethereum.Subscription, error) { @@ -243,7 +243,7 @@ Subscribe to new heads from the Ethereum node. - Retry times (3 retries): 1 sec, 2 sec, 4 sec. */ func (s *AvsSubscriber) SubscribeNewHeadRetryable(ctx context.Context, c chan<- *types.Header) (ethereum.Subscription, error) { - return retry.RetryWithData(SubscribeNewHead(s, ctx, c), retry.RpcRetryConfig()) + return retry.RetryWithData(SubscribeNewHead(s, ctx, c), retry.EthCallRetryConfig()) } func SubscribeToNewTasksV2( @@ -270,7 +270,7 @@ func SubscribeToNewTasksV2Retryable( newTaskCreatedChan chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV2, batchMerkleRoot [][32]byte, ) (event.Subscription, error) { - return retry.RetryWithData(SubscribeToNewTasksV2(opts, serviceManager, newTaskCreatedChan, batchMerkleRoot), retry.RpcRetryConfig()) + return retry.RetryWithData(SubscribeToNewTasksV2(opts, serviceManager, newTaskCreatedChan, batchMerkleRoot), retry.EthCallRetryConfig()) } func SubscribeToNewTasksV3( @@ -297,5 +297,5 @@ func SubscribeToNewTasksV3Retryable( newTaskCreatedChan chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV3, batchMerkleRoot [][32]byte, ) (event.Subscription, error) { - return retry.RetryWithData(SubscribeToNewTasksV3(opts, serviceManager, newTaskCreatedChan, batchMerkleRoot), retry.RpcRetryConfig()) + return retry.RetryWithData(SubscribeToNewTasksV3(opts, serviceManager, newTaskCreatedChan, batchMerkleRoot), retry.EthCallRetryConfig()) } diff --git a/core/retry.go b/core/retry.go index 320c56e5aa..a4dfda151b 100644 --- a/core/retry.go +++ b/core/retry.go @@ -25,14 +25,14 @@ func (e PermanentError) Is(err error) bool { } const ( - RpcInitialInterval = 1 * time.Second // Initial delay for retry interval. - RpcMaxInterval = 60 * time.Second // Maximum interval an individual retry may have. - RpcMaxElapsedTime = 0 * time.Second // Maximum time all retries may take. `0` corresponds to no limit on the time of the retries. - RpcRandomizationFactor float64 = 0 // Randomization (Jitter) factor used to map retry interval to a range of values around the computed interval. In precise terms (random value in range [1 - randomizationfactor, 1 + randomizationfactor]). NOTE: This is set to 0 as we do not use jitter in Aligned. - RpcMultiplier float64 = 2 // Multiplier factor computed exponential retry interval is scaled by. - RpcNumRetries uint64 = 3 // Total number of retries attempted. - ChainInitialInterval = 12 * time.Second // Initial delay for retry interval for contract calls. Corresponds to 1 ethereum block. - ChainMaxInterval = 2 * time.Minute // Maximum interval for an individual retry. + EthCallInitialInterval = 1 * time.Second // Initial delay for retry interval. + EthCallMaxInterval = 60 * time.Second // Maximum interval an individual retry may have. + EthCallMaxElapsedTime = 0 * time.Second // Maximum time all retries may take. `0` corresponds to no limit on the time of the retries. + EthCallRandomizationFactor float64 = 0 // Randomization (Jitter) factor used to map retry interval to a range of values around the computed interval. In precise terms (random value in range [1 - randomizationfactor, 1 + randomizationfactor]). NOTE: This is set to 0 as we do not use jitter in Aligned. + EthCallMultiplier float64 = 2 // Multiplier factor computed exponential retry interval is scaled by. + EthCallNumRetries uint64 = 3 // Total number of retries attempted. + ChainInitialInterval = 12 * time.Second // Initial delay for retry interval for contract calls. Corresponds to 1 ethereum block. + ChainMaxInterval = 2 * time.Minute // Maximum interval for an individual retry. ) type RetryConfig struct { @@ -44,14 +44,14 @@ type RetryConfig struct { NumRetries uint64 } -func RpcRetryConfig() *RetryConfig { +func EthCallRetryConfig() *RetryConfig { return &RetryConfig{ - InitialInterval: RpcInitialInterval, - MaxInterval: RpcMaxInterval, - MaxElapsedTime: RpcMaxElapsedTime, - RandomizationFactor: RpcRandomizationFactor, - Multiplier: RpcMultiplier, - NumRetries: RpcNumRetries, + InitialInterval: EthCallInitialInterval, + MaxInterval: EthCallMaxInterval, + MaxElapsedTime: EthCallMaxElapsedTime, + RandomizationFactor: EthCallRandomizationFactor, + Multiplier: EthCallMultiplier, + NumRetries: EthCallNumRetries, } } @@ -59,10 +59,10 @@ func ChainRetryConfig() *RetryConfig { return &RetryConfig{ InitialInterval: ChainInitialInterval, MaxInterval: ChainMaxInterval, - MaxElapsedTime: RpcMaxElapsedTime, - RandomizationFactor: RpcRandomizationFactor, - Multiplier: RpcMultiplier, - NumRetries: RpcNumRetries, + MaxElapsedTime: EthCallMaxElapsedTime, + RandomizationFactor: EthCallRandomizationFactor, + Multiplier: EthCallMultiplier, + NumRetries: EthCallNumRetries, } } diff --git a/core/retry_test.go b/core/retry_test.go index 19d78bea56..b48cd3d29b 100644 --- a/core/retry_test.go +++ b/core/retry_test.go @@ -48,8 +48,8 @@ func TestRetryWithData(t *testing.T) { MaxInterval: 2, MaxElapsedTime: 3, RandomizationFactor: 0, - Multiplier: retry.RpcMultiplier, - NumRetries: retry.RpcNumRetries, + Multiplier: retry.EthCallMultiplier, + NumRetries: retry.EthCallNumRetries, } _, err := retry.RetryWithData(function, config) if err != nil { @@ -67,8 +67,8 @@ func TestRetry(t *testing.T) { MaxInterval: 2, MaxElapsedTime: 3, RandomizationFactor: 0, - Multiplier: retry.RpcMultiplier, - NumRetries: retry.RpcNumRetries, + Multiplier: retry.EthCallMultiplier, + NumRetries: retry.EthCallNumRetries, } err := retry.Retry(function, config) if err != nil { @@ -170,7 +170,7 @@ func TestWaitForTransactionReceipt(t *testing.T) { } // Assert Call succeeds when Anvil running - receipt_function := utils.WaitForTransactionReceipt(*client, *client, hash, retry.RpcRetryConfig()) + receipt_function := utils.WaitForTransactionReceipt(*client, *client, hash, retry.EthCallRetryConfig()) _, err = receipt_function() assert.NotNil(t, err, "Error Waiting for Transaction with Anvil Running: %s\n", err) if !strings.Contains(err.Error(), "not found") { @@ -183,7 +183,7 @@ func TestWaitForTransactionReceipt(t *testing.T) { return } - receipt_function = utils.WaitForTransactionReceipt(*client, *client, hash, retry.RpcRetryConfig()) + receipt_function = utils.WaitForTransactionReceipt(*client, *client, hash, retry.EthCallRetryConfig()) _, err = receipt_function() assert.NotNil(t, err) if _, ok := err.(retry.PermanentError); ok { @@ -200,7 +200,7 @@ func TestWaitForTransactionReceipt(t *testing.T) { t.Errorf("Error setting up Anvil: %s\n", err) } - receipt_function = utils.WaitForTransactionReceipt(*client, *client, hash, retry.RpcRetryConfig()) + receipt_function = utils.WaitForTransactionReceipt(*client, *client, hash, retry.EthCallRetryConfig()) _, err = receipt_function() assert.NotNil(t, err) if !strings.Contains(err.Error(), "not found") { diff --git a/core/utils/eth_client_utils.go b/core/utils/eth_client_utils.go index 1232b3fea3..c8a86b2dd6 100644 --- a/core/utils/eth_client_utils.go +++ b/core/utils/eth_client_utils.go @@ -103,5 +103,5 @@ func GetGasPriceRetryable(client eth.InstrumentedClient, fallbackClient eth.Inst return gasPrice, nil } - return retry.RetryWithData(respondToTaskV2_func, retry.RpcRetryConfig()) + return retry.RetryWithData(respondToTaskV2_func, retry.EthCallRetryConfig()) } From 0845e66722516fef4ff181743046db83a5d0051e Mon Sep 17 00:00:00 2001 From: PatStiles Date: Tue, 19 Nov 2024 18:07:49 -0300 Subject: [PATCH 22/25] naming --- aggregator/pkg/aggregator.go | 2 +- aggregator/pkg/server.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/aggregator/pkg/aggregator.go b/aggregator/pkg/aggregator.go index c1eaa0d3af..3b771027a0 100644 --- a/aggregator/pkg/aggregator.go +++ b/aggregator/pkg/aggregator.go @@ -419,7 +419,7 @@ func (agg *Aggregator) InitializeNewTask(batchIndex uint32, taskCreatedBlock uin } return err } - return retry.Retry(initializeNewTask_func, retry.RpcRetryConfig()) + return retry.Retry(initializeNewTask_func, retry.EthCallRetryConfig()) } // Long-lived goroutine that periodically checks and removes old Tasks from stored Maps diff --git a/aggregator/pkg/server.go b/aggregator/pkg/server.go index 7b53420377..7df8645bed 100644 --- a/aggregator/pkg/server.go +++ b/aggregator/pkg/server.go @@ -158,5 +158,5 @@ func (agg *Aggregator) GetTaskIndexRetryable(batchIdentifierHash [32]byte) (uint return taskIndex, nil } } - return retry.RetryWithData(getTaskIndex_func, retry.RpcRetryConfig()) + return retry.RetryWithData(getTaskIndex_func, retry.EthCallRetryConfig()) } From 261aa1c70a0cd11c3c4e3941c9694b5d9bf30b41 Mon Sep 17 00:00:00 2001 From: PatStiles Date: Tue, 19 Nov 2024 18:22:49 -0300 Subject: [PATCH 23/25] rm extra comment --- core/utils/eth_client_utils.go | 1 - 1 file changed, 1 deletion(-) diff --git a/core/utils/eth_client_utils.go b/core/utils/eth_client_utils.go index c8a86b2dd6..dceee4c360 100644 --- a/core/utils/eth_client_utils.go +++ b/core/utils/eth_client_utils.go @@ -84,7 +84,6 @@ func CalculateGasPriceBumpBasedOnRetry(currentGasPrice *big.Int, baseBumpPercent return bumpedGasPrice } -//TODO: move to retryable function file /* GetGasPriceRetryable Get the gas price from the client with retry logic. From c8aad5e65e0ed8e74e4d7699201db3738afa75be Mon Sep 17 00:00:00 2001 From: PatStiles Date: Wed, 20 Nov 2024 11:10:51 -0300 Subject: [PATCH 24/25] Add methods for get old Task Hash --- core/chainio/avs_reader.go | 9 +++----- core/chainio/retryable.go | 44 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 47 insertions(+), 6 deletions(-) diff --git a/core/chainio/avs_reader.go b/core/chainio/avs_reader.go index 69b8f281b1..2e65af2b56 100644 --- a/core/chainio/avs_reader.go +++ b/core/chainio/avs_reader.go @@ -111,12 +111,9 @@ func (r *AvsReader) GetNotRespondedTasksFrom(fromBlock uint64) ([]servicemanager // This function is a helper to get a task hash of aproximately nBlocksOld blocks ago func (r *AvsReader) GetOldTaskHash(nBlocksOld uint64, interval uint64) (*[32]byte, error) { - latestBlock, err := r.AvsContractBindings.ethClient.BlockNumber(context.Background()) + latestBlock, err := r.BlockNumberRetryable(context.Background()) if err != nil { - latestBlock, err = r.AvsContractBindings.ethClientFallback.BlockNumber(context.Background()) - if err != nil { - return nil, fmt.Errorf("failed to get latest block number: %w", err) - } + return nil, fmt.Errorf("failed to get latest block number: %w", err) } if latestBlock < nBlocksOld { @@ -129,7 +126,7 @@ func (r *AvsReader) GetOldTaskHash(nBlocksOld uint64, interval uint64) (*[32]byt toBlock := latestBlock - nBlocksOld fromBlock = toBlock - interval - logs, err := r.AvsContractBindings.ServiceManager.FilterNewBatchV3(&bind.FilterOpts{Start: fromBlock, End: &toBlock, Context: context.Background()}, nil) + logs, err := r.FilterBatchV3Retryable(&bind.FilterOpts{Start: fromBlock, End: &toBlock, Context: context.Background()}, nil) if err != nil { return nil, err } diff --git a/core/chainio/retryable.go b/core/chainio/retryable.go index acaa5bb8ae..b3c5175ac4 100644 --- a/core/chainio/retryable.go +++ b/core/chainio/retryable.go @@ -299,3 +299,47 @@ func SubscribeToNewTasksV3Retryable( ) (event.Subscription, error) { return retry.RetryWithData(SubscribeToNewTasksV3(opts, serviceManager, newTaskCreatedChan, batchMerkleRoot), retry.EthCallRetryConfig()) } + +func FilterBatchV3Reader(r *AvsReader, opts *bind.FilterOpts, batchMerkleRoot [][32]byte) func() (*servicemanager.ContractAlignedLayerServiceManagerNewBatchV3Iterator, error) { + filterNewBatchV3_func := func() (*servicemanager.ContractAlignedLayerServiceManagerNewBatchV3Iterator, error) { + logs, err := r.AvsContractBindings.ServiceManager.FilterNewBatchV3(opts, batchMerkleRoot) + if err != nil { + logs, err = r.AvsContractBindings.ServiceManagerFallback.FilterNewBatchV3(opts, batchMerkleRoot) + } + return logs, err + } + return filterNewBatchV3_func +} + +/* +FilterBatchV3Retryable +Get NewBatchV3 logs from the AVS contract. +- All errors are considered Transient Errors +- Retry times (3 retries): 1 sec, 2 sec, 4 sec. +*/ +func (r *AvsReader) FilterBatchV3Retryable(opts *bind.FilterOpts, batchMerkleRoot [][32]byte) (*servicemanager.ContractAlignedLayerServiceManagerNewBatchV3Iterator, error) { + return retry.RetryWithData(FilterBatchV3Reader(r, opts, batchMerkleRoot), retry.EthCallRetryConfig()) +} + +func BlockNumberReader(r *AvsReader, ctx context.Context) func() (uint64, error) { + latestBlock_func := func() (uint64, error) { + // Try with main connection + latestBlock, err := r.AvsContractBindings.ethClient.BlockNumber(ctx) + if err != nil { + // If error try with fallback connection + latestBlock, err = r.AvsContractBindings.ethClientFallback.BlockNumber(ctx) + } + return latestBlock, err + } + return latestBlock_func +} + +/* +BlockNumberRetryable +Get the latest block number from Ethereum +- All errors are considered Transient Errors +- Retry times (3 retries): 1 sec, 2 sec, 4 sec. +*/ +func (r *AvsReader) BlockNumberRetryable(ctx context.Context) (uint64, error) { + return retry.RetryWithData(BlockNumberReader(r, ctx), retry.EthCallRetryConfig()) +} From 1d9fe8a858961965b52b3a601bd9aef8c4312c11 Mon Sep 17 00:00:00 2001 From: PatStiles Date: Wed, 20 Nov 2024 11:14:00 -0300 Subject: [PATCH 25/25] add unit tests --- core/retry_test.go | 95 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 95 insertions(+) diff --git a/core/retry_test.go b/core/retry_test.go index b48cd3d29b..dc336f4227 100644 --- a/core/retry_test.go +++ b/core/retry_test.go @@ -928,3 +928,98 @@ func TestBatchersBalances(t *testing.T) { return } } + +func TestFilterBatchV3Reader(t *testing.T) { + cmd, _, err := SetupAnvil(8545) + if err != nil { + t.Errorf("Error setting up Anvil: %s\n", err) + } + + aggregatorConfig := config.NewAggregatorConfig("../config-files/config-aggregator-test.yaml") + r, err := chainio.NewAvsReaderFromConfig(aggregatorConfig.BaseConfig, aggregatorConfig.EcdsaConfig) + if err != nil { + return + } + batch_func := chainio.FilterBatchV3Reader(r, &bind.FilterOpts{Start: 0, End: nil, Context: context.Background()}, nil) + _, err = batch_func() + assert.Nil(t, err) + + if err := cmd.Process.Kill(); err != nil { + t.Errorf("Error killing process: %v\n", err) + return + } + + batch_func = chainio.FilterBatchV3Reader(r, &bind.FilterOpts{Start: 0, End: nil, Context: context.Background()}, nil) + _, err = batch_func() + assert.NotNil(t, err) + if _, ok := err.(retry.PermanentError); ok { + t.Errorf("FilerBatchV3 Emitted non Transient error: %s\n", err) + return + } + if !strings.Contains(err.Error(), "connection refused") { + t.Errorf("FilterBatchV3 Emitted non Transient error: %s\n", err) + return + } + + cmd, _, err = SetupAnvil(8545) + if err != nil { + t.Errorf("Error setting up Anvil: %s\n", err) + } + + batch_func = chainio.FilterBatchV3Reader(r, &bind.FilterOpts{Start: 0, End: nil, Context: context.Background()}, nil) + _, err = batch_func() + assert.Nil(t, err) + + if err := cmd.Process.Kill(); err != nil { + t.Errorf("Error killing process: %v\n", err) + return + } +} + +func TestBlockNumberReader(t *testing.T) { + // Start anvil + cmd, _, err := SetupAnvil(8545) + if err != nil { + t.Errorf("Error setting up Anvil: %s\n", err) + } + + aggregatorConfig := config.NewAggregatorConfig("../config-files/config-aggregator-test.yaml") + r, err := chainio.NewAvsReaderFromConfig(aggregatorConfig.BaseConfig, aggregatorConfig.EcdsaConfig) + if err != nil { + return + } + block_func := chainio.BlockNumberReader(r, context.Background()) + _, err = block_func() + assert.Nil(t, err) + + if err := cmd.Process.Kill(); err != nil { + t.Errorf("Error killing process: %v\n", err) + return + } + + block_func = chainio.BlockNumberReader(r, context.Background()) + _, err = block_func() + assert.NotNil(t, err) + if _, ok := err.(retry.PermanentError); ok { + t.Errorf("BlockNumber Emitted non Transient error: %s\n", err) + return + } + if !strings.Contains(err.Error(), "connect: connection refused") { + t.Errorf("BlockNumber Emitted non Transient error: %s\n", err) + return + } + + cmd, _, err = SetupAnvil(8545) + if err != nil { + t.Errorf("Error setting up Anvil: %s\n", err) + } + + block_func = chainio.BlockNumberReader(r, context.Background()) + _, err = block_func() + assert.Nil(t, err) + + if err := cmd.Process.Kill(); err != nil { + t.Errorf("Error killing process: %v\n", err) + return + } +}