Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
180 changes: 166 additions & 14 deletions isvctl/src/isvctl/orchestrator/loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from pathlib import Path
from typing import Any

from _pytest.mark.expression import Expression, ParseError
from isvtest.core.resolution import (
ErrorReason,
ResolvedEntry,
Expand Down Expand Up @@ -301,6 +302,122 @@ def _has_explicit_pytest_selection(extra_pytest_args: list[str] | None) -> bool:
)


def _pytest_option_values(extra_pytest_args: list[str] | None, option: str) -> list[str]:
"""Return values passed to a pytest option that accepts one argument."""
if not extra_pytest_args:
return []

values: list[str] = []
option_prefix = f"{option}="
for index, arg in enumerate(extra_pytest_args):
if arg == option and index + 1 < len(extra_pytest_args):
values.append(extra_pytest_args[index + 1])
elif arg.startswith(option_prefix):
values.append(arg.removeprefix(option_prefix))
return values


def _entry_matches_keyword_expression(entry: ValidationEntry, expression: Expression) -> bool:
"""Evaluate a pytest -k expression against an isvtest validation entry."""
keywords = {
entry.name,
entry.category,
f"test_validation[{entry.name}]",
"test_validation",
*entry.labels,
}

def matcher(candidate: str, **_kwargs: str | int | bool | None) -> bool:
return any(candidate.lower() in keyword.lower() for keyword in keywords)

return expression.evaluate(matcher)


def _entry_matches_marker_expression(entry: ValidationEntry, expression: Expression) -> bool:
"""Evaluate a pytest -m expression against validation labels mirrored as marks."""
markers = set(entry.labels)
return expression.evaluate(lambda candidate, **_kwargs: candidate in markers)


def _selected_validation_indexes(
entries: list[ValidationEntry],
extra_pytest_args: list[str] | None,
) -> set[int] | None:
"""Return validation indexes selected by pytest -k/-m, or None when unknown.

The actual pytest run still receives the original arguments; this is a
conservative pre-pass used only to avoid executing unrelated provider steps.
If expression parsing fails, fall back to the historical full-step behavior.
"""
k_expressions = _pytest_option_values(extra_pytest_args, "-k")
m_expressions = _pytest_option_values(extra_pytest_args, "-m")
if not k_expressions and not m_expressions:
return None

selected = set(range(len(entries)))
try:
for expression_text in k_expressions:
expression = Expression.compile(expression_text)
selected &= {
index for index, entry in enumerate(entries) if _entry_matches_keyword_expression(entry, expression)
}
for expression_text in m_expressions:
expression = Expression.compile(expression_text)
selected &= {
index for index, entry in enumerate(entries) if _entry_matches_marker_expression(entry, expression)
}
except ParseError:
logger.warning("Could not pre-resolve pytest selection expression; running all configured steps")
return None

return selected


def _pytest_excluded_entry(entry: ValidationEntry) -> ResolvedEntry:
"""Return a terminal result for an entry excluded by pytest selection."""
return ResolvedEntry(
entry=entry,
state=State.SKIPPED,
skip_reason=SkipReason.EXCLUDED,
message="excluded by pytest -k/-m filter",
)


def _step_names_for_selected_validations(
steps: list[Any],
entries: list[ValidationEntry],
selected_indexes: set[int],
) -> set[str]:
"""Return configured step names needed by selected validations."""
selected_entries = [entry for index, entry in enumerate(entries) if index in selected_indexes]
selected_validation_names = {entry.name for entry in selected_entries}
selected_step_names = {entry.step for entry in selected_entries if entry.step}

for step in steps:
required_validations = getattr(step, "requires_available_validations", [])
if any(validation in selected_validation_names for validation in required_validations):
selected_step_names.add(step.name)

return selected_step_names


def _prune_steps_for_pytest_selection(
steps: list[Any],
entries: list[ValidationEntry],
selected_indexes: set[int] | None,
) -> list[Any]:
"""Drop provider steps that cannot feed the explicitly selected validations."""
if selected_indexes is None or len(selected_indexes) == len(entries):
return steps

selected_step_names = _step_names_for_selected_validations(steps, entries, selected_indexes)
if not selected_step_names:
logger.info("No configured steps feed the selected validations; skipping all provider steps")
return []

return [step for step in steps if step.name in selected_step_names]


def _apply_step_validation_gates(steps: list[Any], released_tests: set[str] | None) -> list[Any]:
"""Mark steps skipped when their required validations are unavailable."""
if released_tests is None:
Expand Down Expand Up @@ -446,6 +563,12 @@ def _run_steps_mode(
if released_tests is None:
logger.info(f"Including unreleased validations because {INCLUDE_UNRELEASED_ENV} is enabled")

all_validations = {}
if self.config.tests and self.config.tests.validations:
all_validations = self.config.tests.validations
validation_entries = parse_validations(all_validations)
selected_validation_indexes = _selected_validation_indexes(validation_entries, self._extra_pytest_args)

steps = _apply_step_validation_gates(steps, released_tests)

config_phases = self.config.get_phases(platform)
Expand All @@ -465,6 +588,9 @@ def _run_steps_mode(
],
)

configured_step_phases = {step.name: (step.phase or "setup").lower() for step in steps if not step.skip}
steps = _prune_steps_for_pytest_selection(steps, validation_entries, selected_validation_indexes)

