From 5ea1adb956753005c499c579adc36cfd5a95d708 Mon Sep 17 00:00:00 2001 From: "claude[bot]" <41898282+claude[bot]@users.noreply.github.com> Date: Mon, 13 Oct 2025 14:53:11 +0000 Subject: [PATCH 1/5] feat: add Prometheus metrics for DA submission failures MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add comprehensive Prometheus metrics to track DA submission failures and retry behavior in the sequencer: - da_submitter_failures_total{reason}: Counter for failures by reason (timeout, too_big, already_in_mempool, not_included_in_block, context_canceled, unknown) - da_submitter_last_failure_timestamp{reason}: Timestamp of last failure - da_submitter_pending_blobs: Current number of pending blobs - da_submitter_resends_total: Total retry attempts This addresses issue #2755 by providing better observability into DA layer submission problems, allowing operators to diagnose and respond to failures more effectively. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-authored-by: Marko --- block/components.go | 4 +- block/internal/common/metrics.go | 140 ++++++++++++++---- block/internal/submitting/da_submitter.go | 56 ++++++- .../da_submitter_integration_test.go | 2 +- .../submitting/da_submitter_mocks_test.go | 2 +- .../internal/submitting/da_submitter_test.go | 2 + block/internal/submitting/submitter_test.go | 4 +- 7 files changed, 171 insertions(+), 39 deletions(-) diff --git a/block/components.go b/block/components.go index d1c0084da2..3ee2062acf 100644 --- a/block/components.go +++ b/block/components.go @@ -161,7 +161,7 @@ func NewSyncComponents( ) // Create DA submitter for sync nodes (no signer, only DA inclusion processing) - daSubmitter := submitting.NewDASubmitter(da, config, genesis, blockOpts, logger) + daSubmitter := submitting.NewDASubmitter(da, config, genesis, blockOpts, metrics, logger) submitter := submitting.NewSubmitter( store, exec, @@ -240,7 +240,7 @@ func NewAggregatorComponents( } // Create DA submitter for aggregator nodes (with signer for submission) - daSubmitter := submitting.NewDASubmitter(da, config, genesis, blockOpts, logger) + daSubmitter := submitting.NewDASubmitter(da, config, genesis, blockOpts, metrics, logger) submitter := submitting.NewSubmitter( store, exec, diff --git a/block/internal/common/metrics.go b/block/internal/common/metrics.go index b079eaef09..b2164be069 100644 --- a/block/internal/common/metrics.go +++ b/block/internal/common/metrics.go @@ -63,6 +63,12 @@ type Metrics struct { // State transition metrics StateTransitions map[string]metrics.Counter InvalidTransitions metrics.Counter + + // DA Submitter metrics + DASubmitterFailures map[string]metrics.Counter // Counter with reason label + DASubmitterLastFailure map[string]metrics.Gauge // Timestamp gauge with reason label + DASubmitterPendingBlobs metrics.Gauge // Number of pending blobs + DASubmitterResends metrics.Counter // Number of resend attempts } // PrometheusMetrics returns Metrics built using Prometheus client library @@ -73,10 +79,12 @@ func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics { } m := &Metrics{ - ChannelBufferUsage: make(map[string]metrics.Gauge), - ErrorsByType: make(map[string]metrics.Counter), - OperationDuration: make(map[string]metrics.Histogram), - StateTransitions: make(map[string]metrics.Counter), + ChannelBufferUsage: make(map[string]metrics.Gauge), + ErrorsByType: make(map[string]metrics.Counter), + OperationDuration: make(map[string]metrics.Histogram), + StateTransitions: make(map[string]metrics.Counter), + DASubmitterFailures: make(map[string]metrics.Counter), + DASubmitterLastFailure: make(map[string]metrics.Gauge), } // Original metrics @@ -349,6 +357,54 @@ func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics { }, labels).With(labelsAndValues...) } + // DA Submitter metrics + m.DASubmitterPendingBlobs = prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "da_submitter_pending_blobs", + Help: "Number of blobs pending DA submission", + }, labels).With(labelsAndValues...) + + m.DASubmitterResends = prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "da_submitter_resends_total", + Help: "Total number of DA submission retry attempts", + }, labels).With(labelsAndValues...) + + // Initialize DA submitter failure counters and timestamps for various reasons + failureReasons := []string{ + "already_rejected", + "insufficient_fee", + "timeout", + "already_in_mempool", + "not_included_in_block", + "too_big", + "context_canceled", + "unknown", + } + for _, reason := range failureReasons { + m.DASubmitterFailures[reason] = prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "da_submitter_failures_total", + Help: "Total number of DA submission failures by reason", + ConstLabels: map[string]string{ + "reason": reason, + }, + }, labels).With(labelsAndValues...) + + m.DASubmitterLastFailure[reason] = prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "da_submitter_last_failure_timestamp", + Help: "Unix timestamp of the last DA submission failure by reason", + ConstLabels: map[string]string{ + "reason": reason, + }, + }, labels).With(labelsAndValues...) + } + return m } @@ -363,34 +419,38 @@ func NopMetrics() *Metrics { CommittedHeight: discard.NewGauge(), // Extended metrics - ChannelBufferUsage: make(map[string]metrics.Gauge), - ErrorsByType: make(map[string]metrics.Counter), - OperationDuration: make(map[string]metrics.Histogram), - StateTransitions: make(map[string]metrics.Counter), - DroppedSignals: discard.NewCounter(), - RecoverableErrors: discard.NewCounter(), - NonRecoverableErrors: discard.NewCounter(), - GoroutineCount: discard.NewGauge(), - DASubmissionAttempts: discard.NewCounter(), - DASubmissionSuccesses: discard.NewCounter(), - DASubmissionFailures: discard.NewCounter(), - DARetrievalAttempts: discard.NewCounter(), - DARetrievalSuccesses: discard.NewCounter(), - DARetrievalFailures: discard.NewCounter(), - DAInclusionHeight: discard.NewGauge(), - PendingHeadersCount: discard.NewGauge(), - PendingDataCount: discard.NewGauge(), - SyncLag: discard.NewGauge(), - HeadersSynced: discard.NewCounter(), - DataSynced: discard.NewCounter(), - BlocksApplied: discard.NewCounter(), - InvalidHeadersCount: discard.NewCounter(), - BlockProductionTime: discard.NewHistogram(), - EmptyBlocksProduced: discard.NewCounter(), - LazyBlocksProduced: discard.NewCounter(), - NormalBlocksProduced: discard.NewCounter(), - TxsPerBlock: discard.NewHistogram(), - InvalidTransitions: discard.NewCounter(), + ChannelBufferUsage: make(map[string]metrics.Gauge), + ErrorsByType: make(map[string]metrics.Counter), + OperationDuration: make(map[string]metrics.Histogram), + StateTransitions: make(map[string]metrics.Counter), + DroppedSignals: discard.NewCounter(), + RecoverableErrors: discard.NewCounter(), + NonRecoverableErrors: discard.NewCounter(), + GoroutineCount: discard.NewGauge(), + DASubmissionAttempts: discard.NewCounter(), + DASubmissionSuccesses: discard.NewCounter(), + DASubmissionFailures: discard.NewCounter(), + DARetrievalAttempts: discard.NewCounter(), + DARetrievalSuccesses: discard.NewCounter(), + DARetrievalFailures: discard.NewCounter(), + DAInclusionHeight: discard.NewGauge(), + PendingHeadersCount: discard.NewGauge(), + PendingDataCount: discard.NewGauge(), + SyncLag: discard.NewGauge(), + HeadersSynced: discard.NewCounter(), + DataSynced: discard.NewCounter(), + BlocksApplied: discard.NewCounter(), + InvalidHeadersCount: discard.NewCounter(), + BlockProductionTime: discard.NewHistogram(), + EmptyBlocksProduced: discard.NewCounter(), + LazyBlocksProduced: discard.NewCounter(), + NormalBlocksProduced: discard.NewCounter(), + TxsPerBlock: discard.NewHistogram(), + InvalidTransitions: discard.NewCounter(), + DASubmitterFailures: make(map[string]metrics.Counter), + DASubmitterLastFailure: make(map[string]metrics.Gauge), + DASubmitterPendingBlobs: discard.NewGauge(), + DASubmitterResends: discard.NewCounter(), } // Initialize maps with no-op metrics @@ -414,5 +474,21 @@ func NopMetrics() *Metrics { m.StateTransitions[transition] = discard.NewCounter() } + // Initialize DA submitter failure maps with no-op metrics + failureReasons := []string{ + "already_rejected", + "insufficient_fee", + "timeout", + "already_in_mempool", + "not_included_in_block", + "too_big", + "context_canceled", + "unknown", + } + for _, reason := range failureReasons { + m.DASubmitterFailures[reason] = discard.NewCounter() + m.DASubmitterLastFailure[reason] = discard.NewGauge() + } + return m } diff --git a/block/internal/submitting/da_submitter.go b/block/internal/submitting/da_submitter.go index d47c4a2296..0f38d9fcc2 100644 --- a/block/internal/submitting/da_submitter.go +++ b/block/internal/submitting/da_submitter.go @@ -120,6 +120,7 @@ type DASubmitter struct { genesis genesis.Genesis options common.BlockOptions logger zerolog.Logger + metrics *common.Metrics // calculate namespaces bytes once and reuse them namespaceBz []byte @@ -132,6 +133,7 @@ func NewDASubmitter( config config.Config, genesis genesis.Genesis, options common.BlockOptions, + metrics *common.Metrics, logger zerolog.Logger, ) *DASubmitter { daSubmitterLogger := logger.With().Str("component", "da_submitter").Logger() @@ -146,12 +148,26 @@ func NewDASubmitter( config: config, genesis: genesis, options: options, + metrics: metrics, logger: daSubmitterLogger, namespaceBz: coreda.NamespaceFromString(config.DA.GetNamespace()).Bytes(), namespaceDataBz: coreda.NamespaceFromString(config.DA.GetDataNamespace()).Bytes(), } } +// recordFailure records a DA submission failure in metrics +func (s *DASubmitter) recordFailure(reason string) { + if s.metrics == nil { + return + } + if counter, ok := s.metrics.DASubmitterFailures[reason]; ok { + counter.Add(1) + } + if gauge, ok := s.metrics.DASubmitterLastFailure[reason]; ok { + gauge.Set(float64(time.Now().Unix())) + } +} + // getGasMultiplier fetches the gas multiplier from DA layer with fallback and clamping func (s *DASubmitter) getGasMultiplier(ctx context.Context, pol retryPolicy) float64 { gasMultiplier, err := s.da.GasMultiplier(ctx) @@ -352,8 +368,18 @@ func submitToDA[T any]( marshaled = batchMarshaled } + // Update pending blobs metric + if s.metrics != nil { + s.metrics.DASubmitterPendingBlobs.Set(float64(len(items))) + } + // Start the retry loop for rs.Attempt < pol.MaxAttempts { + // Record resend metric for retry attempts (not the first attempt) + if rs.Attempt > 0 && s.metrics != nil { + s.metrics.DASubmitterResends.Add(1) + } + if err := waitForBackoffOrContext(ctx, rs.Backoff); err != nil { return err } @@ -375,14 +401,24 @@ func submitToDA[T any]( s.logger.Info().Str("itemType", itemType).Float64("gasPrice", rs.GasPrice).Uint64("count", res.SubmittedCount).Msg("successfully submitted items to DA layer") if int(res.SubmittedCount) == len(items) { rs.Next(reasonSuccess, pol, gm, sentinelNoGas) + // Clear pending blobs on success + if s.metrics != nil { + s.metrics.DASubmitterPendingBlobs.Set(0) + } return nil } // partial success: advance window items = items[res.SubmittedCount:] marshaled = marshaled[res.SubmittedCount:] rs.Next(reasonSuccess, pol, gm, sentinelNoGas) + // Update pending blobs count + if s.metrics != nil { + s.metrics.DASubmitterPendingBlobs.Set(float64(len(items))) + } case coreda.StatusTooBig: + // Record failure metric + s.recordFailure("too_big") // Iteratively halve until it fits or single-item too big if len(items) == 1 { s.logger.Error().Str("itemType", itemType).Msg("single item exceeds DA blob size limit") @@ -397,21 +433,39 @@ func submitToDA[T any]( marshaled = marshaled[:half] s.logger.Debug().Int("newBatchSize", half).Msg("batch too big; halving and retrying") rs.Next(reasonTooBig, pol, gm, sentinelNoGas) + // Update pending blobs count + if s.metrics != nil { + s.metrics.DASubmitterPendingBlobs.Set(float64(len(items))) + } + + case coreda.StatusNotIncludedInBlock: + // Record failure metric + s.recordFailure("not_included_in_block") + s.logger.Info().Dur("backoff", pol.MaxBackoff).Float64("gasPrice", rs.GasPrice).Msg("retrying due to mempool state") + rs.Next(reasonMempool, pol, gm, sentinelNoGas) - case coreda.StatusNotIncludedInBlock, coreda.StatusAlreadyInMempool: + case coreda.StatusAlreadyInMempool: + // Record failure metric + s.recordFailure("already_in_mempool") s.logger.Info().Dur("backoff", pol.MaxBackoff).Float64("gasPrice", rs.GasPrice).Msg("retrying due to mempool state") rs.Next(reasonMempool, pol, gm, sentinelNoGas) case coreda.StatusContextCanceled: + // Record failure metric + s.recordFailure("context_canceled") s.logger.Info().Msg("DA layer submission canceled due to context cancellation") return context.Canceled default: + // Record failure metric + s.recordFailure("unknown") s.logger.Error().Str("error", res.Message).Int("attempt", rs.Attempt+1).Msg("DA layer submission failed") rs.Next(reasonFailure, pol, gm, sentinelNoGas) } } + // Final failure after max attempts + s.recordFailure("timeout") return fmt.Errorf("failed to submit all %s(s) to DA layer after %d attempts", itemType, rs.Attempt) } diff --git a/block/internal/submitting/da_submitter_integration_test.go b/block/internal/submitting/da_submitter_integration_test.go index 223682f545..11713d707b 100644 --- a/block/internal/submitting/da_submitter_integration_test.go +++ b/block/internal/submitting/da_submitter_integration_test.go @@ -86,7 +86,7 @@ func TestDASubmitter_SubmitHeadersAndData_MarksInclusionAndUpdatesLastSubmitted( dummyDA := coreda.NewDummyDA(10_000_000, 0, 0, 10*time.Millisecond) // Create DA submitter - daSubmitter := NewDASubmitter(dummyDA, cfg, gen, common.DefaultBlockOptions(), zerolog.Nop()) + daSubmitter := NewDASubmitter(dummyDA, cfg, gen, common.DefaultBlockOptions(), common.NopMetrics(), zerolog.Nop()) // Submit headers and data require.NoError(t, daSubmitter.SubmitHeaders(context.Background(), cm)) diff --git a/block/internal/submitting/da_submitter_mocks_test.go b/block/internal/submitting/da_submitter_mocks_test.go index 2775ce7d62..9ef14fba36 100644 --- a/block/internal/submitting/da_submitter_mocks_test.go +++ b/block/internal/submitting/da_submitter_mocks_test.go @@ -28,7 +28,7 @@ func newTestSubmitter(mockDA *mocks.MockDA, override func(*config.Config)) *DASu if override != nil { override(&cfg) } - return NewDASubmitter(mockDA, cfg, genesis.Genesis{} /*options=*/, common.BlockOptions{}, zerolog.Nop()) + return NewDASubmitter(mockDA, cfg, genesis.Genesis{} /*options=*/, common.BlockOptions{}, common.NopMetrics(), zerolog.Nop()) } // marshal helper for simple items diff --git a/block/internal/submitting/da_submitter_test.go b/block/internal/submitting/da_submitter_test.go index c682c055e1..69af9ab1fa 100644 --- a/block/internal/submitting/da_submitter_test.go +++ b/block/internal/submitting/da_submitter_test.go @@ -56,6 +56,7 @@ func setupDASubmitterTest(t *testing.T) (*DASubmitter, store.Store, cache.Manage cfg, gen, common.DefaultBlockOptions(), + common.NopMetrics(), zerolog.Nop(), ) @@ -99,6 +100,7 @@ func TestNewDASubmitterSetsVisualizerWhenEnabled(t *testing.T) { cfg, genesis.Genesis{}, common.DefaultBlockOptions(), + common.NopMetrics(), zerolog.Nop(), ) diff --git a/block/internal/submitting/submitter_test.go b/block/internal/submitting/submitter_test.go index f86d7deb45..c05ebe3297 100644 --- a/block/internal/submitting/submitter_test.go +++ b/block/internal/submitting/submitter_test.go @@ -159,7 +159,7 @@ func TestSubmitter_setSequencerHeightToDAHeight(t *testing.T) { cfg := config.DefaultConfig() metrics := common.NopMetrics() - daSub := NewDASubmitter(nil, cfg, genesis.Genesis{}, common.DefaultBlockOptions(), zerolog.Nop()) + daSub := NewDASubmitter(nil, cfg, genesis.Genesis{}, common.DefaultBlockOptions(), metrics, zerolog.Nop()) s := NewSubmitter(mockStore, nil, cm, metrics, cfg, genesis.Genesis{}, daSub, nil, zerolog.Nop(), nil) s.ctx = ctx @@ -238,7 +238,7 @@ func TestSubmitter_processDAInclusionLoop_advances(t *testing.T) { exec.On("SetFinal", mock.Anything, uint64(1)).Return(nil).Once() exec.On("SetFinal", mock.Anything, uint64(2)).Return(nil).Once() - daSub := NewDASubmitter(nil, cfg, genesis.Genesis{}, common.DefaultBlockOptions(), zerolog.Nop()) + daSub := NewDASubmitter(nil, cfg, genesis.Genesis{}, common.DefaultBlockOptions(), metrics, zerolog.Nop()) s := NewSubmitter(st, exec, cm, metrics, cfg, genesis.Genesis{}, daSub, nil, zerolog.Nop(), nil) // prepare two consecutive blocks in store with DA included in cache From b10a1db490a7fe2ae31faa1882ebc8b332518fba Mon Sep 17 00:00:00 2001 From: "claude[bot]" <41898282+claude[bot]@users.noreply.github.com> Date: Mon, 13 Oct 2025 15:06:56 +0000 Subject: [PATCH 2/5] fix: update DASubmitterPendingBlobs to track total backlog Changes the DASubmitterPendingBlobs metric to track the total number of blobs awaiting submission across the entire queue, rather than just the current batch being submitted. This provides better visibility into the submission backlog for monitoring and alerting. - Added getTotalPendingFn parameter to submitToDA() - Updated metric to call NumPendingHeaders() or NumPendingData() - Updated metric documentation and help text - Updated all test calls with new parameter Co-authored-by: Marko --- block/internal/common/metrics.go | 4 +-- block/internal/submitting/da_submitter.go | 27 ++++++++++--------- .../submitting/da_submitter_mocks_test.go | 5 ++++ 3 files changed, 22 insertions(+), 14 deletions(-) diff --git a/block/internal/common/metrics.go b/block/internal/common/metrics.go index b2164be069..9f6d190774 100644 --- a/block/internal/common/metrics.go +++ b/block/internal/common/metrics.go @@ -67,7 +67,7 @@ type Metrics struct { // DA Submitter metrics DASubmitterFailures map[string]metrics.Counter // Counter with reason label DASubmitterLastFailure map[string]metrics.Gauge // Timestamp gauge with reason label - DASubmitterPendingBlobs metrics.Gauge // Number of pending blobs + DASubmitterPendingBlobs metrics.Gauge // Total number of blobs awaiting submission (backlog) DASubmitterResends metrics.Counter // Number of resend attempts } @@ -362,7 +362,7 @@ func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics { Namespace: namespace, Subsystem: MetricsSubsystem, Name: "da_submitter_pending_blobs", - Help: "Number of blobs pending DA submission", + Help: "Total number of blobs awaiting DA submission (backlog)", }, labels).With(labelsAndValues...) m.DASubmitterResends = prometheus.NewCounterFrom(stdprometheus.CounterOpts{ diff --git a/block/internal/submitting/da_submitter.go b/block/internal/submitting/da_submitter.go index 0f38d9fcc2..2b5935f1be 100644 --- a/block/internal/submitting/da_submitter.go +++ b/block/internal/submitting/da_submitter.go @@ -231,6 +231,7 @@ func (s *DASubmitter) SubmitHeaders(ctx context.Context, cache cache.Manager) er s.namespaceBz, []byte(s.config.DA.SubmitOptions), cache, + func() uint64 { return cache.NumPendingHeaders() }, ) } @@ -274,6 +275,7 @@ func (s *DASubmitter) SubmitData(ctx context.Context, cache cache.Manager, signe s.namespaceDataBz, []byte(s.config.DA.SubmitOptions), cache, + func() uint64 { return cache.NumPendingData() }, ) } @@ -344,6 +346,7 @@ func submitToDA[T any]( namespace []byte, options []byte, cache cache.Manager, + getTotalPendingFn func() uint64, ) error { marshaled, err := marshalItems(ctx, items, marshalFn, itemType) if err != nil { @@ -368,9 +371,9 @@ func submitToDA[T any]( marshaled = batchMarshaled } - // Update pending blobs metric - if s.metrics != nil { - s.metrics.DASubmitterPendingBlobs.Set(float64(len(items))) + // Update pending blobs metric to track total backlog + if s.metrics != nil && getTotalPendingFn != nil { + s.metrics.DASubmitterPendingBlobs.Set(float64(getTotalPendingFn())) } // Start the retry loop @@ -401,9 +404,9 @@ func submitToDA[T any]( s.logger.Info().Str("itemType", itemType).Float64("gasPrice", rs.GasPrice).Uint64("count", res.SubmittedCount).Msg("successfully submitted items to DA layer") if int(res.SubmittedCount) == len(items) { rs.Next(reasonSuccess, pol, gm, sentinelNoGas) - // Clear pending blobs on success - if s.metrics != nil { - s.metrics.DASubmitterPendingBlobs.Set(0) + // Update pending blobs metric to reflect total backlog + if s.metrics != nil && getTotalPendingFn != nil { + s.metrics.DASubmitterPendingBlobs.Set(float64(getTotalPendingFn())) } return nil } @@ -411,9 +414,9 @@ func submitToDA[T any]( items = items[res.SubmittedCount:] marshaled = marshaled[res.SubmittedCount:] rs.Next(reasonSuccess, pol, gm, sentinelNoGas) - // Update pending blobs count - if s.metrics != nil { - s.metrics.DASubmitterPendingBlobs.Set(float64(len(items))) + // Update pending blobs count to reflect total backlog + if s.metrics != nil && getTotalPendingFn != nil { + s.metrics.DASubmitterPendingBlobs.Set(float64(getTotalPendingFn())) } case coreda.StatusTooBig: @@ -433,9 +436,9 @@ func submitToDA[T any]( marshaled = marshaled[:half] s.logger.Debug().Int("newBatchSize", half).Msg("batch too big; halving and retrying") rs.Next(reasonTooBig, pol, gm, sentinelNoGas) - // Update pending blobs count - if s.metrics != nil { - s.metrics.DASubmitterPendingBlobs.Set(float64(len(items))) + // Update pending blobs count to reflect total backlog + if s.metrics != nil && getTotalPendingFn != nil { + s.metrics.DASubmitterPendingBlobs.Set(float64(getTotalPendingFn())) } case coreda.StatusNotIncludedInBlock: diff --git a/block/internal/submitting/da_submitter_mocks_test.go b/block/internal/submitting/da_submitter_mocks_test.go index 9ef14fba36..fc309e0638 100644 --- a/block/internal/submitting/da_submitter_mocks_test.go +++ b/block/internal/submitting/da_submitter_mocks_test.go @@ -86,6 +86,7 @@ func TestSubmitToDA_MempoolRetry_IncreasesGasAndSucceeds(t *testing.T) { nsBz, opts, nil, + nil, ) assert.NoError(t, err) @@ -137,6 +138,7 @@ func TestSubmitToDA_UnknownError_RetriesSameGasThenSucceeds(t *testing.T) { nsBz, opts, nil, + nil, ) assert.NoError(t, err) assert.Equal(t, []float64{5.5, 5.5}, usedGas) @@ -193,6 +195,7 @@ func TestSubmitToDA_TooBig_HalvesBatch(t *testing.T) { nsBz, opts, nil, + nil, ) assert.NoError(t, err) assert.Equal(t, []int{4, 2}, batchSizes) @@ -242,6 +245,7 @@ func TestSubmitToDA_SentinelNoGas_PreservesGasAcrossRetries(t *testing.T) { nsBz, opts, nil, + nil, ) assert.NoError(t, err) assert.Equal(t, []float64{-1, -1}, usedGas) @@ -282,6 +286,7 @@ func TestSubmitToDA_PartialSuccess_AdvancesWindow(t *testing.T) { nsBz, opts, nil, + nil, ) assert.NoError(t, err) assert.Equal(t, 3, totalSubmitted) From 6473d35535ac5b957a16aa60708dbef0d8ade592 Mon Sep 17 00:00:00 2001 From: tac0turtle Date: Mon, 13 Oct 2025 17:13:18 +0200 Subject: [PATCH 3/5] fmt --- block/internal/common/metrics.go | 72 ++++++++++++++++---------------- 1 file changed, 36 insertions(+), 36 deletions(-) diff --git a/block/internal/common/metrics.go b/block/internal/common/metrics.go index 9f6d190774..5c8e272701 100644 --- a/block/internal/common/metrics.go +++ b/block/internal/common/metrics.go @@ -65,10 +65,10 @@ type Metrics struct { InvalidTransitions metrics.Counter // DA Submitter metrics - DASubmitterFailures map[string]metrics.Counter // Counter with reason label - DASubmitterLastFailure map[string]metrics.Gauge // Timestamp gauge with reason label - DASubmitterPendingBlobs metrics.Gauge // Total number of blobs awaiting submission (backlog) - DASubmitterResends metrics.Counter // Number of resend attempts + DASubmitterFailures map[string]metrics.Counter // Counter with reason label + DASubmitterLastFailure map[string]metrics.Gauge // Timestamp gauge with reason label + DASubmitterPendingBlobs metrics.Gauge // Total number of blobs awaiting submission (backlog) + DASubmitterResends metrics.Counter // Number of resend attempts } // PrometheusMetrics returns Metrics built using Prometheus client library @@ -419,38 +419,38 @@ func NopMetrics() *Metrics { CommittedHeight: discard.NewGauge(), // Extended metrics - ChannelBufferUsage: make(map[string]metrics.Gauge), - ErrorsByType: make(map[string]metrics.Counter), - OperationDuration: make(map[string]metrics.Histogram), - StateTransitions: make(map[string]metrics.Counter), - DroppedSignals: discard.NewCounter(), - RecoverableErrors: discard.NewCounter(), - NonRecoverableErrors: discard.NewCounter(), - GoroutineCount: discard.NewGauge(), - DASubmissionAttempts: discard.NewCounter(), - DASubmissionSuccesses: discard.NewCounter(), - DASubmissionFailures: discard.NewCounter(), - DARetrievalAttempts: discard.NewCounter(), - DARetrievalSuccesses: discard.NewCounter(), - DARetrievalFailures: discard.NewCounter(), - DAInclusionHeight: discard.NewGauge(), - PendingHeadersCount: discard.NewGauge(), - PendingDataCount: discard.NewGauge(), - SyncLag: discard.NewGauge(), - HeadersSynced: discard.NewCounter(), - DataSynced: discard.NewCounter(), - BlocksApplied: discard.NewCounter(), - InvalidHeadersCount: discard.NewCounter(), - BlockProductionTime: discard.NewHistogram(), - EmptyBlocksProduced: discard.NewCounter(), - LazyBlocksProduced: discard.NewCounter(), - NormalBlocksProduced: discard.NewCounter(), - TxsPerBlock: discard.NewHistogram(), - InvalidTransitions: discard.NewCounter(), - DASubmitterFailures: make(map[string]metrics.Counter), - DASubmitterLastFailure: make(map[string]metrics.Gauge), - DASubmitterPendingBlobs: discard.NewGauge(), - DASubmitterResends: discard.NewCounter(), + ChannelBufferUsage: make(map[string]metrics.Gauge), + ErrorsByType: make(map[string]metrics.Counter), + OperationDuration: make(map[string]metrics.Histogram), + StateTransitions: make(map[string]metrics.Counter), + DroppedSignals: discard.NewCounter(), + RecoverableErrors: discard.NewCounter(), + NonRecoverableErrors: discard.NewCounter(), + GoroutineCount: discard.NewGauge(), + DASubmissionAttempts: discard.NewCounter(), + DASubmissionSuccesses: discard.NewCounter(), + DASubmissionFailures: discard.NewCounter(), + DARetrievalAttempts: discard.NewCounter(), + DARetrievalSuccesses: discard.NewCounter(), + DARetrievalFailures: discard.NewCounter(), + DAInclusionHeight: discard.NewGauge(), + PendingHeadersCount: discard.NewGauge(), + PendingDataCount: discard.NewGauge(), + SyncLag: discard.NewGauge(), + HeadersSynced: discard.NewCounter(), + DataSynced: discard.NewCounter(), + BlocksApplied: discard.NewCounter(), + InvalidHeadersCount: discard.NewCounter(), + BlockProductionTime: discard.NewHistogram(), + EmptyBlocksProduced: discard.NewCounter(), + LazyBlocksProduced: discard.NewCounter(), + NormalBlocksProduced: discard.NewCounter(), + TxsPerBlock: discard.NewHistogram(), + InvalidTransitions: discard.NewCounter(), + DASubmitterFailures: make(map[string]metrics.Counter), + DASubmitterLastFailure: make(map[string]metrics.Gauge), + DASubmitterPendingBlobs: discard.NewGauge(), + DASubmitterResends: discard.NewCounter(), } // Initialize maps with no-op metrics From eef7d2b825c556006bd02c662b964a21c4f87844 Mon Sep 17 00:00:00 2001 From: "claude[bot]" <41898282+claude[bot]@users.noreply.github.com> Date: Tue, 14 Oct 2025 08:40:25 +0000 Subject: [PATCH 4/5] refactor: use typed failure reasons and NoOp metrics pattern - Create DASubmitterFailureReason type with typed constants - Update metrics maps to use typed failure reasons instead of strings - Refactor DASubmitter constructor to use NopMetrics if metrics is nil - Remove all nil checks for metrics throughout the codebase - Add warning log for unregistered failure reasons - Centralize failure reason definitions using AllDASubmitterFailureReasons() This addresses review feedback to make the code more type-safe and readable by removing defensive nil checks. Co-authored-by: Marko --- block/internal/common/metrics.go | 72 +++++++++++++---------- block/internal/submitting/da_submitter.go | 38 +++++++----- 2 files changed, 62 insertions(+), 48 deletions(-) diff --git a/block/internal/common/metrics.go b/block/internal/common/metrics.go index 5c8e272701..0a3d29ec8a 100644 --- a/block/internal/common/metrics.go +++ b/block/internal/common/metrics.go @@ -13,6 +13,34 @@ const ( MetricsSubsystem = "sequencer" ) +// DASubmitterFailureReason represents a typed failure reason for DA submission failures +type DASubmitterFailureReason string + +const ( + DASubmitterFailureReasonAlreadyRejected DASubmitterFailureReason = "already_rejected" + DASubmitterFailureReasonInsufficientFee DASubmitterFailureReason = "insufficient_fee" + DASubmitterFailureReasonTimeout DASubmitterFailureReason = "timeout" + DASubmitterFailureReasonAlreadyInMempool DASubmitterFailureReason = "already_in_mempool" + DASubmitterFailureReasonNotIncludedInBlock DASubmitterFailureReason = "not_included_in_block" + DASubmitterFailureReasonTooBig DASubmitterFailureReason = "too_big" + DASubmitterFailureReasonContextCanceled DASubmitterFailureReason = "context_canceled" + DASubmitterFailureReasonUnknown DASubmitterFailureReason = "unknown" +) + +// AllDASubmitterFailureReasons returns all possible failure reasons +func AllDASubmitterFailureReasons() []DASubmitterFailureReason { + return []DASubmitterFailureReason{ + DASubmitterFailureReasonAlreadyRejected, + DASubmitterFailureReasonInsufficientFee, + DASubmitterFailureReasonTimeout, + DASubmitterFailureReasonAlreadyInMempool, + DASubmitterFailureReasonNotIncludedInBlock, + DASubmitterFailureReasonTooBig, + DASubmitterFailureReasonContextCanceled, + DASubmitterFailureReasonUnknown, + } +} + // Metrics contains all metrics exposed by this package. type Metrics struct { // Original metrics @@ -65,10 +93,10 @@ type Metrics struct { InvalidTransitions metrics.Counter // DA Submitter metrics - DASubmitterFailures map[string]metrics.Counter // Counter with reason label - DASubmitterLastFailure map[string]metrics.Gauge // Timestamp gauge with reason label - DASubmitterPendingBlobs metrics.Gauge // Total number of blobs awaiting submission (backlog) - DASubmitterResends metrics.Counter // Number of resend attempts + DASubmitterFailures map[DASubmitterFailureReason]metrics.Counter // Counter with reason label + DASubmitterLastFailure map[DASubmitterFailureReason]metrics.Gauge // Timestamp gauge with reason label + DASubmitterPendingBlobs metrics.Gauge // Total number of blobs awaiting submission (backlog) + DASubmitterResends metrics.Counter // Number of resend attempts } // PrometheusMetrics returns Metrics built using Prometheus client library @@ -83,8 +111,8 @@ func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics { ErrorsByType: make(map[string]metrics.Counter), OperationDuration: make(map[string]metrics.Histogram), StateTransitions: make(map[string]metrics.Counter), - DASubmitterFailures: make(map[string]metrics.Counter), - DASubmitterLastFailure: make(map[string]metrics.Gauge), + DASubmitterFailures: make(map[DASubmitterFailureReason]metrics.Counter), + DASubmitterLastFailure: make(map[DASubmitterFailureReason]metrics.Gauge), } // Original metrics @@ -373,24 +401,14 @@ func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics { }, labels).With(labelsAndValues...) // Initialize DA submitter failure counters and timestamps for various reasons - failureReasons := []string{ - "already_rejected", - "insufficient_fee", - "timeout", - "already_in_mempool", - "not_included_in_block", - "too_big", - "context_canceled", - "unknown", - } - for _, reason := range failureReasons { + for _, reason := range AllDASubmitterFailureReasons() { m.DASubmitterFailures[reason] = prometheus.NewCounterFrom(stdprometheus.CounterOpts{ Namespace: namespace, Subsystem: MetricsSubsystem, Name: "da_submitter_failures_total", Help: "Total number of DA submission failures by reason", ConstLabels: map[string]string{ - "reason": reason, + "reason": string(reason), }, }, labels).With(labelsAndValues...) @@ -400,7 +418,7 @@ func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics { Name: "da_submitter_last_failure_timestamp", Help: "Unix timestamp of the last DA submission failure by reason", ConstLabels: map[string]string{ - "reason": reason, + "reason": string(reason), }, }, labels).With(labelsAndValues...) } @@ -447,8 +465,8 @@ func NopMetrics() *Metrics { NormalBlocksProduced: discard.NewCounter(), TxsPerBlock: discard.NewHistogram(), InvalidTransitions: discard.NewCounter(), - DASubmitterFailures: make(map[string]metrics.Counter), - DASubmitterLastFailure: make(map[string]metrics.Gauge), + DASubmitterFailures: make(map[DASubmitterFailureReason]metrics.Counter), + DASubmitterLastFailure: make(map[DASubmitterFailureReason]metrics.Gauge), DASubmitterPendingBlobs: discard.NewGauge(), DASubmitterResends: discard.NewCounter(), } @@ -475,17 +493,7 @@ func NopMetrics() *Metrics { } // Initialize DA submitter failure maps with no-op metrics - failureReasons := []string{ - "already_rejected", - "insufficient_fee", - "timeout", - "already_in_mempool", - "not_included_in_block", - "too_big", - "context_canceled", - "unknown", - } - for _, reason := range failureReasons { + for _, reason := range AllDASubmitterFailureReasons() { m.DASubmitterFailures[reason] = discard.NewCounter() m.DASubmitterLastFailure[reason] = discard.NewGauge() } diff --git a/block/internal/submitting/da_submitter.go b/block/internal/submitting/da_submitter.go index 2b5935f1be..8b3d780b47 100644 --- a/block/internal/submitting/da_submitter.go +++ b/block/internal/submitting/da_submitter.go @@ -143,6 +143,11 @@ func NewDASubmitter( server.SetDAVisualizationServer(server.NewDAVisualizationServer(da, visualizerLogger, config.Node.Aggregator)) } + // Use NoOp metrics if nil to avoid nil checks throughout the code + if metrics == nil { + metrics = common.NopMetrics() + } + return &DASubmitter{ da: da, config: config, @@ -156,13 +161,14 @@ func NewDASubmitter( } // recordFailure records a DA submission failure in metrics -func (s *DASubmitter) recordFailure(reason string) { - if s.metrics == nil { +func (s *DASubmitter) recordFailure(reason common.DASubmitterFailureReason) { + counter, ok := s.metrics.DASubmitterFailures[reason] + if !ok { + s.logger.Warn().Str("reason", string(reason)).Msg("unregistered failure reason, metric not recorded") return } - if counter, ok := s.metrics.DASubmitterFailures[reason]; ok { - counter.Add(1) - } + counter.Add(1) + if gauge, ok := s.metrics.DASubmitterLastFailure[reason]; ok { gauge.Set(float64(time.Now().Unix())) } @@ -372,14 +378,14 @@ func submitToDA[T any]( } // Update pending blobs metric to track total backlog - if s.metrics != nil && getTotalPendingFn != nil { + if getTotalPendingFn != nil { s.metrics.DASubmitterPendingBlobs.Set(float64(getTotalPendingFn())) } // Start the retry loop for rs.Attempt < pol.MaxAttempts { // Record resend metric for retry attempts (not the first attempt) - if rs.Attempt > 0 && s.metrics != nil { + if rs.Attempt > 0 { s.metrics.DASubmitterResends.Add(1) } @@ -405,7 +411,7 @@ func submitToDA[T any]( if int(res.SubmittedCount) == len(items) { rs.Next(reasonSuccess, pol, gm, sentinelNoGas) // Update pending blobs metric to reflect total backlog - if s.metrics != nil && getTotalPendingFn != nil { + if getTotalPendingFn != nil { s.metrics.DASubmitterPendingBlobs.Set(float64(getTotalPendingFn())) } return nil @@ -415,13 +421,13 @@ func submitToDA[T any]( marshaled = marshaled[res.SubmittedCount:] rs.Next(reasonSuccess, pol, gm, sentinelNoGas) // Update pending blobs count to reflect total backlog - if s.metrics != nil && getTotalPendingFn != nil { + if getTotalPendingFn != nil { s.metrics.DASubmitterPendingBlobs.Set(float64(getTotalPendingFn())) } case coreda.StatusTooBig: // Record failure metric - s.recordFailure("too_big") + s.recordFailure(common.DASubmitterFailureReasonTooBig) // Iteratively halve until it fits or single-item too big if len(items) == 1 { s.logger.Error().Str("itemType", itemType).Msg("single item exceeds DA blob size limit") @@ -437,38 +443,38 @@ func submitToDA[T any]( s.logger.Debug().Int("newBatchSize", half).Msg("batch too big; halving and retrying") rs.Next(reasonTooBig, pol, gm, sentinelNoGas) // Update pending blobs count to reflect total backlog - if s.metrics != nil && getTotalPendingFn != nil { + if getTotalPendingFn != nil { s.metrics.DASubmitterPendingBlobs.Set(float64(getTotalPendingFn())) } case coreda.StatusNotIncludedInBlock: // Record failure metric - s.recordFailure("not_included_in_block") + s.recordFailure(common.DASubmitterFailureReasonNotIncludedInBlock) s.logger.Info().Dur("backoff", pol.MaxBackoff).Float64("gasPrice", rs.GasPrice).Msg("retrying due to mempool state") rs.Next(reasonMempool, pol, gm, sentinelNoGas) case coreda.StatusAlreadyInMempool: // Record failure metric - s.recordFailure("already_in_mempool") + s.recordFailure(common.DASubmitterFailureReasonAlreadyInMempool) s.logger.Info().Dur("backoff", pol.MaxBackoff).Float64("gasPrice", rs.GasPrice).Msg("retrying due to mempool state") rs.Next(reasonMempool, pol, gm, sentinelNoGas) case coreda.StatusContextCanceled: // Record failure metric - s.recordFailure("context_canceled") + s.recordFailure(common.DASubmitterFailureReasonContextCanceled) s.logger.Info().Msg("DA layer submission canceled due to context cancellation") return context.Canceled default: // Record failure metric - s.recordFailure("unknown") + s.recordFailure(common.DASubmitterFailureReasonUnknown) s.logger.Error().Str("error", res.Message).Int("attempt", rs.Attempt+1).Msg("DA layer submission failed") rs.Next(reasonFailure, pol, gm, sentinelNoGas) } } // Final failure after max attempts - s.recordFailure("timeout") + s.recordFailure(common.DASubmitterFailureReasonTimeout) return fmt.Errorf("failed to submit all %s(s) to DA layer after %d attempts", itemType, rs.Attempt) } From a0b2a90cd1421fdf1c59201f25ec53d2bebfc524 Mon Sep 17 00:00:00 2001 From: tac0turtle Date: Tue, 14 Oct 2025 10:43:37 +0200 Subject: [PATCH 5/5] fmt --- block/internal/common/metrics.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/block/internal/common/metrics.go b/block/internal/common/metrics.go index 0a3d29ec8a..c543537cf5 100644 --- a/block/internal/common/metrics.go +++ b/block/internal/common/metrics.go @@ -17,14 +17,14 @@ const ( type DASubmitterFailureReason string const ( - DASubmitterFailureReasonAlreadyRejected DASubmitterFailureReason = "already_rejected" - DASubmitterFailureReasonInsufficientFee DASubmitterFailureReason = "insufficient_fee" - DASubmitterFailureReasonTimeout DASubmitterFailureReason = "timeout" - DASubmitterFailureReasonAlreadyInMempool DASubmitterFailureReason = "already_in_mempool" - DASubmitterFailureReasonNotIncludedInBlock DASubmitterFailureReason = "not_included_in_block" - DASubmitterFailureReasonTooBig DASubmitterFailureReason = "too_big" - DASubmitterFailureReasonContextCanceled DASubmitterFailureReason = "context_canceled" - DASubmitterFailureReasonUnknown DASubmitterFailureReason = "unknown" + DASubmitterFailureReasonAlreadyRejected DASubmitterFailureReason = "already_rejected" + DASubmitterFailureReasonInsufficientFee DASubmitterFailureReason = "insufficient_fee" + DASubmitterFailureReasonTimeout DASubmitterFailureReason = "timeout" + DASubmitterFailureReasonAlreadyInMempool DASubmitterFailureReason = "already_in_mempool" + DASubmitterFailureReasonNotIncludedInBlock DASubmitterFailureReason = "not_included_in_block" + DASubmitterFailureReasonTooBig DASubmitterFailureReason = "too_big" + DASubmitterFailureReasonContextCanceled DASubmitterFailureReason = "context_canceled" + DASubmitterFailureReasonUnknown DASubmitterFailureReason = "unknown" ) // AllDASubmitterFailureReasons returns all possible failure reasons