From e7fc94c8d27d15044d47346bed98d4c5f516be01 Mon Sep 17 00:00:00 2001 From: Tobias Date: Wed, 13 May 2026 13:59:11 +0200 Subject: [PATCH 01/17] bump version to 2.5.1.beta1 and update deepdiff dependency to 9.0.0 --- setup.cfg | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/setup.cfg b/setup.cfg index fa5f61c5..3ea62343 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,6 +1,6 @@ [metadata] name = IntuneCD -version = 2.5.0 +version = 2.5.1.beta1 author = Tobias Almén author_email = almenscorner@outlook.com description = Tool to backup and update configurations in Intune @@ -20,7 +20,7 @@ package_dir = packages = find: python_requires = >=3.9 install_requires = - deepdiff==8.4.2 + deepdiff==9.0.0 pyyaml>=6.0.2 msrest>=0.7.1 markdown_toclify>=0.1.7 @@ -35,4 +35,5 @@ console_scripts = IntuneCD-startbackup = IntuneCD.run_backup:start IntuneCD-startupdate = IntuneCD.run_update:start IntuneCD-startdocumentation = IntuneCD.run_documentation:start + IntuneCD-startcompare = IntuneCD.run_compare:start IntuneCD = IntuneCD.__main__:main From fb3db32b08d56fb74f3d4823c66e905a9c7ef0f4 Mon Sep 17 00:00:00 2001 From: Tobias Date: Wed, 13 May 2026 14:00:31 +0200 Subject: [PATCH 02/17] Add compare functionality to analyze drift between backup directories --- src/IntuneCD/__main__.py | 6 + src/IntuneCD/run_compare.py | 297 ++++++++++++++++++++++++++++++++++++ 2 files changed, 303 insertions(+) create mode 100644 src/IntuneCD/run_compare.py diff --git a/src/IntuneCD/__main__.py b/src/IntuneCD/__main__.py index ac761b43..489ac608 100644 --- a/src/IntuneCD/__main__.py +++ b/src/IntuneCD/__main__.py @@ -7,6 +7,7 @@ start as run_documentation, ) from IntuneCD.run_update import get_parser as get_update_parser, start as run_update +from IntuneCD.run_compare import get_parser as get_compare_parser, start as run_compare from importlib.metadata import version, PackageNotFoundError @@ -80,5 +81,10 @@ def main(): ) documentation_parser.set_defaults(func=run_documentation) + compare_parser = subparsers.add_parser( + "compare", parents=[get_compare_parser(include_help=False)] + ) + compare_parser.set_defaults(func=run_compare) + args = parser.parse_args() args.func(args) diff --git a/src/IntuneCD/run_compare.py b/src/IntuneCD/run_compare.py new file mode 100644 index 00000000..d962c987 --- /dev/null +++ b/src/IntuneCD/run_compare.py @@ -0,0 +1,297 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +""" +Offline drift/compare tool. + +Compares two local IntuneCD backup directories without making any API calls. +Useful when you want to check for drift between two environments (e.g. dev vs +prod backups) or between a backup and a known-good baseline, without needing +any write permissions. +""" + +import argparse +import json +import os +import re + +import yaml +from deepdiff import DeepDiff + +# Keys that are Intune-generated metadata and should not affect drift results. +# Mirrors the logic in IntuneCDBase.remove_keys(). +_METADATA_KEYS = { + "id", + "version", + "topicIdentifier", + "certificate", + "createdDateTime", + "lastModifiedDateTime", + "isAssigned", + "@odata.context", + "scheduledActionConfigurations@odata.context", + "scheduledActionsForRule@odata.context", + "sourceId", + "supportsScopeTags", + "companyCodes", + "isGlobalScript", + "highestAvailableVersion", + "token", + "lastSyncDateTime", + "isReadOnly", + "secretReferenceValueId", + "isEncrypted", + "modifiedDateTime", + "deployedAppCount", + "intunecd_name", + "deviceHealthScriptType", +} + + +def _strip_keys(data: dict, extra_keys: set = None) -> dict: + """Recursively remove metadata-only keys from a dict.""" + keys_to_remove = _METADATA_KEYS | (extra_keys or set()) + if isinstance(data, dict): + return { + k: _strip_keys(v, extra_keys) + for k, v in data.items() + if k not in keys_to_remove + } + if isinstance(data, list): + return [_strip_keys(item, extra_keys) for item in data] + return data + + +def _load_file(path: str) -> dict | None: + """Load a JSON or YAML backup file.""" + try: + with open(path, encoding="utf-8") as f: + if path.endswith(".yaml"): + return json.loads(json.dumps(yaml.safe_load(f))) + return json.load(f) + except Exception as e: + print(f"[WARNING] Could not load {path}: {e}") + return None + + +def _process_diffs(diff: dict) -> list: + """Convert a DeepDiff result into a simple list of change dicts.""" + result = [] + + def _setting(key: str) -> str: + match = re.search(r"\[(.+)\]", key) + return ( + match.group(1).split("[")[-1].replace("'", "").replace('"', "") + if match + else key + ) + + if "values_changed" in diff: + for key, val in diff["values_changed"].items(): + result.append( + { + "setting": _setting(key), + "source_val": str(val["new_value"])[:100], + "target_val": str(val["old_value"])[:100], + } + ) + + if "type_changes" in diff: + for key, val in diff["type_changes"].items(): + result.append( + { + "setting": _setting(key), + "source_val": str(val["new_value"])[:100], + "target_val": str(val["old_value"])[:100], + } + ) + + if "iterable_item_added" in diff: + for key in diff["iterable_item_added"]: + result.append( + { + "setting": _setting(key), + "source_val": str(list(diff["iterable_item_added"].values()))[:100], + "target_val": "", + } + ) + + if "iterable_item_removed" in diff: + for key in diff["iterable_item_removed"]: + result.append( + { + "setting": _setting(key), + "source_val": "", + "target_val": str(list(diff["iterable_item_removed"].values()))[:100], + } + ) + + return result + + +def _collect_files(root: str) -> dict[str, str]: + """Return {relative_path: absolute_path} for all JSON/YAML files under root.""" + files = {} + for dirpath, _, filenames in os.walk(root): + for filename in filenames: + if filename.endswith((".json", ".yaml")): + abs_path = os.path.join(dirpath, filename) + rel_path = os.path.relpath(abs_path, root) + files[rel_path] = abs_path + return files + + +def _config_type_from_path(rel_path: str) -> str: + """Derive a human-readable config type from the relative file path.""" + parts = rel_path.replace("\\", "/").split("/") + return parts[0] if len(parts) > 1 else "Unknown" + + +def compare(source: str, target: str, extra_keys: set = None) -> dict: + """ + Compare two backup directories. + + Args: + source: Path to the source backup (e.g. dev). + target: Path to the target backup (e.g. prod). + extra_keys: Additional keys to strip before comparing. + + Returns: + A summary dict with diff_count, changes, missing_in_target, missing_in_source. + """ + source_files = _collect_files(source) + target_files = _collect_files(target) + + source_keys = set(source_files) + target_keys = set(target_files) + + missing_in_target = sorted(source_keys - target_keys) + missing_in_source = sorted(target_keys - source_keys) + common = source_keys & target_keys + + diff_count = len(missing_in_target) + len(missing_in_source) + changes = [] + + for rel_path in sorted(common): + source_data = _load_file(source_files[rel_path]) + target_data = _load_file(target_files[rel_path]) + + if source_data is None or target_data is None: + continue + + source_clean = _strip_keys(source_data, extra_keys) + target_clean = _strip_keys(target_data, extra_keys) + + raw_diff = DeepDiff(target_clean, source_clean, ignore_order=True) + if not raw_diff: + continue + + diffs = _process_diffs(raw_diff) + if not diffs: + continue + + config_type = _config_type_from_path(rel_path) + name_source = source_data[0] if isinstance(source_data, list) else source_data + name = ( + (name_source.get("displayName") or name_source.get("name")) + if isinstance(name_source, dict) + else None + ) or os.path.splitext(os.path.basename(rel_path))[0] + + diff_count += len(diffs) + changes.append( + { + "file": rel_path, + "config_type": config_type, + "name": name, + "diffs": diffs, + } + ) + + return { + "type": "compare_summary", + "source": source, + "target": target, + "diff_count": diff_count, + "missing_in_target": missing_in_target, + "missing_in_source": missing_in_source, + "changes": changes, + } + + +def get_parser(include_help=True): + parser = argparse.ArgumentParser( + description=( + "Compare two IntuneCD backup directories for drift. " + "No API calls are made — works entirely on local files." + ), + add_help=include_help, + ) + parser.add_argument( + "-s", + "--source", + help="Path to the source backup directory (e.g. dev backup)", + required=True, + ) + parser.add_argument( + "-t", + "--target", + help="Path to the target backup directory (e.g. prod backup)", + required=True, + ) + parser.add_argument( + "-o", + "--output", + help="Path to write the JSON comparison summary. Defaults to compare_summary.json in the current directory.", + default="compare_summary.json", + ) + parser.add_argument( + "--exclude-keys", + help="Additional keys to strip before comparing, separated by space.", + nargs="+", + ) + + return parser + + +def start(args=None): + if args is None: + args = get_parser(include_help=True).parse_args() + + extra_keys = set(args.exclude_keys) if args.exclude_keys else None + + print(f"Comparing:\n source: {args.source}\n target: {args.target}\n") + + result = compare(args.source, args.target, extra_keys) + + with open(args.output, "w", encoding="utf-8") as f: + json.dump(result, f, indent=2) + + print(f"{'=' * 80}") + if result["missing_in_target"]: + print(f"In source but not in target ({len(result['missing_in_target'])}):") + for p in result["missing_in_target"]: + print(f" + {p}") + + if result["missing_in_source"]: + print(f"In target but not in source ({len(result['missing_in_source'])}):") + for p in result["missing_in_source"]: + print(f" - {p}") + + if result["changes"]: + print(f"\nConfigurations with differences ({len(result['changes'])}):") + for change in result["changes"]: + print(f"\n [{change['config_type']}] {change['name']} ({change['file']})") + for diff in change["diffs"]: + print( + f" setting: {diff['setting']}" + f" | source: {diff['source_val']}" + f" | target: {diff['target_val']}" + ) + + print(f"\nTotal diffs: {result['diff_count']}") + print(f"Summary written to: {args.output}") + + +if __name__ == "__main__": + start() From f5dfb552db8378704aa33b04a7934b49aaf37bde Mon Sep 17 00:00:00 2001 From: Tobias Date: Wed, 13 May 2026 14:01:02 +0200 Subject: [PATCH 03/17] change import logic in backup and update modules to fix compatibility with local and installed environments --- src/IntuneCD/backup_intune.py | 6 +++--- src/IntuneCD/update_intune.py | 9 ++++++--- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/src/IntuneCD/backup_intune.py b/src/IntuneCD/backup_intune.py index 90b85ec9..3376dcaf 100644 --- a/src/IntuneCD/backup_intune.py +++ b/src/IntuneCD/backup_intune.py @@ -16,11 +16,11 @@ def import_backup_module(module_path: str): Dynamically imports a backup module, handling both installed and local Git repository cases. """ try: - # Try importing as an installed package - return importlib.import_module(module_path, package="IntuneCD") + # Resolve relative to the package this file is part of, so dev runs + # (src.IntuneCD.*) and installed runs (IntuneCD.*) both work. + return importlib.import_module(module_path, package=__package__) except ModuleNotFoundError: try: - # If that fails, assume we're running locally and try direct relative import return importlib.import_module(module_path) except ModuleNotFoundError as e: print(f"[ERROR] Could not import {module_path}: {e}") diff --git a/src/IntuneCD/update_intune.py b/src/IntuneCD/update_intune.py index 1fbeb9c8..6e0aa3bf 100644 --- a/src/IntuneCD/update_intune.py +++ b/src/IntuneCD/update_intune.py @@ -11,11 +11,11 @@ def import_update_module(module_path: str): Dynamically imports a update module, handling both installed and local Git repository cases. """ try: - # Try importing as an installed package - return importlib.import_module(module_path, package="IntuneCD") + # Resolve relative to the package this file is part of, so dev runs + # (src.IntuneCD.*) and installed runs (IntuneCD.*) both work. + return importlib.import_module(module_path, package=__package__) except ModuleNotFoundError: try: - # If that fails, assume we're running locally and try direct relative import return importlib.import_module(module_path) except ModuleNotFoundError as e: print(f"[ERROR] Could not import {module_path}: {e}") @@ -205,4 +205,7 @@ def update_intune( if result: diff_summary.append(result) except Exception as e: + # import traceback + print(f"[ERROR] {module_name} failed with exception: {e}") + # traceback.print_exc() From 03a51e9174232adb30836d8d0ed4a42f3e60077a Mon Sep 17 00:00:00 2001 From: Tobias Date: Wed, 13 May 2026 14:01:55 +0200 Subject: [PATCH 04/17] Fix app config backup crash if one app could not be found #257 --- .../backup/Intune/AppConfiguration.py | 26 ++++++++++++------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/src/IntuneCD/backup/Intune/AppConfiguration.py b/src/IntuneCD/backup/Intune/AppConfiguration.py index b18a2bb8..436e87d2 100644 --- a/src/IntuneCD/backup/Intune/AppConfiguration.py +++ b/src/IntuneCD/backup/Intune/AppConfiguration.py @@ -48,17 +48,25 @@ def main(self) -> dict[str, any]: return None if self.graph_data["value"]: - item = "" for item in self.graph_data["value"]: - for app in item["targetedMobileApps"]: - app_data = self.make_graph_request( - endpoint=self.endpoint + self.APP_ENDPOINT + "/" + app + app = None + try: + for app in item["targetedMobileApps"]: + app_data = self.make_graph_request( + endpoint=self.endpoint + self.APP_ENDPOINT + "/" + app + ) + if app_data: + item.pop("targetedMobileApps") + item["targetedMobileApps"] = {} + item["targetedMobileApps"]["appName"] = app_data[ + "displayName" + ] + item["targetedMobileApps"]["type"] = app_data["@odata.type"] + except Exception as e: + self.log( + tag="error", + msg=f"Error getting app data for App Configuration {item.get('displayName', 'Unknown')} with app id {app}: {e}", ) - if app_data: - item.pop("targetedMobileApps") - item["targetedMobileApps"] = {} - item["targetedMobileApps"]["appName"] = app_data["displayName"] - item["targetedMobileApps"]["type"] = app_data["@odata.type"] if item.get("payloadJson"): item["payloadJson"] = self.decode_base64(item["payloadJson"]) item["payloadJson"] = json.loads(item["payloadJson"]) From 93cfa9646bc52e6d26675ece76f01bd0f594b1c5 Mon Sep 17 00:00:00 2001 From: Tobias Date: Wed, 13 May 2026 14:02:55 +0200 Subject: [PATCH 05/17] Enhance base64 validation and improve string formatting in clean_list function --- src/IntuneCD/intunecdlib/documentation_functions.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/src/IntuneCD/intunecdlib/documentation_functions.py b/src/IntuneCD/intunecdlib/documentation_functions.py index 64a59108..ff7b795f 100644 --- a/src/IntuneCD/intunecdlib/documentation_functions.py +++ b/src/IntuneCD/intunecdlib/documentation_functions.py @@ -181,9 +181,11 @@ def is_base64(s): decoded = base64.b64decode(s.encode()) else: decoded = base64.b64decode(s) - # If decoding succeeds and the decoded bytes match the original string, it's a valid base64-encoded string - return decoded == s.encode() - except (TypeError, binascii.Error): + # Verify the decoded bytes are valid UTF-8 + decoded.decode("utf-8") + # If decoding succeeds and re-encoding matches the original, it's a valid base64-encoded string + return base64.b64encode(decoded).decode("utf-8") == s + except (TypeError, binascii.Error, UnicodeDecodeError): # If decoding fails, it's not a valid base64-encoded string return False @@ -256,6 +258,8 @@ def dict_to_ul(val) -> str: def simple_value_to_string(key, val) -> str: if decode and is_base64(val): val = decode_base64(val) + val = val.replace("\r\n", "
").replace("\r", "
").replace("\n", "
") + return f"**{key}:**
Click to expand...{val}

