diff --git a/pkg/logpoller/metrics.go b/pkg/logpoller/metrics.go index e13aae85c..c7ab7c71e 100644 --- a/pkg/logpoller/metrics.go +++ b/pkg/logpoller/metrics.go @@ -37,16 +37,6 @@ var ( Help: "Last processed masterchain block sequence number", }, []string{"chainID"}) - promTonLpBlocksProcessed = promauto.NewCounterVec(prometheus.CounterOpts{ - Name: "ton_logpoller_blocks_processed_total", - Help: "Total number of blocks processed", - }, []string{"chainID"}) - - promTonLpLogsInserted = promauto.NewCounterVec(prometheus.CounterOpts{ - Name: "ton_logpoller_logs_inserted_total", - Help: "Total number of logs inserted to database", - }, []string{"chainID"}) - promTonLpLoaderErrors = promauto.NewCounterVec(prometheus.CounterOpts{ Name: "ton_logpoller_loader_errors_total", Help: "Total number of transaction loading errors", @@ -58,21 +48,11 @@ var ( }, []string{"chainID"}) // Query metrics for observed stores - promTonLpQueryDuration = promauto.NewGaugeVec(prometheus.GaugeOpts{ - Name: "ton_logpoller_query_duration_seconds", - Help: "Duration of last database query by operation", - }, []string{"chainID", "query", "type"}) - promTonLpAddressesMonitored = promauto.NewGaugeVec(prometheus.GaugeOpts{ Name: "ton_logpoller_addresses_monitored", Help: "Number of addresses being monitored", }, []string{"chainID"}) - promTonLpQueryResultSize = promauto.NewGaugeVec(prometheus.GaugeOpts{ - Name: "ton_logpoller_query_result_size", - Help: "Number of rows returned by query", - }, []string{"chainID", "query"}) - // Parser pipeline metrics promTonLpTxsProcessed = promauto.NewCounterVec(prometheus.CounterOpts{ Name: "ton_logpoller_txs_processed_total", @@ -116,15 +96,14 @@ type logPollerMetrics struct { pollErrors metric.Int64Counter blocksBehind metric.Int64Gauge lastProcessedBlockSeqNo metric.Int64Gauge - blocksProcessed metric.Int64Counter - logsInserted metric.Int64Counter loaderErrors metric.Int64Counter parseErrors metric.Int64Counter // query metrics for observed stores (OTel) - queryDuration metric.Float64Gauge addressesMonitored metric.Int64Gauge - queryResultSize metric.Int64Gauge + + // framework metrics (shared cross-chain observability) + frameworkMetrics frameworkmetrics.GenericLogPollerMetrics // parser pipeline metrics (OTel) txsProcessed metric.Int64Counter @@ -161,16 +140,6 @@ func newMetrics(chainID string) (*logPollerMetrics, error) { return nil, fmt.Errorf("failed to register last processed block: %w", err) } - blocksProcessed, err := m.Int64Counter("ton_logpoller_blocks_processed_total") - if err != nil { - return nil, fmt.Errorf("failed to register blocks processed: %w", err) - } - - logsInserted, err := m.Int64Counter("ton_logpoller_logs_inserted_total") - if err != nil { - return nil, fmt.Errorf("failed to register logs inserted: %w", err) - } - loaderErrors, err := m.Int64Counter("ton_logpoller_loader_errors_total") if err != nil { return nil, fmt.Errorf("failed to register loader errors: %w", err) @@ -181,21 +150,11 @@ func newMetrics(chainID string) (*logPollerMetrics, error) { return nil, fmt.Errorf("failed to register parse errors: %w", err) } - queryDuration, err := m.Float64Gauge("ton_logpoller_query_duration_seconds") - if err != nil { - return nil, fmt.Errorf("failed to register query duration: %w", err) - } - addressesMonitored, err := m.Int64Gauge("ton_logpoller_addresses_monitored") if err != nil { return nil, fmt.Errorf("failed to register addresses monitored: %w", err) } - queryResultSize, err := m.Int64Gauge("ton_logpoller_query_result_size") - if err != nil { - return nil, fmt.Errorf("failed to register query result size: %w", err) - } - txsProcessed, err := m.Int64Counter("ton_logpoller_txs_processed_total") if err != nil { return nil, fmt.Errorf("failed to register txs processed: %w", err) @@ -226,6 +185,11 @@ func newMetrics(chainID string) (*logPollerMetrics, error) { return nil, fmt.Errorf("failed to register pruning errors: %w", err) } + fwMetrics, err := frameworkmetrics.NewGenericLogPollerMetrics(chainID, "ton") + if err != nil { + return nil, fmt.Errorf("failed to initialize framework logpoller metrics: %w", err) + } + return &logPollerMetrics{ chainID: chainID, Labeler: metrics.NewLabeler().With("chainID", chainID), @@ -233,13 +197,9 @@ func newMetrics(chainID string) (*logPollerMetrics, error) { pollDuration: pollDuration, pollErrors: pollErrors, blocksBehind: blocksBehind, - blocksProcessed: blocksProcessed, - logsInserted: logsInserted, loaderErrors: loaderErrors, parseErrors: parseErrors, - queryDuration: queryDuration, addressesMonitored: addressesMonitored, - queryResultSize: queryResultSize, txsProcessed: txsProcessed, msgsProcessed: msgsProcessed, logsMatched: logsMatched, @@ -247,6 +207,7 @@ func newMetrics(chainID string) (*logPollerMetrics, error) { logsDeleted: logsDeleted, pruningDuration: pruningDuration, pruningErrors: pruningErrors, + frameworkMetrics: fwMetrics, }, nil } @@ -281,18 +242,6 @@ func (m *logPollerMetrics) SetLastProcessedBlock(ctx context.Context, seqNo uint m.lastProcessedBlockSeqNo.Record(ctx, int64(seqNo), metric.WithAttributes(m.getOtelAttributes()...)) } -// AddBlocksProcessed increments the blocks processed counter -func (m *logPollerMetrics) AddBlocksProcessed(ctx context.Context, count int64) { - promTonLpBlocksProcessed.WithLabelValues(m.chainID).Add(float64(count)) - m.blocksProcessed.Add(ctx, count, metric.WithAttributes(m.getOtelAttributes()...)) -} - -// AddLogsInserted increments the logs inserted counter -func (m *logPollerMetrics) AddLogsInserted(ctx context.Context, count int64) { - promTonLpLogsInserted.WithLabelValues(m.chainID).Add(float64(count)) - m.logsInserted.Add(ctx, count, metric.WithAttributes(m.getOtelAttributes()...)) -} - // IncrementLoaderErrors increments the loader error counter func (m *logPollerMetrics) IncrementLoaderErrors(ctx context.Context) { promTonLpLoaderErrors.WithLabelValues(m.chainID).Inc() @@ -305,27 +254,12 @@ func (m *logPollerMetrics) IncrementParseErrors(ctx context.Context) { m.parseErrors.Add(ctx, 1, metric.WithAttributes(m.getOtelAttributes()...)) } -// RecordQueryDuration records the duration of a database query -func (m *logPollerMetrics) RecordQueryDuration(ctx context.Context, queryName string, queryType frameworkmetrics.QueryType, duration time.Duration) { - seconds := duration.Seconds() - promTonLpQueryDuration.WithLabelValues(m.chainID, queryName, string(queryType)).Set(seconds) - attrs := append(m.getOtelAttributes(), attribute.String("query", queryName), attribute.String("type", string(queryType))) - m.queryDuration.Record(ctx, seconds, metric.WithAttributes(attrs...)) -} - // SetAddressesMonitored sets the number of addresses being monitored func (m *logPollerMetrics) SetAddressesMonitored(ctx context.Context, count int) { promTonLpAddressesMonitored.WithLabelValues(m.chainID).Set(float64(count)) m.addressesMonitored.Record(ctx, int64(count), metric.WithAttributes(m.getOtelAttributes()...)) } -// SetQueryResultSize sets the result size of a query -func (m *logPollerMetrics) SetQueryResultSize(ctx context.Context, queryName string, count int) { - promTonLpQueryResultSize.WithLabelValues(m.chainID, queryName).Set(float64(count)) - attrs := append(m.getOtelAttributes(), attribute.String("query", queryName)) - m.queryResultSize.Record(ctx, int64(count), metric.WithAttributes(attrs...)) -} - // IncrementTxsProcessed increments the transactions processed counter func (m *logPollerMetrics) IncrementTxsProcessed(ctx context.Context) { promTonLpTxsProcessed.WithLabelValues(m.chainID).Inc() diff --git a/pkg/logpoller/observed_filter_store.go b/pkg/logpoller/observed_filter_store.go index 7c041cb2f..17b058a73 100644 --- a/pkg/logpoller/observed_filter_store.go +++ b/pkg/logpoller/observed_filter_store.go @@ -34,9 +34,7 @@ func NewObservedFilterStore(store FilterStore, metrics *logPollerMetrics, lggr l func (o *ObservedFilterStore) RegisterFilter(ctx context.Context, flt models.Filter) (int64, error) { start := time.Now() id, err := o.FilterStore.RegisterFilter(ctx, flt) - - o.metrics.RecordQueryDuration(ctx, "RegisterFilter", frameworkmetrics.Create, time.Since(start)) - + o.metrics.frameworkMetrics.RecordQueryDuration(ctx, "RegisterFilter", frameworkmetrics.Create, time.Since(start).Seconds()) return id, err } @@ -44,9 +42,7 @@ func (o *ObservedFilterStore) RegisterFilter(ctx context.Context, flt models.Fil func (o *ObservedFilterStore) UnregisterFilter(ctx context.Context, name string) error { start := time.Now() err := o.FilterStore.UnregisterFilter(ctx, name) - - o.metrics.RecordQueryDuration(ctx, "UnregisterFilter", frameworkmetrics.Del, time.Since(start)) - + o.metrics.frameworkMetrics.RecordQueryDuration(ctx, "UnregisterFilter", frameworkmetrics.Del, time.Since(start).Seconds()) return err } @@ -54,9 +50,7 @@ func (o *ObservedFilterStore) UnregisterFilter(ctx context.Context, name string) func (o *ObservedFilterStore) HasFilter(ctx context.Context, name string) (bool, error) { start := time.Now() exists, err := o.FilterStore.HasFilter(ctx, name) - - o.metrics.RecordQueryDuration(ctx, "HasFilter", frameworkmetrics.Read, time.Since(start)) - + o.metrics.frameworkMetrics.RecordQueryDuration(ctx, "HasFilter", frameworkmetrics.Read, time.Since(start).Seconds()) return exists, err } @@ -64,12 +58,10 @@ func (o *ObservedFilterStore) HasFilter(ctx context.Context, name string) (bool, func (o *ObservedFilterStore) GetDistinctAddresses(ctx context.Context) ([]*address.Address, error) { start := time.Now() addresses, err := o.FilterStore.GetDistinctAddresses(ctx) - - o.metrics.RecordQueryDuration(ctx, "GetDistinctAddresses", frameworkmetrics.Read, time.Since(start)) + o.metrics.frameworkMetrics.RecordQueryDuration(ctx, "GetDistinctAddresses", frameworkmetrics.Read, time.Since(start).Seconds()) if err == nil { o.metrics.SetAddressesMonitored(ctx, len(addresses)) } - return addresses, err } @@ -77,12 +69,10 @@ func (o *ObservedFilterStore) GetDistinctAddresses(ctx context.Context) ([]*addr func (o *ObservedFilterStore) GetFiltersByAddress(ctx context.Context, addr *address.Address) ([]models.Filter, error) { start := time.Now() filters, err := o.FilterStore.GetFiltersByAddress(ctx, addr) - - o.metrics.RecordQueryDuration(ctx, "GetFiltersByAddress", frameworkmetrics.Read, time.Since(start)) + o.metrics.frameworkMetrics.RecordQueryDuration(ctx, "GetFiltersByAddress", frameworkmetrics.Read, time.Since(start).Seconds()) if err == nil { - o.metrics.SetQueryResultSize(ctx, "GetFiltersByAddress", len(filters)) + o.metrics.frameworkMetrics.RecordQueryDatasetSize(ctx, "GetFiltersByAddress", frameworkmetrics.Read, int64(len(filters))) } - return filters, err } @@ -90,11 +80,9 @@ func (o *ObservedFilterStore) GetFiltersByAddress(ctx context.Context, addr *add func (o *ObservedFilterStore) GetAllActiveFilters(ctx context.Context) ([]models.Filter, error) { start := time.Now() filters, err := o.FilterStore.GetAllActiveFilters(ctx) - - o.metrics.RecordQueryDuration(ctx, "GetAllActiveFilters", frameworkmetrics.Read, time.Since(start)) + o.metrics.frameworkMetrics.RecordQueryDuration(ctx, "GetAllActiveFilters", frameworkmetrics.Read, time.Since(start).Seconds()) if err == nil { - o.metrics.SetQueryResultSize(ctx, "GetAllActiveFilters", len(filters)) + o.metrics.frameworkMetrics.RecordQueryDatasetSize(ctx, "GetAllActiveFilters", frameworkmetrics.Read, int64(len(filters))) } - return filters, err } diff --git a/pkg/logpoller/observed_log_store.go b/pkg/logpoller/observed_log_store.go index ecd0c336c..3af9be370 100644 --- a/pkg/logpoller/observed_log_store.go +++ b/pkg/logpoller/observed_log_store.go @@ -34,9 +34,20 @@ func (o *ObservedLogStore) SaveLogs(ctx context.Context, logs []models.Log, batc start := time.Now() count, err := o.LogStore.SaveLogs(ctx, logs, batchInsertSize, minBatchSize) - o.metrics.RecordQueryDuration(ctx, "SaveLogs", frameworkmetrics.Create, time.Since(start)) + o.metrics.frameworkMetrics.RecordQueryDuration(ctx, "SaveLogs", frameworkmetrics.Create, time.Since(start).Seconds()) if err == nil && count > 0 { - o.metrics.AddLogsInserted(ctx, count) + o.metrics.frameworkMetrics.IncrementLogsInserted(ctx, count) + + // Record discovery latency using the newest log's TxTimestamp. + // TxTimestamp is set from tx.Transaction.Now (uint32 Unix seconds) during parsing. + // Latency = wall clock now - transaction timestamp, in seconds. + newestTimestamp := logs[0].TxTimestamp + for _, l := range logs[1:] { + if l.TxTimestamp.After(newestTimestamp) { + newestTimestamp = l.TxTimestamp + } + } + o.metrics.frameworkMetrics.RecordLogDiscoveryLatency(ctx, time.Since(newestTimestamp).Seconds()) } return count, err @@ -47,9 +58,9 @@ func (o *ObservedLogStore) QueryLogs(ctx context.Context, logQuery *query.LogQue start := time.Now() logs, hasMore, nextCursor, err := o.LogStore.QueryLogs(ctx, logQuery) - o.metrics.RecordQueryDuration(ctx, "QueryLogs", frameworkmetrics.Read, time.Since(start)) + o.metrics.frameworkMetrics.RecordQueryDuration(ctx, "QueryLogs", frameworkmetrics.Read, time.Since(start).Seconds()) if err == nil { - o.metrics.SetQueryResultSize(ctx, "QueryLogs", len(logs)) + o.metrics.frameworkMetrics.RecordQueryDatasetSize(ctx, "QueryLogs", frameworkmetrics.Read, int64(len(logs))) } return logs, hasMore, nextCursor, err @@ -60,7 +71,7 @@ func (o *ObservedLogStore) GetHighestMCBlockSeqno(ctx context.Context) (uint32, start := time.Now() seqno, exists, err := o.LogStore.GetHighestMCBlockSeqno(ctx) - o.metrics.RecordQueryDuration(ctx, "GetHighestMCBlockSeqno", frameworkmetrics.Read, time.Since(start)) + o.metrics.frameworkMetrics.RecordQueryDuration(ctx, "GetHighestMCBlockSeqno", frameworkmetrics.Read, time.Since(start).Seconds()) return seqno, exists, err } diff --git a/pkg/logpoller/service.go b/pkg/logpoller/service.go index cfdfbe630..cf03484da 100644 --- a/pkg/logpoller/service.go +++ b/pkg/logpoller/service.go @@ -288,7 +288,6 @@ func (lp *service) run(ctx context.Context) (err error) { lp.lastProcessedBlockSeqNo = blockRange.ToSeqNo() lp.metrics.SetLastProcessedBlock(ctx, lp.lastProcessedBlockSeqNo) - lp.metrics.AddBlocksProcessed(ctx, int64(blockRange.ToSeqNo()-blockRange.FromSeqNo())) return nil }