From 19298519df75de547c7d51a8940dea50a72f4688 Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Thu, 16 Apr 2026 19:38:33 +0200 Subject: [PATCH 1/2] feat: add automatic DA retriever walkback when P2P stalls and DA blocks too far ahead --- block/internal/da/subscriber.go | 17 +++ block/internal/da/subscriber_test.go | 38 ++++++ block/internal/syncing/da_follower.go | 87 ++++++++++-- block/internal/syncing/da_follower_test.go | 152 +++++++++++++++++++++ block/internal/syncing/syncer.go | 13 ++ 5 files changed, 294 insertions(+), 13 deletions(-) diff --git a/block/internal/da/subscriber.go b/block/internal/da/subscriber.go index 8ff46773ce..e332af7a62 100644 --- a/block/internal/da/subscriber.go +++ b/block/internal/da/subscriber.go @@ -159,6 +159,23 @@ func (s *Subscriber) HasReachedHead() bool { return s.headReached.Load() } +// RewindTo sets localDAHeight back to the given height and signals the catchup +// loop so that DA heights are re-fetched. This is used when the primary source +// (P2P) stalls and DA needs to take over for the missing range. +func (s *Subscriber) RewindTo(daHeight uint64) { + for { + cur := s.localDAHeight.Load() + if daHeight >= cur { + return + } + if s.localDAHeight.CompareAndSwap(cur, daHeight) { + s.headReached.Store(false) + s.signalCatchup() + return + } + } +} + // signalCatchup sends a non-blocking signal to wake catchupLoop. func (s *Subscriber) signalCatchup() { select { diff --git a/block/internal/da/subscriber_test.go b/block/internal/da/subscriber_test.go index 2ed80886de..38d5f4edc0 100644 --- a/block/internal/da/subscriber_test.go +++ b/block/internal/da/subscriber_test.go @@ -101,6 +101,44 @@ func TestSubscriber_RunCatchup(t *testing.T) { }) } +func TestSubscriber_RewindTo(t *testing.T) { + t.Run("no_op_when_target_is_equal_or_higher", func(t *testing.T) { + sub := NewSubscriber(SubscriberConfig{ + Client: testmocks.NewMockClient(t), + Logger: zerolog.Nop(), + Handler: new(MockSubscriberHandler), + Namespaces: [][]byte{[]byte("ns")}, + StartHeight: 100, + DABlockTime: time.Millisecond, + }) + sub.localDAHeight.Store(100) + + sub.RewindTo(100) + assert.Equal(t, uint64(100), sub.LocalDAHeight()) + + sub.RewindTo(200) + assert.Equal(t, uint64(100), sub.LocalDAHeight()) + }) + + t.Run("rewinds_local_height_and_clears_head", func(t *testing.T) { + sub := NewSubscriber(SubscriberConfig{ + Client: testmocks.NewMockClient(t), + Logger: zerolog.Nop(), + Handler: new(MockSubscriberHandler), + Namespaces: [][]byte{[]byte("ns")}, + StartHeight: 100, + DABlockTime: time.Millisecond, + }) + sub.localDAHeight.Store(150) + sub.headReached.Store(true) + + sub.RewindTo(120) + + assert.Equal(t, uint64(120), sub.LocalDAHeight()) + assert.False(t, sub.HasReachedHead()) + }) +} + func TestSubscriber_RunSubscription_InlineDoesNotPrematurelyReachHead(t *testing.T) { ctx, cancel := context.WithCancel(t.Context()) defer cancel() diff --git a/block/internal/syncing/da_follower.go b/block/internal/syncing/da_follower.go index 443fb876ae..8366371d7f 100644 --- a/block/internal/syncing/da_follower.go +++ b/block/internal/syncing/da_follower.go @@ -5,6 +5,7 @@ import ( "errors" "slices" "sync" + "sync/atomic" "time" "github.com/rs/zerolog" @@ -26,10 +27,19 @@ type DAFollower interface { // daFollower is the concrete implementation of DAFollower. type daFollower struct { - subscriber *da.Subscriber - retriever DARetriever - eventSink common.EventSink - logger zerolog.Logger + subscriber *da.Subscriber + retriever DARetriever + eventSink common.EventSink + logger zerolog.Logger + nodeHeightFn func() uint64 + p2pStalledFn func() bool + startDAHeight uint64 + + // walkbackActive is set when the follower detects a gap between the + // DA events it just processed and the node's current block height. + // While active, every DA height (even empty ones) triggers a rewind + // so the subscriber walks backwards until the gap is filled. + walkbackActive atomic.Bool // Priority queue for P2P hint heights (absorbed from DARetriever refactoring #2). priorityMu sync.Mutex @@ -48,6 +58,12 @@ type DAFollowerConfig struct { DataNamespace []byte // may be nil or equal to Namespace StartDAHeight uint64 DABlockTime time.Duration + // NodeHeight returns the node's current block height. Used together + // with P2PStalled to detect gaps that need a DA walkback. + NodeHeight func() uint64 + // P2PStalled returns true when the P2P sync worker has failed to + // deliver blocks. The follower only walks back when P2P is stalled. + P2PStalled func() bool } // NewDAFollower creates a new daFollower. @@ -61,6 +77,9 @@ func NewDAFollower(cfg DAFollowerConfig) DAFollower { retriever: cfg.Retriever, eventSink: cfg.EventSink, logger: cfg.Logger.With().Str("component", "da_follower").Logger(), + nodeHeightFn: cfg.NodeHeight, + p2pStalledFn: cfg.P2PStalled, + startDAHeight: cfg.StartDAHeight, priorityHeights: make([]uint64, 0), } @@ -123,6 +142,13 @@ func (f *daFollower) HandleEvent(ctx context.Context, ev datypes.SubscriptionEve // HandleCatchup retrieves events at a single DA height and pipes them // to the event sink. Checks priority heights first. +// +// When a node-height callback is configured, HandleCatchup detects gaps +// between the block heights it just fetched and the node's current height. +// If the smallest block height is above nodeHeight+1 the subscriber is +// rewound by one DA height so it re-fetches the previous height on the +// next iteration. This "walk-back" continues automatically through empty +// DA heights until blocks contiguous with the node are found. func (f *daFollower) HandleCatchup(ctx context.Context, daHeight uint64) error { // 1. Drain stale or future priority heights from P2P hints for priorityHeight := f.popPriorityHeight(); priorityHeight != 0; priorityHeight = f.popPriorityHeight() { @@ -134,23 +160,58 @@ func (f *daFollower) HandleCatchup(ctx context.Context, daHeight uint64) error { Uint64("da_height", priorityHeight). Msg("fetching priority DA height from P2P hint") - if err := f.fetchAndPipeHeight(ctx, priorityHeight); err != nil { + if _, err := f.fetchAndPipeHeight(ctx, priorityHeight); err != nil { if errors.Is(err, datypes.ErrHeightFromFuture) { - // Priority hint points to a future height — silently ignore. f.logger.Debug().Uint64("priority_da_height", priorityHeight). Msg("priority hint is from future, ignoring") continue } - // Roll back so daHeight is attempted again next cycle after backoff. return err } break // continue with daHeight } // 2. Normal sequential fetch - if err := f.fetchAndPipeHeight(ctx, daHeight); err != nil { + events, err := f.fetchAndPipeHeight(ctx, daHeight) + if err != nil { return err } + + // 3. Self-correction: walk back when P2P has stalled and DA blocks skip + // past the node height. Only active when P2P is confirmed stalled to + // avoid unnecessary rewinds during normal DA catchup. + p2pStalled := f.p2pStalledFn != nil && f.p2pStalledFn() + if p2pStalled && f.nodeHeightFn != nil && daHeight > f.startDAHeight { + nodeHeight := f.nodeHeightFn() + + needsWalkback := f.walkbackActive.Load() + if len(events) > 0 { + minHeight := events[0].Header.Height() + for _, e := range events[1:] { + if e.Header.Height() < minHeight { + minHeight = e.Header.Height() + } + } + if minHeight <= nodeHeight+1 { + f.walkbackActive.Store(false) + return nil + } + needsWalkback = true + } + + if needsWalkback { + f.walkbackActive.Store(true) + f.logger.Info(). + Uint64("da_height", daHeight). + Uint64("node_height", nodeHeight). + Int("events", len(events)). + Msg("P2P stalled with gap between DA blocks and node height, walking DA follower back") + f.subscriber.RewindTo(daHeight - 1) + } + } else if !p2pStalled { + f.walkbackActive.Store(false) + } + return nil } @@ -158,22 +219,22 @@ func (f *daFollower) HandleCatchup(ctx context.Context, daHeight uint64) error { // It does NOT handle ErrHeightFromFuture — callers must decide how to react // because the correct response depends on whether this is a normal sequential // catchup or a priority-hint fetch. -func (f *daFollower) fetchAndPipeHeight(ctx context.Context, daHeight uint64) error { +func (f *daFollower) fetchAndPipeHeight(ctx context.Context, daHeight uint64) ([]common.DAHeightEvent, error) { events, err := f.retriever.RetrieveFromDA(ctx, daHeight) if err != nil { if errors.Is(err, datypes.ErrBlobNotFound) { - return nil + return nil, nil } - return err + return nil, err } for _, event := range events { if err := f.eventSink.PipeEvent(ctx, event); err != nil { - return err + return nil, err } } - return nil + return events, nil } // QueuePriorityHeight queues a DA height for priority retrieval. diff --git a/block/internal/syncing/da_follower_test.go b/block/internal/syncing/da_follower_test.go index 710d2d81d0..2793ad8072 100644 --- a/block/internal/syncing/da_follower_test.go +++ b/block/internal/syncing/da_follower_test.go @@ -4,6 +4,7 @@ import ( "context" "errors" "testing" + "time" "github.com/rs/zerolog" "github.com/stretchr/testify/assert" @@ -11,7 +12,9 @@ import ( "github.com/stretchr/testify/require" "github.com/evstack/ev-node/block/internal/common" + "github.com/evstack/ev-node/block/internal/da" datypes "github.com/evstack/ev-node/pkg/da/types" + "github.com/evstack/ev-node/types" ) func TestDAFollower_HandleEvent(t *testing.T) { @@ -251,3 +254,152 @@ func makeRange(start, end uint64) []uint64 { } return out } + +type mockSubHandler struct { + mock.Mock +} + +func (m *mockSubHandler) HandleEvent(ctx context.Context, ev datypes.SubscriptionEvent, isInline bool) error { + return m.Called(ctx, ev, isInline).Error(0) +} + +func (m *mockSubHandler) HandleCatchup(ctx context.Context, height uint64) error { + return m.Called(ctx, height).Error(0) +} + +func newTestSubscriber(startHeight uint64) *da.Subscriber { + return da.NewSubscriber(da.SubscriberConfig{ + Client: nil, + Logger: zerolog.Nop(), + Handler: &mockSubHandler{}, + Namespaces: nil, + StartHeight: startHeight, + DABlockTime: time.Millisecond, + }) +} + +func makeHeader(height uint64) *types.SignedHeader { + return &types.SignedHeader{Header: types.Header{BaseHeader: types.BaseHeader{Height: height}}} +} + +func TestDAFollower_HandleCatchup_SelfCorrectingWalkback(t *testing.T) { + t.Run("rewinds_when_gap_detected", func(t *testing.T) { + daRetriever := NewMockDARetriever(t) + daRetriever.On("RetrieveFromDA", mock.Anything, uint64(100)). + Return([]common.DAHeightEvent{{Header: makeHeader(50)}}, nil).Once() + + sub := newTestSubscriber(100) + sub.LocalDAHeight() // ensure initialized + + var nodeHeight uint64 = 40 + + follower := &daFollower{ + subscriber: sub, + retriever: daRetriever, + eventSink: common.EventSinkFunc(func(_ context.Context, _ common.DAHeightEvent) error { return nil }), + logger: zerolog.Nop(), + nodeHeightFn: func() uint64 { return nodeHeight }, + startDAHeight: 1, + } + + err := follower.HandleCatchup(t.Context(), 100) + require.NoError(t, err) + assert.True(t, follower.walkbackActive.Load()) + assert.Equal(t, uint64(99), sub.LocalDAHeight()) + }) + + t.Run("keeps_walking_back_on_empty_height", func(t *testing.T) { + daRetriever := NewMockDARetriever(t) + daRetriever.On("RetrieveFromDA", mock.Anything, uint64(99)). + Return(nil, datypes.ErrBlobNotFound).Once() + + sub := newTestSubscriber(100) + + var nodeHeight uint64 = 40 + + follower := &daFollower{ + subscriber: sub, + retriever: daRetriever, + eventSink: common.EventSinkFunc(func(_ context.Context, _ common.DAHeightEvent) error { return nil }), + logger: zerolog.Nop(), + nodeHeightFn: func() uint64 { return nodeHeight }, + startDAHeight: 1, + } + follower.walkbackActive.Store(true) + + err := follower.HandleCatchup(t.Context(), 99) + require.NoError(t, err) + assert.True(t, follower.walkbackActive.Load()) + assert.Equal(t, uint64(98), sub.LocalDAHeight()) + }) + + t.Run("stops_walkback_when_contiguous", func(t *testing.T) { + daRetriever := NewMockDARetriever(t) + daRetriever.On("RetrieveFromDA", mock.Anything, uint64(95)). + Return([]common.DAHeightEvent{{Header: makeHeader(41)}}, nil).Once() + + sub := newTestSubscriber(100) + + var nodeHeight uint64 = 40 + + follower := &daFollower{ + subscriber: sub, + retriever: daRetriever, + eventSink: common.EventSinkFunc(func(_ context.Context, _ common.DAHeightEvent) error { return nil }), + logger: zerolog.Nop(), + nodeHeightFn: func() uint64 { return nodeHeight }, + startDAHeight: 1, + } + follower.walkbackActive.Store(true) + + err := follower.HandleCatchup(t.Context(), 95) + require.NoError(t, err) + assert.False(t, follower.walkbackActive.Load()) + assert.Equal(t, uint64(100), sub.LocalDAHeight()) // no rewind + }) + + t.Run("no_walkback_without_nodeHeightFn", func(t *testing.T) { + daRetriever := NewMockDARetriever(t) + daRetriever.On("RetrieveFromDA", mock.Anything, uint64(100)). + Return([]common.DAHeightEvent{{Header: makeHeader(50)}}, nil).Once() + + sub := newTestSubscriber(100) + + follower := &daFollower{ + subscriber: sub, + retriever: daRetriever, + eventSink: common.EventSinkFunc(func(_ context.Context, _ common.DAHeightEvent) error { return nil }), + logger: zerolog.Nop(), + startDAHeight: 1, + } + + err := follower.HandleCatchup(t.Context(), 100) + require.NoError(t, err) + assert.False(t, follower.walkbackActive.Load()) + assert.Equal(t, uint64(100), sub.LocalDAHeight()) // no rewind + }) + + t.Run("no_walkback_at_startDAHeight", func(t *testing.T) { + daRetriever := NewMockDARetriever(t) + daRetriever.On("RetrieveFromDA", mock.Anything, uint64(1)). + Return([]common.DAHeightEvent{{Header: makeHeader(50)}}, nil).Once() + + sub := newTestSubscriber(1) + + var nodeHeight uint64 = 40 + + follower := &daFollower{ + subscriber: sub, + retriever: daRetriever, + eventSink: common.EventSinkFunc(func(_ context.Context, _ common.DAHeightEvent) error { return nil }), + logger: zerolog.Nop(), + nodeHeightFn: func() uint64 { return nodeHeight }, + startDAHeight: 1, + } + + err := follower.HandleCatchup(t.Context(), 1) + require.NoError(t, err) + assert.False(t, follower.walkbackActive.Load()) + assert.Equal(t, uint64(1), sub.LocalDAHeight()) // no rewind + }) +} diff --git a/block/internal/syncing/syncer.go b/block/internal/syncing/syncer.go index 4d2cbb4afe..a97767b25b 100644 --- a/block/internal/syncing/syncer.go +++ b/block/internal/syncing/syncer.go @@ -82,6 +82,11 @@ type Syncer struct { daFollower DAFollower + // p2pStalled is set by p2pWorkerLoop when P2P genuinely fails (not + // cancelled by a DA event). The DA follower reads it to decide whether + // to walk back and fill the gap from DA. + p2pStalled atomic.Bool + // Forced inclusion tracking forcedInclusionMu sync.RWMutex seenBlockTxs map[string]struct{} // SHA-256 hex of every tx seen in a DA-sourced block @@ -220,6 +225,11 @@ func (s *Syncer) Start(ctx context.Context) (err error) { DataNamespace: s.daClient.GetDataNamespace(), StartDAHeight: s.daRetrieverHeight.Load(), DABlockTime: s.config.DA.BlockTime.Duration, + NodeHeight: func() uint64 { + h, _ := s.store.Height(s.ctx) + return h + }, + P2PStalled: s.p2pStalled.Load, }) if err = s.daFollower.Start(ctx); err != nil { return fmt.Errorf("failed to start DA follower: %w", err) @@ -488,6 +498,7 @@ func (s *Syncer) p2pWorkerLoop(ctx context.Context) { } if waitCtx.Err() == nil { + s.p2pStalled.Store(true) logger.Warn().Err(err).Uint64("height", targetHeight).Msg("P2P handler failed to process height") } @@ -497,6 +508,8 @@ func (s *Syncer) p2pWorkerLoop(ctx context.Context) { continue } + s.p2pStalled.Store(false) + if err := s.waitForStoreHeight(ctx, targetHeight); err != nil { if errors.Is(err, context.Canceled) { return From 9b7ecceef7b5d561c5ad5db45eb754f831795b64 Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Thu, 16 Apr 2026 19:40:04 +0200 Subject: [PATCH 2/2] add cl --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 99026c6e04..63f1d97e68 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changes +- Add automatic DA retriever walkback when P2P stalls and DA blocks too far ahead [#3262](https://github.com/evstack/ev-node/pull/3262) - Make it easier to override `DefaultMaxBlobSize` by ldflags [#3235](https://github.com/evstack/ev-node/pull/3235) - Add solo sequencer (simple in memory single sequencer without force inclusion) [#3235](https://github.com/evstack/ev-node/pull/3235) - Improve reaper to sustain txs burst better [#3236](https://github.com/evstack/ev-node/pull/3236)