Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 95 additions & 1 deletion docs/api-resources.md
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,100 @@ The status uses Kubernetes-style conditions instead of a single phase field:
- One adapter reports `Available=False` for `observed_generation=1` `Available` transitions to `False`
- One adapter reports `Available=False` for `observed_generation=2` `Available` keeps its `True` status

### Aggregation logic

Description of the aggregation logic for the resource status conditions

- An API that stores resources entities (clusters, nodepools)
- A sentinel that polls the API for changes and triggers messages
- Instances of "adapters":
- Read the messages
- Reconcile the state with the world
- Report back to the API, using statuses "conditions"

Resources keep track of its status, which is affected by the reports from adapters

- Each resource keeps a `generation` property that gets increased on every change
- Adapters associated with a resource, report their state as an array of adapter conditions
- Three of these conditions are always mandatory : `Available`, `Applied`, `Health`
- If one of the mandatory conditions is missing, the report is discarded
- A `observed_generation` field indicating the generation associated with the report
- `observed_time` for when the adapter work was done
- If the reported `observed_generation` is lower than the already stored `observed_generation` for that adapter, the report is discarded
- Each resource has a list of associated "adapters" used to compute the aggregated status.conditions
- Each resource "status.conditions" is array property composed of:
- The `Available` condition of each adapter, named as `<adapter-name>Successful`
- 2 aggregated conditions: `Ready` and `Available` computed from the array of `Available` resource statuses conditions
- Only `Available` condition from adapters is used to compute aggregated conditions

The whole API spec is at: <https://raw.githubusercontent.com/openshift-hyperfleet/hyperfleet-api/refs/heads/main/openapi/openapi.yaml>

The aggregation logic for a resource (cluster/nodepool) works as follows.

**Notation:**

- `X` = report's `observed_generation`
- `G` = resource's current `generation`
- `statuses[]` = all stored adapter condition reports
- `lut` = `last_update_time`
- `ltt` = `last_transition_time`
- `obs_gen` = `observed_generation`
- `obs_time` = report's `observed_time`
- `—` = no change

---

#### Discard / Reject Rules

Checked before any aggregation. A discarded or rejected report causes no state change.

| Rule | Condition | Outcome |
|---|---|---|
| `obs_gen` too high | report `observed_generation` > resource `generation` | Discarded |
| Stale adapter report | report `observed_generation` < adapter's stored `observed_generation` | Discarded |
| Missing mandatory conditions | Missing any of `Available`, `Applied`, `Health`, or value not in `{True, False, Unknown}` | Discarded |
| Available=Unknown | Report is valid but `Available=Unknown` | Discarded |

---

#### Lifecycle Events

| Event | Condition | Target | → status | → obs_gen | → lut | → ltt |
|---|---|---|---|---|---|---|
| Creation | — | `Ready` | `False` | `1` | `now` | `now` |
| Creation | — | `Available` | `False` | `1` | `now` | `now` |
| Change (→G) | Was `Ready=True` | `Ready` | `False` | `G` | `now` | `now` |
| Change (→G) | Was `Ready=False` | `Ready` | `False` | `G` | `now` | `—` |
| Change (→G) | — | `Available` | unchanged | unchanged | `—` | `—` |

---

#### Adapter Report Aggregation Matrix

The **Ready** check and **Available** check are independent — both can apply to the same incoming report.

##### Report `Available=True` (obs_gen = X)

| Target | Current State | Required Condition | → status | → lut | → ltt | → obs_gen |
|---|---|---|---|---|---|---|
| `Ready` | `Ready=True` | `X==G` AND all `statuses[].obs_gen==G` AND all `statuses[].status==True` | unchanged | `min(statuses[].lut)` | `—` | `—` |
| `Ready` | `Ready=False` | `X==G` AND all `statuses[].obs_gen==G` AND all `statuses[].status==True` | **`True`** | `min(statuses[].lut)` | `obs_time` | `—` |
| `Ready` | any | Conditions above not met | `—` | `—` | `—` | `—` |
| `Available` | `Available=False` | all `statuses[].obs_gen==X` | **`True`** | `min(statuses[].lut)` | `obs_time` | `X` |
| `Available` | `Available=True` | all `statuses[].obs_gen==X` | unchanged | `min(statuses[].lut)` | `—` | `X` |
| `Available` | any | Conditions above not met | `—` | `—` | `—` | `—` |

