From 45ae980a8f4bcd3580fb21a7fa313100d7c58c2a Mon Sep 17 00:00:00 2001 From: bdchatham Date: Fri, 27 Mar 2026 16:59:35 -0400 Subject: [PATCH 1/2] feat: add monitor task infrastructure for shadow replayer comparison MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add a third task category — monitor tasks — that the controller submits once in Running phase and polls each reconcile for completion. When a monitor task reaches a terminal state, the controller sets a Condition and emits a Kubernetes Event. The first (and currently only) monitor task is result-export in comparison mode: the sidecar compares local block execution results against a canonical RPC endpoint and completes when app-hash divergence is detected. Key changes: - MonitorTask struct on SeiNodeStatus with SubmittedAt/CompletedAt timestamps for observability - Rename PlannedTaskStatus → TaskStatus for cross-context clarity - ResultExportComplete Condition with reasons: DivergenceDetected, TaskFailed, TaskLost - Sidecar-lost-task detection: ErrNotFound after successful submission immediately marks Failed (sidecar Submit is synchronous) - Early return on optimistic lock conflicts to avoid wasted API calls - Immediate requeue on terminal state for fast downstream reaction - Remove dead scheduled-mode result-export code (CanonicalRPC is now required on ResultExportConfig) - Bump seictl to v0.0.24 (adds CanonicalRPC to ResultExportTask) --- api/v1alpha1/common_types.go | 19 +- api/v1alpha1/seinode_types.go | 40 +- api/v1alpha1/zz_generated.deepcopy.go | 27 + config/crd/sei.io_seinodegroups.yaml | 11 + config/crd/sei.io_seinodes.yaml | 52 ++ go.mod | 2 +- go.sum | 4 +- internal/controller/node/monitor.go | 169 ++++++ internal/controller/node/monitor_test.go | 563 ++++++++++++++++++ internal/controller/node/plan_execution.go | 37 +- .../node/plan_execution_integration_test.go | 4 +- .../controller/node/plan_execution_test.go | 141 ++++- internal/planner/bootstrap.go | 25 +- internal/planner/executor.go | 10 +- internal/planner/executor_test.go | 12 +- internal/planner/group.go | 2 +- internal/planner/group_test.go | 2 +- internal/planner/planner.go | 2 +- .../seinode/pacific-1-shadow-replayer.yaml | 3 +- manifests/sei.io_seinodegroups.yaml | 11 + manifests/sei.io_seinodes.yaml | 52 ++ 21 files changed, 1115 insertions(+), 73 deletions(-) create mode 100644 internal/controller/node/monitor.go create mode 100644 internal/controller/node/monitor_test.go diff --git a/api/v1alpha1/common_types.go b/api/v1alpha1/common_types.go index 392e2f2..4cad665 100644 --- a/api/v1alpha1/common_types.go +++ b/api/v1alpha1/common_types.go @@ -120,12 +120,23 @@ type S3SnapshotDestination struct { Region string `json:"region"` } -// ResultExportConfig enables periodic export of block execution results. +// ResultExportConfig enables export of block execution results to S3. // The sidecar queries the local RPC endpoint for block results and uploads // them in compressed NDJSON pages to the platform S3 bucket, keyed by the -// node's chain ID. Its presence on a node spec is sufficient to enable -// export — no additional fields are required. -type ResultExportConfig struct{} +// node's chain ID. +// +// When CanonicalRPC is set, the sidecar additionally compares local results +// against the canonical chain and the task completes when app-hash divergence +// is detected (monitor mode). Without CanonicalRPC, results are exported +// periodically on a cron schedule (scheduled mode). +type ResultExportConfig struct { + // CanonicalRPC is the HTTP RPC endpoint of the canonical chain node + // to compare block execution results against. When set, the sidecar + // runs in comparison mode and the task completes when app-hash + // divergence is detected. + // +kubebuilder:validation:MinLength=1 + CanonicalRPC string `json:"canonicalRpc"` +} // GenesisConfiguration defines where genesis data is sourced. // At most one of PVC or S3 may be set. When neither is set and the chain ID diff --git a/api/v1alpha1/seinode_types.go b/api/v1alpha1/seinode_types.go index 01fc315..6fce095 100644 --- a/api/v1alpha1/seinode_types.go +++ b/api/v1alpha1/seinode_types.go @@ -99,14 +99,14 @@ const ( TaskPlanFailed TaskPlanPhase = "Failed" ) -// PlannedTaskStatus represents the state of an individual task within a plan. +// TaskStatus represents the lifecycle state of a task (plan, monitor, etc.). // +kubebuilder:validation:Enum=Pending;Complete;Failed -type PlannedTaskStatus string +type TaskStatus string const ( - PlannedTaskPending PlannedTaskStatus = "Pending" - PlannedTaskComplete PlannedTaskStatus = "Complete" - PlannedTaskFailed PlannedTaskStatus = "Failed" + TaskPending TaskStatus = "Pending" + TaskComplete TaskStatus = "Complete" + TaskFailed TaskStatus = "Failed" ) // PlannedTask is a single task within a TaskPlan. Each task carries its full @@ -121,7 +121,7 @@ type PlannedTask struct { ID string `json:"id"` // Status is the current state of this task. - Status PlannedTaskStatus `json:"status"` + Status TaskStatus `json:"status"` // Params is the opaque JSON payload for this task. Deserialized at // execution time by task.Deserialize into the concrete task type. @@ -165,6 +165,29 @@ const ( PhaseTerminating SeiNodePhase = "Terminating" ) +// MonitorTask tracks a long-running sidecar task that the controller +// actively polls for completion. Unlike ScheduledTasks (fire-and-forget), +// completing a monitor task triggers a controller response (Event + Condition). +// The map key in MonitorTasks serves as the task type identifier. +type MonitorTask struct { + // ID is the sidecar-assigned task UUID. + ID string `json:"id"` + + // Status tracks lifecycle: Pending → Complete or Failed. + Status TaskStatus `json:"status"` + + // SubmittedAt is the time the task was submitted to the sidecar. + SubmittedAt metav1.Time `json:"submittedAt"` + + // CompletedAt is the time the task reached a terminal state. + // +optional + CompletedAt *metav1.Time `json:"completedAt,omitempty"` + + // Error is set when the task fails. + // +optional + Error string `json:"error,omitempty"` +} + // SeiNodeStatus defines the observed state of a SeiNode. type SeiNodeStatus struct { // Phase is the high-level lifecycle state. @@ -184,6 +207,11 @@ type SeiNodeStatus struct { // +optional ScheduledTasks map[string]string `json:"scheduledTasks,omitempty"` + // MonitorTasks tracks long-running sidecar tasks the controller polls + // for completion. Keyed by task type for idempotent submission. + // +optional + MonitorTasks map[string]MonitorTask `json:"monitorTasks,omitempty"` + // ConfigStatus reports the observed configuration state from the sidecar. // +optional ConfigStatus *ConfigStatus `json:"configStatus,omitempty"` diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index bb16697..a15b8f8 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -410,6 +410,26 @@ func (in *GroupNodeStatus) DeepCopy() *GroupNodeStatus { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *MonitorTask) DeepCopyInto(out *MonitorTask) { + *out = *in + in.SubmittedAt.DeepCopyInto(&out.SubmittedAt) + if in.CompletedAt != nil { + in, out := &in.CompletedAt, &out.CompletedAt + *out = (*in).DeepCopy() + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new MonitorTask. +func (in *MonitorTask) DeepCopy() *MonitorTask { + if in == nil { + return nil + } + out := new(MonitorTask) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *MonitoringConfig) DeepCopyInto(out *MonitoringConfig) { *out = *in @@ -1023,6 +1043,13 @@ func (in *SeiNodeStatus) DeepCopyInto(out *SeiNodeStatus) { (*out)[key] = val } } + if in.MonitorTasks != nil { + in, out := &in.MonitorTasks, &out.MonitorTasks + *out = make(map[string]MonitorTask, len(*in)) + for key, val := range *in { + (*out)[key] = *val.DeepCopy() + } + } if in.ConfigStatus != nil { in, out := &in.ConfigStatus, &out.ConfigStatus *out = new(ConfigStatus) diff --git a/config/crd/sei.io_seinodegroups.yaml b/config/crd/sei.io_seinodegroups.yaml index 07ff72f..3ac6da8 100644 --- a/config/crd/sei.io_seinodegroups.yaml +++ b/config/crd/sei.io_seinodegroups.yaml @@ -715,6 +715,17 @@ spec: The sidecar queries the local RPC for block_results and uploads compressed NDJSON pages on a schedule. Useful for shadow replayers that need their execution results compared against the canonical chain. + properties: + canonicalRpc: + description: |- + CanonicalRPC is the HTTP RPC endpoint of the canonical chain node + to compare block execution results against. When set, the sidecar + runs in comparison mode and the task completes when app-hash + divergence is detected. + minLength: 1 + type: string + required: + - canonicalRpc type: object snapshot: description: Snapshot identifies the snapshot to restore diff --git a/config/crd/sei.io_seinodes.yaml b/config/crd/sei.io_seinodes.yaml index e67e407..c3df7eb 100644 --- a/config/crd/sei.io_seinodes.yaml +++ b/config/crd/sei.io_seinodes.yaml @@ -427,6 +427,17 @@ spec: The sidecar queries the local RPC for block_results and uploads compressed NDJSON pages on a schedule. Useful for shadow replayers that need their execution results compared against the canonical chain. + properties: + canonicalRpc: + description: |- + CanonicalRPC is the HTTP RPC endpoint of the canonical chain node + to compare block execution results against. When set, the sidecar + runs in comparison mode and the task completes when app-hash + divergence is detected. + minLength: 1 + type: string + required: + - canonicalRpc type: object snapshot: description: Snapshot identifies the snapshot to restore from @@ -898,6 +909,47 @@ spec: - phase - tasks type: object + monitorTasks: + additionalProperties: + description: |- + MonitorTask tracks a long-running sidecar task that the controller + actively polls for completion. Unlike ScheduledTasks (fire-and-forget), + completing a monitor task triggers a controller response (Event + Condition). + The map key in MonitorTasks serves as the task type identifier. + properties: + completedAt: + description: CompletedAt is the time the task reached a terminal + state. + format: date-time + type: string + error: + description: Error is set when the task fails. + type: string + id: + description: ID is the sidecar-assigned task UUID. + type: string + status: + description: 'Status tracks lifecycle: Pending → Complete or + Failed.' + enum: + - Pending + - Complete + - Failed + type: string + submittedAt: + description: SubmittedAt is the time the task was submitted + to the sidecar. + format: date-time + type: string + required: + - id + - status + - submittedAt + type: object + description: |- + MonitorTasks tracks long-running sidecar tasks the controller polls + for completion. Keyed by task type for idempotent submission. + type: object phase: description: Phase is the high-level lifecycle state. enum: diff --git a/go.mod b/go.mod index 3dac93c..9f2aca0 100644 --- a/go.mod +++ b/go.mod @@ -10,7 +10,7 @@ require ( github.com/onsi/gomega v1.38.2 github.com/prometheus/client_golang v1.23.2 github.com/sei-protocol/sei-config v0.0.9-0.20260327015454-7cf35ff77daa - github.com/sei-protocol/seictl v0.0.23 + github.com/sei-protocol/seictl v0.0.24 k8s.io/api v0.35.0 k8s.io/apiextensions-apiserver v0.35.0 k8s.io/apimachinery v0.35.0 diff --git a/go.sum b/go.sum index 704130e..2c2d3b3 100644 --- a/go.sum +++ b/go.sum @@ -166,8 +166,8 @@ github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7 github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/sei-protocol/sei-config v0.0.9-0.20260327015454-7cf35ff77daa h1:bi0qHl2E8TxpOpBZZJmMl7eZc04sMJe5E11nC+uF7IM= github.com/sei-protocol/sei-config v0.0.9-0.20260327015454-7cf35ff77daa/go.mod h1:IEAv5ynYw8Gu2F2qNfE4MQR0PPihAT6g7RWLpWdw5O0= -github.com/sei-protocol/seictl v0.0.23 h1:bDxzMeys7bkdfcHi2WFtMa/zVUxmu7w3yqYu1eCfsKY= -github.com/sei-protocol/seictl v0.0.23/go.mod h1:Q1YlXp1fUnJGLq5l1ORgpNsKNv+w/BaLBqDdv0pj/a0= +github.com/sei-protocol/seictl v0.0.24 h1:VmNg5A5tGhB/Z8Q0VN2MKsjodkSyqe69O+8pE3mhDrw= +github.com/sei-protocol/seictl v0.0.24/go.mod h1:0iI6V8BMOvchjjGSv1TQTUVxN4noUvIUdERmoegoavE= github.com/sei-protocol/seilog v0.0.3 h1:Zi7oWXdX5jv92dY8n482xH032LtNebC89Y+qYZlBn0Y= github.com/sei-protocol/seilog v0.0.3/go.mod h1:CKg58wraWnB3gRxWQ0v1rIVr0gmDHjkfP1bM2giKFFU= github.com/spf13/cobra v1.10.2 h1:DMTTonx5m65Ic0GOoRY2c16WCbHxOOw6xxezuLaBpcU= diff --git a/internal/controller/node/monitor.go b/internal/controller/node/monitor.go new file mode 100644 index 0000000..0fec973 --- /dev/null +++ b/internal/controller/node/monitor.go @@ -0,0 +1,169 @@ +package node + +import ( + "context" + "errors" + "fmt" + + "github.com/google/uuid" + sidecar "github.com/sei-protocol/seictl/sidecar/client" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/log" + + seiv1alpha1 "github.com/sei-protocol/sei-k8s-controller/api/v1alpha1" + "github.com/sei-protocol/sei-k8s-controller/internal/planner" + "github.com/sei-protocol/sei-k8s-controller/internal/task" +) + +const ( + ConditionResultExportComplete = "ResultExportComplete" + ReasonDivergenceDetected = "DivergenceDetected" + ReasonTaskFailed = "TaskFailed" + ReasonTaskLost = "TaskLost" +) + +// ensureMonitorTasks submits all applicable monitor tasks for a node based +// on its spec. Each task is submitted exactly once (idempotent). Errors do +// not short-circuit — all applicable tasks are attempted. +func (r *SeiNodeReconciler) ensureMonitorTasks(ctx context.Context, node *seiv1alpha1.SeiNode, sc task.SidecarClient) error { + var firstErr error + + if req := planner.ResultExportMonitorTask(node); req != nil { + if err := r.ensureMonitorTask(ctx, node, sc, *req); err != nil { + firstErr = err + } + } + + return firstErr +} + +// ensureMonitorTask submits a monitor task exactly once, tracking it in +// node.Status.MonitorTasks. Idempotency: skips if the task type already exists. +func (r *SeiNodeReconciler) ensureMonitorTask(ctx context.Context, node *seiv1alpha1.SeiNode, sc task.SidecarClient, req sidecar.TaskRequest) error { + if node.Status.MonitorTasks != nil { + if _, ok := node.Status.MonitorTasks[req.Type]; ok { + return nil + } + } + + id, err := sc.SubmitTask(ctx, req) + if err != nil { + return fmt.Errorf("submitting monitor task %s: %w", req.Type, err) + } + + now := metav1.Now() + patch := client.MergeFromWithOptions(node.DeepCopy(), client.MergeFromWithOptimisticLock{}) + if node.Status.MonitorTasks == nil { + node.Status.MonitorTasks = make(map[string]seiv1alpha1.MonitorTask) + } + node.Status.MonitorTasks[req.Type] = seiv1alpha1.MonitorTask{ + ID: id.String(), + Status: seiv1alpha1.TaskPending, + SubmittedAt: now, + } + return r.Status().Patch(ctx, node, patch) +} + +// pollMonitorTasks checks each pending monitor task via the sidecar API. +// When a task completes or fails, it patches the status, sets a Condition, +// and emits a Kubernetes Event. Returns true when a terminal state was +// observed, signaling the caller to requeue immediately. +func (r *SeiNodeReconciler) pollMonitorTasks(ctx context.Context, node *seiv1alpha1.SeiNode, sc task.SidecarClient) (bool, error) { + if len(node.Status.MonitorTasks) == 0 { + return false, nil + } + + logger := log.FromContext(ctx) + var patched bool + var terminal bool + + patch := client.MergeFromWithOptions(node.DeepCopy(), client.MergeFromWithOptimisticLock{}) + + for key, mt := range node.Status.MonitorTasks { + if mt.Status != seiv1alpha1.TaskPending { + continue + } + + taskID, parseErr := uuid.Parse(mt.ID) + if parseErr != nil { + logger.Info("invalid monitor task UUID, marking failed", "task", key, "id", mt.ID) + r.failMonitorTask(node, key, mt, fmt.Sprintf("invalid task UUID: %s", mt.ID), ReasonTaskFailed) + patched = true + terminal = true + continue + } + + result, err := sc.GetTask(ctx, taskID) + if err != nil { + if errors.Is(err, sidecar.ErrNotFound) { + // The sidecar inserts a task into its active map synchronously + // before returning from Submit. ErrNotFound after a successful + // submission means the sidecar process restarted and lost the task. + logger.Info("monitor task lost by sidecar", "task", key, "submittedAt", mt.SubmittedAt.Time) + r.failMonitorTask(node, key, mt, "sidecar lost task (not found after successful submission)", ReasonTaskLost) + patched = true + terminal = true + continue + } + logger.Info("monitor task poll error, will retry", "task", key, "error", err) + continue + } + + switch result.Status { + case sidecar.Completed: + now := metav1.Now() + mt.Status = seiv1alpha1.TaskComplete + mt.CompletedAt = &now + node.Status.MonitorTasks[key] = mt + patched = true + terminal = true + + meta.SetStatusCondition(&node.Status.Conditions, metav1.Condition{ + Type: ConditionResultExportComplete, + Status: metav1.ConditionTrue, + Reason: ReasonDivergenceDetected, + Message: fmt.Sprintf("Monitor task %s completed — app-hash divergence detected", key), + ObservedGeneration: node.Generation, + }) + r.Recorder.Eventf(node, corev1.EventTypeNormal, "MonitorTaskCompleted", + "Monitor task %s completed", key) + logger.Info("monitor task completed", "task", key) + + case sidecar.Failed: + errMsg := "task failed with unknown error" + if result.Error != nil && *result.Error != "" { + errMsg = *result.Error + } + r.failMonitorTask(node, key, mt, errMsg, ReasonTaskFailed) + patched = true + terminal = true + } + } + + if patched { + return terminal, r.Status().Patch(ctx, node, patch) + } + return false, nil +} + +// failMonitorTask marks a monitor task as failed, sets the Condition, and emits an Event. +func (r *SeiNodeReconciler) failMonitorTask(node *seiv1alpha1.SeiNode, key string, mt seiv1alpha1.MonitorTask, errMsg, reason string) { + now := metav1.Now() + mt.Status = seiv1alpha1.TaskFailed + mt.CompletedAt = &now + mt.Error = errMsg + node.Status.MonitorTasks[key] = mt + + meta.SetStatusCondition(&node.Status.Conditions, metav1.Condition{ + Type: ConditionResultExportComplete, + Status: metav1.ConditionFalse, + Reason: reason, + Message: fmt.Sprintf("Monitor task %s failed: %s", key, errMsg), + ObservedGeneration: node.Generation, + }) + r.Recorder.Eventf(node, corev1.EventTypeWarning, "MonitorTaskFailed", + "Monitor task %s failed: %s", key, errMsg) +} diff --git a/internal/controller/node/monitor_test.go b/internal/controller/node/monitor_test.go new file mode 100644 index 0000000..54c760c --- /dev/null +++ b/internal/controller/node/monitor_test.go @@ -0,0 +1,563 @@ +package node + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/google/uuid" + sidecar "github.com/sei-protocol/seictl/sidecar/client" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + seiv1alpha1 "github.com/sei-protocol/sei-k8s-controller/api/v1alpha1" + "github.com/sei-protocol/sei-k8s-controller/internal/planner" +) + +// monitorReplayerNode returns a replayer node with canonicalRpc set, +// placing result-export in monitor (comparison) mode. +func monitorReplayerNode() *seiv1alpha1.SeiNode { + node := replayerNode() + node.Spec.Replayer.ResultExport = &seiv1alpha1.ResultExportConfig{ + CanonicalRPC: "http://canonical-rpc:26657", + } + return node +} + +// --- ensureMonitorTask tests --- + +func TestEnsureMonitorTask_SubmitsOnce(t *testing.T) { + taskID := uuid.New() + mock := &mockSidecarClient{submitID: taskID} + node := monitorReplayerNode() + node.Status.Phase = seiv1alpha1.PhaseRunning + node.Status.InitPlan = &seiv1alpha1.TaskPlan{Phase: seiv1alpha1.TaskPlanComplete} + + r, c := newProgressionReconciler(t, mock, node) + ctx := context.Background() + + req := planner.ResultExportMonitorTask(node) + if req == nil { + t.Fatal("expected non-nil monitor task request") + } + + if err := r.ensureMonitorTask(ctx, node, mock, *req); err != nil { + t.Fatalf("ensureMonitorTask: %v", err) + } + + if len(mock.submitted) != 1 { + t.Fatalf("expected 1 submission, got %d", len(mock.submitted)) + } + if mock.submitted[0].Type != planner.TaskResultExport { + t.Errorf("submitted type = %q, want %q", mock.submitted[0].Type, planner.TaskResultExport) + } + + updated := fetchNode(t, c, node.Name, node.Namespace) + if updated.Status.MonitorTasks == nil { + t.Fatal("expected MonitorTasks to be set") + } + mt, ok := updated.Status.MonitorTasks[planner.TaskResultExport] + if !ok { + t.Fatalf("expected MonitorTasks[%s] to exist", planner.TaskResultExport) + } + if mt.ID != taskID.String() { + t.Errorf("MonitorTasks ID = %q, want %q", mt.ID, taskID.String()) + } + if mt.Status != seiv1alpha1.TaskPending { + t.Errorf("MonitorTasks status = %q, want Pending", mt.Status) + } + if mt.SubmittedAt.IsZero() { + t.Error("expected SubmittedAt to be set") + } +} + +func TestEnsureMonitorTask_Idempotent(t *testing.T) { + existingID := uuid.New() + mock := &mockSidecarClient{} + node := monitorReplayerNode() + node.Status.Phase = seiv1alpha1.PhaseRunning + node.Status.InitPlan = &seiv1alpha1.TaskPlan{Phase: seiv1alpha1.TaskPlanComplete} + node.Status.MonitorTasks = map[string]seiv1alpha1.MonitorTask{ + planner.TaskResultExport: { + ID: existingID.String(), + Status: seiv1alpha1.TaskPending, + SubmittedAt: metav1.Now(), + }, + } + + r, _ := newProgressionReconciler(t, mock, node) + ctx := context.Background() + + req := planner.ResultExportMonitorTask(node) + if err := r.ensureMonitorTask(ctx, node, mock, *req); err != nil { + t.Fatalf("ensureMonitorTask: %v", err) + } + + if len(mock.submitted) != 0 { + t.Errorf("expected no submissions for existing task, got %d", len(mock.submitted)) + } +} + +// --- pollMonitorTasks tests --- + +func TestPollMonitorTasks_Completed(t *testing.T) { + taskID := uuid.New() + mock := &mockSidecarClient{ + taskResults: map[uuid.UUID]*sidecar.TaskResult{ + taskID: completedResult(taskID, planner.TaskResultExport, nil), + }, + } + node := monitorReplayerNode() + node.Status.Phase = seiv1alpha1.PhaseRunning + node.Status.InitPlan = &seiv1alpha1.TaskPlan{Phase: seiv1alpha1.TaskPlanComplete} + node.Status.MonitorTasks = map[string]seiv1alpha1.MonitorTask{ + planner.TaskResultExport: { + ID: taskID.String(), + Status: seiv1alpha1.TaskPending, + SubmittedAt: metav1.Now(), + }, + } + + r, c := newProgressionReconciler(t, mock, node) + ctx := context.Background() + + requeue, err := r.pollMonitorTasks(ctx, node, mock) + if err != nil { + t.Fatalf("pollMonitorTasks: %v", err) + } + if !requeue { + t.Error("expected requeue=true on task completion") + } + + updated := fetchNode(t, c, node.Name, node.Namespace) + mt := updated.Status.MonitorTasks[planner.TaskResultExport] + if mt.Status != seiv1alpha1.TaskComplete { + t.Errorf("monitor task status = %q, want Complete", mt.Status) + } + if mt.Error != "" { + t.Errorf("monitor task error = %q, want empty", mt.Error) + } + if mt.CompletedAt == nil { + t.Error("expected CompletedAt to be set") + } + + cond := meta.FindStatusCondition(updated.Status.Conditions, ConditionResultExportComplete) + if cond == nil { + t.Fatal("expected ResultExportComplete condition") + } + if cond.Status != metav1.ConditionTrue { + t.Errorf("condition status = %q, want True", cond.Status) + } + if cond.Reason != ReasonDivergenceDetected { + t.Errorf("condition reason = %q, want %q", cond.Reason, ReasonDivergenceDetected) + } +} + +func TestPollMonitorTasks_Failed(t *testing.T) { + taskID := uuid.New() + mock := &mockSidecarClient{ + taskResults: map[uuid.UUID]*sidecar.TaskResult{ + taskID: completedResult(taskID, planner.TaskResultExport, strPtr("apphash mismatch")), + }, + } + node := monitorReplayerNode() + node.Status.Phase = seiv1alpha1.PhaseRunning + node.Status.InitPlan = &seiv1alpha1.TaskPlan{Phase: seiv1alpha1.TaskPlanComplete} + node.Status.MonitorTasks = map[string]seiv1alpha1.MonitorTask{ + planner.TaskResultExport: { + ID: taskID.String(), + Status: seiv1alpha1.TaskPending, + SubmittedAt: metav1.Now(), + }, + } + + r, c := newProgressionReconciler(t, mock, node) + ctx := context.Background() + + requeue, err := r.pollMonitorTasks(ctx, node, mock) + if err != nil { + t.Fatalf("pollMonitorTasks: %v", err) + } + if !requeue { + t.Error("expected requeue=true on task failure") + } + + updated := fetchNode(t, c, node.Name, node.Namespace) + mt := updated.Status.MonitorTasks[planner.TaskResultExport] + if mt.Status != seiv1alpha1.TaskFailed { + t.Errorf("monitor task status = %q, want Failed", mt.Status) + } + if mt.Error != "apphash mismatch" { + t.Errorf("monitor task error = %q, want %q", mt.Error, "apphash mismatch") + } + if mt.CompletedAt == nil { + t.Error("expected CompletedAt to be set") + } + + cond := meta.FindStatusCondition(updated.Status.Conditions, ConditionResultExportComplete) + if cond == nil { + t.Fatal("expected ResultExportComplete condition") + } + if cond.Status != metav1.ConditionFalse { + t.Errorf("condition status = %q, want False", cond.Status) + } + if cond.Reason != ReasonTaskFailed { + t.Errorf("condition reason = %q, want %q", cond.Reason, ReasonTaskFailed) + } +} + +func TestPollMonitorTasks_StillRunning(t *testing.T) { + taskID := uuid.New() + now := time.Now() + mock := &mockSidecarClient{ + taskResults: map[uuid.UUID]*sidecar.TaskResult{ + taskID: { + Id: taskID, + Type: planner.TaskResultExport, + Status: sidecar.Running, + SubmittedAt: now.Add(-10 * time.Second), + }, + }, + } + node := monitorReplayerNode() + node.Status.Phase = seiv1alpha1.PhaseRunning + node.Status.InitPlan = &seiv1alpha1.TaskPlan{Phase: seiv1alpha1.TaskPlanComplete} + node.Status.MonitorTasks = map[string]seiv1alpha1.MonitorTask{ + planner.TaskResultExport: { + ID: taskID.String(), + Status: seiv1alpha1.TaskPending, + SubmittedAt: metav1.Now(), + }, + } + + r, c := newProgressionReconciler(t, mock, node) + ctx := context.Background() + + requeue, err := r.pollMonitorTasks(ctx, node, mock) + if err != nil { + t.Fatalf("pollMonitorTasks: %v", err) + } + if requeue { + t.Error("expected requeue=false for still-running task") + } + + updated := fetchNode(t, c, node.Name, node.Namespace) + mt := updated.Status.MonitorTasks[planner.TaskResultExport] + if mt.Status != seiv1alpha1.TaskPending { + t.Errorf("monitor task status = %q, want Pending", mt.Status) + } +} + +func TestPollMonitorTasks_SkipsCompletedTasks(t *testing.T) { + taskID := uuid.New() + now := metav1.Now() + mock := &mockSidecarClient{} + node := monitorReplayerNode() + node.Status.Phase = seiv1alpha1.PhaseRunning + node.Status.InitPlan = &seiv1alpha1.TaskPlan{Phase: seiv1alpha1.TaskPlanComplete} + node.Status.MonitorTasks = map[string]seiv1alpha1.MonitorTask{ + planner.TaskResultExport: { + ID: taskID.String(), + Status: seiv1alpha1.TaskComplete, + SubmittedAt: metav1.NewTime(now.Add(-10 * time.Minute)), + CompletedAt: &now, + }, + } + + r, _ := newProgressionReconciler(t, mock, node) + ctx := context.Background() + + requeue, err := r.pollMonitorTasks(ctx, node, mock) + if err != nil { + t.Fatalf("pollMonitorTasks: %v", err) + } + if requeue { + t.Error("expected requeue=false for already-completed task") + } +} + +func TestPollMonitorTasks_SidecarLostTask(t *testing.T) { + taskID := uuid.New() + // Empty mock returns ErrNotFound for any GetTask call, simulating a + // sidecar restart that lost the task's in-memory state. + mock := &mockSidecarClient{} + node := monitorReplayerNode() + node.Status.Phase = seiv1alpha1.PhaseRunning + node.Status.InitPlan = &seiv1alpha1.TaskPlan{Phase: seiv1alpha1.TaskPlanComplete} + node.Status.MonitorTasks = map[string]seiv1alpha1.MonitorTask{ + planner.TaskResultExport: { + ID: taskID.String(), + Status: seiv1alpha1.TaskPending, + SubmittedAt: metav1.Now(), + }, + } + + r, c := newProgressionReconciler(t, mock, node) + ctx := context.Background() + + requeue, err := r.pollMonitorTasks(ctx, node, mock) + if err != nil { + t.Fatalf("pollMonitorTasks: %v", err) + } + if !requeue { + t.Error("expected requeue=true when sidecar lost task") + } + + updated := fetchNode(t, c, node.Name, node.Namespace) + mt := updated.Status.MonitorTasks[planner.TaskResultExport] + if mt.Status != seiv1alpha1.TaskFailed { + t.Errorf("monitor task status = %q, want Failed", mt.Status) + } + if mt.Error == "" { + t.Error("expected non-empty error for lost task") + } + + cond := meta.FindStatusCondition(updated.Status.Conditions, ConditionResultExportComplete) + if cond == nil { + t.Fatal("expected ResultExportComplete condition") + } + if cond.Reason != ReasonTaskLost { + t.Errorf("condition reason = %q, want %q", cond.Reason, ReasonTaskLost) + } +} + +// --- ensureMonitorTask error handling --- + +func TestEnsureMonitorTask_SubmitError(t *testing.T) { + mock := &mockSidecarClient{submitErr: fmt.Errorf("connection refused")} + node := monitorReplayerNode() + node.Status.Phase = seiv1alpha1.PhaseRunning + node.Status.InitPlan = &seiv1alpha1.TaskPlan{Phase: seiv1alpha1.TaskPlanComplete} + + r, c := newProgressionReconciler(t, mock, node) + ctx := context.Background() + + req := planner.ResultExportMonitorTask(node) + err := r.ensureMonitorTask(ctx, node, mock, *req) + if err == nil { + t.Fatal("expected error from failed submit") + } + + // MonitorTasks should NOT be set — the submit failed before patching. + updated := fetchNode(t, c, node.Name, node.Namespace) + if updated.Status.MonitorTasks != nil { + t.Errorf("expected nil MonitorTasks after submit failure, got %v", updated.Status.MonitorTasks) + } +} + +// --- pollMonitorTasks edge cases --- + +func TestPollMonitorTasks_EmptyMap(t *testing.T) { + mock := &mockSidecarClient{} + node := monitorReplayerNode() + node.Status.Phase = seiv1alpha1.PhaseRunning + node.Status.InitPlan = &seiv1alpha1.TaskPlan{Phase: seiv1alpha1.TaskPlanComplete} + + r, _ := newProgressionReconciler(t, mock, node) + + requeue, err := r.pollMonitorTasks(context.Background(), node, mock) + if err != nil { + t.Fatalf("pollMonitorTasks: %v", err) + } + if requeue { + t.Error("expected requeue=false for empty MonitorTasks") + } +} + +func TestPollMonitorTasks_InvalidUUID(t *testing.T) { + mock := &mockSidecarClient{} + node := monitorReplayerNode() + node.Status.Phase = seiv1alpha1.PhaseRunning + node.Status.InitPlan = &seiv1alpha1.TaskPlan{Phase: seiv1alpha1.TaskPlanComplete} + node.Status.MonitorTasks = map[string]seiv1alpha1.MonitorTask{ + planner.TaskResultExport: { + ID: "not-a-uuid", + Status: seiv1alpha1.TaskPending, + SubmittedAt: metav1.Now(), + }, + } + + r, c := newProgressionReconciler(t, mock, node) + + requeue, err := r.pollMonitorTasks(context.Background(), node, mock) + if err != nil { + t.Fatalf("pollMonitorTasks: %v", err) + } + if !requeue { + t.Error("expected requeue=true for invalid UUID failure") + } + + updated := fetchNode(t, c, node.Name, node.Namespace) + mt := updated.Status.MonitorTasks[planner.TaskResultExport] + if mt.Status != seiv1alpha1.TaskFailed { + t.Errorf("status = %q, want Failed", mt.Status) + } + if mt.CompletedAt == nil { + t.Error("expected CompletedAt to be set") + } + + cond := meta.FindStatusCondition(updated.Status.Conditions, ConditionResultExportComplete) + if cond == nil { + t.Fatal("expected ResultExportComplete condition") + } + if cond.Reason != ReasonTaskFailed { + t.Errorf("condition reason = %q, want %q", cond.Reason, ReasonTaskFailed) + } +} + +func TestPollMonitorTasks_TransientGetTaskError(t *testing.T) { + taskID := uuid.New() + mock := &mockSidecarClient{ + getTaskErr: fmt.Errorf("connection timeout"), + } + node := monitorReplayerNode() + node.Status.Phase = seiv1alpha1.PhaseRunning + node.Status.InitPlan = &seiv1alpha1.TaskPlan{Phase: seiv1alpha1.TaskPlanComplete} + node.Status.MonitorTasks = map[string]seiv1alpha1.MonitorTask{ + planner.TaskResultExport: { + ID: taskID.String(), + Status: seiv1alpha1.TaskPending, + SubmittedAt: metav1.Now(), + }, + } + + r, c := newProgressionReconciler(t, mock, node) + + requeue, err := r.pollMonitorTasks(context.Background(), node, mock) + if err != nil { + t.Fatalf("pollMonitorTasks: %v", err) + } + if requeue { + t.Error("expected requeue=false for transient error (will retry next reconcile)") + } + + // Task should remain Pending — transient errors don't change status. + updated := fetchNode(t, c, node.Name, node.Namespace) + mt := updated.Status.MonitorTasks[planner.TaskResultExport] + if mt.Status != seiv1alpha1.TaskPending { + t.Errorf("status = %q, want Pending", mt.Status) + } +} + +func TestPollMonitorTasks_FailedWithUnknownError(t *testing.T) { + taskID := uuid.New() + now := time.Now() + mock := &mockSidecarClient{ + taskResults: map[uuid.UUID]*sidecar.TaskResult{ + taskID: { + Id: taskID, + Type: planner.TaskResultExport, + Status: sidecar.Failed, + SubmittedAt: now.Add(-10 * time.Second), + CompletedAt: &now, + Error: nil, + }, + }, + } + node := monitorReplayerNode() + node.Status.Phase = seiv1alpha1.PhaseRunning + node.Status.InitPlan = &seiv1alpha1.TaskPlan{Phase: seiv1alpha1.TaskPlanComplete} + node.Status.MonitorTasks = map[string]seiv1alpha1.MonitorTask{ + planner.TaskResultExport: { + ID: taskID.String(), + Status: seiv1alpha1.TaskPending, + SubmittedAt: metav1.Now(), + }, + } + + r, c := newProgressionReconciler(t, mock, node) + + requeue, err := r.pollMonitorTasks(context.Background(), node, mock) + if err != nil { + t.Fatalf("pollMonitorTasks: %v", err) + } + if !requeue { + t.Error("expected requeue=true on task failure") + } + + updated := fetchNode(t, c, node.Name, node.Namespace) + mt := updated.Status.MonitorTasks[planner.TaskResultExport] + if mt.Status != seiv1alpha1.TaskFailed { + t.Errorf("status = %q, want Failed", mt.Status) + } + if mt.Error != "task failed with unknown error" { + t.Errorf("error = %q, want %q", mt.Error, "task failed with unknown error") + } +} + +// --- Reconcile integration tests --- + +func TestReconcileRunning_MonitorMode_SubmitsMonitorTask(t *testing.T) { + taskID := uuid.New() + mock := &mockSidecarClient{submitID: taskID} + node := monitorReplayerNode() + node.Status.Phase = seiv1alpha1.PhaseRunning + node.Status.InitPlan = &seiv1alpha1.TaskPlan{Phase: seiv1alpha1.TaskPlanComplete} + + r, c := newProgressionReconciler(t, mock, node) + + _, err := r.reconcileRunning(context.Background(), node) + if err != nil { + t.Fatalf("reconcileRunning: %v", err) + } + + if len(mock.submitted) != 1 { + t.Fatalf("expected 1 submission, got %d", len(mock.submitted)) + } + if mock.submitted[0].Type != planner.TaskResultExport { + t.Errorf("submitted type = %q, want %q", mock.submitted[0].Type, planner.TaskResultExport) + } + + updated := fetchNode(t, c, node.Name, node.Namespace) + if updated.Status.MonitorTasks == nil { + t.Fatal("expected MonitorTasks to be set") + } + if _, ok := updated.Status.MonitorTasks[planner.TaskResultExport]; !ok { + t.Errorf("expected MonitorTasks[%s] to exist", planner.TaskResultExport) + } + if updated.Status.ScheduledTasks != nil { + if _, ok := updated.Status.ScheduledTasks[planner.TaskResultExport]; ok { + t.Error("result-export should not be in ScheduledTasks") + } + } +} + +// --- Planner builder tests --- + +func TestResultExportMonitorTask_ReturnsRequest(t *testing.T) { + node := monitorReplayerNode() + req := planner.ResultExportMonitorTask(node) + if req == nil { + t.Fatal("expected non-nil TaskRequest") + } + if req.Type != planner.TaskResultExport { + t.Errorf("Type = %q, want %q", req.Type, planner.TaskResultExport) + } + if req.Params == nil { + t.Fatal("expected non-nil params") + } + params := *req.Params + if params["canonicalRpc"] != "http://canonical-rpc:26657" { + t.Errorf("canonicalRpc = %v, want %q", params["canonicalRpc"], "http://canonical-rpc:26657") + } + if _, ok := params["bucket"]; !ok { + t.Error("expected bucket param") + } +} + +func TestResultExportMonitorTask_NilWithoutResultExport(t *testing.T) { + node := replayerNode() + req := planner.ResultExportMonitorTask(node) + if req != nil { + t.Errorf("expected nil, got %v", req) + } +} + +func TestResultExportMonitorTask_NilForNonReplayer(t *testing.T) { + node := snapshotNode() + req := planner.ResultExportMonitorTask(node) + if req != nil { + t.Errorf("expected nil, got %v", req) + } +} diff --git a/internal/controller/node/plan_execution.go b/internal/controller/node/plan_execution.go index 8a846ce..7bad696 100644 --- a/internal/controller/node/plan_execution.go +++ b/internal/controller/node/plan_execution.go @@ -4,6 +4,7 @@ import ( "context" sidecar "github.com/sei-protocol/seictl/sidecar/client" + apierrors "k8s.io/apimachinery/pkg/api/errors" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/log" @@ -26,21 +27,43 @@ func (r *SeiNodeReconciler) buildSidecarClient(node *seiv1alpha1.SeiNode) task.S return c } -// reconcileRuntimeTasks ensures all scheduled tasks are submitted exactly -// once. The sidecar owns execution cadence after that. +// reconcileRuntimeTasks ensures all scheduled and monitor tasks are submitted +// exactly once, and polls monitor tasks for completion on each reconcile. func (r *SeiNodeReconciler) reconcileRuntimeTasks(ctx context.Context, node *seiv1alpha1.SeiNode, sc task.SidecarClient) (ctrl.Result, error) { + logger := log.FromContext(ctx) + + // Snapshot upload: always a scheduled (cron) task. if builder := planner.SnapshotUploadScheduledTask(node); builder != nil { req := builder.ToTaskRequest() if err := r.ensureScheduledTask(ctx, node, sc, req); err != nil { - log.FromContext(ctx).Info("scheduled task submission failed, will retry", "task", req.Type, "error", err) + if apierrors.IsConflict(err) { + return ctrl.Result{Requeue: true}, nil + } + logger.Info("scheduled task submission failed, will retry", "task", req.Type, "error", err) } } - if builder := planner.ResultExportScheduledTask(node); builder != nil { - req := builder.ToTaskRequest() - if err := r.ensureScheduledTask(ctx, node, sc, req); err != nil { - log.FromContext(ctx).Info("scheduled task submission failed, will retry", "task", req.Type, "error", err) + + // Result export: monitor task that compares block results against + // the canonical chain and completes on app-hash divergence. + if err := r.ensureMonitorTasks(ctx, node, sc); err != nil { + if apierrors.IsConflict(err) { + return ctrl.Result{Requeue: true}, nil + } + logger.Info("monitor task submission failed, will retry", "error", err) + } + + // Poll all active monitor tasks for completion. + requeue, err := r.pollMonitorTasks(ctx, node, sc) + if err != nil { + if apierrors.IsConflict(err) { + return ctrl.Result{Requeue: true}, nil } + logger.Info("monitor task poll failed, will retry", "error", err) } + if requeue { + return ctrl.Result{Requeue: true}, nil + } + return ctrl.Result{RequeueAfter: statusPollInterval}, nil } diff --git a/internal/controller/node/plan_execution_integration_test.go b/internal/controller/node/plan_execution_integration_test.go index f7b35ea..9fdc6d0 100644 --- a/internal/controller/node/plan_execution_integration_test.go +++ b/internal/controller/node/plan_execution_integration_test.go @@ -42,7 +42,7 @@ func driveTask( // Check if already complete (fire-and-forget tasks complete in one call) node = fetch() for _, pt := range node.Status.InitPlan.Tasks { - if pt.ID == taskUUID.String() && pt.Status == seiv1alpha1.PlannedTaskComplete { + if pt.ID == taskUUID.String() && pt.Status == seiv1alpha1.TaskComplete { return } } @@ -188,7 +188,7 @@ func TestIntegrationTaskFailure_FailsPlan(t *testing.T) { updated = fetch() g.Expect(updated.Status.InitPlan.Phase).To(Equal(seiv1alpha1.TaskPlanFailed)) - g.Expect(updated.Status.InitPlan.Tasks[0].Status).To(Equal(seiv1alpha1.PlannedTaskFailed)) + g.Expect(updated.Status.InitPlan.Tasks[0].Status).To(Equal(seiv1alpha1.TaskFailed)) g.Expect(updated.Status.InitPlan.Tasks[0].Error).To(Equal("S3 access denied")) mock.submitted = nil diff --git a/internal/controller/node/plan_execution_test.go b/internal/controller/node/plan_execution_test.go index 0cce672..3201158 100644 --- a/internal/controller/node/plan_execution_test.go +++ b/internal/controller/node/plan_execution_test.go @@ -345,7 +345,7 @@ func TestBuildPlanPhaseAndTasks(t *testing.T) { t.Fatalf("expected 6 tasks, got %d: %v", len(plan.Tasks), taskTypes(plan)) } for _, pt := range plan.Tasks { - if pt.Status != seiv1alpha1.PlannedTaskPending { + if pt.Status != seiv1alpha1.TaskPending { t.Errorf("task %q status = %q, want Pending", pt.Type, pt.Status) } if pt.ID == "" { @@ -497,7 +497,7 @@ func TestReconcile_SubmitsFirstPendingTask(t *testing.T) { updated := fetchNode(t, c, node.Name, node.Namespace) firstTask := updated.Status.InitPlan.Tasks[0] - if firstTask.Status != seiv1alpha1.PlannedTaskPending && firstTask.Status != seiv1alpha1.PlannedTaskComplete { + if firstTask.Status != seiv1alpha1.TaskPending && firstTask.Status != seiv1alpha1.TaskComplete { t.Logf("task status = %q (submit succeeded, status depends on mock GetTask)", firstTask.Status) } } @@ -508,7 +508,7 @@ func TestReconcile_AllTasksComplete_MarksPlanComplete(t *testing.T) { p, _ := planner.ForNode(node, testSnapshotRegion) node.Status.InitPlan = mustBuildPlan(t, p, node) for i := range node.Status.InitPlan.Tasks { - node.Status.InitPlan.Tasks[i].Status = seiv1alpha1.PlannedTaskComplete + node.Status.InitPlan.Tasks[i].Status = seiv1alpha1.TaskComplete } r, c := newProgressionReconciler(t, mock, node) @@ -621,7 +621,7 @@ func TestReconcile_SubmitError_RequeuesGracefully(t *testing.T) { t.Errorf("RequeueAfter = %v, want %v", result.RequeueAfter, planner.TaskPollInterval) } updated := fetchNode(t, c, node.Name, node.Namespace) - if updated.Status.InitPlan.Tasks[0].Status != seiv1alpha1.PlannedTaskPending { + if updated.Status.InitPlan.Tasks[0].Status != seiv1alpha1.TaskPending { t.Errorf("task status = %q, want Pending after submit failure", updated.Status.InitPlan.Tasks[0].Status) } } @@ -796,23 +796,22 @@ func TestReconcileInitializing_PlanFailed_TransitionsToFailed(t *testing.T) { // --- Result export tests --- -func TestResultExportScheduledTask_ReplayerWithExport(t *testing.T) { - node := replayerNode() - node.Spec.Replayer.ResultExport = &seiv1alpha1.ResultExportConfig{} - builder := planner.ResultExportScheduledTask(node) - if builder == nil { - t.Fatal("expected non-nil builder") +func TestResultExportMonitorTask_ReplayerWithExport(t *testing.T) { + node := monitorReplayerNode() + req := planner.ResultExportMonitorTask(node) + if req == nil { + t.Fatal("expected non-nil TaskRequest") } - if builder.TaskType() != planner.TaskResultExport { - t.Errorf("TaskType() = %q, want %q", builder.TaskType(), planner.TaskResultExport) + if req.Type != planner.TaskResultExport { + t.Errorf("Type = %q, want %q", req.Type, planner.TaskResultExport) } } -func TestResultExportScheduledTask_ReplayerWithoutExport(t *testing.T) { +func TestResultExportMonitorTask_ReplayerWithoutExport(t *testing.T) { node := replayerNode() - builder := planner.ResultExportScheduledTask(node) - if builder != nil { - t.Errorf("expected nil builder, got %v", builder) + req := planner.ResultExportMonitorTask(node) + if req != nil { + t.Errorf("expected nil TaskRequest, got %v", req) } } @@ -851,7 +850,7 @@ func TestReconcileInitializing_SidecarClientError_Requeues(t *testing.T) { node.Status.InitPlan = &seiv1alpha1.TaskPlan{ Phase: seiv1alpha1.TaskPlanActive, Tasks: []seiv1alpha1.PlannedTask{ - {Type: planner.TaskConfigApply, ID: "test-id", Status: seiv1alpha1.PlannedTaskPending}, + {Type: planner.TaskConfigApply, ID: "test-id", Status: seiv1alpha1.TaskPending}, }, } @@ -990,11 +989,37 @@ func TestSidecarURLForNode(t *testing.T) { // --- Reconcile replayer runtime task tests --- -func TestReconcile_CompletePlan_SubmitsResultExportForReplayer(t *testing.T) { +func TestReconcile_ReplayerWithoutResultExport_NoMonitorTask(t *testing.T) { + mock := &mockSidecarClient{} + node := replayerNode() + node.Status.Phase = seiv1alpha1.PhaseRunning + node.Status.InitPlan = &seiv1alpha1.TaskPlan{Phase: seiv1alpha1.TaskPlanComplete} + + r, c := newProgressionReconciler(t, mock, node) + + _, err := r.reconcileRunning(context.Background(), node) + if err != nil { + t.Fatalf("error = %v", err) + } + if len(mock.submitted) != 0 { + t.Errorf("expected no submissions for replayer without ResultExport, got %d", len(mock.submitted)) + } + + updated := fetchNode(t, c, node.Name, node.Namespace) + if updated.Status.MonitorTasks != nil { + t.Errorf("expected nil MonitorTasks, got %v", updated.Status.MonitorTasks) + } +} + +func TestReconcile_SnapshotterWithMonitorTask_BothSubmitted(t *testing.T) { taskID := uuid.New() mock := &mockSidecarClient{submitID: taskID} - node := replayerNode() - node.Spec.Replayer.ResultExport = &seiv1alpha1.ResultExportConfig{} + + // Build a node that qualifies for both snapshot upload (archive) and + // result-export monitor (replayer). In practice these don't overlap on + // one node, but this exercises the code path where both run in one reconcile. + node := monitorReplayerNode() + node.Status.Phase = seiv1alpha1.PhaseRunning node.Status.InitPlan = &seiv1alpha1.TaskPlan{Phase: seiv1alpha1.TaskPlanComplete} r, c := newProgressionReconciler(t, mock, node) @@ -1003,18 +1028,84 @@ func TestReconcile_CompletePlan_SubmitsResultExportForReplayer(t *testing.T) { if err != nil { t.Fatalf("error = %v", err) } + + // The monitor replayer node only has result-export (no snapshot upload + // since it's a replayer, not an archive). Verify the single submission. if len(mock.submitted) != 1 { - t.Fatalf("expected 1 scheduled task submitted, got %d", len(mock.submitted)) + t.Fatalf("expected 1 submission, got %d", len(mock.submitted)) + } + if mock.submitted[0].Type != planner.TaskResultExport { + t.Errorf("submitted type = %q, want %q", mock.submitted[0].Type, planner.TaskResultExport) + } + + updated := fetchNode(t, c, node.Name, node.Namespace) + if _, ok := updated.Status.MonitorTasks[planner.TaskResultExport]; !ok { + t.Errorf("expected MonitorTasks[%s] to exist", planner.TaskResultExport) + } +} + +func TestReconcileRunning_PollRequeue_ImmediateRequeue(t *testing.T) { + taskID := uuid.New() + mock := &mockSidecarClient{ + submitID: taskID, + taskResults: map[uuid.UUID]*sidecar.TaskResult{ + taskID: completedResult(taskID, planner.TaskResultExport, nil), + }, + } + node := monitorReplayerNode() + node.Status.Phase = seiv1alpha1.PhaseRunning + node.Status.InitPlan = &seiv1alpha1.TaskPlan{Phase: seiv1alpha1.TaskPlanComplete} + node.Status.MonitorTasks = map[string]seiv1alpha1.MonitorTask{ + planner.TaskResultExport: { + ID: taskID.String(), + Status: seiv1alpha1.TaskPending, + SubmittedAt: metav1.Now(), + }, + } + + r, _ := newProgressionReconciler(t, mock, node) + + result, err := r.reconcileRunning(context.Background(), node) + if err != nil { + t.Fatalf("error = %v", err) + } + if !result.Requeue { + t.Error("expected immediate Requeue when poll detects terminal state") + } + if result.RequeueAfter != 0 { + t.Errorf("expected RequeueAfter=0 for immediate requeue, got %v", result.RequeueAfter) + } +} + +func TestReconcile_CompletePlan_SubmitsResultExportMonitorForReplayer(t *testing.T) { + taskID := uuid.New() + mock := &mockSidecarClient{submitID: taskID} + node := monitorReplayerNode() + node.Status.Phase = seiv1alpha1.PhaseRunning + node.Status.InitPlan = &seiv1alpha1.TaskPlan{Phase: seiv1alpha1.TaskPlanComplete} + + r, c := newProgressionReconciler(t, mock, node) + + _, err := r.reconcileRunning(context.Background(), node) + if err != nil { + t.Fatalf("error = %v", err) + } + if len(mock.submitted) != 1 { + t.Fatalf("expected 1 monitor task submitted, got %d", len(mock.submitted)) } if mock.submitted[0].Type != planner.TaskResultExport { t.Errorf("task type = %q, want %q", mock.submitted[0].Type, planner.TaskResultExport) } updated := fetchNode(t, c, node.Name, node.Namespace) - if updated.Status.ScheduledTasks == nil { - t.Fatal("expected ScheduledTasks to be set") + if updated.Status.MonitorTasks == nil { + t.Fatal("expected MonitorTasks to be set") + } + mt, ok := updated.Status.MonitorTasks[planner.TaskResultExport] + if !ok { + t.Fatalf("expected MonitorTasks[%s] to exist", planner.TaskResultExport) } - if got := updated.Status.ScheduledTasks[planner.TaskResultExport]; got != taskID.String() { - t.Errorf("ScheduledTasks[%s] = %q, want %q", planner.TaskResultExport, got, taskID.String()) + if mt.ID != taskID.String() { + t.Errorf("MonitorTasks[%s].ID = %q, want %q", planner.TaskResultExport, mt.ID, taskID.String()) } } diff --git a/internal/planner/bootstrap.go b/internal/planner/bootstrap.go index 4ee7ca2..06c9ef8 100644 --- a/internal/planner/bootstrap.go +++ b/internal/planner/bootstrap.go @@ -12,7 +12,6 @@ import ( const ( defaultSnapshotUploadCron = "0 0 * * *" - defaultResultExportCron = "*/10 * * * *" resultExportBucket = "sei-node-mvp" resultExportRegion = "eu-central-1" resultExportPrefix = "shadow-results/" @@ -126,7 +125,7 @@ func IsBootstrapComplete(plan *seiv1alpha1.TaskPlan) bool { } for _, t := range plan.Tasks { if t.Type == task.TaskTypeTeardownBootstrap { - return t.Status == seiv1alpha1.PlannedTaskComplete + return t.Status == seiv1alpha1.TaskComplete } } return true @@ -247,16 +246,20 @@ func SnapshotUploadScheduledTask(node *seiv1alpha1.SeiNode) sidecar.TaskBuilder } } -// ResultExportScheduledTask returns a result-export task builder if applicable. -func ResultExportScheduledTask(node *seiv1alpha1.SeiNode) sidecar.TaskBuilder { +// ResultExportMonitorTask builds a TaskRequest for result-export comparison +// mode. The sidecar compares local block results against the canonical RPC +// and completes on app-hash divergence. Returns nil when the node has no +// result-export config. +func ResultExportMonitorTask(node *seiv1alpha1.SeiNode) *sidecar.TaskRequest { if node.Spec.Replayer == nil || node.Spec.Replayer.ResultExport == nil { return nil } - cron := defaultResultExportCron - return sidecar.ResultExportTask{ - Bucket: resultExportBucket, - Prefix: resultExportPrefix + node.Spec.ChainID + "/", - Region: resultExportRegion, - Schedule: &sidecar.ScheduleConfig{Cron: &cron}, - } + re := node.Spec.Replayer.ResultExport + req := sidecar.ResultExportTask{ + Bucket: resultExportBucket, + Prefix: resultExportPrefix + node.Spec.ChainID + "/", + Region: resultExportRegion, + CanonicalRPC: re.CanonicalRPC, + }.ToTaskRequest() + return &req } diff --git a/internal/planner/executor.go b/internal/planner/executor.go index 2eada4d..4cda86d 100644 --- a/internal/planner/executor.go +++ b/internal/planner/executor.go @@ -44,7 +44,7 @@ type Executor[T client.Object] struct { // tasks are complete. func CurrentTask(plan *seiv1alpha1.TaskPlan) *seiv1alpha1.PlannedTask { for i := range plan.Tasks { - if plan.Tasks[i].Status != seiv1alpha1.PlannedTaskComplete { + if plan.Tasks[i].Status != seiv1alpha1.TaskComplete { return &plan.Tasks[i] } } @@ -104,7 +104,7 @@ func executePlan( status := exec.Status(ctx) - if status == task.ExecutionRunning && t.Status == seiv1alpha1.PlannedTaskPending { + if status == task.ExecutionRunning && t.Status == seiv1alpha1.TaskPending { if err := exec.Execute(ctx); err != nil { log.FromContext(ctx).Info("task submission failed, will retry", "task", t.Type, "error", err) @@ -119,7 +119,7 @@ func executePlan( case task.ExecutionComplete: patch := client.MergeFromWithOptions(obj.DeepCopyObject().(client.Object), client.MergeFromWithOptimisticLock{}) - t.Status = seiv1alpha1.PlannedTaskComplete + t.Status = seiv1alpha1.TaskComplete if err := kc.Status().Patch(ctx, obj, patch); err != nil { return ctrl.Result{}, fmt.Errorf("marking task complete: %w", err) } @@ -157,7 +157,7 @@ func retryTask( resourceName := obj.GetName() t.ID = task.DeterministicTaskID(resourceName, t.Type, t.RetryCount) - t.Status = seiv1alpha1.PlannedTaskPending + t.Status = seiv1alpha1.TaskPending t.Error = "" if err := kc.Status().Patch(ctx, obj, patch); err != nil { @@ -180,7 +180,7 @@ func failTask( log.FromContext(ctx).Error(fmt.Errorf("task failed: %s", errMsg), "task plan failed", "task", t.Type) patch := client.MergeFromWithOptions(obj.DeepCopyObject().(client.Object), client.MergeFromWithOptimisticLock{}) - t.Status = seiv1alpha1.PlannedTaskFailed + t.Status = seiv1alpha1.TaskFailed t.Error = errMsg plan.Phase = seiv1alpha1.TaskPlanFailed if err := kc.Status().Patch(ctx, obj, patch); err != nil { diff --git a/internal/planner/executor_test.go b/internal/planner/executor_test.go index 9fd8778..f4e260b 100644 --- a/internal/planner/executor_test.go +++ b/internal/planner/executor_test.go @@ -137,7 +137,7 @@ func TestExecutePlan_RetryOnFailure(t *testing.T) { { Type: sidecar.TaskTypeConfigureGenesis, ID: taskID, - Status: seiv1alpha1.PlannedTaskPending, + Status: seiv1alpha1.TaskPending, Params: configGenesisParams(t), MaxRetries: 5, }, @@ -166,7 +166,7 @@ func TestExecutePlan_RetryOnFailure(t *testing.T) { } tsk := &updated.Status.InitPlan.Tasks[0] - if tsk.Status != seiv1alpha1.PlannedTaskPending { + if tsk.Status != seiv1alpha1.TaskPending { t.Errorf("task status = %q, want Pending (reset for retry)", tsk.Status) } if tsk.RetryCount != 1 { @@ -205,7 +205,7 @@ func TestExecutePlan_ExhaustedRetries_FailsPlan(t *testing.T) { { Type: sidecar.TaskTypeConfigureGenesis, ID: taskID, - Status: seiv1alpha1.PlannedTaskPending, + Status: seiv1alpha1.TaskPending, Params: configGenesisParams(t), MaxRetries: 2, RetryCount: 2, @@ -233,7 +233,7 @@ func TestExecutePlan_ExhaustedRetries_FailsPlan(t *testing.T) { if updated.Status.InitPlan.Phase != seiv1alpha1.TaskPlanFailed { t.Errorf("plan phase = %q, want Failed", updated.Status.InitPlan.Phase) } - if updated.Status.InitPlan.Tasks[0].Status != seiv1alpha1.PlannedTaskFailed { + if updated.Status.InitPlan.Tasks[0].Status != seiv1alpha1.TaskFailed { t.Errorf("task status = %q, want Failed", updated.Status.InitPlan.Tasks[0].Status) } } @@ -278,7 +278,7 @@ func TestExecuteGroupPlan_CompletesSuccessfully(t *testing.T) { { Type: sidecar.TaskTypeAssembleGenesis, ID: taskID, - Status: seiv1alpha1.PlannedTaskPending, + Status: seiv1alpha1.TaskPending, Params: &apiextensionsv1.JSON{Raw: assembleParams}, }, }, @@ -317,7 +317,7 @@ func TestExecuteGroupPlan_CompletesSuccessfully(t *testing.T) { t.Fatalf("get group: %v", err) } - if updated.Status.InitPlan.Tasks[0].Status != seiv1alpha1.PlannedTaskComplete { + if updated.Status.InitPlan.Tasks[0].Status != seiv1alpha1.TaskComplete { t.Errorf("task status = %q, want Complete", updated.Status.InitPlan.Tasks[0].Status) } if result.RequeueAfter == 0 { diff --git a/internal/planner/group.go b/internal/planner/group.go index 4a67e9b..6e8dd68 100644 --- a/internal/planner/group.go +++ b/internal/planner/group.go @@ -69,7 +69,7 @@ func buildGroupPlannedTask(groupName, taskType string, attempt int, params any) return seiv1alpha1.PlannedTask{ Type: taskType, ID: id, - Status: seiv1alpha1.PlannedTaskPending, + Status: seiv1alpha1.TaskPending, Params: p, }, nil } diff --git a/internal/planner/group_test.go b/internal/planner/group_test.go index 2d210e5..1118450 100644 --- a/internal/planner/group_test.go +++ b/internal/planner/group_test.go @@ -54,7 +54,7 @@ func TestBuildGroupAssemblyPlan(t *testing.T) { if assembleTask.Type != TaskAssembleGenesis { t.Errorf("task[0] type = %q, want %q", assembleTask.Type, TaskAssembleGenesis) } - if assembleTask.Status != seiv1alpha1.PlannedTaskPending { + if assembleTask.Status != seiv1alpha1.TaskPending { t.Errorf("task[0] status = %q, want Pending", assembleTask.Status) } if assembleTask.ID == "" { diff --git a/internal/planner/planner.go b/internal/planner/planner.go index 20350f5..3cc31d2 100644 --- a/internal/planner/planner.go +++ b/internal/planner/planner.go @@ -204,7 +204,7 @@ func buildPlannedTask(node *seiv1alpha1.SeiNode, taskType string, attempt int, p return seiv1alpha1.PlannedTask{ Type: taskType, ID: id, - Status: seiv1alpha1.PlannedTaskPending, + Status: seiv1alpha1.TaskPending, Params: p, }, nil } diff --git a/manifests/samples/seinode/pacific-1-shadow-replayer.yaml b/manifests/samples/seinode/pacific-1-shadow-replayer.yaml index 0867aea..e8ca3fd 100644 --- a/manifests/samples/seinode/pacific-1-shadow-replayer.yaml +++ b/manifests/samples/seinode/pacific-1-shadow-replayer.yaml @@ -44,4 +44,5 @@ spec: targetHeight: 198740000 bootstrapImage: "ghcr.io/sei-protocol/sei:v6.3.0" trustPeriod: "9999h0m0s" - resultExport: {} + resultExport: + canonicalRpc: "http://pacific-1-canonical-rpc.default.svc.cluster.local:26657" diff --git a/manifests/sei.io_seinodegroups.yaml b/manifests/sei.io_seinodegroups.yaml index 07ff72f..3ac6da8 100644 --- a/manifests/sei.io_seinodegroups.yaml +++ b/manifests/sei.io_seinodegroups.yaml @@ -715,6 +715,17 @@ spec: The sidecar queries the local RPC for block_results and uploads compressed NDJSON pages on a schedule. Useful for shadow replayers that need their execution results compared against the canonical chain. + properties: + canonicalRpc: + description: |- + CanonicalRPC is the HTTP RPC endpoint of the canonical chain node + to compare block execution results against. When set, the sidecar + runs in comparison mode and the task completes when app-hash + divergence is detected. + minLength: 1 + type: string + required: + - canonicalRpc type: object snapshot: description: Snapshot identifies the snapshot to restore diff --git a/manifests/sei.io_seinodes.yaml b/manifests/sei.io_seinodes.yaml index e67e407..c3df7eb 100644 --- a/manifests/sei.io_seinodes.yaml +++ b/manifests/sei.io_seinodes.yaml @@ -427,6 +427,17 @@ spec: The sidecar queries the local RPC for block_results and uploads compressed NDJSON pages on a schedule. Useful for shadow replayers that need their execution results compared against the canonical chain. + properties: + canonicalRpc: + description: |- + CanonicalRPC is the HTTP RPC endpoint of the canonical chain node + to compare block execution results against. When set, the sidecar + runs in comparison mode and the task completes when app-hash + divergence is detected. + minLength: 1 + type: string + required: + - canonicalRpc type: object snapshot: description: Snapshot identifies the snapshot to restore from @@ -898,6 +909,47 @@ spec: - phase - tasks type: object + monitorTasks: + additionalProperties: + description: |- + MonitorTask tracks a long-running sidecar task that the controller + actively polls for completion. Unlike ScheduledTasks (fire-and-forget), + completing a monitor task triggers a controller response (Event + Condition). + The map key in MonitorTasks serves as the task type identifier. + properties: + completedAt: + description: CompletedAt is the time the task reached a terminal + state. + format: date-time + type: string + error: + description: Error is set when the task fails. + type: string + id: + description: ID is the sidecar-assigned task UUID. + type: string + status: + description: 'Status tracks lifecycle: Pending → Complete or + Failed.' + enum: + - Pending + - Complete + - Failed + type: string + submittedAt: + description: SubmittedAt is the time the task was submitted + to the sidecar. + format: date-time + type: string + required: + - id + - status + - submittedAt + type: object + description: |- + MonitorTasks tracks long-running sidecar tasks the controller polls + for completion. Keyed by task type for idempotent submission. + type: object phase: description: Phase is the high-level lifecycle state. enum: From 9a58c2fb2bba74b77af887da5a1b15e52675e57f Mon Sep 17 00:00:00 2001 From: bdchatham Date: Fri, 27 Mar 2026 17:06:22 -0400 Subject: [PATCH 2/2] fix: replace deprecated Requeue with RequeueAfter Use planner.ResultRequeueImmediate (RequeueAfter: 1ms) instead of the deprecated ctrl.Result{Requeue: true} to satisfy staticcheck SA1019. --- internal/controller/node/plan_execution.go | 8 ++++---- internal/controller/node/plan_execution_test.go | 7 ++----- 2 files changed, 6 insertions(+), 9 deletions(-) diff --git a/internal/controller/node/plan_execution.go b/internal/controller/node/plan_execution.go index 7bad696..f54ca60 100644 --- a/internal/controller/node/plan_execution.go +++ b/internal/controller/node/plan_execution.go @@ -37,7 +37,7 @@ func (r *SeiNodeReconciler) reconcileRuntimeTasks(ctx context.Context, node *sei req := builder.ToTaskRequest() if err := r.ensureScheduledTask(ctx, node, sc, req); err != nil { if apierrors.IsConflict(err) { - return ctrl.Result{Requeue: true}, nil + return planner.ResultRequeueImmediate, nil } logger.Info("scheduled task submission failed, will retry", "task", req.Type, "error", err) } @@ -47,7 +47,7 @@ func (r *SeiNodeReconciler) reconcileRuntimeTasks(ctx context.Context, node *sei // the canonical chain and completes on app-hash divergence. if err := r.ensureMonitorTasks(ctx, node, sc); err != nil { if apierrors.IsConflict(err) { - return ctrl.Result{Requeue: true}, nil + return planner.ResultRequeueImmediate, nil } logger.Info("monitor task submission failed, will retry", "error", err) } @@ -56,12 +56,12 @@ func (r *SeiNodeReconciler) reconcileRuntimeTasks(ctx context.Context, node *sei requeue, err := r.pollMonitorTasks(ctx, node, sc) if err != nil { if apierrors.IsConflict(err) { - return ctrl.Result{Requeue: true}, nil + return planner.ResultRequeueImmediate, nil } logger.Info("monitor task poll failed, will retry", "error", err) } if requeue { - return ctrl.Result{Requeue: true}, nil + return planner.ResultRequeueImmediate, nil } return ctrl.Result{RequeueAfter: statusPollInterval}, nil diff --git a/internal/controller/node/plan_execution_test.go b/internal/controller/node/plan_execution_test.go index 3201158..8b37802 100644 --- a/internal/controller/node/plan_execution_test.go +++ b/internal/controller/node/plan_execution_test.go @@ -1069,11 +1069,8 @@ func TestReconcileRunning_PollRequeue_ImmediateRequeue(t *testing.T) { if err != nil { t.Fatalf("error = %v", err) } - if !result.Requeue { - t.Error("expected immediate Requeue when poll detects terminal state") - } - if result.RequeueAfter != 0 { - t.Errorf("expected RequeueAfter=0 for immediate requeue, got %v", result.RequeueAfter) + if result.RequeueAfter != planner.ResultRequeueImmediate.RequeueAfter { + t.Errorf("expected immediate requeue (%v), got RequeueAfter=%v", planner.ResultRequeueImmediate.RequeueAfter, result.RequeueAfter) } }