" if isinstance(val, str): val = val.replace("\\", "\\\\") @@ -279,6 +283,8 @@ def list_string(item_list) -> str: def string(s) -> str: if decode and is_base64(s): s = decode_base64(s) + s = s.replace("\r\n", "
").replace("\r", "
").replace("\n", "
") + return f"
Click to expand...{s}
" if len(s) > 200: string = f"
Click to expand...{s}
" From 601051c10d3988c5b5fccd1ae97ad3aee2056624 Mon Sep 17 00:00:00 2001 From: Tobias Date: Wed, 13 May 2026 14:22:17 +0200 Subject: [PATCH 06/17] Update pytest and pytest-cov versions in test requirements --- test-requirements.txt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test-requirements.txt b/test-requirements.txt index b3426f3f..9bb9a489 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -3,6 +3,6 @@ testfixtures>=7.0 deepdiff>=5.6.0 requests msal -pytest==6.2.5 -pytest-cov==3.0.0 +pytest>=8.3 +pytest-cov>=5.0 pytablewriter From 753891b5d9ec4678e4ad47a115b2f53c8ce859a7 Mon Sep 17 00:00:00 2001 From: Tobias Date: Wed, 13 May 2026 14:24:48 +0200 Subject: [PATCH 07/17] Fix compliance updates stopping on object is not iterable #256 --- .../backup/Intune/DeviceCompliance.py | 6 ++-- src/IntuneCD/update/Intune/Compliance.py | 20 ++++++++----- .../update/Intune/DeviceCompliance.py | 30 +++++++++++-------- 3 files changed, 34 insertions(+), 22 deletions(-) diff --git a/src/IntuneCD/backup/Intune/DeviceCompliance.py b/src/IntuneCD/backup/Intune/DeviceCompliance.py index 76b40be4..d10c9b4b 100644 --- a/src/IntuneCD/backup/Intune/DeviceCompliance.py +++ b/src/IntuneCD/backup/Intune/DeviceCompliance.py @@ -167,9 +167,9 @@ def main(self) -> dict[str, any]: for action in item["scheduledActionsForRule"]: self.remove_keys(action) self._get_notification_template(action) - for config in item["scheduledActionsForRule"][0][ - "scheduledActionConfigurations" - ]: + for config in item["scheduledActionsForRule"][0].get( + "scheduledActionConfigurations", [] + ): self.remove_keys(config) try: diff --git a/src/IntuneCD/update/Intune/Compliance.py b/src/IntuneCD/update/Intune/Compliance.py index d462bb6e..0508ccbf 100644 --- a/src/IntuneCD/update/Intune/Compliance.py +++ b/src/IntuneCD/update/Intune/Compliance.py @@ -260,9 +260,9 @@ def _get_notification_template_id(self, rule: dict[str, any]) -> dict[str, any]: 0 ]["id"] else: - action[ - "notificationTemplateId" - ] = "00000000-0000-0000-0000-000000000000" + action["notificationTemplateId"] = ( + "00000000-0000-0000-0000-000000000000" + ) action.pop("notificationTemplateName") @@ -283,7 +283,12 @@ def main(self) -> dict[str, any]: "/assignments", ) - for filename in os.listdir(self.path): + filenames = os.listdir(self.path) + skip = self._duplicate_filenames_to_skip(filenames) + + for filename in filenames: + if filename in skip: + continue # reset params self.params = None self.create_request = None @@ -295,6 +300,7 @@ def main(self) -> dict[str, any]: self.create_request = None if "technologies" not in repo_data: continue + self.match_id = self._match_id_from_filename(filename) self.match_info = { "name": repo_data.get("name"), "technologies": repo_data.get("technologies"), @@ -323,9 +329,9 @@ def main(self) -> dict[str, any]: repo_data = self._remove_compliance_keys(repo_data) for item in intune_data["value"]: - for action in item["scheduledActionsForRule"][0][ - "scheduledActionConfigurations" - ]: + for action in item["scheduledActionsForRule"][0].get( + "scheduledActionConfigurations", [] + ): self.remove_keys(action) try: diff --git a/src/IntuneCD/update/Intune/DeviceCompliance.py b/src/IntuneCD/update/Intune/DeviceCompliance.py index 62931044..b99928f7 100644 --- a/src/IntuneCD/update/Intune/DeviceCompliance.py +++ b/src/IntuneCD/update/Intune/DeviceCompliance.py @@ -46,9 +46,9 @@ def _set_compliance_script_id(self, data: dict) -> dict[str, any]: }, ) if compliance_script_id.get("value"): - data["deviceCompliancePolicyScript"][ - "deviceComplianceScriptId" - ] = compliance_script_id["value"][0]["id"] + data["deviceCompliancePolicyScript"]["deviceComplianceScriptId"] = ( + compliance_script_id["value"][0]["id"] + ) return data @@ -77,8 +77,8 @@ def _scheduledActionsForRule_diff_check(self, repo_data: dict) -> None: repo_data (dict): The data to check the scheduled actions for the rule for """ for intune_action, repo_action in zip( - self.downstream_object.get("scheduledActionsForRule"), - repo_data["scheduledActionsForRule"], + self.downstream_object.get("scheduledActionsForRule", []), + repo_data.get("scheduledActionsForRule", []), ): action_diff = self.get_diffs(repo_action, intune_action, None) @@ -89,7 +89,7 @@ def _scheduledActionsForRule_diff_check(self, repo_data: dict) -> None: "ruleName": "PasswordRequired", "scheduledActionConfigurations": repo_data[ "scheduledActionsForRule" - ][0]["scheduledActionConfigurations"], + ][0].get("scheduledActionConfigurations", []), } ] } @@ -114,7 +114,7 @@ def _get_notification_template_id(self, rule: dict[str, any]) -> dict[str, any]: Returns: dict[str, any]: The notification template """ - for action in rule["scheduledActionConfigurations"]: + for action in rule.get("scheduledActionConfigurations", []): if action.get("notificationTemplateName"): notification_template = self.make_graph_request( self.endpoint @@ -128,9 +128,9 @@ def _get_notification_template_id(self, rule: dict[str, any]) -> dict[str, any]: 0 ]["id"] else: - action[ - "notificationTemplateId" - ] = "00000000-0000-0000-0000-000000000000" + action["notificationTemplateId"] = ( + "00000000-0000-0000-0000-000000000000" + ) action.pop("notificationTemplateName") @@ -151,13 +151,19 @@ def main(self) -> dict[str, any]: "/assignments", ) - for filename in os.listdir(self.path): + filenames = os.listdir(self.path) + skip = self._duplicate_filenames_to_skip(filenames) + + for filename in filenames: + if filename in skip: + continue self.config_type = "Compliance Policy" self.notify = True repo_data = self.load_repo_data(filename) if repo_data: if repo_data.get("platforms") == "linux": continue + self.match_id = self._match_id_from_filename(filename) self.match_info = { "displayName": repo_data.get("displayName"), "@odata.type": repo_data.get("@odata.type"), @@ -169,7 +175,7 @@ def main(self) -> dict[str, any]: if repo_data is False: continue - for rule in repo_data.get("scheduledActionsForRule"): + for rule in repo_data.get("scheduledActionsForRule") or []: self._get_notification_template_id(rule) for item in intune_data.get( From 0d137916af4add4ea86a32c57d91ad7f031cea8f Mon Sep 17 00:00:00 2001 From: Tobias Date: Wed, 13 May 2026 14:25:03 +0200 Subject: [PATCH 08/17] Update expected data format in document_configs test and increase limit parameter --- tests/test_documentation_functions.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_documentation_functions.py b/tests/test_documentation_functions.py index f907109f..8c26e1f4 100644 --- a/tests/test_documentation_functions.py +++ b/tests/test_documentation_functions.py @@ -125,13 +125,13 @@ def test_document_configs(self): '{"@odata.type":"test","test":"test","name":"test","description":"test","testvals":"1,2","testbool":false,"testlist":["test"],"testlistdict":[{"test":{"test":{"test":["1"],"testb64":"dW5pY29ybg=="}}}],"testdict2":{"test":{"test":{"test":["1"]}}},"testdictlist":{"test":["a","b","c"]},"assignments":[{"intent":"Include","target":{"@odata.type":"#test","groupName":"test-group","deviceAndAppManagementAssignmentFilterId":"test-filter","deviceAndAppManagementAssignmentFilterType":"test"}}]}', encoding="utf-8", ) - self.expected_data = "##test###testDescription:test####Assignments|intent|target|filtertype|filtername||-------|----------|-----------|-----------||Include|test-group|test|test-filter|####Configuration|setting|value||------------|-------------------------------------------------------------------------------------||Odatatype|test||Test|test||Name|test||Testvals|1,2||Testbool|False||Testlist|test
||Testlistdict|**test:**
    **test:**
    • 1
    **testb64:**dW5pY29ybg==

||Testdict2|**test:**
    **test:**
      **test:**
      • 1
||Testdictlist|**test:**
  • a
  • b
  • c
|" + self.expected_data = "##test###testDescription:test####Assignments|intent|target|filtertype|filtername||-------|----------|-----------|-----------||Include|test-group|test|test-filter|####Configuration|setting|value||------------|----------------------------------------------------------------------------------------------------------------------------------------||Odatatype|test||Test|test||Name|test||Testvals|1,2||Testbool|False||Testlist|test
||Testlistdict|**test:**
    **test:**
    • 1
    **testb64:**
    Clicktoexpand...unicorn


||Testdict2|**test:**
    **test:**
      **test:**
      • 1
||Testdictlist|**test:**
  • a
  • b
  • c
|" document_configs( f"{self.directory.path}/config", f"{self.directory.path}/test.md", "test", - 100, + 10000, split=False, cleanup=True, decode=True, From 8caa5ff94b1dd960dda95dbdd503f40e8c3212e8 Mon Sep 17 00:00:00 2001 From: Tobias Date: Wed, 13 May 2026 14:25:42 +0200 Subject: [PATCH 09/17] Add match_id extraction and duplicate filename handling across update modules #255 --- src/IntuneCD/intunecdlib/BaseUpdateModule.py | 61 ++++++++++++++++++- .../update/Intune/AppConfiguration.py | 8 ++- src/IntuneCD/update/Intune/AppProtection.py | 8 ++- .../update/Intune/AppleEnrollmentProfile.py | 8 ++- .../update/Intune/ComplianceScripts.py | 8 ++- .../update/Intune/ConditionalAccess.py | 8 ++- .../update/Intune/CustomAttributes.py | 8 ++- .../update/Intune/DeviceCategories.py | 8 ++- .../update/Intune/DeviceConfigurations.py | 8 ++- .../update/Intune/EnrollmentConfigurations.py | 8 ++- .../update/Intune/EnrollmentStatusPage.py | 8 ++- src/IntuneCD/update/Intune/Filters.py | 8 ++- .../Intune/GroupPolicyConfigurations.py | 8 ++- .../update/Intune/ManagementIntents.py | 10 ++- .../update/Intune/NotificationTemplate.py | 8 ++- .../update/Intune/PowerShellScripts.py | 8 ++- .../update/Intune/ProactiveRemediation.py | 8 ++- .../update/Intune/ReusableSettings.py | 8 ++- src/IntuneCD/update/Intune/Roles.py | 8 ++- src/IntuneCD/update/Intune/ScopeTags.py | 8 ++- src/IntuneCD/update/Intune/SettingsCatalog.py | 8 ++- src/IntuneCD/update/Intune/ShellScripts.py | 8 ++- .../update/Intune/WindowsDriverUpdates.py | 8 ++- .../update/Intune/WindowsEnrollmentProfile.py | 8 ++- .../update/Intune/WindowsFeatureUpdates.py | 8 ++- .../update/Intune/WindowsQualityUpdates.py | 8 ++- 26 files changed, 235 insertions(+), 28 deletions(-) diff --git a/src/IntuneCD/intunecdlib/BaseUpdateModule.py b/src/IntuneCD/intunecdlib/BaseUpdateModule.py index af7c564f..30818729 100644 --- a/src/IntuneCD/intunecdlib/BaseUpdateModule.py +++ b/src/IntuneCD/intunecdlib/BaseUpdateModule.py @@ -67,6 +67,7 @@ def __init__( self.azure_update = False self.config_type = None self.match_info = None + self.match_id = None self.config_endpoint = None self.downstream_assignments = None self.create_request = None @@ -399,16 +400,72 @@ def create_downstream_data( repo_assignments, [], self.assignment_key, self.create_request["id"] ) - def get_match_data(self, intune_data: dict, match_info: dict) -> tuple: + _FILENAME_ID_RE = re.compile( + r"__(?P[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12})(?=\.[^.]+$)" + ) + + def _match_id_from_filename(self, filename: str) -> str: + """Extracts an `__` suffix from a backup filename, if present.""" + if not filename: + return None + match = self._FILENAME_ID_RE.search(filename) + return match.group("id") if match else None + + def _duplicate_filenames_to_skip(self, filenames: list) -> set: + """Detects filenames that would resolve to the same tenant object. + + Groups files by their name with any `__` suffix stripped. If a + group has more than one file: + - If any file in the group has an ID suffix, files without one are + skipped (the ID-suffixed entry is the authoritative copy). + - If no file in the group has an ID, all but the first are skipped. + """ + groups: dict = {} + for f in filenames: + base = self._FILENAME_ID_RE.sub("", f) + groups.setdefault(base, []).append(f) + + skip: set = set() + for base, fnames in groups.items(): + if len(fnames) <= 1: + continue + with_id = [f for f in fnames if self._FILENAME_ID_RE.search(f)] + without_id = [f for f in fnames if not self._FILENAME_ID_RE.search(f)] + if with_id: + for f in without_id: + skip.add(f) + self.log( + tag="warning", + msg=f"Skipping {f}: duplicate of ID-suffixed file(s) {with_id}", + ) + else: + for f in without_id[1:]: + skip.add(f) + self.log( + tag="error", + msg=f"Skipping {f}: duplicate of {without_id[0]} with no ID to disambiguate", + ) + return skip + + def get_match_data( + self, intune_data: dict, match_info: dict, match_id: str = None + ) -> tuple: """Gets the matching data Args: intune_data (dict): The intune data match_info (dict): The match info from the repository + match_id (str, optional): If provided, prefer an item with this id + before falling back to match_info matching. Returns: tuple: The matching data """ + if match_id: + for item in intune_data: + if item.get("id") == match_id: + intune_data.remove(item) + return dict(item), item["id"] config_match_count = len(match_info) intune_item = None intune_id = None @@ -626,7 +683,7 @@ def process_update( self.downstream_id = downstream_data.get("id", "") else: self.downstream_object, self.downstream_id = self.get_match_data( - downstream_data, self.match_info + downstream_data, self.match_info, self.match_id ) if self.downstream_object: diff --git a/src/IntuneCD/update/Intune/AppConfiguration.py b/src/IntuneCD/update/Intune/AppConfiguration.py index 24ae2215..2f3030e6 100644 --- a/src/IntuneCD/update/Intune/AppConfiguration.py +++ b/src/IntuneCD/update/Intune/AppConfiguration.py @@ -84,9 +84,15 @@ def main(self) -> dict[str, any]: "/assignments", ) - for filename in os.listdir(self.path): + filenames = os.listdir(self.path) + skip = self._duplicate_filenames_to_skip(filenames) + + for filename in filenames: + if filename in skip: + continue repo_data = self.load_repo_data(filename) if repo_data: + self.match_id = self._match_id_from_filename(filename) self.match_info = { "displayName": repo_data.get("displayName"), "@odata.type": repo_data.get("@odata.type"), diff --git a/src/IntuneCD/update/Intune/AppProtection.py b/src/IntuneCD/update/Intune/AppProtection.py index da11d721..e71cd948 100644 --- a/src/IntuneCD/update/Intune/AppProtection.py +++ b/src/IntuneCD/update/Intune/AppProtection.py @@ -101,10 +101,16 @@ def main(self) -> dict[str, any]: "/assignments", ) - for filename in os.listdir(self.path): + filenames = os.listdir(self.path) + skip = self._duplicate_filenames_to_skip(filenames) + + for filename in filenames: + if filename in skip: + continue self.assignment_endpoint = "/deviceAppManagement/" repo_data = self.load_repo_data(filename) if repo_data: + self.match_id = self._match_id_from_filename(filename) self.match_info = self._get_match_info(repo_data) self.name = repo_data.get("displayName") diff_data = self.create_diff_data(self.name, self.config_type) diff --git a/src/IntuneCD/update/Intune/AppleEnrollmentProfile.py b/src/IntuneCD/update/Intune/AppleEnrollmentProfile.py index efbcd514..f4cd7f85 100644 --- a/src/IntuneCD/update/Intune/AppleEnrollmentProfile.py +++ b/src/IntuneCD/update/Intune/AppleEnrollmentProfile.py @@ -76,11 +76,17 @@ def main(self) -> dict[str, any]: if value is not None ] - for filename in os.listdir(self.path): + filenames = os.listdir(self.path) + skip = self._duplicate_filenames_to_skip(filenames) + + for filename in filenames: + if filename in skip: + continue self.downstream_id = None repo_data = self.load_repo_data(filename) if repo_data: repo_data.pop("isDefault", None) + self.match_id = self._match_id_from_filename(filename) self.match_info = { "displayName": repo_data.get("displayName"), } diff --git a/src/IntuneCD/update/Intune/ComplianceScripts.py b/src/IntuneCD/update/Intune/ComplianceScripts.py index 55a0fe46..1c05df3a 100644 --- a/src/IntuneCD/update/Intune/ComplianceScripts.py +++ b/src/IntuneCD/update/Intune/ComplianceScripts.py @@ -68,13 +68,19 @@ def main(self) -> dict[str, any]: return None # Get details for each script to populate script content intune_data["value"] = self._get_script_details(intune_data) - for filename in os.listdir(self.path): + filenames = os.listdir(self.path) + skip = self._duplicate_filenames_to_skip(filenames) + + for filename in filenames: + if filename in skip: + continue self.notify = True repo_data = self.load_repo_data(filename) if repo_data: # Skip if policy contains settingDefinitionId as it is not a device compliance script if repo_data.get("settingDefinitionId"): continue + self.match_id = self._match_id_from_filename(filename) self.match_info = { "displayName": repo_data.get("displayName"), } diff --git a/src/IntuneCD/update/Intune/ConditionalAccess.py b/src/IntuneCD/update/Intune/ConditionalAccess.py index bd95bd20..94b42d2c 100644 --- a/src/IntuneCD/update/Intune/ConditionalAccess.py +++ b/src/IntuneCD/update/Intune/ConditionalAccess.py @@ -43,7 +43,12 @@ def main(self) -> dict[str, any]: self.log(tag="error", msg=f"Error getting {self.config_type} data: {e}") return None - for filename in os.listdir(self.path): + filenames = os.listdir(self.path) + skip = self._duplicate_filenames_to_skip(filenames) + + for filename in filenames: + if filename in skip: + continue repo_data = self.load_repo_data(filename) if repo_data: self.get_pop_keys( @@ -55,6 +60,7 @@ def main(self) -> dict[str, any]: ], "pop", ) + self.match_id = self._match_id_from_filename(filename) self.match_info = { "displayName": repo_data.get("displayName"), } diff --git a/src/IntuneCD/update/Intune/CustomAttributes.py b/src/IntuneCD/update/Intune/CustomAttributes.py index 0a094794..32c01b05 100644 --- a/src/IntuneCD/update/Intune/CustomAttributes.py +++ b/src/IntuneCD/update/Intune/CustomAttributes.py @@ -103,11 +103,17 @@ def main(self) -> dict[str, any]: "", ) - for filename in os.listdir(self.path): + filenames = os.listdir(self.path) + skip = self._duplicate_filenames_to_skip(filenames) + + for filename in filenames: + if filename in skip: + continue self.notify = True script_data = None repo_data = self.load_repo_data(filename) if repo_data: + self.match_id = self._match_id_from_filename(filename) self.match_info = { "displayName": repo_data.get("displayName"), } diff --git a/src/IntuneCD/update/Intune/DeviceCategories.py b/src/IntuneCD/update/Intune/DeviceCategories.py index 8e3cc026..9aeccbcd 100644 --- a/src/IntuneCD/update/Intune/DeviceCategories.py +++ b/src/IntuneCD/update/Intune/DeviceCategories.py @@ -35,9 +35,15 @@ def main(self) -> dict[str, any]: self.log(tag="error", msg=f"Error getting {self.config_type} data: {e}") return None - for filename in os.listdir(self.path): + filenames = os.listdir(self.path) + skip = self._duplicate_filenames_to_skip(filenames) + + for filename in filenames: + if filename in skip: + continue repo_data = self.load_repo_data(filename) if repo_data: + self.match_id = self._match_id_from_filename(filename) self.match_info = { "displayName": repo_data.get("displayName"), } diff --git a/src/IntuneCD/update/Intune/DeviceConfigurations.py b/src/IntuneCD/update/Intune/DeviceConfigurations.py index 21ca3e08..eb592286 100644 --- a/src/IntuneCD/update/Intune/DeviceConfigurations.py +++ b/src/IntuneCD/update/Intune/DeviceConfigurations.py @@ -137,7 +137,12 @@ def main(self) -> dict[str, any]: "#microsoft.graph.macOSCustomConfiguration", ] - for filename in os.listdir(self.path): + filenames = os.listdir(self.path) + skip = self._duplicate_filenames_to_skip(filenames) + + for filename in filenames: + if filename in skip: + continue self.notify = True self.config_type = "Device Configuration" @@ -145,6 +150,7 @@ def main(self) -> dict[str, any]: if repo_data: if "@odata.type" not in repo_data: continue + self.match_id = self._match_id_from_filename(filename) self.match_info = { "displayName": repo_data.get("displayName"), "@odata.type": repo_data.get("@odata.type"), diff --git a/src/IntuneCD/update/Intune/EnrollmentConfigurations.py b/src/IntuneCD/update/Intune/EnrollmentConfigurations.py index d9ad6b27..249be02a 100644 --- a/src/IntuneCD/update/Intune/EnrollmentConfigurations.py +++ b/src/IntuneCD/update/Intune/EnrollmentConfigurations.py @@ -109,10 +109,16 @@ def main(self) -> dict[str, any]: != "#microsoft.graph.windows10EnrollmentCompletionPageConfiguration" ] - for filename in os.listdir(self.path): + filenames = os.listdir(self.path) + skip = self._duplicate_filenames_to_skip(filenames) + + for filename in filenames: + if filename in skip: + continue self.downstream_id = None repo_data = self.load_repo_data(filename) if repo_data: + self.match_id = self._match_id_from_filename(filename) if ( repo_data["@odata.type"] == "#microsoft.graph.deviceEnrollmentPlatformRestrictionConfiguration" diff --git a/src/IntuneCD/update/Intune/EnrollmentStatusPage.py b/src/IntuneCD/update/Intune/EnrollmentStatusPage.py index f947f93d..55782320 100644 --- a/src/IntuneCD/update/Intune/EnrollmentStatusPage.py +++ b/src/IntuneCD/update/Intune/EnrollmentStatusPage.py @@ -78,9 +78,15 @@ def main(self) -> dict[str, any]: "/assignments", ) - for filename in os.listdir(self.path): + filenames = os.listdir(self.path) + skip = self._duplicate_filenames_to_skip(filenames) + + for filename in filenames: + if filename in skip: + continue repo_data = self.load_repo_data(filename) if repo_data: + self.match_id = self._match_id_from_filename(filename) self.match_info = { "displayName": repo_data.get("displayName"), "@odata.type": repo_data.get("@odata.type"), diff --git a/src/IntuneCD/update/Intune/Filters.py b/src/IntuneCD/update/Intune/Filters.py index aef8677d..2ae8e3f8 100644 --- a/src/IntuneCD/update/Intune/Filters.py +++ b/src/IntuneCD/update/Intune/Filters.py @@ -36,9 +36,15 @@ def main(self) -> dict[str, any]: self.log(tag="error", msg=f"Error getting {self.config_type} data: {e}") return None - for filename in os.listdir(self.path): + filenames = os.listdir(self.path) + skip = self._duplicate_filenames_to_skip(filenames) + + for filename in filenames: + if filename in skip: + continue repo_data = self.load_repo_data(filename) if repo_data: + self.match_id = self._match_id_from_filename(filename) self.match_info = { "displayName": repo_data.get("displayName"), } diff --git a/src/IntuneCD/update/Intune/GroupPolicyConfigurations.py b/src/IntuneCD/update/Intune/GroupPolicyConfigurations.py index 80ebd24d..69261e7d 100644 --- a/src/IntuneCD/update/Intune/GroupPolicyConfigurations.py +++ b/src/IntuneCD/update/Intune/GroupPolicyConfigurations.py @@ -426,12 +426,18 @@ def main(self) -> dict[str, any]: intune_profiles.append(profile) - for filename in os.listdir(self.path): + filenames = os.listdir(self.path) + skip = self._duplicate_filenames_to_skip(filenames) + + for filename in filenames: + if filename in skip: + continue repo_data = self.load_repo_data(filename) if repo_data: self.create_request = None self.config_type = "Group Policy Configuration" self.notify = True + self.match_id = self._match_id_from_filename(filename) self.match_info = { "displayName": repo_data.get("displayName"), "policyConfigurationIngestionType": repo_data.get( diff --git a/src/IntuneCD/update/Intune/ManagementIntents.py b/src/IntuneCD/update/Intune/ManagementIntents.py index ccd836ed..0081e30a 100644 --- a/src/IntuneCD/update/Intune/ManagementIntents.py +++ b/src/IntuneCD/update/Intune/ManagementIntents.py @@ -129,7 +129,12 @@ def main(self) -> dict[str, any]: # Set glob pattern pattern = self.path + "*/*" - for filename in glob.glob(pattern, recursive=True): + filenames = glob.glob(pattern, recursive=True) + skip = self._duplicate_filenames_to_skip(filenames) + + for filename in filenames: + if filename in skip: + continue self.notify = True repo_data = self.load_repo_data(filename) if repo_data: @@ -142,6 +147,7 @@ def main(self) -> dict[str, any]: ) continue + self.match_id = self._match_id_from_filename(filename) self.match_info = { "displayName": repo_data.get("displayName"), "templateId": repo_data.get("templateId"), @@ -152,7 +158,7 @@ def main(self) -> dict[str, any]: self.diff_data["name"] = self.name intune_intent, intune_id = self.get_match_data( - intents["value"], self.match_info + intents["value"], self.match_info, self.match_id ) if intune_intent: diff --git a/src/IntuneCD/update/Intune/NotificationTemplate.py b/src/IntuneCD/update/Intune/NotificationTemplate.py index 7ba9e4a4..81e232b3 100644 --- a/src/IntuneCD/update/Intune/NotificationTemplate.py +++ b/src/IntuneCD/update/Intune/NotificationTemplate.py @@ -141,13 +141,19 @@ def main(self) -> dict[str, any]: if val["displayName"] != "EnrollmentNotificationInternalMEO" ] - for filename in os.listdir(self.path): + filenames = os.listdir(self.path) + skip = self._duplicate_filenames_to_skip(filenames) + + for filename in filenames: + if filename in skip: + continue # Reset the paramters self.create_request = None self.config_type = "Notification Template" repo_data = self.load_repo_data(filename) if repo_data: + self.match_id = self._match_id_from_filename(filename) self.match_info = { "displayName": repo_data.get("displayName"), } diff --git a/src/IntuneCD/update/Intune/PowerShellScripts.py b/src/IntuneCD/update/Intune/PowerShellScripts.py index 623b26b5..3539fb4a 100644 --- a/src/IntuneCD/update/Intune/PowerShellScripts.py +++ b/src/IntuneCD/update/Intune/PowerShellScripts.py @@ -97,11 +97,17 @@ def main(self) -> dict[str, any]: "", ) - for filename in os.listdir(self.path): + filenames = os.listdir(self.path) + skip = self._duplicate_filenames_to_skip(filenames) + + for filename in filenames: + if filename in skip: + continue self.notify = True script_data = None repo_data = self.load_repo_data(filename) if repo_data: + self.match_id = self._match_id_from_filename(filename) self.match_info = { "displayName": repo_data.get("displayName"), } diff --git a/src/IntuneCD/update/Intune/ProactiveRemediation.py b/src/IntuneCD/update/Intune/ProactiveRemediation.py index f0b3e3aa..fa8edcc7 100644 --- a/src/IntuneCD/update/Intune/ProactiveRemediation.py +++ b/src/IntuneCD/update/Intune/ProactiveRemediation.py @@ -103,7 +103,12 @@ def main(self) -> dict[str, any]: "", ) - for filename in os.listdir(self.path): + filenames = os.listdir(self.path) + skip = self._duplicate_filenames_to_skip(filenames) + + for filename in filenames: + if filename in skip: + continue self.config_type = "Proactive Remediation" self.notify = True self.exclude_paths = [ @@ -116,6 +121,7 @@ def main(self) -> dict[str, any]: repo_data = self.load_repo_data(filename) if repo_data: repo_data.pop("deviceHealthScriptType", None) + self.match_id = self._match_id_from_filename(filename) self.match_info = { "displayName": repo_data.get("displayName"), } diff --git a/src/IntuneCD/update/Intune/ReusableSettings.py b/src/IntuneCD/update/Intune/ReusableSettings.py index ceb8bb19..185ba54e 100644 --- a/src/IntuneCD/update/Intune/ReusableSettings.py +++ b/src/IntuneCD/update/Intune/ReusableSettings.py @@ -63,13 +63,19 @@ def main(self) -> dict[str, any]: self.log(tag="error", msg=f"Error getting {self.config_type} data: {e}") return None - for filename in os.listdir(self.path): + filenames = os.listdir(self.path) + skip = self._duplicate_filenames_to_skip(filenames) + + for filename in filenames: + if filename in skip: + continue self.notify = True repo_data = self.load_repo_data(filename) if repo_data: # If policy does not contain settingDefinitionId skip as it is not a reusable setting if not repo_data.get("settingDefinitionId"): continue + self.match_id = self._match_id_from_filename(filename) self.match_info = { "displayName": repo_data.get("displayName"), } diff --git a/src/IntuneCD/update/Intune/Roles.py b/src/IntuneCD/update/Intune/Roles.py index 5f47f649..ac574c19 100644 --- a/src/IntuneCD/update/Intune/Roles.py +++ b/src/IntuneCD/update/Intune/Roles.py @@ -44,9 +44,15 @@ def main(self) -> dict[str, any]: self.log(tag="error", msg=f"Error getting {self.config_type} data: {e}") return None - for filename in os.listdir(self.path): + filenames = os.listdir(self.path) + skip = self._duplicate_filenames_to_skip(filenames) + + for filename in filenames: + if filename in skip: + continue repo_data = self.load_repo_data(filename) if repo_data: + self.match_id = self._match_id_from_filename(filename) self.match_info = { "displayName": repo_data.get("displayName"), } diff --git a/src/IntuneCD/update/Intune/ScopeTags.py b/src/IntuneCD/update/Intune/ScopeTags.py index 6805c8c0..19a87a74 100644 --- a/src/IntuneCD/update/Intune/ScopeTags.py +++ b/src/IntuneCD/update/Intune/ScopeTags.py @@ -47,9 +47,15 @@ def main(self) -> dict[str, any]: "/assignments", ) - for filename in os.listdir(self.path): + filenames = os.listdir(self.path) + skip = self._duplicate_filenames_to_skip(filenames) + + for filename in filenames: + if filename in skip: + continue repo_data = self.load_repo_data(filename) if repo_data: + self.match_id = self._match_id_from_filename(filename) self.match_info = { "displayName": repo_data.get("displayName"), } diff --git a/src/IntuneCD/update/Intune/SettingsCatalog.py b/src/IntuneCD/update/Intune/SettingsCatalog.py index 37cdfe69..3a3195eb 100644 --- a/src/IntuneCD/update/Intune/SettingsCatalog.py +++ b/src/IntuneCD/update/Intune/SettingsCatalog.py @@ -60,7 +60,12 @@ def main(self) -> dict[str, any]: if settings: profile["settings"] = settings - for filename in os.listdir(self.path): + filenames = os.listdir(self.path) + skip = self._duplicate_filenames_to_skip(filenames) + + for filename in filenames: + if filename in skip: + continue repo_data = self.load_repo_data(filename) if repo_data: if ( @@ -72,6 +77,7 @@ def main(self) -> dict[str, any]: msg=f'Skipping "{repo_data["name"]}", Endpoint detection and response is currently not supported...', ) continue + self.match_id = self._match_id_from_filename(filename) self.match_info = { "name": repo_data.get("name"), "technologies": repo_data.get("technologies"), diff --git a/src/IntuneCD/update/Intune/ShellScripts.py b/src/IntuneCD/update/Intune/ShellScripts.py index c36b08fe..9e2f142f 100644 --- a/src/IntuneCD/update/Intune/ShellScripts.py +++ b/src/IntuneCD/update/Intune/ShellScripts.py @@ -97,11 +97,17 @@ def main(self) -> dict[str, any]: "", ) - for filename in os.listdir(self.path): + filenames = os.listdir(self.path) + skip = self._duplicate_filenames_to_skip(filenames) + + for filename in filenames: + if filename in skip: + continue self.notify = True script_data = None repo_data = self.load_repo_data(filename) if repo_data: + self.match_id = self._match_id_from_filename(filename) self.match_info = { "displayName": repo_data.get("displayName"), } diff --git a/src/IntuneCD/update/Intune/WindowsDriverUpdates.py b/src/IntuneCD/update/Intune/WindowsDriverUpdates.py index 82811b8d..376f32da 100644 --- a/src/IntuneCD/update/Intune/WindowsDriverUpdates.py +++ b/src/IntuneCD/update/Intune/WindowsDriverUpdates.py @@ -49,9 +49,15 @@ def main(self) -> dict[str, any]: "/assignments", ) - for filename in os.listdir(self.path): + filenames = os.listdir(self.path) + skip = self._duplicate_filenames_to_skip(filenames) + + for filename in filenames: + if filename in skip: + continue repo_data = self.load_repo_data(filename) if repo_data: + self.match_id = self._match_id_from_filename(filename) self.match_info = { "displayName": repo_data.get("displayName"), } diff --git a/src/IntuneCD/update/Intune/WindowsEnrollmentProfile.py b/src/IntuneCD/update/Intune/WindowsEnrollmentProfile.py index ff4bed86..24f6d98f 100644 --- a/src/IntuneCD/update/Intune/WindowsEnrollmentProfile.py +++ b/src/IntuneCD/update/Intune/WindowsEnrollmentProfile.py @@ -51,9 +51,15 @@ def main(self) -> dict[str, any]: "/assignments", ) - for filename in os.listdir(self.path): + filenames = os.listdir(self.path) + skip = self._duplicate_filenames_to_skip(filenames) + + for filename in filenames: + if filename in skip: + continue repo_data = self.load_repo_data(filename) if repo_data: + self.match_id = self._match_id_from_filename(filename) self.match_info = { "displayName": repo_data.get("displayName"), } diff --git a/src/IntuneCD/update/Intune/WindowsFeatureUpdates.py b/src/IntuneCD/update/Intune/WindowsFeatureUpdates.py index bd6dd50b..fa5cdfae 100644 --- a/src/IntuneCD/update/Intune/WindowsFeatureUpdates.py +++ b/src/IntuneCD/update/Intune/WindowsFeatureUpdates.py @@ -48,9 +48,15 @@ def main(self) -> dict[str, any]: "/assignments", ) - for filename in os.listdir(self.path): + filenames = os.listdir(self.path) + skip = self._duplicate_filenames_to_skip(filenames) + + for filename in filenames: + if filename in skip: + continue repo_data = self.load_repo_data(filename) if repo_data: + self.match_id = self._match_id_from_filename(filename) self.match_info = { "displayName": repo_data.get("displayName"), } diff --git a/src/IntuneCD/update/Intune/WindowsQualityUpdates.py b/src/IntuneCD/update/Intune/WindowsQualityUpdates.py index 135e92e8..16602c82 100644 --- a/src/IntuneCD/update/Intune/WindowsQualityUpdates.py +++ b/src/IntuneCD/update/Intune/WindowsQualityUpdates.py @@ -47,9 +47,15 @@ def main(self) -> dict[str, any]: "/assignments", ) - for filename in os.listdir(self.path): + filenames = os.listdir(self.path) + skip = self._duplicate_filenames_to_skip(filenames) + + for filename in filenames: + if filename in skip: + continue repo_data = self.load_repo_data(filename) if repo_data: + self.match_id = self._match_id_from_filename(filename) self.match_info = { "displayName": repo_data.get("displayName"), } From d88289417c61d083c620f7251f6d3d32e0e8a97a Mon Sep 17 00:00:00 2001 From: akmhatey-ai <260399619+akmhatey-ai@users.noreply.github.com> Date: Sat, 16 May 2026 15:01:25 +0200 Subject: [PATCH 10/17] Sanitize control characters in backup filenames --- src/IntuneCD/intunecdlib/BaseBackupModule.py | 1 + tests/test_base_backup_module.py | 12 ++++++++++++ 2 files changed, 13 insertions(+) create mode 100644 tests/test_base_backup_module.py diff --git a/src/IntuneCD/intunecdlib/BaseBackupModule.py b/src/IntuneCD/intunecdlib/BaseBackupModule.py index 9c41588c..67f8075a 100644 --- a/src/IntuneCD/intunecdlib/BaseBackupModule.py +++ b/src/IntuneCD/intunecdlib/BaseBackupModule.py @@ -85,6 +85,7 @@ def _prepare_file_name(self, filename: str) -> str: filename = str(filename) for character in remove_characters: filename = filename.replace(character, "_") + filename = re.sub(r"[\x00-\x1f]+", "_", filename) return filename diff --git a/tests/test_base_backup_module.py b/tests/test_base_backup_module.py new file mode 100644 index 00000000..459bdf91 --- /dev/null +++ b/tests/test_base_backup_module.py @@ -0,0 +1,12 @@ +# -*- coding: utf-8 -*- +from src.IntuneCD.intunecdlib.BaseBackupModule import BaseBackupModule + + +def test_prepare_file_name_replaces_linebreaks(): + module = BaseBackupModule() + + prepared = module._prepare_file_name("Application_win32_6_17_2_2\r\n") + + assert prepared == "Application_win32_6_17_2_2_" + assert "\r" not in prepared + assert "\n" not in prepared From 2410da2fcc04da2e39643af02adf27576a3fd989 Mon Sep 17 00:00:00 2001 From: akmhatey-ai <260399619+akmhatey-ai@users.noreply.github.com> Date: Sat, 16 May 2026 15:49:28 +0200 Subject: [PATCH 11/17] Address filename sanitizer review --- src/IntuneCD/intunecdlib/BaseBackupModule.py | 8 ++++---- tests/test_base_backup_module.py | 8 ++++++++ 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/src/IntuneCD/intunecdlib/BaseBackupModule.py b/src/IntuneCD/intunecdlib/BaseBackupModule.py index 67f8075a..718cc099 100644 --- a/src/IntuneCD/intunecdlib/BaseBackupModule.py +++ b/src/IntuneCD/intunecdlib/BaseBackupModule.py @@ -9,6 +9,8 @@ class BaseBackupModule(BaseGraphModule): """Base class for backup modules.""" + REMOVE_CHARACTERS = '/\\:*?<>"|' + def __init__( self, path: str = None, @@ -80,12 +82,10 @@ def _prepare_file_name(self, filename: str) -> str: str: The prepared filename """ - remove_characters = '/\\:*?<>"|' if not isinstance(filename, str): filename = str(filename) - for character in remove_characters: - filename = filename.replace(character, "_") - filename = re.sub(r"[\x00-\x1f]+", "_", filename) + filename = re.sub(r"[\x00-\x1f\x7f]+", "_", filename) + filename = re.sub(rf"[{re.escape(self.REMOVE_CHARACTERS)}]", "_", filename) return filename diff --git a/tests/test_base_backup_module.py b/tests/test_base_backup_module.py index 459bdf91..0f9e0fc3 100644 --- a/tests/test_base_backup_module.py +++ b/tests/test_base_backup_module.py @@ -10,3 +10,11 @@ def test_prepare_file_name_replaces_linebreaks(): assert prepared == "Application_win32_6_17_2_2_" assert "\r" not in prepared assert "\n" not in prepared + + +def test_prepare_file_name_replaces_windows_reserved_characters(): + module = BaseBackupModule() + + prepared = module._prepare_file_name('Application/\\:*?<>"|Name') + + assert prepared == "Application_________Name" From c3a683a46008f5515109cf10739dc69b7add0e0a Mon Sep 17 00:00:00 2001 From: akmhatey-ai <260399619+akmhatey-ai@users.noreply.github.com> Date: Sat, 16 May 2026 15:53:48 +0200 Subject: [PATCH 12/17] Remove zero-width filename characters --- src/IntuneCD/intunecdlib/BaseBackupModule.py | 4 +++- tests/test_base_backup_module.py | 10 +++++++++- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/src/IntuneCD/intunecdlib/BaseBackupModule.py b/src/IntuneCD/intunecdlib/BaseBackupModule.py index 718cc099..8b4c29dc 100644 --- a/src/IntuneCD/intunecdlib/BaseBackupModule.py +++ b/src/IntuneCD/intunecdlib/BaseBackupModule.py @@ -10,6 +10,7 @@ class BaseBackupModule(BaseGraphModule): """Base class for backup modules.""" REMOVE_CHARACTERS = '/\\:*?<>"|' + ZERO_WIDTH_CHARACTERS = "\u200b\u200c\u200d\u2060\ufeff" def __init__( self, @@ -84,7 +85,8 @@ def _prepare_file_name(self, filename: str) -> str: if not isinstance(filename, str): filename = str(filename) - filename = re.sub(r"[\x00-\x1f\x7f]+", "_", filename) + filename = re.sub(r"[\x00-\x1f\x7f]+", "", filename) + filename = re.sub(rf"[{re.escape(self.ZERO_WIDTH_CHARACTERS)}]+", "", filename) filename = re.sub(rf"[{re.escape(self.REMOVE_CHARACTERS)}]", "_", filename) return filename diff --git a/tests/test_base_backup_module.py b/tests/test_base_backup_module.py index 0f9e0fc3..c34fa256 100644 --- a/tests/test_base_backup_module.py +++ b/tests/test_base_backup_module.py @@ -7,7 +7,7 @@ def test_prepare_file_name_replaces_linebreaks(): prepared = module._prepare_file_name("Application_win32_6_17_2_2\r\n") - assert prepared == "Application_win32_6_17_2_2_" + assert prepared == "Application_win32_6_17_2_2" assert "\r" not in prepared assert "\n" not in prepared @@ -18,3 +18,11 @@ def test_prepare_file_name_replaces_windows_reserved_characters(): prepared = module._prepare_file_name('Application/\\:*?<>"|Name') assert prepared == "Application_________Name" + + +def test_prepare_file_name_removes_zero_width_characters(): + module = BaseBackupModule() + + prepared = module._prepare_file_name("Application\u200bName\ufeff") + + assert prepared == "ApplicationName" From ee0f267f2e69dea4e5810d4687f9e5d6c21ffbff Mon Sep 17 00:00:00 2001 From: David Haioum Date: Sat, 23 May 2026 10:38:28 +0800 Subject: [PATCH 13/17] fix: match proactive remediation scripts by policy --- .../update/Intune/ProactiveRemediation.py | 21 ++-- .../Intune/test_ProactiveRemediation.py | 101 ++++++++++++++++++ 2 files changed, 114 insertions(+), 8 deletions(-) create mode 100644 tests/Update/Intune/test_ProactiveRemediation.py diff --git a/src/IntuneCD/update/Intune/ProactiveRemediation.py b/src/IntuneCD/update/Intune/ProactiveRemediation.py index f0b3e3aa..d7e9ddcd 100644 --- a/src/IntuneCD/update/Intune/ProactiveRemediation.py +++ b/src/IntuneCD/update/Intune/ProactiveRemediation.py @@ -29,20 +29,25 @@ def __init__(self, *args, **kwargs): self.assignment_key = "deviceHealthScriptAssignments" def _get_script_data(self, filename: str) -> tuple[str, str]: - fname_id = filename.split("__") detection_script_name = "" remediation_script_name = "" - if len(fname_id) > 1: - fname_id = fname_id[1].replace(".json", "").replace(".yaml", "") - else: - fname_id = "" - # Get all remediation scripts and detection scripts files script_files = os.listdir(self.script_data_path) - # Filter out files that matches the id - script_files = [f for f in script_files if fname_id in f] + # Prefer the filename prefix used when backups do not append ids. + filename_prefix = os.path.splitext(filename)[0] + prefix_matches = [ + f for f in script_files if f.startswith(f"{filename_prefix}_") + ] + if prefix_matches: + script_files = prefix_matches + else: + filename_parts = filename_prefix.rsplit("__", 1) + if len(filename_parts) > 1: + script_files = [f for f in script_files if filename_parts[1] in f] + else: + script_files = [] # Set detection and remediation script name and path for f in script_files: diff --git a/tests/Update/Intune/test_ProactiveRemediation.py b/tests/Update/Intune/test_ProactiveRemediation.py new file mode 100644 index 00000000..9665125b --- /dev/null +++ b/tests/Update/Intune/test_ProactiveRemediation.py @@ -0,0 +1,101 @@ +# -*- coding: utf-8 -*- +import os +import tempfile +import unittest +from unittest.mock import patch + +from src.IntuneCD.update.Intune.ProactiveRemediation import ( + ProactiveRemediationUpdateModule, +) + + +class TestProactiveRemediationUpdateModule(unittest.TestCase): + """Tests for the ProactiveRemediationUpdateModule class.""" + + def setUp(self): + self.temp_dir = tempfile.TemporaryDirectory() + self.module = ProactiveRemediationUpdateModule(path=self.temp_dir.name) + os.makedirs(self.module.script_data_path) + + def tearDown(self): + self.temp_dir.cleanup() + + def _write_script(self, filename: str, content: str) -> None: + with open( + f"{self.module.script_data_path}{filename}", "w", encoding="utf-8" + ) as f: + f.write(content) + + def test_get_script_data_matches_filename_prefix_without_append_id(self): + """Test that scripts are matched by policy name when filenames have no id.""" + script_files = [ + "Policy A_detectionScript.ps1", + "Policy A_remediationScript.ps1", + "Policy A Extra_detectionScript.ps1", + "Policy A Extra_remediationScript.ps1", + "Policy B_detectionScript.ps1", + "Policy B_remediationScript.ps1", + ] + for script_file in script_files: + self._write_script(script_file, script_file) + + with patch( + "src.IntuneCD.update.Intune.ProactiveRemediation.os.listdir", + return_value=script_files, + ): + detection_script, remediation_script = self.module._get_script_data( + "Policy A.yaml" + ) + + self.assertEqual(detection_script, "Policy A_detectionScript.ps1") + self.assertEqual(remediation_script, "Policy A_remediationScript.ps1") + + def test_get_script_data_matches_id_with_append_id(self): + """Test that scripts are matched by id when filenames have appended ids.""" + script_files = [ + "Policy A_detectionScript__policy-a-id.ps1", + "Policy A_remediationScript__policy-a-id.ps1", + "Policy B_detectionScript__policy-b-id.ps1", + "Policy B_remediationScript__policy-b-id.ps1", + ] + for script_file in script_files: + self._write_script(script_file, script_file) + + with patch( + "src.IntuneCD.update.Intune.ProactiveRemediation.os.listdir", + return_value=script_files, + ): + detection_script, remediation_script = self.module._get_script_data( + "Policy A__policy-a-id.yaml" + ) + + self.assertEqual(detection_script, "Policy A_detectionScript__policy-a-id.ps1") + self.assertEqual( + remediation_script, "Policy A_remediationScript__policy-a-id.ps1" + ) + + def test_get_script_data_handles_double_underscore_without_append_id(self): + """Test that double underscores in policy names are not treated as ids.""" + script_files = [ + "Policy__A_detectionScript.ps1", + "Policy__A_remediationScript.ps1", + "Policy B_detectionScript__A.ps1", + "Policy B_remediationScript__A.ps1", + ] + for script_file in script_files: + self._write_script(script_file, script_file) + + with patch( + "src.IntuneCD.update.Intune.ProactiveRemediation.os.listdir", + return_value=script_files, + ): + detection_script, remediation_script = self.module._get_script_data( + "Policy__A.yaml" + ) + + self.assertEqual(detection_script, "Policy__A_detectionScript.ps1") + self.assertEqual(remediation_script, "Policy__A_remediationScript.ps1") + + +if __name__ == "__main__": + unittest.main() From d4ecc285edf789977850fb7443d477da37a265c9 Mon Sep 17 00:00:00 2001 From: Tobias Date: Fri, 29 May 2026 13:54:50 +0200 Subject: [PATCH 14/17] formatting --- .../intunecdlib/documentation_functions.py | 119 +++++++++++------- 1 file changed, 77 insertions(+), 42 deletions(-) diff --git a/src/IntuneCD/intunecdlib/documentation_functions.py b/src/IntuneCD/intunecdlib/documentation_functions.py index ff7b795f..3f270edf 100644 --- a/src/IntuneCD/intunecdlib/documentation_functions.py +++ b/src/IntuneCD/intunecdlib/documentation_functions.py @@ -53,12 +53,12 @@ def escape_markdown(text): :return: The escaped text """ # Regex to match http/https links - link_pattern = re.compile(r'(https?://[^\s\)\]\}]+)') + link_pattern = re.compile(r"(https?://[^\s\)\]\}]+)") parts = [] last_end = 0 for match in link_pattern.finditer(text): # Escape markdown in text before the link - before = text[last_end:match.start()] + before = text[last_end : match.start()] escaped = re.sub(r"([\_*\[\]()\{\}`>\#\+\-=|\.!])", r"\\\1", before) parts.append(escaped) # Add the link unescaped @@ -68,7 +68,7 @@ def escape_markdown(text): after = text[last_end:] escaped_after = re.sub(r"([\_*\[\]()\{\}`>\#\+\-=|\.!])", r"\\\1", after) parts.append(escaped_after) - return ''.join(parts) + return "".join(parts) def sanitize_text(text): @@ -77,9 +77,9 @@ def sanitize_text(text): :param text: The text to be sanitized :return: The sanitized text """ - text = re.sub(r'[ \t]+', ' ', text) - text = re.sub(r'[\r\n]+', '\n', text) - text = re.sub(r'[^\x20-\x7E\n]', '', text) + text = re.sub(r"[ \t]+", " ", text) + text = re.sub(r"[\r\n]+", "\n", text) + text = re.sub(r"[^\x20-\x7E\n]", "", text) return text.strip() @@ -89,7 +89,7 @@ def convert_newlines_to_br(text): :param text: The input text :return: Text with newlines replaced by
""" - return text.replace('\n', '
') + return text.replace("\n", "
") def assignment_table(data): @@ -144,16 +144,14 @@ def write_assignment_table(data, headers): [ intent, target, - assignment["target"][ - "deviceAndAppManagementAssignmentFilterType" - ], - assignment["target"][ - "deviceAndAppManagementAssignmentFilterId" - ], + assignment["target"]["deviceAndAppManagementAssignmentFilterType"], + assignment["target"]["deviceAndAppManagementAssignmentFilterId"], ] ) - assignment_list.sort(key=lambda x: x[0], reverse=True) # Sort by the 'Intent' column in reverse order + assignment_list.sort( + key=lambda x: x[0], reverse=True + ) # Sort by the 'Intent' column in reverse order table = write_assignment_table(assignment_list, headers) return table @@ -258,7 +256,9 @@ def dict_to_ul(val) -> str: def simple_value_to_string(key, val) -> str: if decode and is_base64(val): val = decode_base64(val) - val = val.replace("\r\n", "
").replace("\r", "
").replace("\n", "
") + val = ( + val.replace("\r\n", "
").replace("\r", "
").replace("\n", "
") + ) return f"**{key}:**
Click to expand...{val}

" if isinstance(val, str): @@ -654,8 +654,8 @@ def escape_backslash_for_md(value): :return: The processed string with backslashes properly escaped for Markdown """ escapable = r"_*\[\](){}#`>+-=|.!" - value = re.sub(rf'(?".join([f'[{url}]({url})' for i, url in enumerate(info_urls)]) + links = "
".join([f"[{url}]({url})" for i, url in enumerate(info_urls)]) description = f"{description}
InfoUrls:
{links}" if description else links - description = f"
Click to expand...{description}
" if description else "" + description = ( + f"
Click to expand...{description}
" + if description + else "" + ) if "simpleSettingValue" in setting_instance: value = setting_instance["simpleSettingValue"].get("value", "") - formatted_value = escape_backslash_for_md(rf"{value}") if value != "" else "Not configured" + formatted_value = ( + escape_backslash_for_md(rf"{value}") if value != "" else "Not configured" + ) return [[display_name, formatted_value, description]] elif "simpleSettingCollectionValue" in setting_instance: @@ -707,9 +713,15 @@ def escape_backslash_for_md(value): if value and "options" in definition: for option in definition["options"]: if option.get("value") == value or option.get("itemId") == value: - option_display_name = option.get("displayName") or option.get("name") + option_display_name = option.get("displayName") or option.get( + "name" + ) break - formatted_value = option_display_name if option_display_name else (value if value else "Not configured") + formatted_value = ( + option_display_name + if option_display_name + else (value if value else "Not configured") + ) rows = [] rows.append([display_name, formatted_value, description]) for child in children: @@ -794,32 +806,48 @@ def document_settings_catalog( config_table_list = [] for setting in repo_data.get("settings", []): - rows = extract_setting(setting.get("settingInstance", {}), settings_lookup) + rows = extract_setting( + setting.get("settingInstance", {}), settings_lookup + ) for row in rows: setting_name = row[0] value = row[1] description = row[2] - setting_definition_id = setting.get("settingInstance", {}).get("settingDefinitionId", "") + setting_definition_id = setting.get("settingInstance", {}).get( + "settingDefinitionId", "" + ) definition = settings_lookup.get(setting_definition_id, {}) category_id = definition.get("categoryId", "") - category_name = categories_lookup.get(category_id, {}).get("displayName", "") - root_category_id = categories_lookup.get(category_id, {}).get("rootCategoryId", "") - root_category_name = categories_lookup.get(root_category_id, {}).get("displayName", "") - - if max_length and isinstance(value, str) and len(value) > max_length: + category_name = categories_lookup.get(category_id, {}).get( + "displayName", "" + ) + root_category_id = categories_lookup.get(category_id, {}).get( + "rootCategoryId", "" + ) + root_category_name = categories_lookup.get( + root_category_id, {} + ).get("displayName", "") + + if ( + max_length + and isinstance(value, str) + and len(value) > max_length + ): value = "Value too long to display" - config_table_list.append({ - "setting_name": setting_name, - "value": value, - "description": description, - "category_name": category_name, - "root_category_name": root_category_name - }) + config_table_list.append( + { + "setting_name": setting_name, + "value": value, + "description": description, + "category_name": category_name, + "root_category_name": root_category_name, + } + ) # Sort by category_name, then root_category_name config_table_list_sorted = sorted( config_table_list, - key=lambda x: (x["root_category_name"], x["category_name"]) + key=lambda x: (x["root_category_name"], x["category_name"]), ) # Group items by root_category_name and category_name @@ -827,14 +855,17 @@ def document_settings_catalog( for item in config_table_list_sorted: grouped[item["root_category_name"]][item["category_name"]].append(item) - # Output file logic - config_name = repo_data.get("name", os.path.splitext(os.path.basename(filename))[0]) + config_name = repo_data.get( + "name", os.path.splitext(os.path.basename(filename))[0] + ) safe_config_name = re.sub(r'[<>:"/\\|?*]', "_", config_name) if split_per_config: if not os.path.exists(f"{configpath}/docs"): os.makedirs(f"{configpath}/docs") - config_outpath = os.path.join(f"{configpath}/docs", f"{safe_config_name}.md") + config_outpath = os.path.join( + f"{configpath}/docs", f"{safe_config_name}.md" + ) md_file(config_outpath) target_md = config_outpath top_header = f"# {config_name}" @@ -866,8 +897,12 @@ def document_settings_catalog( else: table_data.append([f"**{root_cat}** > **{cat}**", "", ""]) for i in items: - table_data.append([i["setting_name"], i["value"], i["description"]]) - table_md = write_table(table_data, headers=["Setting", "Value", "Description"]) + table_data.append( + [i["setting_name"], i["value"], i["description"]] + ) + table_md = write_table( + table_data, headers=["Setting", "Value", "Description"] + ) md.write(str(table_md) + "\n") except Exception as e: From 493127477fe7edde4f8b952a6d458b29fd38a65b Mon Sep 17 00:00:00 2001 From: Tobias Date: Fri, 29 May 2026 13:55:32 +0200 Subject: [PATCH 15/17] Add color output options and HTML report generation to run_compare - Implemented ANSI color codes for terminal output based on user preference. - Added `--no-color` argument to disable colored output. - Introduced `--html` argument to generate a self-contained HTML report alongside the JSON output. - Enhanced output formatting for better readability, including colored indicators for source and target differences. --- src/IntuneCD/intunecdlib/compare_html.py | 1029 ++++++++++++++++++++++ src/IntuneCD/run_compare.py | 115 ++- 2 files changed, 1129 insertions(+), 15 deletions(-) create mode 100644 src/IntuneCD/intunecdlib/compare_html.py diff --git a/src/IntuneCD/intunecdlib/compare_html.py b/src/IntuneCD/intunecdlib/compare_html.py new file mode 100644 index 00000000..5d2d2619 --- /dev/null +++ b/src/IntuneCD/intunecdlib/compare_html.py @@ -0,0 +1,1029 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +"""HTML renderer for run_compare drift summaries. + +Produces a self-contained dark-themed HTML report with inline char-diff +highlighting, a sticky controls bar, filterable sections, copy-diff buttons, +and a few keyboard shortcuts. +""" + +import html +import json +import re +from collections import defaultdict +from datetime import datetime, timezone + + +_STYLE = """ +:root { + --canvas: #0c0c0d; + --surface-1: #131316; + --surface-2: #1a1a1f; + --hairline: #26262d; + --hairline-hi: #3a3a45; + + --text: #ece9e0; + --text-dim: #8b8880; + --text-faint: #5a5752; + + --amber: #f2a23a; + --amber-soft: rgba(242,162,58,0.10); + --lime: #a3d959; + --lime-soft: rgba(163,217,89,0.08); + --vermilion: #f15a3e; + --vermilion-soft:rgba(241,90,62,0.08); +} + +* { box-sizing: border-box; } +*::selection { background: var(--amber); color: var(--canvas); } + +html, body { + margin: 0; + padding: 0; + background: var(--canvas); + color: var(--text); + font-family: "JetBrains Mono", ui-monospace, "SF Mono", Menlo, Consolas, monospace; + font-size: 13.5px; + line-height: 1.55; + font-variant-numeric: tabular-nums; + font-feature-settings: "ss01", "ss02", "zero"; + -webkit-font-smoothing: antialiased; + text-rendering: optimizeLegibility; +} + +body::before { + content: ""; + position: fixed; + inset: 0; + pointer-events: none; + z-index: 100; + opacity: 0.05; + mix-blend-mode: overlay; + background-image: url("data:image/svg+xml;utf8,"); +} + +.container { + max-width: 1180px; + margin: 0 auto; + padding: 56px 40px 80px; + position: relative; +} + +header { + display: grid; + grid-template-columns: 1fr auto; + align-items: end; + gap: 32px; + padding-bottom: 28px; + border-bottom: 1px solid var(--hairline); + margin-bottom: 36px; + animation: fade-up 700ms ease-out backwards; +} + +.wordmark { + display: flex; + align-items: center; + gap: 10px; + font-size: 10.5px; + letter-spacing: 0.22em; + text-transform: uppercase; + color: var(--text-dim); + margin-bottom: 14px; +} +.wordmark .pill { + display: inline-block; + padding: 2px 9px; + border: 1px solid var(--hairline-hi); + border-radius: 1px; + color: var(--text); + font-weight: 500; + letter-spacing: 0.18em; +} +.wordmark .div { color: var(--text-faint); } + +.title { + font-family: inherit; + font-weight: 500; + font-size: 38px; + line-height: 1.18; + letter-spacing: -0.02em; + color: var(--text); + margin: 0; + max-width: 26ch; +} +.title em { + color: var(--amber); + font-style: normal; + font-weight: 700; + letter-spacing: -0.025em; +} + +.run-meta { + font-size: 11px; + color: var(--text-dim); + text-align: right; + line-height: 2.0; + letter-spacing: 0.04em; + font-variant-numeric: tabular-nums; +} +.run-meta div { display: flex; align-items: baseline; justify-content: flex-end; gap: 10px; } +.run-meta .key { color: var(--text-faint); font-size: 10px; letter-spacing: 0.2em; } +.run-meta .val { color: var(--text); word-break: break-all; } +.run-meta .val .dim { color: var(--text-dim); } + +.instrument { + display: grid; + grid-template-columns: auto 1fr; + gap: 48px; + align-items: end; + padding: 28px 0 40px; + border-bottom: 1px solid var(--hairline); + margin-bottom: 36px; + animation: fade-up 700ms ease-out 80ms backwards; +} + +.hero-stat .num { + font-family: inherit; + font-weight: 700; + font-size: 132px; + line-height: 0.9; + color: var(--amber); + letter-spacing: -0.06em; + font-variant-numeric: tabular-nums; +} +.hero-stat.ok .num { color: var(--lime); } + +.hero-stat .label { + font-size: 10.5px; + letter-spacing: 0.22em; + text-transform: uppercase; + color: var(--text-dim); + margin-top: 10px; +} + +.stat-cluster { + display: grid; + grid-template-columns: repeat(3, 1fr); + border-left: 1px solid var(--hairline); + align-self: end; +} +.stat { + padding: 4px 24px; + border-right: 1px solid var(--hairline); +} +.stat:last-child { border-right: none; } +.stat .n { + font-size: 30px; + font-weight: 500; + line-height: 1.05; + color: var(--text); + letter-spacing: -0.01em; +} +.stat.add .n { color: var(--lime); } +.stat.rm .n { color: var(--vermilion); } +.stat .l { + font-size: 10px; + letter-spacing: 0.18em; + text-transform: uppercase; + color: var(--text-dim); + margin-top: 6px; +} + +.sticky-sentinel { height: 1px; margin-bottom: -1px; } + +.controls-wrap { + position: sticky; + top: 0; + z-index: 40; + margin: 0 -40px 28px; + padding: 14px 40px; + border-bottom: 1px solid transparent; + transition: border-color 240ms ease, background 240ms ease; + animation: fade-up 700ms ease-out 160ms backwards; +} +.controls-wrap.pinned { + background: rgba(12,12,13,0.78); + -webkit-backdrop-filter: blur(14px) saturate(140%); + backdrop-filter: blur(14px) saturate(140%); + border-bottom-color: var(--hairline); +} + +.controls { + display: flex; + gap: 8px; + align-items: stretch; +} +.controls .search { flex: 1 1 0; min-width: 0; } +.controls .drift-chip, +.controls .btn { flex: 0 0 auto; } + +.drift-chip { + display: none; + align-items: center; + gap: 10px; + padding: 9px 14px; + border: 1px solid var(--amber); + background: var(--amber-soft); + border-radius: 1px; + font-size: 10px; + font-weight: 700; + letter-spacing: 0.18em; + text-transform: uppercase; + color: var(--amber); + white-space: nowrap; +} +.controls-wrap.pinned .drift-chip { display: inline-flex; } +.drift-chip .dot { + width: 6px; height: 6px; + border-radius: 50%; + background: var(--amber); + box-shadow: 0 0 8px rgba(242,162,58,0.55); +} +.drift-chip .n { + color: var(--text); + font-size: 13px; + letter-spacing: 0; + font-weight: 700; +} + +.search { position: relative; } +.search input { + width: 100%; + background: var(--surface-1); + border: 1px solid var(--hairline); + border-radius: 1px; + padding: 12px 44px 12px 40px; + color: var(--text); + font-family: inherit; + font-size: 13px; + letter-spacing: 0.01em; + transition: border-color 160ms, background 160ms; +} +.search input::placeholder { color: var(--text-faint); } +.search input:focus { + outline: none; + border-color: var(--amber); + background: var(--surface-2); +} +.search::before { + content: "▸"; + position: absolute; + left: 18px; + top: 50%; + transform: translateY(-50%); + color: var(--amber); + font-size: 11px; +} +.kbd-hint { + position: absolute; + right: 12px; + top: 50%; + transform: translateY(-50%); + display: inline-flex; + align-items: center; + justify-content: center; + min-width: 18px; + height: 18px; + padding: 0 5px; + font-family: inherit; + font-size: 10px; + font-weight: 700; + color: var(--text-faint); + background: var(--surface-2); + border: 1px solid var(--hairline); + border-radius: 2px; + pointer-events: none; + letter-spacing: 0; + transition: opacity 160ms; +} +.search input:focus ~ .kbd-hint { opacity: 0; } + +.no-results { + display: none; + padding: 36px 24px; + text-align: center; + color: var(--text-dim); + font-size: 12px; + letter-spacing: 0.04em; + border: 1px dashed var(--hairline); + border-radius: 1px; + margin-bottom: 14px; +} +.no-results.visible { display: block; } +.no-results .q { color: var(--amber); font-weight: 600; } +.no-results .btn-link { + background: transparent; + border: none; + color: var(--text); + cursor: pointer; + font-family: inherit; + font-size: 11px; + letter-spacing: 0.18em; + text-transform: uppercase; + margin-left: 16px; + padding: 0; + text-decoration: underline; + text-underline-offset: 4px; + text-decoration-color: var(--text-faint); +} +.no-results .btn-link:hover { text-decoration-color: var(--text); } + +.btn { + display: inline-flex; + align-items: center; + justify-content: center; + background: transparent; + border: 1px solid var(--hairline); + border-radius: 1px; + color: var(--text-dim); + padding: 0 18px; + min-height: 40px; + font-family: inherit; + font-size: 10.5px; + letter-spacing: 0.18em; + text-transform: uppercase; + cursor: pointer; + transition: color 140ms, border-color 140ms; +} +.btn:hover { color: var(--text); border-color: var(--text); } + +details.section { + background: var(--surface-1); + border: 1px solid var(--hairline); + border-radius: 1px; + margin-bottom: 14px; + overflow: hidden; + animation: fade-up 700ms ease-out backwards; +} +details.section[data-kind="add"] { animation-delay: 220ms; } +details.section[data-kind="rm"] { animation-delay: 260ms; } +details.section[data-kind="mod"] { animation-delay: 300ms; } + +details.section > summary { + padding: 16px 20px; + cursor: pointer; + user-select: none; + display: flex; + align-items: center; + gap: 16px; + list-style: none; +} +details.section > summary::-webkit-details-marker { display: none; } +details.section > summary::after { + content: "▾"; + margin-left: 4px; + color: var(--text-faint); + font-size: 10px; + transition: transform 200ms; +} +details.section[open] > summary::after { transform: rotate(180deg); } + +.sig { + font-size: 10px; + font-weight: 700; + letter-spacing: 0.22em; + text-transform: uppercase; + padding: 4px 10px; + border: 1px solid; + border-radius: 1px; +} +.sig.add { color: var(--lime); border-color: var(--lime); } +.sig.rm { color: var(--vermilion); border-color: var(--vermilion); } +.sig.mod { color: var(--amber); border-color: var(--amber); } + +.section-title { + font-size: 14px; + font-weight: 500; + color: var(--text); + letter-spacing: 0.005em; +} +.section-count { + font-family: inherit; + font-weight: 500; + font-size: 18px; + color: var(--text-dim); + line-height: 1; + margin-left: auto; + letter-spacing: 0.02em; + font-variant-numeric: tabular-nums; +} + +.section-body { + padding: 6px 20px 20px; + border-top: 1px solid var(--hairline); +} + +.path-list { list-style: none; margin: 0; padding: 0; } +.path-list li { + display: grid; + grid-template-columns: 22px 1fr; + align-items: baseline; + gap: 8px; + padding: 12px 8px; + border-bottom: 1px dotted var(--hairline); + font-size: 12.5px; + transition: background 140ms, padding-left 220ms cubic-bezier(.2,.7,.3,1); +} +.path-list li:last-child { border-bottom: none; } +.path-list li:hover { background: var(--surface-2); padding-left: 14px; } +.path-list li .prefix { + font-weight: 700; + font-size: 15px; + line-height: 1; +} +.path-list li.add .prefix { color: var(--lime); } +.path-list li.rm .prefix { color: var(--vermilion); } +.path-list li .path { + color: var(--text); + word-break: break-all; +} +.path-list li .path .dim { color: var(--text-faint); } + +.config-item { + padding: 22px 4px; + border-bottom: 1px dotted var(--hairline); +} +.config-item:last-child { border-bottom: none; } + +.config-head { + display: grid; + grid-template-columns: 1fr auto; + gap: 6px 16px; + align-items: baseline; + margin-bottom: 16px; +} +.config-name { + font-family: inherit; + font-weight: 600; + font-size: 18px; + color: var(--text); + line-height: 1.2; + letter-spacing: -0.005em; +} +.config-file { + grid-column: 1 / -1; + color: var(--text-faint); + font-size: 11px; + word-break: break-all; + letter-spacing: 0.005em; +} +.config-file .dim { color: var(--text-faint); opacity: 0.7; } +.config-badge { + font-size: 10px; + letter-spacing: 0.18em; + text-transform: uppercase; + color: var(--amber); + padding: 4px 10px; + border: 1px solid var(--amber); + border-radius: 1px; + white-space: nowrap; + align-self: start; +} + +.diff-list { + display: flex; + flex-direction: column; + gap: 1px; + background: var(--hairline); + border: 1px solid var(--hairline); + border-radius: 1px; + overflow: hidden; +} +.diff { + display: grid; + grid-template-columns: 2px 1fr auto; + background: var(--canvas); + padding: 16px 18px; + gap: 0 18px; + align-items: start; + transition: background 140ms; +} +.diff:hover { background: var(--surface-2); } +.diff .bar { + width: 2px; + background: var(--amber); + align-self: stretch; + min-height: 60px; +} +.diff .body { + display: flex; + flex-direction: column; + gap: 10px; + min-width: 0; +} +.diff .setting { + font-size: 13.5px; + font-weight: 500; + color: var(--text); + word-break: break-word; +} +.diff .pair { + display: grid; + grid-template-columns: 36px 1fr; + gap: 0 12px; + font-size: 12.5px; + align-items: start; +} +.diff .pair .tag { + font-size: 9.5px; + letter-spacing: 0.22em; + font-weight: 700; + padding-top: 3px; + text-transform: uppercase; +} +.diff .pair .val { + word-break: break-word; + white-space: pre-wrap; + padding-left: 12px; + border-left: 1px solid; + color: var(--text); +} +.diff .pair.src .tag { color: var(--lime); } +.diff .pair.src .val { border-left-color: var(--lime); } +.diff .pair.tgt .tag { color: var(--vermilion); } +.diff .pair.tgt .val { border-left-color: var(--vermilion); } +.diff .pair .val.empty { color: var(--text-faint); font-style: italic; } + +/* Inline character-diff highlighting (computed client-side) */ +.diff .pair .val mark { + background: transparent; + color: inherit; + padding: 0 1px; + border-radius: 1px; +} +.diff .pair.src .val mark.del { + background: rgba(241,90,62,0.22); + color: var(--text); + box-shadow: inset 0 -1px 0 rgba(241,90,62,0.6); +} +.diff .pair.tgt .val mark.ins { + background: rgba(163,217,89,0.22); + color: var(--text); + box-shadow: inset 0 -1px 0 rgba(163,217,89,0.6); +} + +.copy-btn { + background: transparent; + border: 1px solid var(--hairline-hi); + color: var(--text-dim); + border-radius: 1px; + padding: 4px 12px; + font-family: inherit; + font-size: 10px; + letter-spacing: 0.2em; + text-transform: uppercase; + cursor: pointer; + align-self: start; + transition: all 140ms; +} +.copy-btn:hover { color: var(--text); border-color: var(--text); } +.copy-btn.ok { color: var(--lime); border-color: var(--lime); } + +.empty-state { + text-align: center; + padding: 96px 0 80px; + color: var(--text-dim); +} +.empty-state .glyph { + font-family: inherit; + font-weight: 500; + font-size: 64px; + line-height: 1; + color: var(--lime); + letter-spacing: -0.04em; + margin-bottom: 18px; +} +.empty-state .msg { + font-size: 11px; + letter-spacing: 0.22em; + text-transform: uppercase; +} + +footer { + margin-top: 56px; + padding-top: 20px; + border-top: 1px solid var(--hairline); + display: flex; + justify-content: space-between; + gap: 16px; + flex-wrap: wrap; + font-size: 10px; + letter-spacing: 0.2em; + text-transform: uppercase; + color: var(--text-faint); +} + +.hidden { display: none !important; } + +@keyframes fade-up { + from { opacity: 0; transform: translateY(10px); } + to { opacity: 1; transform: translateY(0); } +} + +@media (max-width: 820px) { + .container { padding: 32px 20px 56px; } + .title { font-size: 28px; } + .hero-stat .num { font-size: 92px; } + .instrument { grid-template-columns: 1fr; gap: 24px; } + .stat-cluster { border-left: none; border-top: 1px solid var(--hairline); padding-top: 16px; } + .stat { padding: 8px 16px; } + header { grid-template-columns: 1fr; } + .run-meta { text-align: left; } + .run-meta div { justify-content: flex-start; } +} +""" + + +_SCRIPT = """ +(function() { + const $ = id => document.getElementById(id); + const filter = $('filter'); + const expandAll = $('expand-all'); + const collapseAll = $('collapse-all'); + const noResults = $('no-results'); + const noResultsQ = $('no-results-q'); + const clearFilter = $('clear-filter'); + const ctrlWrap = $('controls-wrap'); + const sentinel = $('sticky-sentinel'); + + /* ── Inline character-diff between SRC and TGT values ───────── */ + function escHtml(s) { + return s.replace(/[&<>"]/g, c => ({'&':'&','<':'<','>':'>','"':'"'}[c])); + } + + // Prefix/suffix-anchored diff. Cleaner than LCS for config values: + // incidental shared characters don't fragment the highlight into orphans. + // A common prefix or suffix is only counted if it's at least 3 chars. + function diffChars(a, b) { + let p = 0; + const maxP = Math.min(a.length, b.length); + while (p < maxP && a.charCodeAt(p) === b.charCodeAt(p)) p++; + let s = 0; + const maxS = Math.min(a.length - p, b.length - p); + while (s < maxS && a.charCodeAt(a.length - 1 - s) === b.charCodeAt(b.length - 1 - s)) s++; + if (p < 3) p = 0; + if (s < 3) s = 0; + const ops = []; + if (p) ops.push({op:'eq', text:a.slice(0, p)}); + const aMid = a.slice(p, a.length - s); + const bMid = b.slice(p, b.length - s); + if (aMid) ops.push({op:'del', text:aMid}); + if (bMid) ops.push({op:'ins', text:bMid}); + if (s) ops.push({op:'eq', text:a.slice(a.length - s)}); + return ops; + } + + function renderSide(ops, side) { + let out = ''; + for (const o of ops) { + if (o.op === 'eq') out += escHtml(o.text); + else if (side === 'src' && o.op === 'del') out += '' + escHtml(o.text) + ''; + else if (side === 'tgt' && o.op === 'ins') out += '' + escHtml(o.text) + ''; + } + return out; + } + + document.querySelectorAll('.diff').forEach(diff => { + const src = diff.querySelector('.pair.src .val'); + const tgt = diff.querySelector('.pair.tgt .val'); + if (!src || !tgt) return; + if (src.classList.contains('empty') || tgt.classList.contains('empty')) return; + const sText = src.textContent, tText = tgt.textContent; + if (!sText || !tText || sText === tText) return; + const ops = diffChars(sText, tText); + // No anchoring shared context — colored tags already convey the change. + if (!ops.some(o => o.op === 'eq')) return; + src.innerHTML = renderSide(ops, 'src'); + tgt.innerHTML = renderSide(ops, 'tgt'); + }); + + /* ── Filter ──────────────────────────────────────────────────── */ + function applyFilter() { + const raw = filter.value.trim(); + const q = raw.toLowerCase(); + document.querySelectorAll('[data-searchable]').forEach(el => { + const text = el.getAttribute('data-searchable'); + el.classList.toggle('hidden', q && !text.includes(q)); + }); + document.querySelectorAll('details[data-group]').forEach(d => { + const visible = d.querySelectorAll('.config-item:not(.hidden), .path-list li:not(.hidden)').length; + d.classList.toggle('hidden', q && visible === 0); + if (q && visible > 0) d.open = true; + }); + const anyVisible = !!document.querySelector('details.section:not(.hidden)'); + if (q && !anyVisible) { + noResultsQ.textContent = raw; + noResults.classList.add('visible'); + } else { + noResults.classList.remove('visible'); + } + } + + if (filter) filter.addEventListener('input', applyFilter); + if (clearFilter) clearFilter.addEventListener('click', () => { + filter.value = ''; + applyFilter(); + filter.focus(); + }); + + if (expandAll) expandAll.addEventListener('click', () => document.querySelectorAll('details').forEach(d => d.open = true)); + if (collapseAll) collapseAll.addEventListener('click', () => document.querySelectorAll('details').forEach(d => d.open = false)); + + /* ── Copy buttons ───────────────────────────────────────────── */ + document.addEventListener('click', e => { + const btn = e.target.closest('.copy-btn'); + if (!btn) return; + const payload = btn.getAttribute('data-copy'); + navigator.clipboard.writeText(payload).then(() => { + const orig = btn.textContent; + btn.textContent = 'Copied'; + btn.classList.add('ok'); + setTimeout(() => { btn.textContent = orig; btn.classList.remove('ok'); }, 1200); + }); + }); + + /* ── Sticky bar pinned state ────────────────────────────────── */ + if (sentinel && ctrlWrap && 'IntersectionObserver' in window) { + new IntersectionObserver(([entry]) => { + ctrlWrap.classList.toggle('pinned', !entry.isIntersecting); + }, { threshold: [0] }).observe(sentinel); + } + + /* ── Keyboard shortcuts ─────────────────────────────────────── */ + document.addEventListener('keydown', e => { + if (e.target === filter) { + if (e.key === 'Escape') { filter.value = ''; applyFilter(); filter.blur(); } + return; + } + if (e.metaKey || e.ctrlKey || e.altKey) return; + if (e.target.tagName === 'INPUT' || e.target.tagName === 'TEXTAREA') return; + if (e.key === '/') { + e.preventDefault(); + if (filter) { filter.focus(); filter.select(); } + } + }); +})(); +""" + + +_PATH_SUFFIX_RE = re.compile(r"(__[A-Fa-f0-9-]{8,}\.json)$") + + +def _esc(value) -> str: + """HTML-escape a value, coercing to string.""" + return html.escape("" if value is None else str(value), quote=True) + + +def _searchable(*parts) -> str: + """Build a lowercased search-blob attribute value.""" + return html.escape(" ".join(str(p) for p in parts if p).lower(), quote=True) + + +def _dim_path(path: str) -> str: + """HTML-escape a path; dim any trailing `__.json` suffix.""" + if not path: + return "" + m = _PATH_SUFFIX_RE.search(path) + if not m: + return _esc(path) + return f'{_esc(path[: m.start()])}{_esc(m.group(1))}' + + +def _render_path_section(title: str, items: list, kind: str) -> str: + """Render an Added or Removed section listing missing config files.""" + if not items: + return "" + label = "Added" if kind == "add" else "Removed" + prefix = "+" if kind == "add" else "−" + rows = "\n".join( + f'
  • ' + f'{prefix}' + f'{_dim_path(p)}' + f"
  • " + for p in items + ) + return ( + f'
    ' + f" " + f' {label}' + f' {_esc(title)}' + f' {len(items)}' + f" " + f'
    ' + f'
      {rows}
    ' + f"
    " + f"
    " + ) + + +def _render_diff(diff: dict) -> str: + """Render a single setting-level diff row.""" + setting = diff.get("setting", "") + src = diff.get("source_val", "") + tgt = diff.get("target_val", "") + src_str = "" if src is None else str(src) + tgt_str = "" if tgt is None else str(tgt) + copy_payload = json.dumps( + {"setting": setting, "source": src_str, "target": tgt_str}, + ensure_ascii=False, + ) + src_val = ( + f'{_esc(src_str)}' + if src_str + else '' + ) + tgt_val = ( + f'{_esc(tgt_str)}' + if tgt_str + else '' + ) + return ( + f'
    ' + f'
    ' + f'
    ' + f'
    {_esc(setting)}
    ' + f'
    SRC{src_val}
    ' + f'
    TGT{tgt_val}
    ' + f"
    " + f' ' + f"
    " + ) + + +def _render_config_item(change: dict) -> str: + """Render one modified config (header + diff list).""" + name = change.get("name", "") + file_ = change.get("file", "") + diffs = change.get("diffs", []) + n = len(diffs) + search_terms = [name, file_, change.get("config_type", "")] + for d in diffs: + search_terms.extend( + [d.get("setting", ""), d.get("source_val", ""), d.get("target_val", "")] + ) + diff_html = "\n".join(_render_diff(d) for d in diffs) + return ( + f'
    ' + f'
    ' + f'
    {_esc(name)}
    ' + f'
    {n} drift{"s" if n != 1 else ""}
    ' + f'
    {_dim_path(file_)}
    ' + f"
    " + f'
    {diff_html}
    ' + f"
    " + ) + + +def _render_modified_sections(changes: list) -> str: + """Render Modified sections grouped by config_type.""" + if not changes: + return "" + grouped: dict = defaultdict(list) + for ch in changes: + grouped[ch.get("config_type", "Unknown")].append(ch) + + out = [] + for config_type in sorted(grouped): + items = grouped[config_type] + items_html = "\n".join(_render_config_item(ch) for ch in items) + out.append( + f'
    ' + f" " + f' Modified' + f' {_esc(config_type)}' + f' {len(items)}' + f" " + f'
    {items_html}
    ' + f"
    " + ) + return "\n".join(out) + + +def _headline(configs_touched: int) -> str: + """Compose the editorial headline for the report.""" + if configs_touched == 0: + return "All systems aligned." + if configs_touched == 1: + return 'Drift detected across 1 configuration.' + return f'Drift detected across {configs_touched} configurations.' + + +def render_html(result: dict) -> str: + """Render a compare-summary dict to a self-contained HTML document.""" + diff_count = result.get("diff_count", 0) + changes = result.get("changes", []) or [] + missing_in_target = result.get("missing_in_target", []) or [] + missing_in_source = result.get("missing_in_source", []) or [] + source = result.get("source", "") + target = result.get("target", "") + + now = datetime.now(timezone.utc) + generated_date = now.strftime("%Y-%m-%d") + generated_time = now.strftime("%H:%M:%S") + + configs_touched = len(missing_in_target) + len(missing_in_source) + len(changes) + headline = _headline(configs_touched) + + sections = [ + _render_path_section("In source, not in target", missing_in_target, "add"), + _render_path_section("In target, not in source", missing_in_source, "rm"), + _render_modified_sections(changes), + ] + body = "\n".join(s for s in sections if s) + if not body: + body = ( + '
    ' + '
    ' + '
    No differences found
    ' + "
    " + ) + + hero_class = " ok" if diff_count == 0 else "" + + return f""" + + + + +IntuneCD ⁄ Drift Inspection + + + + + + +
    + +
    +
    +
    + IntuneCD + + Drift Inspection +
    +

    {headline}

    +
    +
    +
    SRC{_esc(source)}
    +
    TGT{_esc(target)}
    +
    RUN{generated_date} {generated_time} UTC
    +
    +
    + +
    +
    +
    {diff_count}
    +
    Drifts observed
    +
    +
    +
    +
    {len(changes)}
    +
    Configs modified
    +
    +
    +
    {len(missing_in_target)}
    +
    Only in source
    +
    +
    +
    {len(missing_in_source)}
    +
    Only in target
    +
    +
    +
    + + +
    +
    + + + + +
    +
    + +
    + No matches for “”. + +
    + + {body} + +
    + IntuneCD ⁄ run_compare + Drift Inspection +
    +
    + + + +""" diff --git a/src/IntuneCD/run_compare.py b/src/IntuneCD/run_compare.py index d962c987..cee24f99 100644 --- a/src/IntuneCD/run_compare.py +++ b/src/IntuneCD/run_compare.py @@ -14,10 +14,44 @@ import json import os import re +import sys import yaml from deepdiff import DeepDiff + +def _color_enabled() -> bool: + if os.environ.get("NO_COLOR"): + return False + if os.environ.get("FORCE_COLOR"): + return True + return sys.stdout.isatty() + + +class _C: + """ANSI color codes. All empty strings when color is disabled.""" + + RESET = "" + BOLD = "" + DIM = "" + RED = "" + GREEN = "" + YELLOW = "" + CYAN = "" + MAGENTA = "" + + @classmethod + def enable(cls): + cls.RESET = "\033[0m" + cls.BOLD = "\033[1m" + cls.DIM = "\033[2m" + cls.RED = "\033[31m" + cls.GREEN = "\033[32m" + cls.YELLOW = "\033[33m" + cls.CYAN = "\033[36m" + cls.MAGENTA = "\033[35m" + + # Keys that are Intune-generated metadata and should not affect drift results. # Mirrors the logic in IntuneCDBase.remove_keys(). _METADATA_KEYS = { @@ -122,7 +156,9 @@ def _setting(key: str) -> str: { "setting": _setting(key), "source_val": "", - "target_val": str(list(diff["iterable_item_removed"].values()))[:100], + "target_val": str(list(diff["iterable_item_removed"].values()))[ + :100 + ], } ) @@ -132,7 +168,9 @@ def _setting(key: str) -> str: def _collect_files(root: str) -> dict[str, str]: """Return {relative_path: absolute_path} for all JSON/YAML files under root.""" files = {} - for dirpath, _, filenames in os.walk(root): + skip_dirs = {"__archive__"} + for dirpath, dirnames, filenames in os.walk(root): + dirnames[:] = [d for d in dirnames if d not in skip_dirs] for filename in filenames: if filename.endswith((".json", ".yaml")): abs_path = os.path.join(dirpath, filename) @@ -250,6 +288,19 @@ def get_parser(include_help=True): help="Additional keys to strip before comparing, separated by space.", nargs="+", ) + parser.add_argument( + "--no-color", + help="Disable colored output. Color is also disabled when stdout is not a TTY or NO_COLOR is set.", + action="store_true", + ) + parser.add_argument( + "--html", + help=( + "Also write a self-contained HTML report alongside the JSON output. " + "The HTML file uses the same path as -o with a .html extension." + ), + action="store_true", + ) return parser @@ -260,37 +311,71 @@ def start(args=None): extra_keys = set(args.exclude_keys) if args.exclude_keys else None - print(f"Comparing:\n source: {args.source}\n target: {args.target}\n") + use_color = False if getattr(args, "no_color", False) else _color_enabled() + if use_color: + _C.enable() + + print( + f"Comparing:\n {_C.BOLD}source:{_C.RESET} {args.source}\n" + f" {_C.BOLD}target:{_C.RESET} {args.target}\n" + ) result = compare(args.source, args.target, extra_keys) with open(args.output, "w", encoding="utf-8") as f: json.dump(result, f, indent=2) - print(f"{'=' * 80}") + html_path = None + if getattr(args, "html", False): + from .intunecdlib.compare_html import render_html + + base, _ = os.path.splitext(args.output) + html_path = f"{base}.html" + with open(html_path, "w", encoding="utf-8") as f: + _ = f.write(render_html(result)) + + print(f"{_C.DIM}{'=' * 80}{_C.RESET}") if result["missing_in_target"]: - print(f"In source but not in target ({len(result['missing_in_target'])}):") + print( + f"{_C.BOLD}In source but not in target " + f"({len(result['missing_in_target'])}):{_C.RESET}" + ) for p in result["missing_in_target"]: - print(f" + {p}") + print(f" {_C.GREEN}+ {p}{_C.RESET}") if result["missing_in_source"]: - print(f"In target but not in source ({len(result['missing_in_source'])}):") + print( + f"{_C.BOLD}In target but not in source " + f"({len(result['missing_in_source'])}):{_C.RESET}" + ) for p in result["missing_in_source"]: - print(f" - {p}") + print(f" {_C.RED}- {p}{_C.RESET}") if result["changes"]: - print(f"\nConfigurations with differences ({len(result['changes'])}):") + print( + f"\n{_C.BOLD}Configurations with differences " + f"({len(result['changes'])}):{_C.RESET}" + ) for change in result["changes"]: - print(f"\n [{change['config_type']}] {change['name']} ({change['file']})") + print( + f"\n {_C.CYAN}[{change['config_type']}]{_C.RESET} " + f"{_C.BOLD}{change['name']}{_C.RESET} " + f"{_C.DIM}({change['file']}){_C.RESET}" + ) for diff in change["diffs"]: print( - f" setting: {diff['setting']}" - f" | source: {diff['source_val']}" - f" | target: {diff['target_val']}" + f" {_C.DIM}setting:{_C.RESET} {_C.YELLOW}{diff['setting']}{_C.RESET}" + f" {_C.DIM}|{_C.RESET} {_C.DIM}source:{_C.RESET} {_C.GREEN}{diff['source_val']}{_C.RESET}" + f" {_C.DIM}|{_C.RESET} {_C.DIM}target:{_C.RESET} {_C.RED}{diff['target_val']}{_C.RESET}" ) - print(f"\nTotal diffs: {result['diff_count']}") - print(f"Summary written to: {args.output}") + total_color = _C.GREEN if result["diff_count"] == 0 else _C.YELLOW + print( + f"\n{_C.BOLD}Total diffs:{_C.RESET} {total_color}{result['diff_count']}{_C.RESET}" + ) + print(f"{_C.BOLD}Summary written to:{_C.RESET} {args.output}") + if html_path: + print(f"{_C.BOLD}HTML report written to:{_C.RESET} {html_path}") if __name__ == "__main__": From b53e903f5480fcd204aedc92fcaf571c121780e9 Mon Sep 17 00:00:00 2001 From: Tobias Date: Fri, 29 May 2026 13:58:45 +0200 Subject: [PATCH 16/17] bump version --- setup.cfg | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.cfg b/setup.cfg index 3ea62343..686d2657 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,6 +1,6 @@ [metadata] name = IntuneCD -version = 2.5.1.beta1 +version = 2.5.1.beta2 author = Tobias Almén author_email = almenscorner@outlook.com description = Tool to backup and update configurations in Intune From 5af8ad0071a2344264d23affe25a0c5379ebafb2 Mon Sep 17 00:00:00 2001 From: Tobias Date: Fri, 29 May 2026 16:26:38 +0200 Subject: [PATCH 17/17] bump version to 2.6.0 --- setup.cfg | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/setup.cfg b/setup.cfg index 686d2657..aca93585 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,6 +1,6 @@ [metadata] name = IntuneCD -version = 2.5.1.beta2 +version = 2.6.0 author = Tobias Almén author_email = almenscorner@outlook.com description = Tool to backup and update configurations in Intune @@ -20,7 +20,7 @@ package_dir = packages = find: python_requires = >=3.9 install_requires = - deepdiff==9.0.0 + deepdiff==9.1.0 pyyaml>=6.0.2 msrest>=0.7.1 markdown_toclify>=0.1.7