diff --git a/api_module_mapping.md b/api_module_mapping.md index bcfa632d..e9c51926 100644 --- a/api_module_mapping.md +++ b/api_module_mapping.md @@ -285,6 +285,8 @@ Following shows mapping between SecOps [REST Resource](https://cloud.google.com/ |logTypes.getLogTypeSetting |v1alpha| | | |logTypes.legacySubmitParserExtension |v1alpha| | | |logTypes.list |v1alpha| | | +|logTypes.getParserAnalysisReport |v1alpha|chronicle.parser_validation.get_analysis_report |secops log-type get-analysis-report | +|logTypes.triggerGitHubChecks |v1alpha|chronicle.parser_validation.trigger_github_checks |secops log-type trigger-checks | |logTypes.logs.export |v1alpha| | | |logTypes.logs.get |v1alpha| | | |logTypes.logs.import |v1alpha|chronicle.log_ingest.ingest_log |secops log ingest | diff --git a/src/secops/chronicle/client.py b/src/secops/chronicle/client.py index 9b892272..6dd5b331 100644 --- a/src/secops/chronicle/client.py +++ b/src/secops/chronicle/client.py @@ -334,6 +334,10 @@ create_watchlist as _create_watchlist, update_watchlist as _update_watchlist, ) +from secops.chronicle.parser_validation import ( + get_analysis_report as _get_analysis_report, + trigger_github_checks as _trigger_github_checks, +) from secops.exceptions import SecOpsError @@ -761,6 +765,44 @@ def update_watchlist( update_mask, ) + def get_analysis_report(self, name: str) -> dict[str, Any]: + """Get a parser analysis report. + Args: + name: The full resource name of the analysis report. + Returns: + Dictionary containing the analysis report. + Raises: + APIError: If the API request fails. + """ + return _get_analysis_report(self, name) + + def trigger_github_checks( + self, + associated_pr: str, + log_type: str, + customer_id: str, + ) -> dict[str, Any]: + """Trigger GitHub checks for a parser. + + Args: + associated_pr: The PR string (e.g., "owner/repo/pull/123"). + log_type: The string name of the LogType enum. + customer_id: The customer UUID string. + + Returns: + Dictionary containing the response details. + + Raises: + SecOpsError: If gRPC modules or client stub are not available. + APIError: If the gRPC API request fails. + """ + return _trigger_github_checks( + self, + associated_pr=associated_pr, + log_type=log_type, + customer_id=customer_id, + ) + def get_stats( self, query: str, @@ -4643,3 +4685,6 @@ def update_rule_deployment( archived=archived, run_frequency=run_frequency, ) + +# Parser Validation methods + diff --git a/src/secops/chronicle/parser_validation.py b/src/secops/chronicle/parser_validation.py new file mode 100644 index 00000000..f10fb83f --- /dev/null +++ b/src/secops/chronicle/parser_validation.py @@ -0,0 +1,116 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Chronicle parser validation functionality.""" + +from typing import TYPE_CHECKING, Any + +from secops.exceptions import APIError, SecOpsError + +if TYPE_CHECKING: + from secops.chronicle.client import ChronicleClient + + +def trigger_github_checks( + client: "ChronicleClient", + associated_pr: str, + log_type: str, + customer_id: str, + timeout: int = 60, +) -> dict[str, Any]: + """Trigger GitHub checks for a parser. + + Args: + client: ChronicleClient instance + associated_pr: The PR string (e.g., "owner/repo/pull/123"). + log_type: The string name of the LogType enum. + customer_id: The customer UUID string. + timeout: Optional RPC timeout in seconds (default: 60). + + Returns: + Dictionary containing the response details. + + Raises: + SecOpsError: If input is invalid. + APIError: If the API request fails. + """ + instance_id = client.instance_id + if customer_id and customer_id != client.customer_id: + # Dev and staging use 'us' as the location + region = "us" if client.region in ["dev", "staging"] else client.region + instance_id = ( + f"projects/{client.project_id}/locations/" + f"{region}/instances/{customer_id}" + ) + + # The backend expects the resource name to be in the format: + # projects/*/locations/*/instances/*/logTypes/*/parsers/ + base_url = client.base_url(version="v1alpha") + + # First get the list of parsers for this log_type to find a valid + # parser UUID + parsers_url = f"{base_url}/{instance_id}/logTypes/{log_type}/parsers" + parsers_resp = client.session.get(parsers_url, timeout=timeout) + if not parsers_resp.ok: + raise APIError( + f"Failed to fetch parsers for log type {log_type}: " + f"{parsers_resp.text}" + ) + + parsers_data = parsers_resp.json() + if not parsers_data.get("parsers"): + raise SecOpsError(f"No parsers found for log type: {log_type}") + + # Use the first parser's name (which includes the UUID) + parser_name = parsers_data["parsers"][0]["name"] + + url = f"{base_url}/{parser_name}:runAnalysis" + payload = { + "report_type": "GITHUB_PARSER_VALIDATION", + "pull_request": associated_pr, + } + + response = client.session.post(url, json=payload, timeout=timeout) + + if not response.ok: + raise APIError(f"API call failed: {response.text}") + + return response.json() + + +def get_analysis_report( + client: "ChronicleClient", + name: str, + timeout: int = 60, +) -> dict[str, Any]: + """Get a parser analysis report. + Args: + client: ChronicleClient instance + name: The full resource name of the analysis report. + timeout: Optional timeout in seconds (default: 60). + Returns: + Dictionary containing the analysis report. + Raises: + APIError: If the API request fails. + """ + # The name includes 'projects/...', so we just append it to base_url + base_url = client.base_url(version="v1alpha") + url = f"{base_url}/{name}" + + response = client.session.get(url, timeout=timeout) + + if not response.ok: + raise APIError(f"API call failed: {response.text}") + + return response.json() diff --git a/src/secops/cli/cli_client.py b/src/secops/cli/cli_client.py index 4c483656..8f2f5326 100644 --- a/src/secops/cli/cli_client.py +++ b/src/secops/cli/cli_client.py @@ -26,6 +26,7 @@ from secops.cli.commands.investigation import setup_investigation_command from secops.cli.commands.iocs import setup_iocs_command from secops.cli.commands.log import setup_log_command +from secops.cli.commands.log_type import setup_log_type_commands from secops.cli.commands.log_processing import ( setup_log_processing_command, ) @@ -168,6 +169,7 @@ def build_parser() -> argparse.ArgumentParser: setup_investigation_command(subparsers) setup_iocs_command(subparsers) setup_log_command(subparsers) + setup_log_type_commands(subparsers) setup_log_processing_command(subparsers) setup_parser_command(subparsers) setup_parser_extension_command(subparsers) diff --git a/src/secops/cli/commands/log_type.py b/src/secops/cli/commands/log_type.py new file mode 100644 index 00000000..d15dc301 --- /dev/null +++ b/src/secops/cli/commands/log_type.py @@ -0,0 +1,111 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""CLI for ParserValidationToolingService under Log Type command group""" + +import sys + +from secops.cli.utils.formatters import output_formatter +from secops.exceptions import APIError, SecOpsError + + +def setup_log_type_commands(subparsers): + """Set up the log_type service commands for Parser Validation.""" + log_type_parser = subparsers.add_parser( + "log-type", help="Log Type related operations (including Parser Validation)" + ) + + log_type_subparsers = log_type_parser.add_subparsers( + title="Log Type Commands", + dest="log_type_command", + help="Log Type sub-command to execute" + ) + + if sys.version_info >= (3, 7): + log_type_subparsers.required = True + + log_type_parser.set_defaults( + func=lambda args, chronicle: log_type_parser.print_help() + ) + + # --- trigger-checks command --- + trigger_parser = log_type_subparsers.add_parser( + "trigger-checks", help="Trigger GitHub checks for a parser" + ) + trigger_parser.add_argument( + "--associated-pr", + "--associated_pr", + required=True, + help='The PR string (e.g., "owner/repo/pull/123").' + ) + trigger_parser.add_argument( + "--log-type", + "--log_type", + required=True, + help='The string name of the LogType enum (e.g., "BRO_DNS").' + ) + trigger_parser.add_argument( + "--customer-id", + "--customer_id", + required=True, + help="The customer UUID string." + ) + trigger_parser.set_defaults(func=handle_trigger_checks_command) + + # --- get-analysis-report command --- + get_report_parser = log_type_subparsers.add_parser( + "get-analysis-report", help="Get a parser analysis report" + ) + get_report_parser.add_argument( + "--name", + required=True, + help="The full resource name of the analysis report." + ) + get_report_parser.set_defaults(func=handle_get_analysis_report_command) + + +def handle_trigger_checks_command(args, chronicle): + """Handle trigger checks command.""" + try: + result = chronicle.trigger_github_checks( + associated_pr=args.associated_pr, + log_type=args.log_type, + customer_id=args.customer_id, + ) + output_formatter(result, args.output) + except APIError as e: + print(f"API error: {e}", file=sys.stderr) + sys.exit(1) + except SecOpsError as e: + print(f"Error: {e}", file=sys.stderr) + sys.exit(1) + except Exception as e: # pylint: disable=broad-exception-caught + print(f"Error triggering GitHub checks: {e}", file=sys.stderr) + sys.exit(1) + + +def handle_get_analysis_report_command(args, chronicle): + """Handle get analysis report command.""" + try: + result = chronicle.get_analysis_report(name=args.name) + output_formatter(result, args.output) + except APIError as e: + print(f"API error: {e}", file=sys.stderr) + sys.exit(1) + except SecOpsError as e: + print(f"Error: {e}", file=sys.stderr) + sys.exit(1) + except Exception as e: # pylint: disable=broad-exception-caught + print(f"Error fetching analysis report: {e}", file=sys.stderr) + sys.exit(1) diff --git a/tests/chronicle/test_client_parser_validation.py b/tests/chronicle/test_client_parser_validation.py new file mode 100644 index 00000000..49904bda --- /dev/null +++ b/tests/chronicle/test_client_parser_validation.py @@ -0,0 +1,61 @@ +"""Test parser validation methods on ChronicleClient.""" + +from unittest.mock import MagicMock +import pytest + +from secops.chronicle.client import ChronicleClient + + +@pytest.fixture +def mock_client(): + """Create a mock ChronicleClient.""" + client = ChronicleClient( + project_id="test-project", + customer_id="test-customer", + auth=MagicMock(), + ) + # Mock the parser validation service stub + client.parser_validation_service_stub = MagicMock() + return client + + +def test_trigger_github_checks(mock_client, monkeypatch): + """Test ChronicleClient.trigger_github_checks.""" + # Mock the underlying implementation to avoid gRPC dependency in tests + mock_impl = MagicMock(return_value={"message": "Success", "details": "Started"}) + monkeypatch.setattr( + "secops.chronicle.client._trigger_github_checks", mock_impl + ) + + result = mock_client.trigger_github_checks( + associated_pr="owner/repo/pull/123", + log_type="BRO_DNS", + customer_id="test-customer", + ) + + assert result == {"message": "Success", "details": "Started"} + mock_impl.assert_called_once_with( + mock_client, + associated_pr="owner/repo/pull/123", + log_type="BRO_DNS", + customer_id="test-customer", + ) + + +def test_get_analysis_report(mock_client, monkeypatch): + """Test ChronicleClient.get_analysis_report.""" + # Mock the underlying implementation + mock_impl = MagicMock(return_value={"reportId": "123"}) + monkeypatch.setattr( + "secops.chronicle.client._get_analysis_report", mock_impl + ) + + result = mock_client.get_analysis_report( + name="projects/test/locations/us/instances/test/logTypes/DEF/parsers/XYZ/analysisReports/123" + ) + + assert result == {"reportId": "123"} + mock_impl.assert_called_once_with( + mock_client, + "projects/test/locations/us/instances/test/logTypes/DEF/parsers/XYZ/analysisReports/123", + ) diff --git a/tests/cli/test_log_type.py b/tests/cli/test_log_type.py new file mode 100644 index 00000000..a75696b2 --- /dev/null +++ b/tests/cli/test_log_type.py @@ -0,0 +1,95 @@ +"""Unit tests for Log Type CLI commands.""" + +from unittest.mock import MagicMock +from argparse import Namespace +import pytest + +from secops.cli.commands.log_type import ( + handle_trigger_checks_command, + handle_get_analysis_report_command, +) +from secops.exceptions import APIError, SecOpsError + + +def test_handle_trigger_checks_command_success(): + """Test successful trigger_checks command execution.""" + args = Namespace( + associated_pr="owner/repo/pull/123", + log_type="BRO_DNS", + customer_id="1234-5678-uuid", + output="json", + ) + mock_chronicle = MagicMock() + mock_chronicle.trigger_github_checks.return_value = { + "message": "Success", + "details": "Details", + } + + try: + handle_trigger_checks_command(args, mock_chronicle) + except SystemExit: + pytest.fail("Command exited unexpectedly") + + mock_chronicle.trigger_github_checks.assert_called_once_with( + associated_pr="owner/repo/pull/123", + log_type="BRO_DNS", + customer_id="1234-5678-uuid", + ) + + +def test_handle_trigger_checks_command_api_error(capsys): + """Test trigger_checks command with APIError.""" + args = Namespace( + associated_pr="owner/repo/pull/123", + log_type="BRO_DNS", + customer_id="1234-5678-uuid", + output="json", + ) + mock_chronicle = MagicMock() + mock_chronicle.trigger_github_checks.side_effect = APIError("API fault") + + with pytest.raises(SystemExit) as exc: + handle_trigger_checks_command(args, mock_chronicle) + + assert exc.value.code == 1 + err = capsys.readouterr().err + assert "API error: API fault" in err + + +def test_handle_get_analysis_report_command_success(): + """Test successful get_analysis_report command execution.""" + args = Namespace( + name="projects/test/locations/us/instances/abc/logTypes/DEF/parsers/XYZ/analysisReports/123", + output="json", + ) + mock_chronicle = MagicMock() + mock_chronicle.get_analysis_report.return_value = { + "reportId": "123", + "status": "COMPLETED", + } + + try: + handle_get_analysis_report_command(args, mock_chronicle) + except SystemExit: + pytest.fail("Command exited unexpectedly") + + mock_chronicle.get_analysis_report.assert_called_once_with( + name="projects/test/locations/us/instances/abc/logTypes/DEF/parsers/XYZ/analysisReports/123" + ) + + +def test_handle_get_analysis_report_command_secops_error(capsys): + """Test get_analysis_report command with SecOpsError.""" + args = Namespace( + name="projects/test/locations/us/instances/abc/logTypes/DEF/parsers/XYZ/analysisReports/123", + output="json", + ) + mock_chronicle = MagicMock() + mock_chronicle.get_analysis_report.side_effect = SecOpsError("Invalid input") + + with pytest.raises(SystemExit) as exc: + handle_get_analysis_report_command(args, mock_chronicle) + + assert exc.value.code == 1 + err = capsys.readouterr().err + assert "Error: Invalid input" in err diff --git a/tests/cli/test_log_type_integration.py b/tests/cli/test_log_type_integration.py new file mode 100644 index 00000000..e794be1d --- /dev/null +++ b/tests/cli/test_log_type_integration.py @@ -0,0 +1,94 @@ +"""Integration tests for Log Type CLI commands.""" + +import json +import subprocess +import pytest + + +@pytest.mark.integration +def test_cli_log_type_lifecycle(cli_env, common_args): + """Test the complete log-type lifecycle commands.""" + + print("\nTesting log-type trigger-checks command") + + # We need a stable test fixture for the associated_pr. Since PRs are ephemeral, + # we will trigger a check for a dummy PR and expect either a successful trigger + # or a specific graceful failure (like 404 PR not found) to prove the CLI routing works. + trigger_cmd = ( + ["secops"] + + common_args + + [ + "--region", + "staging", + "--project-id", + "1234567890", + "--customer-id", + "123-abc-456-def", + "log-type", + "trigger-checks", + "--associated-pr", + "google/secops-wrapper/pull/1", + "--log-type", + "DUMMY_LOGTYPE", + "--customer-id", + "xyz-123-abc-456-def", + ] + ) + + result = subprocess.run(trigger_cmd, env=cli_env, capture_output=True, text=True) + + # Note: Depending on the backend environment, triggering a check on a fake PR/CustomerID + # might actually return a 400/404 APIError rather than a 0 exit code. + # We assert that the CLI executed and returned *something* from the server, + # even if it's an API error about the fake customer ID. + if result.returncode == 0: + try: + output = json.loads(result.stdout) + assert isinstance(output, dict) + print("Successfully triggered checks (or received valid JSON response)") + except json.JSONDecodeError: + pytest.fail(f"Could not decode JSON from successful exit: {result.stdout}") + else: + # If the backend rejects the fake data, we prove the CLI correctly caught the APIError + assert "API error" in result.stderr or "Error" in result.stderr + print(f"Server gracefully rejected the dummy trigger data: {result.stderr.strip()}") + + print("\nTesting log-type get-analysis-report command") + + # We supply a dummy resource name. The backend will likely 404, proving the routing works. + dummy_report_name = ( + "projects/140410331797/locations/us/instances/ebdc4bb9-878b-11e7-8455-10604b7cb5c1/logTypes/BRO_DNS/" + "parsers/xyz/analysisReports/123" + ) + + get_cmd = ( + ["secops"] + + common_args + + [ + "--project-id", + "140410331797", + "--customer-id", + "ebdc4bb9-878b-11e7-8455-10604b7cb5c1", + "log-type", + "get-analysis-report", + "--name", + dummy_report_name + ] + ) + + get_result = subprocess.run(get_cmd, env=cli_env, capture_output=True, text=True) + + if get_result.returncode == 0: + try: + output = json.loads(get_result.stdout) + assert isinstance(output, dict) + print("Successfully retrieved report") + except json.JSONDecodeError: + pytest.fail(f"Could not decode JSON: {get_result.stdout}") + else: + # We expect a 404 or similar API error since the report name is fake + assert "API error" in get_result.stderr or "Error" in get_result.stderr + print(f"Server gracefully rejected dummy report name: {get_result.stderr.strip()}") + +if __name__ == "__main__": + pytest.main(["-v", __file__, "-m", "integration"])