Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 9 additions & 75 deletions pkg/logpoller/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,6 @@ var (
Help: "Last processed masterchain block sequence number",
}, []string{"chainID"})

promTonLpBlocksProcessed = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "ton_logpoller_blocks_processed_total",
Help: "Total number of blocks processed",
}, []string{"chainID"})

promTonLpLogsInserted = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "ton_logpoller_logs_inserted_total",
Help: "Total number of logs inserted to database",
}, []string{"chainID"})

promTonLpLoaderErrors = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "ton_logpoller_loader_errors_total",
Help: "Total number of transaction loading errors",
Expand All @@ -58,21 +48,11 @@ var (
}, []string{"chainID"})

// Query metrics for observed stores
promTonLpQueryDuration = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "ton_logpoller_query_duration_seconds",
Help: "Duration of last database query by operation",
}, []string{"chainID", "query", "type"})

promTonLpAddressesMonitored = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "ton_logpoller_addresses_monitored",
Help: "Number of addresses being monitored",
}, []string{"chainID"})
Comment on lines 50 to 54
Copy link

Copilot AI Mar 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change removes several Prometheus/OTel query and ingestion metrics (e.g. ton_logpoller_query_duration_seconds, ton_logpoller_query_result_size, and the logs/blocks processed counters) from this package. That is an operationally breaking change for any existing dashboards/alerts scraping the ton_logpoller_* series; consider keeping backward-compatible aliases during a deprecation window or documenting the metric renames/removals as part of the rollout.

Copilot uses AI. Check for mistakes.

promTonLpQueryResultSize = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "ton_logpoller_query_result_size",
Help: "Number of rows returned by query",
}, []string{"chainID", "query"})