steps_by_phase: dict[str, list] = {phase: [] for phase in config_phases}
for step in steps:
step_phase = (step.phase or "setup").lower()
Expand All @@ -479,10 +605,6 @@ def _run_steps_mode(
step_phase = (step.phase or "setup").lower()
self.context.set_step_phase(step.name, step_phase)

all_validations = {}
if self.config.tests and self.config.tests.validations:
all_validations = self.config.tests.validations
validation_entries = parse_validations(all_validations)
resolved_validations_by_index: dict[int, ResolvedEntry] = {}

exclude_labels: list[str] = []
Expand Down Expand Up @@ -562,22 +684,52 @@ def _run_steps_mode(
if junit_tmpdir:
phase_junitxml = str(Path(junit_tmpdir) / f"junit-{phase_name}.xml")

step_phases_snapshot = self.context.get_all_step_phases()
step_phases_snapshot = (
configured_step_phases
if selected_validation_indexes is not None
else self.context.get_all_step_phases()
)
phase_entry_indexes = [
index
for index, entry in enumerate(validation_entries)
if index not in resolved_validations_by_index
and get_entry_phase(entry, step_phases_snapshot) == phase_name
]
phase_entries = [validation_entries[index] for index in phase_entry_indexes]
resolved_phase_entries = self._resolve_validation_entries(
phase_entries,
requested_phase_names if Phase.ALL not in requested_phases else set(config_phases),
set(self._include_labels),
set(resolution_exclude_labels),
set(exclude_tests),
released_tests,
)
if selected_validation_indexes is None:
phase_entries = [validation_entries[index] for index in phase_entry_indexes]
resolved_phase_entries = self._resolve_validation_entries(
phase_entries,
requested_phase_names if Phase.ALL not in requested_phases else set(config_phases),
set(self._include_labels),
set(resolution_exclude_labels),
set(exclude_tests),
released_tests,
)
else:
selected_phase_pairs = [
(index, validation_entries[index])
for index in phase_entry_indexes
if index in selected_validation_indexes
]
selected_phase_entries = [entry for _, entry in selected_phase_pairs]
resolved_selected_entries = self._resolve_validation_entries(
selected_phase_entries,
requested_phase_names if Phase.ALL not in requested_phases else set(config_phases),
set(self._include_labels),
set(resolution_exclude_labels),
set(exclude_tests),
released_tests,
)
resolved_by_index = {
index: resolved
for (index, _entry), resolved in zip(
selected_phase_pairs, resolved_selected_entries, strict=True
)
}
for index in phase_entry_indexes:
if index not in resolved_by_index:
resolved_by_index[index] = _pytest_excluded_entry(validation_entries[index])
resolved_phase_entries = [resolved_by_index[index] for index in phase_entry_indexes]
ready_entries = [entry for entry in resolved_phase_entries if entry.is_ready]
terminal_before_pytest = [entry for entry in resolved_phase_entries if not entry.is_ready]

Expand Down
74 changes: 74 additions & 0 deletions isvctl/tests/test_orchestrator_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,80 @@ def test_validation_gate_allows_step_when_unreleased_checks_are_included(
assert result.inventory is not None
assert "unreleased_setup" in result.inventory

def test_pytest_k_selection_prunes_unrelated_steps(
self,
tmp_path: Path,
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""A focused -k run should execute only steps needed by the selected validation."""
monkeypatch.setattr("isvctl.orchestrator.loop.load_released_test_filter", lambda: None)
selected_probe = _write_script(
tmp_path,
"selected_probe.sh",
'#!/bin/bash\necho \'{"success": true, "platform": "kubernetes", "field": "present"}\'\n',
)
setup_script = _write_script(
tmp_path,
"selected_setup.sh",
'#!/bin/bash\necho \'{"success": true, "platform": "kubernetes"}\'\n',
)
teardown_script = _write_script(
tmp_path,
"selected_teardown.sh",
'#!/bin/bash\necho \'{"success": true, "platform": "kubernetes"}\'\n',
)

config = RunConfig(
commands={
"kubernetes": PlatformCommands(
phases=["setup", "test", "teardown"],
steps=[
StepConfig(
name="selected_setup",
command=setup_script,
phase="setup",
requires_available_validations=["FieldExistsCheck"],
),
StepConfig(name="selected_probe", command=selected_probe, phase="test"),
StepConfig(name="unrelated_probe", command="false", phase="test"),
StepConfig(
name="selected_teardown",
command=teardown_script,
phase="teardown",
requires_available_validations=["FieldExistsCheck"],
),
StepConfig(name="unrelated_teardown", command="false", phase="teardown"),
],
)
},
tests=ValidationConfig(
platform="kubernetes",
validations={
"selected_checks": {
"step": "selected_probe",
"checks": {"FieldExistsCheck": {"field": "field"}},
},
"unrelated_checks": {
"step": "unrelated_probe",
"checks": {"StepSuccessCheck": {}},
},
},
),
)

result = Orchestrator(config).run(extra_pytest_args=["-k", "FieldExistsCheck"])

assert result.success
steps_by_phase = {
phase.phase: [step["name"] for step in phase.details["steps"]] for phase in result.phases if phase.details
}
assert steps_by_phase[Phase.SETUP] == ["selected_setup"]
assert steps_by_phase[Phase.TEST] == ["selected_probe"]
assert steps_by_phase[Phase.TEARDOWN] == ["selected_teardown"]
validations = result.phases[1].details["validations"]
assert [validation["name"] for validation in validations] == ["FieldExistsCheck", "StepSuccessCheck"]
assert validations[1]["skip_reason"] == "test_excluded"

def test_run_setup_phase_command_failure(self) -> None:
"""Test setup phase with command failure."""
config = RunConfig(
Expand Down