##### Report `Available=False` (obs_gen = X)

| Target | Current State | Required Condition | → status | → lut | → ltt | → obs_gen |
|---|---|---|---|---|---|---|
| `Ready` | `Ready=False` | `X==G` | unchanged | `min(statuses[].lut)` | `—` | `—` |
| `Ready` | `Ready=True` | `X==G` | **`False`** | `obs_time` | `obs_time` | `—` |
| `Ready` | any | Conditions above not met | `—` | `—` | `—` | `—` |
| `Available` | `Available=False` | all `statuses[].obs_gen==X` | unchanged | `min(statuses[].lut)` | `—` | `X` |
| `Available` | `Available=True` | all `statuses[].obs_gen==X` | **`False`** | `obs_time` | `obs_time` | `X` |
| `Available` | any | Conditions above not met | `—` | `—` | `—` | `—` |

## NodePool Management

### Endpoints
Expand Down Expand Up @@ -456,7 +550,7 @@ The status object contains synthesized conditions computed from adapter reports:
- All above fields plus:
- `observed_generation` - Generation this condition reflects
- `created_time` - When condition was first created (API-managed)
- `last_updated_time` - When adapter last reported (API-managed, from AdapterStatus.last_report_time)
- `last_updated_time` - When this condition was last refreshed (API-managed). For **Available**, always the evaluation time. For **Ready**: when Ready=True, the minimum of `last_report_time` across all required adapters that report Available=True at the current generation; when Ready=False, the evaluation time (so consumers can detect staleness).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Priority: Inconsistency

The doc on line 459 says Available's last_updated_time is "always the
evaluation time", but the code in BuildSyntheticConditions preserves the
existing value when status and ObservedGeneration are unchanged (lines 359-365
of status_aggregation.go). The test
TestBuildSyntheticConditions_AvailableLastUpdatedTime_Stable confirms the
preservation behavior.

Whichever direction the fix goes for the Available behavior (per Xue Li's
comment), please update the doc to match. If preservation is kept, something
like: "For Available, the evaluation time when the status or
observed_generation changes; otherwise preserved from the previous
evaluation."

- `last_transition_time` - When status last changed (API-managed)

## Parameter Restrictions
Expand Down
60 changes: 31 additions & 29 deletions pkg/config/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,41 +227,43 @@ func (l *ConfigLoader) validateConfig(config *ApplicationConfig) error {
}