// Parser pipeline metrics
promTonLpTxsProcessed = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "ton_logpoller_txs_processed_total",
Expand Down Expand Up @@ -116,15 +96,14 @@ type logPollerMetrics struct {
pollErrors metric.Int64Counter
blocksBehind metric.Int64Gauge
lastProcessedBlockSeqNo metric.Int64Gauge
blocksProcessed metric.Int64Counter
logsInserted metric.Int64Counter
loaderErrors metric.Int64Counter
parseErrors metric.Int64Counter

// query metrics for observed stores (OTel)
queryDuration metric.Float64Gauge
addressesMonitored metric.Int64Gauge
queryResultSize metric.Int64Gauge

// framework metrics (shared cross-chain observability)
frameworkMetrics frameworkmetrics.GenericLogPollerMetrics

// parser pipeline metrics (OTel)
txsProcessed metric.Int64Counter
Expand Down Expand Up @@ -161,16 +140,6 @@ func newMetrics(chainID string) (*logPollerMetrics, error) {
return nil, fmt.Errorf("failed to register last processed block: %w", err)
}

blocksProcessed, err := m.Int64Counter("ton_logpoller_blocks_processed_total")
if err != nil {
return nil, fmt.Errorf("failed to register blocks processed: %w", err)
}

logsInserted, err := m.Int64Counter("ton_logpoller_logs_inserted_total")
if err != nil {
return nil, fmt.Errorf("failed to register logs inserted: %w", err)
}

loaderErrors, err := m.Int64Counter("ton_logpoller_loader_errors_total")
if err != nil {
return nil, fmt.Errorf("failed to register loader errors: %w", err)
Expand All @@ -181,21 +150,11 @@ func newMetrics(chainID string) (*logPollerMetrics, error) {
return nil, fmt.Errorf("failed to register parse errors: %w", err)
}

queryDuration, err := m.Float64Gauge("ton_logpoller_query_duration_seconds")
if err != nil {
return nil, fmt.Errorf("failed to register query duration: %w", err)
}

addressesMonitored, err := m.Int64Gauge("ton_logpoller_addresses_monitored")
if err != nil {
return nil, fmt.Errorf("failed to register addresses monitored: %w", err)
}

queryResultSize, err := m.Int64Gauge("ton_logpoller_query_result_size")
if err != nil {
return nil, fmt.Errorf("failed to register query result size: %w", err)
}

txsProcessed, err := m.Int64Counter("ton_logpoller_txs_processed_total")
if err != nil {
return nil, fmt.Errorf("failed to register txs processed: %w", err)
Expand Down Expand Up @@ -226,27 +185,29 @@ func newMetrics(chainID string) (*logPollerMetrics, error) {
return nil, fmt.Errorf("failed to register pruning errors: %w", err)
}

fwMetrics, err := frameworkmetrics.NewGenericLogPollerMetrics(chainID, "ton")
if err != nil {
return nil, fmt.Errorf("failed to initialize framework logpoller metrics: %w", err)
}

return &logPollerMetrics{
chainID: chainID,
Labeler: metrics.NewLabeler().With("chainID", chainID),

pollDuration: pollDuration,
pollErrors: pollErrors,
blocksBehind: blocksBehind,
blocksProcessed: blocksProcessed,
logsInserted: logsInserted,
loaderErrors: loaderErrors,
parseErrors: parseErrors,
queryDuration: queryDuration,
addressesMonitored: addressesMonitored,
queryResultSize: queryResultSize,
txsProcessed: txsProcessed,
msgsProcessed: msgsProcessed,
logsMatched: logsMatched,
lastProcessedBlockSeqNo: lastProcessedBlockSeqNo,
logsDeleted: logsDeleted,
pruningDuration: pruningDuration,
pruningErrors: pruningErrors,
frameworkMetrics: fwMetrics,
}, nil
}

Expand Down Expand Up @@ -281,18 +242,6 @@ func (m *logPollerMetrics) SetLastProcessedBlock(ctx context.Context, seqNo uint
m.lastProcessedBlockSeqNo.Record(ctx, int64(seqNo), metric.WithAttributes(m.getOtelAttributes()...))
}

// AddBlocksProcessed increments the blocks processed counter
func (m *logPollerMetrics) AddBlocksProcessed(ctx context.Context, count int64) {
promTonLpBlocksProcessed.WithLabelValues(m.chainID).Add(float64(count))
m.blocksProcessed.Add(ctx, count, metric.WithAttributes(m.getOtelAttributes()...))
}

// AddLogsInserted increments the logs inserted counter
func (m *logPollerMetrics) AddLogsInserted(ctx context.Context, count int64) {
promTonLpLogsInserted.WithLabelValues(m.chainID).Add(float64(count))
m.logsInserted.Add(ctx, count, metric.WithAttributes(m.getOtelAttributes()...))
}

// IncrementLoaderErrors increments the loader error counter
func (m *logPollerMetrics) IncrementLoaderErrors(ctx context.Context) {
promTonLpLoaderErrors.WithLabelValues(m.chainID).Inc()
Expand All @@ -305,27 +254,12 @@ func (m *logPollerMetrics) IncrementParseErrors(ctx context.Context) {
m.parseErrors.Add(ctx, 1, metric.WithAttributes(m.getOtelAttributes()...))
}

// RecordQueryDuration records the duration of a database query
func (m *logPollerMetrics) RecordQueryDuration(ctx context.Context, queryName string, queryType frameworkmetrics.QueryType, duration time.Duration) {
seconds := duration.Seconds()
promTonLpQueryDuration.WithLabelValues(m.chainID, queryName, string(queryType)).Set(seconds)
attrs := append(m.getOtelAttributes(), attribute.String("query", queryName), attribute.String("type", string(queryType)))
m.queryDuration.Record(ctx, seconds, metric.WithAttributes(attrs...))
}

// SetAddressesMonitored sets the number of addresses being monitored
func (m *logPollerMetrics) SetAddressesMonitored(ctx context.Context, count int) {
promTonLpAddressesMonitored.WithLabelValues(m.chainID).Set(float64(count))
m.addressesMonitored.Record(ctx, int64(count), metric.WithAttributes(m.getOtelAttributes()...))
}

// SetQueryResultSize sets the result size of a query
func (m *logPollerMetrics) SetQueryResultSize(ctx context.Context, queryName string, count int) {
promTonLpQueryResultSize.WithLabelValues(m.chainID, queryName).Set(float64(count))
attrs := append(m.getOtelAttributes(), attribute.String("query", queryName))
m.queryResultSize.Record(ctx, int64(count), metric.WithAttributes(attrs...))
}

// IncrementTxsProcessed increments the transactions processed counter
func (m *logPollerMetrics) IncrementTxsProcessed(ctx context.Context) {
promTonLpTxsProcessed.WithLabelValues(m.chainID).Inc()
Expand Down
28 changes: 8 additions & 20 deletions pkg/logpoller/observed_filter_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,67 +34,55 @@ func NewObservedFilterStore(store FilterStore, metrics *logPollerMetrics, lggr l
func (o *ObservedFilterStore) RegisterFilter(ctx context.Context, flt models.Filter) (int64, error) {
start := time.Now()
id, err := o.FilterStore.RegisterFilter(ctx, flt)

o.metrics.RecordQueryDuration(ctx, "RegisterFilter", frameworkmetrics.Create, time.Since(start))

o.metrics.frameworkMetrics.RecordQueryDuration(ctx, "RegisterFilter", frameworkmetrics.Create, time.Since(start).Seconds())
return id, err
}

// UnregisterFilter wraps the underlying UnregisterFilter with metrics
func (o *ObservedFilterStore) UnregisterFilter(ctx context.Context, name string) error {
start := time.Now()
err := o.FilterStore.UnregisterFilter(ctx, name)

o.metrics.RecordQueryDuration(ctx, "UnregisterFilter", frameworkmetrics.Del, time.Since(start))

o.metrics.frameworkMetrics.RecordQueryDuration(ctx, "UnregisterFilter", frameworkmetrics.Del, time.Since(start).Seconds())
return err
}

// HasFilter wraps the underlying HasFilter with metrics
func (o *ObservedFilterStore) HasFilter(ctx context.Context, name string) (bool, error) {
start := time.Now()
exists, err := o.FilterStore.HasFilter(ctx, name)

o.metrics.RecordQueryDuration(ctx, "HasFilter", frameworkmetrics.Read, time.Since(start))

o.metrics.frameworkMetrics.RecordQueryDuration(ctx, "HasFilter", frameworkmetrics.Read, time.Since(start).Seconds())
return exists, err
}

// GetDistinctAddresses wraps the underlying GetDistinctAddresses with metrics
func (o *ObservedFilterStore) GetDistinctAddresses(ctx context.Context) ([]*address.Address, error) {
start := time.Now()
addresses, err := o.FilterStore.GetDistinctAddresses(ctx)

o.metrics.RecordQueryDuration(ctx, "GetDistinctAddresses", frameworkmetrics.Read, time.Since(start))
o.metrics.frameworkMetrics.RecordQueryDuration(ctx, "GetDistinctAddresses", frameworkmetrics.Read, time.Since(start).Seconds())
if err == nil {
o.metrics.SetAddressesMonitored(ctx, len(addresses))
}

return addresses, err
}

// GetFiltersByAddress wraps the underlying GetFiltersByAddress with metrics
func (o *ObservedFilterStore) GetFiltersByAddress(ctx context.Context, addr *address.Address) ([]models.Filter, error) {
start := time.Now()
filters, err := o.FilterStore.GetFiltersByAddress(ctx, addr)

o.metrics.RecordQueryDuration(ctx, "GetFiltersByAddress", frameworkmetrics.Read, time.Since(start))
o.metrics.frameworkMetrics.RecordQueryDuration(ctx, "GetFiltersByAddress", frameworkmetrics.Read, time.Since(start).Seconds())
if err == nil {
o.metrics.SetQueryResultSize(ctx, "GetFiltersByAddress", len(filters))
o.metrics.frameworkMetrics.RecordQueryDatasetSize(ctx, "GetFiltersByAddress", frameworkmetrics.Read, int64(len(filters)))
}

return filters, err
}

// GetAllActiveFilters wraps the underlying GetAllActiveFilters with metrics
func (o *ObservedFilterStore) GetAllActiveFilters(ctx context.Context) ([]models.Filter, error) {
start := time.Now()
filters, err := o.FilterStore.GetAllActiveFilters(ctx)

o.metrics.RecordQueryDuration(ctx, "GetAllActiveFilters", frameworkmetrics.Read, time.Since(start))
o.metrics.frameworkMetrics.RecordQueryDuration(ctx, "GetAllActiveFilters", frameworkmetrics.Read, time.Since(start).Seconds())
if err == nil {
o.metrics.SetQueryResultSize(ctx, "GetAllActiveFilters", len(filters))
o.metrics.frameworkMetrics.RecordQueryDatasetSize(ctx, "GetAllActiveFilters", frameworkmetrics.Read, int64(len(filters)))
}

return filters, err
}
21 changes: 16 additions & 5 deletions pkg/logpoller/observed_log_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,20 @@ func (o *ObservedLogStore) SaveLogs(ctx context.Context, logs []models.Log, batc
start := time.Now()
count, err := o.LogStore.SaveLogs(ctx, logs, batchInsertSize, minBatchSize)

o.metrics.RecordQueryDuration(ctx, "SaveLogs", frameworkmetrics.Create, time.Since(start))
o.metrics.frameworkMetrics.RecordQueryDuration(ctx, "SaveLogs", frameworkmetrics.Create, time.Since(start).Seconds())
if err == nil && count > 0 {
o.metrics.AddLogsInserted(ctx, count)
o.metrics.frameworkMetrics.IncrementLogsInserted(ctx, count)

// Record discovery latency using the newest log's TxTimestamp.
// TxTimestamp is set from tx.Transaction.Now (uint32 Unix seconds) during parsing.
// Latency = wall clock now - transaction timestamp, in seconds.
newestTimestamp := logs[0].TxTimestamp
for _, l := range logs[1:] {
if l.TxTimestamp.After(newestTimestamp) {
newestTimestamp = l.TxTimestamp
}
}
o.metrics.frameworkMetrics.RecordLogDiscoveryLatency(ctx, time.Since(newestTimestamp).Seconds())
Copy link

Copilot AI Mar 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

time.Since(newestTimestamp) can return a negative duration if TxTimestamp is ahead of the local wall clock (clock skew or differing time sources), which would record a negative discovery latency. Consider clamping the value to >= 0 before emitting the metric to avoid invalid/confusing latency datapoints.

Suggested change
o.metrics.frameworkMetrics.RecordLogDiscoveryLatency(ctx, time.Since(newestTimestamp).Seconds())
latency := time.Since(newestTimestamp)
if latency < 0 {
latency = 0
}
o.metrics.frameworkMetrics.RecordLogDiscoveryLatency(ctx, latency.Seconds())

Copilot uses AI. Check for mistakes.
}

return count, err
Expand All @@ -47,9 +58,9 @@ func (o *ObservedLogStore) QueryLogs(ctx context.Context, logQuery *query.LogQue
start := time.Now()
logs, hasMore, nextCursor, err := o.LogStore.QueryLogs(ctx, logQuery)

o.metrics.RecordQueryDuration(ctx, "QueryLogs", frameworkmetrics.Read, time.Since(start))
o.metrics.frameworkMetrics.RecordQueryDuration(ctx, "QueryLogs", frameworkmetrics.Read, time.Since(start).Seconds())
if err == nil {
o.metrics.SetQueryResultSize(ctx, "QueryLogs", len(logs))
o.metrics.frameworkMetrics.RecordQueryDatasetSize(ctx, "QueryLogs", frameworkmetrics.Read, int64(len(logs)))
}

return logs, hasMore, nextCursor, err
Expand All @@ -60,7 +71,7 @@ func (o *ObservedLogStore) GetHighestMCBlockSeqno(ctx context.Context) (uint32,
start := time.Now()
seqno, exists, err := o.LogStore.GetHighestMCBlockSeqno(ctx)

o.metrics.RecordQueryDuration(ctx, "GetHighestMCBlockSeqno", frameworkmetrics.Read, time.Since(start))
o.metrics.frameworkMetrics.RecordQueryDuration(ctx, "GetHighestMCBlockSeqno", frameworkmetrics.Read, time.Since(start).Seconds())

return seqno, exists, err
}
1 change: 0 additions & 1 deletion pkg/logpoller/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,6 @@ func (lp *service) run(ctx context.Context) (err error) {

lp.lastProcessedBlockSeqNo = blockRange.ToSeqNo()
lp.metrics.SetLastProcessedBlock(ctx, lp.lastProcessedBlockSeqNo)
lp.metrics.AddBlocksProcessed(ctx, int64(blockRange.ToSeqNo()-blockRange.FromSeqNo()))

Comment on lines 289 to 291
Copy link

Copilot AI Mar 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AddBlocksProcessed was removed and there is no replacement call to a shared/framework metric here, so the logpoller will no longer emit any “blocks processed” metric for a successful run. If blocks-processed is still a required SLI, record it via the new frameworkMetrics (or keep the existing metric) rather than dropping it silently; otherwise please update the PR scope/description to clarify the intentional removal.

Copilot uses AI. Check for mistakes.
return nil
}
Expand Down
Loading