From d89ed7aafc688eab627146fdb8d606045d0e6caa Mon Sep 17 00:00:00 2001 From: "m.shvets" Date: Fri, 23 Jan 2026 13:36:34 +0300 Subject: [PATCH 1/4] Refactor: Enhance network policy update process with dedicated methods for field, netmask, and group updates --- .../policies/network/use_cases.py | 61 +++++++++++++------ interface | 2 +- 2 files changed, 43 insertions(+), 20 deletions(-) diff --git a/app/ldap_protocol/policies/network/use_cases.py b/app/ldap_protocol/policies/network/use_cases.py index cde4294d6..5395ae21e 100644 --- a/app/ldap_protocol/policies/network/use_cases.py +++ b/app/ldap_protocol/policies/network/use_cases.py @@ -148,38 +148,61 @@ async def update( ) -> NetworkPolicyDTO: """Update network policy.""" policy = await self._network_policy_gateway.get_with_for_update(dto.id) + + await self._apply_field_updates(policy, dto) + await self._apply_netmask_updates(policy, dto) + await self._apply_group_updates(policy, dto) + + await self._validate_policy_uniqueness(policy) + await self._session.commit() + + return _convert_model_to_dto(policy) + + async def _apply_field_updates( + self, + policy: NetworkPolicy, + dto: NetworkPolicyUpdateDTO, + ) -> None: + """Apply regular field updates.""" for field in dto.fields_to_update: value = getattr(dto, field) if value is not None: setattr(policy, field, value) + async def _apply_netmask_updates( + self, + policy: NetworkPolicy, + dto: NetworkPolicyUpdateDTO, + ) -> None: + """Apply netmask updates.""" if dto.netmasks and dto.raw: policy.netmasks = dto.netmasks policy.raw = dto.raw - if ( - dto.groups is not None - and len(dto.groups) > 0 - and len(dto.groups) != 0 - ): - policy.groups = await self._network_policy_gateway.get_groups( - dto.groups, + async def _apply_group_updates( + self, + policy: NetworkPolicy, + dto: NetworkPolicyUpdateDTO, + ) -> None: + """Apply group updates.""" + if dto.groups is not None: + policy.groups = ( + await self._network_policy_gateway.get_groups(dto.groups) + if dto.groups + else [] ) - if ( - dto.mfa_groups is not None - and len(dto.mfa_groups) > 0 - and len(dto.mfa_groups) != 0 - ): - policy.mfa_groups = await self._network_policy_gateway.get_groups( - dto.mfa_groups, + if dto.mfa_groups is not None: + policy.mfa_groups = ( + await self._network_policy_gateway.get_groups(dto.mfa_groups) + if dto.mfa_groups + else [] ) + + async def _validate_policy_uniqueness(self, policy: NetworkPolicy) -> None: + """Validate policy uniqueness.""" if await self._network_policy_gateway.check_policy_exists(policy): - raise NetworkPolicyAlreadyExistsError( - "Entry already exists", - ) - await self._session.commit() - return _convert_model_to_dto(policy) + raise NetworkPolicyAlreadyExistsError("Entry already exists") async def swap_priorities(self, id1: int, id2: int) -> SwapPrioritiesDTO: """Swap priorities for network policies.""" diff --git a/interface b/interface index 95ed5e191..f31962020 160000 --- a/interface +++ b/interface @@ -1 +1 @@ -Subproject commit 95ed5e191cdafa07b1dfac96a1659926679ead97 +Subproject commit f31962020a6689e6a4c61fb3349db5b5c7895f92 From 2b8b9238c04bdc8380e0c40193a938f98c500be4 Mon Sep 17 00:00:00 2001 From: "m.shvets" Date: Mon, 26 Jan 2026 08:56:10 +0300 Subject: [PATCH 2/4] Add: Introduce network policy use case fixture and tests for creating and updating policies with empty groups --- tests/conftest.py | 9 +++ .../policies/test_network/test_use_case.py | 72 +++++++++++++++++++ 2 files changed, 81 insertions(+) create mode 100644 tests/test_ldap/policies/test_network/test_use_case.py diff --git a/tests/conftest.py b/tests/conftest.py index c9ba0f8ff..4f95140f8 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1064,6 +1064,15 @@ async def network_policy_gateway( yield await container.get(NetworkPolicyGateway) +@pytest_asyncio.fixture(scope="function") +async def network_policy_use_case( + container: AsyncContainer, +) -> AsyncIterator[NetworkPolicyUseCase]: + """Get network policy gateway.""" + async with container(scope=Scope.REQUEST) as container: + yield await container.get(NetworkPolicyUseCase) + + @pytest_asyncio.fixture(scope="function") async def network_policy_validator( container: AsyncContainer, diff --git a/tests/test_ldap/policies/test_network/test_use_case.py b/tests/test_ldap/policies/test_network/test_use_case.py new file mode 100644 index 000000000..eb9398e13 --- /dev/null +++ b/tests/test_ldap/policies/test_network/test_use_case.py @@ -0,0 +1,72 @@ +"""Test network policy use case with empty groups. + +Copyright (c) 2025 MultiFactor +License: https://github.com/MultiDirectoryLab/MultiDirectory/blob/main/LICENSE +""" + +from ipaddress import IPv4Network + +import pytest + +from enums import MFAFlags +from ldap_protocol.policies.network import NetworkPolicyUseCase +from ldap_protocol.policies.network.dto import ( + NetworkPolicyDTO, + NetworkPolicyUpdateDTO, +) + + +@pytest.mark.asyncio +@pytest.mark.usefixtures("setup_session") +async def test_create_policy( + network_policy_use_case: NetworkPolicyUseCase, +) -> None: + """Test creating policy with empty groups and mfa_groups.""" + dto = NetworkPolicyDTO[None]( + id=None, + name="Test Empty Groups", + netmasks=[IPv4Network("192.168.1.0/24")], + raw=["192.168.1.0/24"], + priority=2, + mfa_status=MFAFlags.DISABLED, + groups=[], + mfa_groups=[], + ) + + result = await network_policy_use_case.create(dto) + poicy = await network_policy_use_case.get(result.id) + assert poicy.groups == [] + assert poicy.mfa_groups == [] + + +@pytest.mark.asyncio +@pytest.mark.usefixtures("setup_session") +async def test_update_policy_to_empty_groups( + network_policy_use_case: NetworkPolicyUseCase, +) -> None: + """Test updating policy from groups to empty.""" + dto = NetworkPolicyDTO[None]( + id=None, + name="Test Update Groups", + netmasks=[IPv4Network("172.16.0.0/12")], + raw=["172.16.0.0/12"], + priority=3, + mfa_status=MFAFlags.DISABLED, + groups=["cn=domain admins,cn=Groups,dc=md,dc=test"], + mfa_groups=["cn=domain admins,cn=Groups,dc=md,dc=test"], + ) + + created = await network_policy_use_case.create(dto) + assert created.groups + assert created.mfa_groups + + update_dto = NetworkPolicyUpdateDTO( + id=created.id, + groups=[], + mfa_groups=[], + ) + + updated = await network_policy_use_case.update(update_dto) + + assert updated.groups == [] + assert updated.mfa_groups == [] From cc67291dbf0bd1673b27f634b31b98bebf4cd91a Mon Sep 17 00:00:00 2001 From: "m.shvets" Date: Mon, 26 Jan 2026 09:00:15 +0300 Subject: [PATCH 3/4] Remove unused fixture from network policy test case --- tests/test_ldap/policies/test_network/test_use_case.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/test_ldap/policies/test_network/test_use_case.py b/tests/test_ldap/policies/test_network/test_use_case.py index eb9398e13..d9714f881 100644 --- a/tests/test_ldap/policies/test_network/test_use_case.py +++ b/tests/test_ldap/policies/test_network/test_use_case.py @@ -17,7 +17,6 @@ @pytest.mark.asyncio -@pytest.mark.usefixtures("setup_session") async def test_create_policy( network_policy_use_case: NetworkPolicyUseCase, ) -> None: From 053adf491aa25a74a4368728e2f5b127cbfe0cdc Mon Sep 17 00:00:00 2001 From: "m.shvets" Date: Mon, 26 Jan 2026 14:44:22 +0300 Subject: [PATCH 4/4] Enhance NetworkPolicyUseCase: Replace uniqueness validation method with direct check for existing policies before committing changes --- app/ldap_protocol/policies/network/use_cases.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/app/ldap_protocol/policies/network/use_cases.py b/app/ldap_protocol/policies/network/use_cases.py index 5395ae21e..38dbc11cc 100644 --- a/app/ldap_protocol/policies/network/use_cases.py +++ b/app/ldap_protocol/policies/network/use_cases.py @@ -153,7 +153,9 @@ async def update( await self._apply_netmask_updates(policy, dto) await self._apply_group_updates(policy, dto) - await self._validate_policy_uniqueness(policy) + if await self._network_policy_gateway.check_policy_exists(policy): + raise NetworkPolicyAlreadyExistsError("Entry already exists") + await self._session.commit() return _convert_model_to_dto(policy) @@ -199,11 +201,6 @@ async def _apply_group_updates( else [] ) - async def _validate_policy_uniqueness(self, policy: NetworkPolicy) -> None: - """Validate policy uniqueness.""" - if await self._network_policy_gateway.check_policy_exists(policy): - raise NetworkPolicyAlreadyExistsError("Entry already exists") - async def swap_priorities(self, id1: int, id2: int) -> SwapPrioritiesDTO: """Swap priorities for network policies.""" policy1 = await self._network_policy_gateway.get(id1)