// handleJSONArrayEnvVars processes environment variables containing JSON arrays
// Viper doesn't automatically parse JSON from env vars, so we handle this explicitly
// Used for: HYPERFLEET_ADAPTERS_CLUSTER_ADAPTERS and HYPERFLEET_ADAPTERS_NODEPOOL_ADAPTERS
// Viper doesn't automatically parse JSON from env vars, so we handle this explicitly.
// Each viper key is filled from the first non-empty env var in the list (canonical name first, then aliases).
func (l *ConfigLoader) handleJSONArrayEnvVars(ctx context.Context) error {
// Map of env var name -> viper key
jsonArrayMappings := map[string]string{
EnvPrefix + "_ADAPTERS_REQUIRED_CLUSTER": "adapters.required.cluster",
EnvPrefix + "_ADAPTERS_REQUIRED_NODEPOOL": "adapters.required.nodepool",
// viper key -> ordered list of env var names (first one set wins)
clusterEnvVars := []string{
EnvPrefix + "_ADAPTERS_REQUIRED_CLUSTER",
EnvPrefix + "_CLUSTER_ADAPTERS", // alias for user convenience
}
nodepoolEnvVars := []string{
EnvPrefix + "_ADAPTERS_REQUIRED_NODEPOOL",
EnvPrefix + "_NODEPOOL_ADAPTERS", // alias for user convenience
}

for envVar, viperKey := range jsonArrayMappings {
jsonValue := os.Getenv(envVar)
if jsonValue == "" {
continue
}

// Parse JSON array
var arrayValue []string
if err := json.Unmarshal([]byte(jsonValue), &arrayValue); err != nil {
return fmt.Errorf("failed to parse %s as JSON array: %w (value: %s)", envVar, err, jsonValue)
setFromEnvVars := func(viperKey string, envVars []string) error {
for _, envVar := range envVars {
jsonValue := os.Getenv(envVar)
if jsonValue == "" {
continue
}
var arrayValue []string
if err := json.Unmarshal([]byte(jsonValue), &arrayValue); err != nil {
return fmt.Errorf("failed to parse %s as JSON array: %w (value: %s)", envVar, err, jsonValue)
}
// Set() overrides Viper's auto-env CSV parsing so JSON arrays are correct.
l.viper.Set(viperKey, arrayValue)
logger.With(ctx, "env_var", envVar, "count", len(arrayValue)).Debug("Parsed JSON array from environment")
return nil
}

// Always set the parsed JSON array value to override Viper's auto-env CSV parsing.
// Viper's AutomaticEnv treats comma-separated strings as arrays, incorrectly parsing
// JSON arrays like '["a","b"]' as ["[\"a\"", "\"b\"]"] instead of ["a", "b"].
//
// We use Set() to ensure proper JSON parsing overrides Viper's CSV parsing.
// This maintains ENV > Config > Default precedence for adapters.
//
// NOTE: Adapters currently have no CLI flags (see bindFlags line 494).
// If CLI flags are added in the future, this code needs updating to check
// if the value came from a flag before calling Set().
l.viper.Set(viperKey, arrayValue)
logger.With(ctx, "env_var", envVar, "count", len(arrayValue)).Debug("Parsed JSON array from environment")
return nil
}

if err := setFromEnvVars("adapters.required.cluster", clusterEnvVars); err != nil {
return err
}
if err := setFromEnvVars("adapters.required.nodepool", nodepoolEnvVars); err != nil {
return err
}
return nil
}

Expand Down
4 changes: 3 additions & 1 deletion pkg/services/CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@ func NewClusterService(dao, adapterStatusDao, config) ClusterService
## Status Aggregation

`UpdateClusterStatusFromAdapters()` in `cluster.go` synthesizes two top-level conditions:
- **Available**: True if all required adapters report `Available=True` (any generation)
- **Available**: True if all required adapters report `Available=True` at ANY generation (last-known-good semantics). `ObservedGeneration` = minimum observed generation across qualifying adapters. When False, `ObservedGeneration` = current resource generation.
- **Ready**: True if all adapters report `Available=True` AND `observed_generation` matches current generation

Ready's `LastUpdatedTime` is computed in `status_aggregation.computeReadyLastUpdated`: when Ready=False it is the minimum of `LastReportTime` across all required adapters (falls back to `now` if any required adapter has no stored status yet); when Ready=True it is the minimum of `LastReportTime` across required adapters that have Available=True at the current generation. True→False transitions override this with the triggering adapter's `observedTime`.

`ProcessAdapterStatus()` validates mandatory conditions (`Available`, `Applied`, `Health`) before persisting. Rejects `Available=Unknown` on subsequent reports (only allowed on first report).

## GenericService
Expand Down
110 changes: 66 additions & 44 deletions pkg/services/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,12 @@ func (s *sqlClusterService) Replace(ctx context.Context, cluster *api.Cluster) (
return nil, handleUpdateError("Cluster", err)
}

// REMOVED: Event creation - no event-driven components
return cluster, nil
updatedCluster, svcErr := s.UpdateClusterStatusFromAdapters(ctx, cluster.ID)
if svcErr != nil {
return nil, svcErr
}

return updatedCluster, nil
}

func (s *sqlClusterService) Delete(ctx context.Context, id string) *errors.ServiceError {
Expand Down Expand Up @@ -143,9 +147,22 @@ func (s *sqlClusterService) OnDelete(ctx context.Context, id string) error {
return nil
}

// UpdateClusterStatusFromAdapters aggregates adapter statuses into cluster status
// UpdateClusterStatusFromAdapters aggregates adapter statuses into cluster status.
// Uses time.Now() as the observed time (for generation-change recomputations).
// Called from Create/Replace, so isLifecycleChange=true (Available frozen, Ready resets).
func (s *sqlClusterService) UpdateClusterStatusFromAdapters(
ctx context.Context, clusterID string,
) (*api.Cluster, *errors.ServiceError) {
return s.updateClusterStatusFromAdapters(ctx, clusterID, time.Now(), true)
}

// updateClusterStatusFromAdapters is the internal implementation.
// observedTime is the triggering adapter's observed_time (its LastReportTime) and is used
// for transition timestamps in the synthetic conditions.
// isLifecycleChange=true freezes Available and resets Ready.lut=now (Create/Replace path).
// isLifecycleChange=false uses the normal adapter-report aggregation path.
func (s *sqlClusterService) updateClusterStatusFromAdapters(
ctx context.Context, clusterID string, observedTime time.Time, isLifecycleChange bool,
) (*api.Cluster, *errors.ServiceError) {
// Get the cluster
cluster, err := s.clusterDao.Get(ctx, clusterID)
Expand Down Expand Up @@ -210,11 +227,14 @@ func (s *sqlClusterService) UpdateClusterStatusFromAdapters(

// Compute synthetic Available and Ready conditions
availableCondition, readyCondition := BuildSyntheticConditions(
ctx,
cluster.StatusConditions,
adapterStatuses,
s.adapterConfig.RequiredClusterAdapters(),
cluster.Generation,
now,
observedTime,
isLifecycleChange,
)

// Combine synthetic conditions with adapter conditions
Expand All @@ -238,13 +258,15 @@ func (s *sqlClusterService) UpdateClusterStatusFromAdapters(
return cluster, nil
}

// ProcessAdapterStatus handles the business logic for adapter status:
// - Validates that all mandatory conditions (Available, Applied, Health) are present
// - Rejects duplicate condition types
// - For first status report: accepts Unknown Available condition to avoid data loss
// - For subsequent reports: rejects Unknown Available condition to preserve existing valid state
// - Uses complete replacement semantics: each update replaces all conditions for this adapter
// - Returns (nil, nil) for discarded updates
// ProcessAdapterStatus handles the business logic for adapter status.
// Pre-processing rules applied in order (spec §2):
// - Stale: discards if observed_generation < existing adapter generation
// - P1: discards if observed_generation > resource generation (report ahead of resource)
// - P2: rejects if mandatory conditions (Available, Applied, Health) are missing or have invalid status
// - P3: discards if Available == Unknown (not processed per spec)
//
// Otherwise: upserts the status and triggers aggregation.
// Returns (nil, nil) for discarded/rejected updates.
func (s *sqlClusterService) ProcessAdapterStatus(
ctx context.Context, clusterID string, adapterStatus *api.AdapterStatus,
) (*api.AdapterStatus, *errors.ServiceError) {
Expand All @@ -256,65 +278,65 @@ func (s *sqlClusterService) ProcessAdapterStatus(
return nil, errors.GeneralError("Failed to get adapter status: %s", findErr)
}
}
// Stale check: discard if older than the adapter's last recorded generation.
if existingStatus != nil && adapterStatus.ObservedGeneration < existingStatus.ObservedGeneration {
// Discard stale status updates (older observed_generation).
return nil, nil
}

// Parse conditions from the adapter status
// Parse conditions from the adapter status (needed for P2 and P3 before resource fetch).
var conditions []api.AdapterCondition
if len(adapterStatus.Conditions) > 0 {
if err := json.Unmarshal(adapterStatus.Conditions, &conditions); err != nil {
return nil, errors.GeneralError("Failed to unmarshal adapter status conditions: %s", err)
}
}

// Validate mandatory conditions and check for duplicates
// P2: validate mandatory conditions (presence and valid status values).
if errorType, conditionName := ValidateMandatoryConditions(conditions); errorType != "" {
ctx = logger.WithClusterID(ctx, clusterID)
logger.Info(ctx, fmt.Sprintf("Discarding adapter status update from %s: %s condition %s",
adapterStatus.Adapter, errorType, conditionName))
return nil, nil
}

// Check Available condition for Unknown status
triggerAggregation := false
// P3: discard if Available == Unknown (spec §2, all reports).
for _, cond := range conditions {
if cond.Type != api.ConditionTypeAvailable {
continue
}

triggerAggregation = true
if cond.Status == api.AdapterConditionUnknown {
if existingStatus != nil {
// Non-first report && Available=Unknown → reject
ctx = logger.WithClusterID(ctx, clusterID)
logger.Info(ctx, fmt.Sprintf("Discarding adapter status update from %s: subsequent Unknown Available",
adapterStatus.Adapter))
return nil, nil
}
// First report from this adapter: allow storing even with Available=Unknown
// but skip aggregation since Unknown should not affect cluster-level conditions
triggerAggregation = false
if cond.Type == api.ConditionTypeAvailable && cond.Status == api.AdapterConditionUnknown {
ctx = logger.WithClusterID(ctx, clusterID)
logger.Info(ctx, fmt.Sprintf("Discarding adapter status update from %s: Available=Unknown reports are not processed",
adapterStatus.Adapter))
return nil, nil
}
break
}

// Upsert the adapter status (complete replacement)
upsertedStatus, err := s.adapterStatusDao.Upsert(ctx, adapterStatus)
// P1: discard if observed_generation is ahead of the current resource generation.
// Checked after P2/P3 to avoid unnecessary resource fetches for invalid/Unknown reports.
cluster, err := s.clusterDao.Get(ctx, clusterID)
if err != nil {
return nil, handleCreateError("AdapterStatus", err)
return nil, handleGetError("Cluster", "id", clusterID, err)
}
if adapterStatus.ObservedGeneration > cluster.Generation {
ctx = logger.WithClusterID(ctx, clusterID)
logger.Info(ctx, fmt.Sprintf(
"Discarding adapter status update from %s: observed_generation %d > resource generation %d",
adapterStatus.Adapter, adapterStatus.ObservedGeneration, cluster.Generation))
return nil, nil
}

// Only trigger aggregation when triggerAggregation is true
if triggerAggregation {
if _, aggregateErr := s.UpdateClusterStatusFromAdapters(
ctx, clusterID,
); aggregateErr != nil {
// Log error but don't fail the request - the status will be computed on next update
ctx = logger.WithClusterID(ctx, clusterID)
logger.WithError(ctx, aggregateErr).Warn("Failed to aggregate cluster status")
}
// Upsert the adapter status (complete replacement).
upsertedStatus, upsertErr := s.adapterStatusDao.Upsert(ctx, adapterStatus)
if upsertErr != nil {
return nil, handleCreateError("AdapterStatus", upsertErr)
}

// Trigger aggregation using the adapter's observed_time for transition timestamps.
observedTime := time.Now()
if upsertedStatus.LastReportTime != nil {
observedTime = *upsertedStatus.LastReportTime
}
if _, aggregateErr := s.updateClusterStatusFromAdapters(ctx, clusterID, observedTime, false); aggregateErr != nil {
ctx = logger.WithClusterID(ctx, clusterID)
logger.WithError(ctx, aggregateErr).Warn("Failed to aggregate cluster status")
}

return upsertedStatus, nil
Expand Down
Loading