Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 79 additions & 36 deletions pkg/sync/sync_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ type SyncService[H header.Header[H]] struct {
getterByHeight GetterByHeightFunc[H]
rangeGetter RangeGetterFunc[H]
storeInitialized atomic.Bool

// context for background operations
bgCtx context.Context
bgCancel context.CancelFunc
}

// DataSyncService is the P2P Sync Service for blocks.
Expand Down Expand Up @@ -153,6 +157,8 @@ func newSyncService[H header.Header[H]](
return nil, fmt.Errorf("failed to initialize the %s store: %w", syncType, err)
}

bgCtx, bgCancel := context.WithCancel(context.Background())

svc := &SyncService[H]{
conf: conf,
genesis: genesis,
Expand All @@ -164,6 +170,8 @@ func newSyncService[H header.Header[H]](
syncType: syncType,
logger: logger,
syncerStatus: new(SyncerStatus),
bgCtx: bgCtx,
bgCancel: bgCancel,
}

return svc, nil
Expand Down Expand Up @@ -389,6 +397,42 @@ func (syncService *SyncService[H]) startSubscriber(ctx context.Context) error {
return nil
}

// tryInit attempts to initialize the syncer from P2P once.
// Returns true if successful, false otherwise with an error.
func (syncService *SyncService[H]) tryInit(ctx context.Context) (bool, error) {
var (
trusted H
err error
heightToQuery uint64
)

head, headErr := syncService.store.Head(ctx)
switch {
case errors.Is(headErr, header.ErrNotFound), errors.Is(headErr, header.ErrEmptyStore):
heightToQuery = syncService.genesis.InitialHeight
case headErr != nil:
return false, fmt.Errorf("failed to inspect local store head: %w", headErr)
default:
heightToQuery = head.Height()
}

if trusted, err = syncService.ex.GetByHeight(ctx, heightToQuery); err != nil {
return false, fmt.Errorf("failed to fetch height %d from peers: %w", heightToQuery, err)
}

if syncService.storeInitialized.CompareAndSwap(false, true) {
if _, err := syncService.initStore(ctx, trusted); err != nil {
syncService.storeInitialized.Store(false)
return false, fmt.Errorf("failed to initialize the store: %w", err)
}
}
if err := syncService.startSyncer(ctx); err != nil {
return false, err
}

return true, nil
}

// initFromP2PWithRetry initializes the syncer from P2P with a retry mechanism.
// It inspects the local store to determine the first height to request:
// - when the store already contains items, it reuses the latest height as the starting point;
Expand All @@ -398,48 +442,15 @@ func (syncService *SyncService[H]) initFromP2PWithRetry(ctx context.Context, pee
return nil
}

tryInit := func(ctx context.Context) (bool, error) {
var (
trusted H
err error
heightToQuery uint64
)

head, headErr := syncService.store.Head(ctx)
switch {
case errors.Is(headErr, header.ErrNotFound), errors.Is(headErr, header.ErrEmptyStore):
heightToQuery = syncService.genesis.InitialHeight
case headErr != nil:
return false, fmt.Errorf("failed to inspect local store head: %w", headErr)
default:
heightToQuery = head.Height()
}

if trusted, err = syncService.ex.GetByHeight(ctx, heightToQuery); err != nil {
return false, fmt.Errorf("failed to fetch height %d from peers: %w", heightToQuery, err)
}

if syncService.storeInitialized.CompareAndSwap(false, true) {
if _, err := syncService.initStore(ctx, trusted); err != nil {
syncService.storeInitialized.Store(false)
return false, fmt.Errorf("failed to initialize the store: %w", err)
}
}
if err := syncService.startSyncer(ctx); err != nil {
return false, err
}
return true, nil
}

// block with exponential backoff until initialization succeeds or context is canceled.
backoff := 1 * time.Second
maxBackoff := 10 * time.Second

timeoutTimer := time.NewTimer(time.Minute * 10)
timeoutTimer := time.NewTimer(time.Minute * 2)
defer timeoutTimer.Stop()

for {
ok, err := tryInit(ctx)
ok, err := syncService.tryInit(ctx)
if ok {
return nil
}
Expand All @@ -450,7 +461,9 @@ func (syncService *SyncService[H]) initFromP2PWithRetry(ctx context.Context, pee
case <-ctx.Done():
return ctx.Err()
case <-timeoutTimer.C:
return fmt.Errorf("timeout reached while trying to initialize the store after 10 minutes: %w", err)
syncService.logger.Warn().Err(err).Msg("timeout reached while trying to initialize the store, scheduling background retry")
go syncService.retryInitInBackground()
return nil
case <-time.After(backoff):
}

Expand All @@ -461,10 +474,40 @@ func (syncService *SyncService[H]) initFromP2PWithRetry(ctx context.Context, pee
}
}

// retryInitInBackground continues attempting to initialize the syncer in the background.
func (syncService *SyncService[H]) retryInitInBackground() {
backoff := 15 * time.Second
maxBackoff := 5 * time.Minute

for {
select {
case <-syncService.bgCtx.Done():
syncService.logger.Info().Msg("background retry cancelled")
return
case <-time.After(backoff):
}

ok, err := syncService.tryInit(syncService.bgCtx)
if ok {
syncService.logger.Info().Msg("successfully initialized store from P2P in background")
return
}

syncService.logger.Info().Err(err).Dur("retry_in", backoff).Msg("background retry: headers not yet available from peers")

backoff *= 2
if backoff > maxBackoff {
backoff = maxBackoff
}
}
}

// Stop is a part of Service interface.
//
// `store` is closed last because it's used by other services.
func (syncService *SyncService[H]) Stop(ctx context.Context) error {
syncService.bgCancel()

// unsubscribe from topic first so that sub.Stop() does not fail
syncService.topicSubscription.Cancel()
err := errors.Join(
Expand Down
111 changes: 111 additions & 0 deletions pkg/sync/sync_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@ package sync
import (
"context"
cryptoRand "crypto/rand"
"errors"
"math/rand"
"path/filepath"
"sync/atomic"
"testing"
"time"

"github.com/celestiaorg/go-header"
"github.com/evstack/ev-node/pkg/config"
genesispkg "github.com/evstack/ev-node/pkg/genesis"
"github.com/evstack/ev-node/pkg/p2p"
Expand Down Expand Up @@ -191,3 +194,111 @@ func bytesN(r *rand.Rand, n int) []byte {
_, _ = r.Read(data)
return data
}

// TestBackgroundRetryEventuallySucceeds verifies that when the sync service cannot
// initially connect to peers, the background retry mechanism is triggered and
// eventually succeeds once headers become available from the DA store.
func TestBackgroundRetryEventuallySucceeds(t *testing.T) {
mn := mocknet.New()
defer mn.Close()

pk, _, err := crypto.GenerateEd25519Key(cryptoRand.Reader)
require.NoError(t, err)
noopSigner, err := noop.NewNoopSigner(pk)
require.NoError(t, err)
rnd := rand.New(rand.NewSource(1)) // nolint:gosec // test code only

chainId := "test-chain-id"
proposerAddr := []byte("test")
genesisDoc := genesispkg.Genesis{
ChainID: chainId,
StartTime: time.Now(),
InitialHeight: 1,
ProposerAddress: proposerAddr,
}

// Use a shared DA store that we can populate later
mainKV := sync.MutexWrap(datastore.NewMapDatastore())
rktStore := store.New(mainKV)

conf := config.DefaultConfig()
conf.RootDir = t.TempDir()
nodeKey, err := key.LoadOrGenNodeKey(filepath.Dir(conf.ConfigPath()))
require.NoError(t, err)
logger := zerolog.Nop()

h, err := mn.AddPeer(nodeKey.PrivKey, nil)
require.NoError(t, err)

p2pClient, err := p2p.NewClientWithHost(conf.P2P, nodeKey.PrivKey, mainKV, chainId, logger, p2p.NopMetrics(), h)
require.NoError(t, err)

ctx, cancel := context.WithCancel(t.Context())
defer cancel()
require.NoError(t, p2pClient.Start(ctx))
t.Cleanup(func() { _ = p2pClient.Close() })

// Create the sync service - it has no peers to connect to, so it won't be able to sync via P2P
// But it DOES have a DA store (rktStore) that we can populate
svc, err := NewHeaderSyncService(mainKV, rktStore, conf, genesisDoc, p2pClient, logger)
require.NoError(t, err)

// Start the service - this will return without starting the syncer because there are no peers
require.NoError(t, svc.Start(ctx))
t.Cleanup(func() { _ = svc.Stop(context.Background()) })

// Verify the syncer hasn't started yet (no peers to get headers from)
require.False(t, svc.syncerStatus.isStarted(), "syncer should not be started without peers")

// Verify that querying the header store before syncer starts returns an empty store error
_, headErr := svc.Store().Head(ctx)
require.Error(t, headErr, "querying head before syncer starts should return an error")
require.True(t, errors.Is(headErr, header.ErrNotFound) || errors.Is(headErr, header.ErrEmptyStore),
"error should be ErrNotFound or ErrEmptyStore, got: %v", headErr)

// Create the genesis header that we'll add to the DA store
headerConfig := types.HeaderConfig{
Height: genesisDoc.InitialHeight,
DataHash: bytesN(rnd, 32),
AppHash: bytesN(rnd, 32),
Signer: noopSigner,
}
signedHeader, err := types.GetRandomSignedHeaderCustom(&headerConfig, genesisDoc.ChainID)
require.NoError(t, err)
require.NoError(t, signedHeader.Validate())

// Track background retry completion
var retryCompleted atomic.Bool

// Manually trigger the background retry (simulating what happens after 2min timeout in initFromP2PWithRetry)
go func() {
svc.retryInitInBackground()
retryCompleted.Store(true)
}()

// Give the retry a moment to start and fail at least once (no headers available yet)
time.Sleep(100 * time.Millisecond)

// Now add the header to the DA store - the background retry's next attempt should find it
// via the exchangeWrapper's getterByHeight which checks the DA store first
batch, err := rktStore.NewBatch(ctx)
require.NoError(t, err)
require.NoError(t, batch.SaveBlockData(signedHeader, &types.Data{}, &types.Signature{}))
require.NoError(t, batch.SetHeight(signedHeader.Height()))
require.NoError(t, batch.Commit())

// Wait for the background retry to succeed and start the syncer
require.Eventually(t, func() bool {
return svc.syncerStatus.isStarted()
}, 30*time.Second, 100*time.Millisecond, "syncer should eventually start after background retry finds header in DA store")

// Verify the retry goroutine completed successfully
require.Eventually(t, func() bool {
return retryCompleted.Load()
}, 5*time.Second, 100*time.Millisecond, "background retry goroutine should complete")

// Verify the store was initialized with the header
head, err := svc.Store().Head(ctx)
require.NoError(t, err)
require.Equal(t, genesisDoc.InitialHeight, head.Height())
}
Loading