diff --git a/isvctl/src/isvctl/orchestrator/loop.py b/isvctl/src/isvctl/orchestrator/loop.py index 8bd5a2a4..e0749e71 100644 --- a/isvctl/src/isvctl/orchestrator/loop.py +++ b/isvctl/src/isvctl/orchestrator/loop.py @@ -29,6 +29,7 @@ from pathlib import Path from typing import Any +from _pytest.mark.expression import Expression, ParseError from isvtest.core.resolution import ( ErrorReason, ResolvedEntry, @@ -301,6 +302,122 @@ def _has_explicit_pytest_selection(extra_pytest_args: list[str] | None) -> bool: ) +def _pytest_option_values(extra_pytest_args: list[str] | None, option: str) -> list[str]: + """Return values passed to a pytest option that accepts one argument.""" + if not extra_pytest_args: + return [] + + values: list[str] = [] + option_prefix = f"{option}=" + for index, arg in enumerate(extra_pytest_args): + if arg == option and index + 1 < len(extra_pytest_args): + values.append(extra_pytest_args[index + 1]) + elif arg.startswith(option_prefix): + values.append(arg.removeprefix(option_prefix)) + return values + + +def _entry_matches_keyword_expression(entry: ValidationEntry, expression: Expression) -> bool: + """Evaluate a pytest -k expression against an isvtest validation entry.""" + keywords = { + entry.name, + entry.category, + f"test_validation[{entry.name}]", + "test_validation", + *entry.labels, + } + + def matcher(candidate: str, **_kwargs: str | int | bool | None) -> bool: + return any(candidate.lower() in keyword.lower() for keyword in keywords) + + return expression.evaluate(matcher) + + +def _entry_matches_marker_expression(entry: ValidationEntry, expression: Expression) -> bool: + """Evaluate a pytest -m expression against validation labels mirrored as marks.""" + markers = set(entry.labels) + return expression.evaluate(lambda candidate, **_kwargs: candidate in markers) + + +def _selected_validation_indexes( + entries: list[ValidationEntry], + extra_pytest_args: list[str] | None, +) -> set[int] | None: + """Return validation indexes selected by pytest -k/-m, or None when unknown. + + The actual pytest run still receives the original arguments; this is a + conservative pre-pass used only to avoid executing unrelated provider steps. + If expression parsing fails, fall back to the historical full-step behavior. + """ + k_expressions = _pytest_option_values(extra_pytest_args, "-k") + m_expressions = _pytest_option_values(extra_pytest_args, "-m") + if not k_expressions and not m_expressions: + return None + + selected = set(range(len(entries))) + try: + for expression_text in k_expressions: + expression = Expression.compile(expression_text) + selected &= { + index for index, entry in enumerate(entries) if _entry_matches_keyword_expression(entry, expression) + } + for expression_text in m_expressions: + expression = Expression.compile(expression_text) + selected &= { + index for index, entry in enumerate(entries) if _entry_matches_marker_expression(entry, expression) + } + except ParseError: + logger.warning("Could not pre-resolve pytest selection expression; running all configured steps") + return None + + return selected + + +def _pytest_excluded_entry(entry: ValidationEntry) -> ResolvedEntry: + """Return a terminal result for an entry excluded by pytest selection.""" + return ResolvedEntry( + entry=entry, + state=State.SKIPPED, + skip_reason=SkipReason.EXCLUDED, + message="excluded by pytest -k/-m filter", + ) + + +def _step_names_for_selected_validations( + steps: list[Any], + entries: list[ValidationEntry], + selected_indexes: set[int], +) -> set[str]: + """Return configured step names needed by selected validations.""" + selected_entries = [entry for index, entry in enumerate(entries) if index in selected_indexes] + selected_validation_names = {entry.name for entry in selected_entries} + selected_step_names = {entry.step for entry in selected_entries if entry.step} + + for step in steps: + required_validations = getattr(step, "requires_available_validations", []) + if any(validation in selected_validation_names for validation in required_validations): + selected_step_names.add(step.name) + + return selected_step_names + + +def _prune_steps_for_pytest_selection( + steps: list[Any], + entries: list[ValidationEntry], + selected_indexes: set[int] | None, +) -> list[Any]: + """Drop provider steps that cannot feed the explicitly selected validations.""" + if selected_indexes is None or len(selected_indexes) == len(entries): + return steps + + selected_step_names = _step_names_for_selected_validations(steps, entries, selected_indexes) + if not selected_step_names: + logger.info("No configured steps feed the selected validations; skipping all provider steps") + return [] + + return [step for step in steps if step.name in selected_step_names] + + def _apply_step_validation_gates(steps: list[Any], released_tests: set[str] | None) -> list[Any]: """Mark steps skipped when their required validations are unavailable.""" if released_tests is None: @@ -446,6 +563,12 @@ def _run_steps_mode( if released_tests is None: logger.info(f"Including unreleased validations because {INCLUDE_UNRELEASED_ENV} is enabled") + all_validations = {} + if self.config.tests and self.config.tests.validations: + all_validations = self.config.tests.validations + validation_entries = parse_validations(all_validations) + selected_validation_indexes = _selected_validation_indexes(validation_entries, self._extra_pytest_args) + steps = _apply_step_validation_gates(steps, released_tests) config_phases = self.config.get_phases(platform) @@ -465,6 +588,9 @@ def _run_steps_mode( ], ) + configured_step_phases = {step.name: (step.phase or "setup").lower() for step in steps if not step.skip} + steps = _prune_steps_for_pytest_selection(steps, validation_entries, selected_validation_indexes) + steps_by_phase: dict[str, list] = {phase: [] for phase in config_phases} for step in steps: step_phase = (step.phase or "setup").lower() @@ -479,10 +605,6 @@ def _run_steps_mode( step_phase = (step.phase or "setup").lower() self.context.set_step_phase(step.name, step_phase) - all_validations = {} - if self.config.tests and self.config.tests.validations: - all_validations = self.config.tests.validations - validation_entries = parse_validations(all_validations) resolved_validations_by_index: dict[int, ResolvedEntry] = {} exclude_labels: list[str] = [] @@ -562,22 +684,52 @@ def _run_steps_mode( if junit_tmpdir: phase_junitxml = str(Path(junit_tmpdir) / f"junit-{phase_name}.xml") - step_phases_snapshot = self.context.get_all_step_phases() + step_phases_snapshot = ( + configured_step_phases + if selected_validation_indexes is not None + else self.context.get_all_step_phases() + ) phase_entry_indexes = [ index for index, entry in enumerate(validation_entries) if index not in resolved_validations_by_index and get_entry_phase(entry, step_phases_snapshot) == phase_name ] - phase_entries = [validation_entries[index] for index in phase_entry_indexes] - resolved_phase_entries = self._resolve_validation_entries( - phase_entries, - requested_phase_names if Phase.ALL not in requested_phases else set(config_phases), - set(self._include_labels), - set(resolution_exclude_labels), - set(exclude_tests), - released_tests, - ) + if selected_validation_indexes is None: + phase_entries = [validation_entries[index] for index in phase_entry_indexes] + resolved_phase_entries = self._resolve_validation_entries( + phase_entries, + requested_phase_names if Phase.ALL not in requested_phases else set(config_phases), + set(self._include_labels), + set(resolution_exclude_labels), + set(exclude_tests), + released_tests, + ) + else: + selected_phase_pairs = [ + (index, validation_entries[index]) + for index in phase_entry_indexes + if index in selected_validation_indexes + ] + selected_phase_entries = [entry for _, entry in selected_phase_pairs] + resolved_selected_entries = self._resolve_validation_entries( + selected_phase_entries, + requested_phase_names if Phase.ALL not in requested_phases else set(config_phases), + set(self._include_labels), + set(resolution_exclude_labels), + set(exclude_tests), + released_tests, + ) + resolved_by_index = { + index: resolved + for (index, _entry), resolved in zip( + selected_phase_pairs, resolved_selected_entries, strict=True + ) + } + for index in phase_entry_indexes: + if index not in resolved_by_index: + resolved_by_index[index] = _pytest_excluded_entry(validation_entries[index]) + resolved_phase_entries = [resolved_by_index[index] for index in phase_entry_indexes] ready_entries = [entry for entry in resolved_phase_entries if entry.is_ready] terminal_before_pytest = [entry for entry in resolved_phase_entries if not entry.is_ready] diff --git a/isvctl/tests/test_orchestrator_loop.py b/isvctl/tests/test_orchestrator_loop.py index c5969b16..5c9150b7 100644 --- a/isvctl/tests/test_orchestrator_loop.py +++ b/isvctl/tests/test_orchestrator_loop.py @@ -240,6 +240,80 @@ def test_validation_gate_allows_step_when_unreleased_checks_are_included( assert result.inventory is not None assert "unreleased_setup" in result.inventory + def test_pytest_k_selection_prunes_unrelated_steps( + self, + tmp_path: Path, + monkeypatch: pytest.MonkeyPatch, + ) -> None: + """A focused -k run should execute only steps needed by the selected validation.""" + monkeypatch.setattr("isvctl.orchestrator.loop.load_released_test_filter", lambda: None) + selected_probe = _write_script( + tmp_path, + "selected_probe.sh", + '#!/bin/bash\necho \'{"success": true, "platform": "kubernetes", "field": "present"}\'\n', + ) + setup_script = _write_script( + tmp_path, + "selected_setup.sh", + '#!/bin/bash\necho \'{"success": true, "platform": "kubernetes"}\'\n', + ) + teardown_script = _write_script( + tmp_path, + "selected_teardown.sh", + '#!/bin/bash\necho \'{"success": true, "platform": "kubernetes"}\'\n', + ) + + config = RunConfig( + commands={ + "kubernetes": PlatformCommands( + phases=["setup", "test", "teardown"], + steps=[ + StepConfig( + name="selected_setup", + command=setup_script, + phase="setup", + requires_available_validations=["FieldExistsCheck"], + ), + StepConfig(name="selected_probe", command=selected_probe, phase="test"), + StepConfig(name="unrelated_probe", command="false", phase="test"), + StepConfig( + name="selected_teardown", + command=teardown_script, + phase="teardown", + requires_available_validations=["FieldExistsCheck"], + ), + StepConfig(name="unrelated_teardown", command="false", phase="teardown"), + ], + ) + }, + tests=ValidationConfig( + platform="kubernetes", + validations={ + "selected_checks": { + "step": "selected_probe", + "checks": {"FieldExistsCheck": {"field": "field"}}, + }, + "unrelated_checks": { + "step": "unrelated_probe", + "checks": {"StepSuccessCheck": {}}, + }, + }, + ), + ) + + result = Orchestrator(config).run(extra_pytest_args=["-k", "FieldExistsCheck"]) + + assert result.success + steps_by_phase = { + phase.phase: [step["name"] for step in phase.details["steps"]] for phase in result.phases if phase.details + } + assert steps_by_phase[Phase.SETUP] == ["selected_setup"] + assert steps_by_phase[Phase.TEST] == ["selected_probe"] + assert steps_by_phase[Phase.TEARDOWN] == ["selected_teardown"] + validations = result.phases[1].details["validations"] + assert [validation["name"] for validation in validations] == ["FieldExistsCheck", "StepSuccessCheck"] + assert validations[1]["skip_reason"] == "test_excluded" + def test_run_setup_phase_command_failure(self) -> None: """Test setup phase with command failure.""" config = RunConfig(