diff --git a/README.md b/README.md index 4826dd6..7a70187 100644 --- a/README.md +++ b/README.md @@ -24,11 +24,15 @@ A comprehensive Telegram bot for managing group members with profile verificatio - **New user probation**: New members restricted from sending links/forwarded messages for 3 days (configurable) - **Contact card blocking**: Prevents all non-admin members from sharing contact cards/phone numbers (delete + restrict) - **Anti-spam enforcement**: Tracks violations and restricts spammers after threshold +- **Trusted users**: Admin-managed trusted list to bypass anti-spam + duplicate-spam checks ### Admin Tools - **/verify command**: Whitelist users with hidden profile pictures (DM only) - **/unverify command**: Remove users from verification whitelist (DM only) - **Inline verification**: Forward messages to bot for quick verify/unverify buttons +- **/trust command**: Add trusted users (DM only, supports user ID or forwarded message) +- **/untrust command**: Remove trusted users from trusted list (DM only) +- **/trusted command**: List all trusted users (DM only) - **Automatic clearance**: Sends notification when verified users' warnings are cleared ## Requirements diff --git a/src/bot/constants.py b/src/bot/constants.py index 2940f93..7cbf2ff 100644 --- a/src/bot/constants.py +++ b/src/bot/constants.py @@ -179,6 +179,28 @@ def format_hours_display(hours: int) -> str: ADMIN_WARN_SENT_MESSAGE = "✅ Peringatan telah dikirim ke {user_mention} di grup." +TRUST_USER_ID_REQUIRED_MESSAGE = ( + "❌ Penggunaan: /trust USER_ID atau /untrust USER_ID, atau forward pesan user ke bot." +) + +TRUST_USER_ID_INVALID_MESSAGE = "❌ User ID harus berupa angka." + +TRUST_ADDED_MESSAGE = ( + "✅ User `{user_id}` ditambahkan ke trusted list.\n" + "• Probation dibersihkan di {probation_clear_count} grup\n" + "• Unrestrict dicoba di {unrestrict_count} grup" +) + +TRUST_ALREADY_EXISTS_MESSAGE = "ℹ️ User `{user_id}` sudah ada di trusted list." + +TRUST_REMOVED_MESSAGE = "✅ User `{user_id}` dihapus dari trusted list." + +TRUST_USER_NOT_FOUND_MESSAGE = "ℹ️ User `{user_id}` tidak ada di trusted list." + +TRUST_LIST_EMPTY_MESSAGE = "ℹ️ Trusted list masih kosong." + +TRUST_LIST_HEADER = "📋 Trusted Users:\n{trusted_lines}" + # Anti-spam probation warning for new users NEW_USER_SPAM_WARNING = ( "⚠️ {user_mention} baru bergabung dan sedang dalam masa percobaan.\n" diff --git a/src/bot/database/models.py b/src/bot/database/models.py index 70a6ced..c296396 100644 --- a/src/bot/database/models.py +++ b/src/bot/database/models.py @@ -72,6 +72,35 @@ class PhotoVerificationWhitelist(SQLModel, table=True): notes: str | None = Field(default=None) +class TrustedUser(SQLModel, table=True): + """ + Trusted users who bypass anti-spam and duplicate spam enforcement. + + `group_id=0` is used as a global trust scope. Future per-group trust can + use a real Telegram group ID without changing the schema. + + Attributes: + id: Primary key (auto-generated). + user_id: Telegram user ID. + group_id: Scope identifier (0 = global). + trusted_by_admin_id: Telegram user ID of admin granting trust. + trusted_at: Timestamp when trust was granted. + notes: Optional admin notes. + """ + + __tablename__ = "trusted_users" + __table_args__ = ( + UniqueConstraint('user_id', 'group_id', name='uix_trusted_user_group'), + ) + + id: int | None = Field(default=None, primary_key=True) + user_id: int = Field(index=True) + group_id: int = Field(default=0, index=True) + trusted_by_admin_id: int + trusted_at: datetime = Field(default_factory=lambda: datetime.now(UTC)) + notes: str | None = Field(default=None) + + class PendingCaptchaValidation(SQLModel, table=True): """ Tracks users who need to complete captcha verification. diff --git a/src/bot/database/service.py b/src/bot/database/service.py index 75f9e74..17a1692 100644 --- a/src/bot/database/service.py +++ b/src/bot/database/service.py @@ -17,6 +17,7 @@ NewUserProbation, PendingCaptchaValidation, PhotoVerificationWhitelist, + TrustedUser, UserWarning, ) @@ -347,6 +348,134 @@ def remove_photo_verification_whitelist(self, user_id: int) -> None: session.commit() logger.info(f"Removed from photo whitelist: user_id={user_id}") + def add_trusted_user( + self, + user_id: int, + trusted_by_admin_id: int, + group_id: int = 0, + notes: str | None = None, + ) -> TrustedUser: + """ + Add a user to trusted list. + + Args: + user_id: Telegram user ID. + trusted_by_admin_id: Telegram user ID of admin granting trust. + group_id: Trust scope ID (0 means global). + notes: Optional admin notes. + + Returns: + TrustedUser: Created trusted record. + + Raises: + ValueError: If user is already trusted in the scope. + """ + with Session(self._engine) as session: + statement = select(TrustedUser).where( + TrustedUser.user_id == user_id, + TrustedUser.group_id == group_id, + ) + existing = session.exec(statement).first() + + if existing: + raise ValueError(f"User {user_id} is already trusted for scope {group_id}") + + record = TrustedUser( + user_id=user_id, + group_id=group_id, + trusted_by_admin_id=trusted_by_admin_id, + notes=notes, + ) + session.add(record) + session.commit() + session.refresh(record) + logger.info( + f"Added trusted user: user_id={user_id}, admin_id={trusted_by_admin_id}, scope={group_id}" + ) + return record + + def remove_trusted_user(self, user_id: int, group_id: int = 0) -> None: + """ + Remove a user from trusted list. + + Args: + user_id: Telegram user ID. + group_id: Trust scope ID (0 means global). + + Raises: + ValueError: If user is not trusted in the scope. + """ + with Session(self._engine) as session: + statement = select(TrustedUser).where( + TrustedUser.user_id == user_id, + TrustedUser.group_id == group_id, + ) + record = session.exec(statement).first() + + if not record: + raise ValueError(f"User {user_id} is not trusted for scope {group_id}") + + session.delete(record) + session.commit() + logger.info(f"Removed trusted user: user_id={user_id}, scope={group_id}") + + def is_user_trusted(self, user_id: int, group_id: int | None = None) -> bool: + """ + Check whether a user is trusted. + + Args: + user_id: Telegram user ID. + group_id: Optional group scope. For v1, only global trust is checked. + + Returns: + bool: True if user is trusted. + """ + del group_id # Reserved for future per-group trust behavior. + + with Session(self._engine) as session: + statement = select(TrustedUser).where( + TrustedUser.user_id == user_id, + TrustedUser.group_id == 0, + ) + record = session.exec(statement).first() + return record is not None + + def get_trusted_user_ids(self, group_id: int | None = None) -> set[int]: + """ + Get trusted user IDs. + + Args: + group_id: Optional group scope. For v1, only global records are returned. + + Returns: + set[int]: Trusted user IDs. + """ + del group_id # Reserved for future per-group trust behavior. + + with Session(self._engine) as session: + statement = select(TrustedUser.user_id).where(TrustedUser.group_id == 0) + return set(session.exec(statement).all()) + + def get_trusted_users(self, group_id: int | None = None) -> list[TrustedUser]: + """ + Get trusted user records with metadata. + + Args: + group_id: Optional group scope. For v1, only global records are returned. + + Returns: + list[TrustedUser]: Trusted user records. + """ + del group_id # Reserved for future per-group trust behavior. + + with Session(self._engine) as session: + statement = ( + select(TrustedUser) + .where(TrustedUser.group_id == 0) + .order_by(TrustedUser.trusted_at.desc()) + ) + return list(session.exec(statement).all()) + def get_warnings_past_time_threshold( self, threshold: timedelta ) -> list[UserWarning]: diff --git a/src/bot/handlers/anti_spam.py b/src/bot/handlers/anti_spam.py index a537ef7..bad836d 100644 --- a/src/bot/handlers/anti_spam.py +++ b/src/bot/handlers/anti_spam.py @@ -28,7 +28,7 @@ ) from bot.database.service import get_database from bot.group_config import get_group_config_for_update -from bot.services.telegram_utils import get_user_mention +from bot.services.telegram_utils import get_user_mention, is_user_admin_or_trusted logger = logging.getLogger(__name__) @@ -305,8 +305,7 @@ async def handle_contact_spam( if user.is_bot: return - admin_ids = context.bot_data.get("group_admin_ids", {}).get(group_config.group_id, []) - if user.id in admin_ids: + if is_user_admin_or_trusted(context, group_config.group_id, user.id): return msg = update.message @@ -394,8 +393,7 @@ async def handle_inline_keyboard_spam( if user.is_bot: return - admin_ids = context.bot_data.get("group_admin_ids", {}).get(group_config.group_id, []) - if user.id in admin_ids: + if is_user_admin_or_trusted(context, group_config.group_id, user.id): return msg = update.message @@ -488,6 +486,9 @@ async def handle_new_user_spam( if user.is_bot: return + if is_user_admin_or_trusted(context, group_config.group_id, user.id): + return + db = get_database() record = db.get_new_user_probation(user.id, group_config.group_id) diff --git a/src/bot/handlers/check.py b/src/bot/handlers/check.py index 6da57f0..33a9e7d 100644 --- a/src/bot/handlers/check.py +++ b/src/bot/handlers/check.py @@ -60,15 +60,20 @@ async def _build_check_response( db = get_database() is_whitelisted = db.is_user_photo_whitelisted(user_id) + is_trusted = db.is_user_trusted(user_id) is True if result.is_complete: action_prompt = ADMIN_CHECK_ACTION_COMPLETE + buttons: list[InlineKeyboardButton] = [] if is_whitelisted: - keyboard = InlineKeyboardMarkup([ - [InlineKeyboardButton("❌ Unverify User", callback_data=f"unverify:{user_id}")] - ]) - else: - keyboard = None + buttons.append( + InlineKeyboardButton("❌ Unverify User", callback_data=f"unverify:{user_id}") + ) + if is_trusted: + buttons.append( + InlineKeyboardButton("🤝 Untrust User", callback_data=f"untrust:{user_id}") + ) + keyboard = InlineKeyboardMarkup([buttons]) if buttons else None else: action_prompt = ADMIN_CHECK_ACTION_INCOMPLETE # Store missing items in callback data (photo,username format) @@ -77,10 +82,18 @@ async def _build_check_response( missing_code += "p" if not result.has_username: missing_code += "u" + + trust_button = ( + InlineKeyboardButton("🤝 Untrust User", callback_data=f"untrust:{user_id}") + if is_trusted + else InlineKeyboardButton("🤝 Trust User", callback_data=f"trust:{user_id}") + ) + keyboard = InlineKeyboardMarkup([ [ InlineKeyboardButton("⚠️ Warn User", callback_data=f"warn:{user_id}:{missing_code}"), InlineKeyboardButton("✅ Verify User", callback_data=f"verify:{user_id}"), + trust_button, ] ]) diff --git a/src/bot/handlers/duplicate_spam.py b/src/bot/handlers/duplicate_spam.py index 391aa7f..e6844eb 100644 --- a/src/bot/handlers/duplicate_spam.py +++ b/src/bot/handlers/duplicate_spam.py @@ -28,7 +28,7 @@ RESTRICTED_PERMISSIONS, ) from bot.group_config import GroupConfig, get_group_config_for_update -from bot.services.telegram_utils import get_user_mention +from bot.services.telegram_utils import get_user_mention, is_user_admin_or_trusted logger = logging.getLogger(__name__) @@ -117,8 +117,7 @@ async def handle_duplicate_spam( if user.is_bot: return - admin_ids = context.bot_data.get("group_admin_ids", {}).get(group_config.group_id, []) - if user.id in admin_ids: + if is_user_admin_or_trusted(context, group_config.group_id, user.id): return text = update.message.text or update.message.caption diff --git a/src/bot/handlers/trust.py b/src/bot/handlers/trust.py new file mode 100644 index 0000000..63ae7eb --- /dev/null +++ b/src/bot/handlers/trust.py @@ -0,0 +1,305 @@ +"""Trusted-user command handlers for anti-spam bypass management.""" + +import logging +from datetime import UTC + +from telegram import Update +from telegram.ext import ContextTypes + +from bot.constants import ( + TRUST_ADDED_MESSAGE, + TRUST_ALREADY_EXISTS_MESSAGE, + TRUST_LIST_EMPTY_MESSAGE, + TRUST_LIST_HEADER, + TRUST_REMOVED_MESSAGE, + TRUST_USER_ID_INVALID_MESSAGE, + TRUST_USER_ID_REQUIRED_MESSAGE, + TRUST_USER_NOT_FOUND_MESSAGE, +) +from bot.database.service import DatabaseService, get_database +from bot.group_config import GroupRegistry, get_group_registry +from bot.services.telegram_utils import extract_forwarded_user, unrestrict_user + +logger = logging.getLogger(__name__) + + +def _add_trusted_cache(context: ContextTypes.DEFAULT_TYPE, user_id: int) -> None: + trusted_ids = set(context.bot_data.get("trusted_user_ids", [])) + trusted_ids.add(user_id) + context.bot_data["trusted_user_ids"] = list(trusted_ids) + + +def _remove_trusted_cache(context: ContextTypes.DEFAULT_TYPE, user_id: int) -> None: + trusted_ids = set(context.bot_data.get("trusted_user_ids", [])) + trusted_ids.discard(user_id) + context.bot_data["trusted_user_ids"] = list(trusted_ids) + + +def _resolve_target_user_id(update: Update, args: list[str]) -> int: + if args: + try: + return int(args[0]) + except ValueError as exc: + raise ValueError(TRUST_USER_ID_INVALID_MESSAGE) from exc + + if update.message: + forwarded = extract_forwarded_user(update.message) + if forwarded: + return forwarded[0] + + raise ValueError(TRUST_USER_ID_REQUIRED_MESSAGE) + + +async def trust_user( + bot: object, + db: DatabaseService, + registry: GroupRegistry, + target_user_id: int, + admin_user_id: int, +) -> tuple[int, int]: + """Add trusted user and apply cleanup side effects. + + Returns: + tuple[int, int]: (probation_clear_count, unrestrict_attempt_count) + """ + db.add_trusted_user( + user_id=target_user_id, + trusted_by_admin_id=admin_user_id, + ) + + cleared_probation = 0 + unrestricted_groups = 0 + for group_config in registry.all_groups(): + try: + if db.get_new_user_probation(target_user_id, group_config.group_id): + db.clear_new_user_probation(target_user_id, group_config.group_id) + cleared_probation += 1 + + await unrestrict_user(bot, group_config.group_id, target_user_id) + unrestricted_groups += 1 + except Exception: + logger.warning( + f"Trust side effects failed for user {target_user_id} in group {group_config.group_id}", + exc_info=True, + ) + continue + + return cleared_probation, unrestricted_groups + + +async def untrust_user( + db: DatabaseService, + target_user_id: int, +) -> None: + """Remove trusted user entry.""" + db.remove_trusted_user(user_id=target_user_id) + + +async def handle_trust_command( + update: Update, context: ContextTypes.DEFAULT_TYPE +) -> None: + """Handle /trust command in bot DM.""" + if not update.message or not update.message.from_user: + return + + if update.effective_chat and update.effective_chat.type != "private": + await update.message.reply_text( + "❌ Perintah ini hanya bisa digunakan di chat pribadi dengan bot." + ) + return + + admin_user_id = update.message.from_user.id + admin_ids = context.bot_data.get("admin_ids", []) + if admin_user_id not in admin_ids: + await update.message.reply_text("❌ Kamu tidak memiliki izin untuk menggunakan perintah ini.") + return + + try: + target_user_id = _resolve_target_user_id(update, context.args) + except ValueError as e: + await update.message.reply_text(str(e)) + return + + db = get_database() + registry = get_group_registry() + + try: + cleared_count, unrestricted_count = await trust_user( + context.bot, db, registry, target_user_id, admin_user_id + ) + _add_trusted_cache(context, target_user_id) + await update.message.reply_text( + TRUST_ADDED_MESSAGE.format( + user_id=target_user_id, + probation_clear_count=cleared_count, + unrestrict_count=unrestricted_count, + ) + ) + except ValueError: + await update.message.reply_text( + TRUST_ALREADY_EXISTS_MESSAGE.format(user_id=target_user_id) + ) + + +async def handle_untrust_command( + update: Update, context: ContextTypes.DEFAULT_TYPE +) -> None: + """Handle /untrust command in bot DM.""" + if not update.message or not update.message.from_user: + return + + if update.effective_chat and update.effective_chat.type != "private": + await update.message.reply_text( + "❌ Perintah ini hanya bisa digunakan di chat pribadi dengan bot." + ) + return + + admin_user_id = update.message.from_user.id + admin_ids = context.bot_data.get("admin_ids", []) + if admin_user_id not in admin_ids: + await update.message.reply_text("❌ Kamu tidak memiliki izin untuk menggunakan perintah ini.") + return + + try: + target_user_id = _resolve_target_user_id(update, context.args) + except ValueError as e: + await update.message.reply_text(str(e)) + return + + db = get_database() + + try: + await untrust_user(db, target_user_id) + _remove_trusted_cache(context, target_user_id) + await update.message.reply_text( + TRUST_REMOVED_MESSAGE.format(user_id=target_user_id) + ) + except ValueError: + await update.message.reply_text( + TRUST_USER_NOT_FOUND_MESSAGE.format(user_id=target_user_id) + ) + + +async def handle_trusted_list_command( + update: Update, context: ContextTypes.DEFAULT_TYPE +) -> None: + """Handle /trusted command in bot DM.""" + if not update.message or not update.message.from_user: + return + + if update.effective_chat and update.effective_chat.type != "private": + await update.message.reply_text( + "❌ Perintah ini hanya bisa digunakan di chat pribadi dengan bot." + ) + return + + admin_user_id = update.message.from_user.id + admin_ids = context.bot_data.get("admin_ids", []) + if admin_user_id not in admin_ids: + await update.message.reply_text("❌ Kamu tidak memiliki izin untuk menggunakan perintah ini.") + return + + db = get_database() + trusted_users = db.get_trusted_users() + + if not trusted_users: + await update.message.reply_text(TRUST_LIST_EMPTY_MESSAGE) + return + + trusted_lines = [] + for record in trusted_users: + trusted_at = record.trusted_at + if trusted_at.tzinfo is None: + trusted_at = trusted_at.replace(tzinfo=UTC) + trusted_at_display = trusted_at.astimezone(UTC).strftime("%Y-%m-%d %H:%M UTC") + trusted_lines.append( + "• `{user_id}` — oleh admin `{admin_id}` pada `{trusted_at}`".format( + user_id=record.user_id, + admin_id=record.trusted_by_admin_id, + trusted_at=trusted_at_display, + ) + ) + + await update.message.reply_text( + TRUST_LIST_HEADER.format(trusted_lines="\n".join(trusted_lines)), + parse_mode="Markdown", + ) + + +async def handle_trust_callback( + update: Update, context: ContextTypes.DEFAULT_TYPE +) -> None: + """Handle trust callback button.""" + query = update.callback_query + if not query or not query.from_user or not query.data: + return + + await query.answer() + + admin_user_id = query.from_user.id + admin_ids = context.bot_data.get("admin_ids", []) + if admin_user_id not in admin_ids: + await query.edit_message_text("❌ Kamu tidak memiliki izin untuk menggunakan perintah ini.") + return + + try: + target_user_id = int(query.data.split(":")[1]) + except (IndexError, ValueError): + await query.edit_message_text("❌ Data callback tidak valid.") + return + + db = get_database() + registry = get_group_registry() + + try: + cleared_count, unrestricted_count = await trust_user( + context.bot, db, registry, target_user_id, admin_user_id + ) + _add_trusted_cache(context, target_user_id) + await query.edit_message_text( + TRUST_ADDED_MESSAGE.format( + user_id=target_user_id, + probation_clear_count=cleared_count, + unrestrict_count=unrestricted_count, + ) + ) + except ValueError: + await query.edit_message_text( + TRUST_ALREADY_EXISTS_MESSAGE.format(user_id=target_user_id) + ) + + +async def handle_untrust_callback( + update: Update, context: ContextTypes.DEFAULT_TYPE +) -> None: + """Handle untrust callback button.""" + query = update.callback_query + if not query or not query.from_user or not query.data: + return + + await query.answer() + + admin_user_id = query.from_user.id + admin_ids = context.bot_data.get("admin_ids", []) + if admin_user_id not in admin_ids: + await query.edit_message_text("❌ Kamu tidak memiliki izin untuk menggunakan perintah ini.") + return + + try: + target_user_id = int(query.data.split(":")[1]) + except (IndexError, ValueError): + await query.edit_message_text("❌ Data callback tidak valid.") + return + + db = get_database() + + try: + await untrust_user(db, target_user_id) + _remove_trusted_cache(context, target_user_id) + await query.edit_message_text( + TRUST_REMOVED_MESSAGE.format(user_id=target_user_id) + ) + except ValueError: + await query.edit_message_text( + TRUST_USER_NOT_FOUND_MESSAGE.format(user_id=target_user_id) + ) diff --git a/src/bot/main.py b/src/bot/main.py index f59f59b..4babba5 100644 --- a/src/bot/main.py +++ b/src/bot/main.py @@ -15,7 +15,7 @@ from telegram.ext import Application, CallbackQueryHandler, CommandHandler, ContextTypes, MessageHandler, filters from bot.config import get_settings -from bot.database.service import init_database +from bot.database.service import get_database, init_database from bot.group_config import get_group_registry, init_group_registry from bot.handlers import captcha from bot.handlers.anti_spam import handle_contact_spam, handle_inline_keyboard_spam, handle_new_user_spam @@ -34,6 +34,13 @@ handle_check_forwarded_message, handle_warn_callback, ) +from bot.handlers.trust import ( + handle_trust_callback, + handle_trust_command, + handle_trusted_list_command, + handle_untrust_callback, + handle_untrust_command, +) from bot.services.scheduler import auto_restrict_expired_warnings from bot.services.telegram_utils import fetch_group_admin_ids @@ -190,6 +197,12 @@ async def post_init(application: Application) -> None: # type: ignore[type-arg] application.bot_data["admin_ids"] = list(all_admin_ids) # type: ignore[index] logger.info(f"Total unique admins across all groups: {len(all_admin_ids)}") + # Preload trusted users cache + db = get_database() + trusted_ids = list(db.get_trusted_user_ids()) + application.bot_data["trusted_user_ids"] = trusted_ids # type: ignore[index] + logger.info(f"Loaded {len(trusted_ids)} trusted user(s) into cache") + # Recover pending captcha verifications for groups with captcha enabled has_captcha = any(gc.captcha_enabled for gc in registry.all_groups()) if has_captcha: @@ -264,6 +277,24 @@ def main() -> None: ) logger.info("Registered handler: check_command (group=0)") + # Handler: /trust command - allows admins to trust users for spam bypass in DM + application.add_handler( + CommandHandler("trust", handle_trust_command) + ) + logger.info("Registered handler: trust_command (group=0)") + + # Handler: /untrust command - allows admins to remove users from trusted list in DM + application.add_handler( + CommandHandler("untrust", handle_untrust_command) + ) + logger.info("Registered handler: untrust_command (group=0)") + + # Handler: /trusted command - list all trusted users in DM + application.add_handler( + CommandHandler("trusted", handle_trusted_list_command) + ) + logger.info("Registered handler: trusted_list_command (group=0)") + # Handler: Forwarded message handler - allows admins to check profiles via forward application.add_handler( MessageHandler( @@ -286,6 +317,14 @@ def main() -> None: CallbackQueryHandler(handle_warn_callback, pattern=r"^warn:\d+:") ) logger.info("Registered handler: warn_callback (group=0)") + application.add_handler( + CallbackQueryHandler(handle_trust_callback, pattern=r"^trust:\d+$") + ) + logger.info("Registered handler: trust_callback (group=0)") + application.add_handler( + CallbackQueryHandler(handle_untrust_callback, pattern=r"^untrust:\d+$") + ) + logger.info("Registered handler: untrust_callback (group=0)") # Handler 6: Captcha handlers - new member verification for handler in captcha.get_handlers(): diff --git a/src/bot/services/telegram_utils.py b/src/bot/services/telegram_utils.py index fa5334e..ad918f6 100644 --- a/src/bot/services/telegram_utils.py +++ b/src/bot/services/telegram_utils.py @@ -12,6 +12,8 @@ from telegram.error import BadRequest, Forbidden from telegram.helpers import escape_markdown, mention_markdown +from bot.database.service import get_database + logger = logging.getLogger(__name__) @@ -132,10 +134,10 @@ async def unrestrict_user( def extract_forwarded_user(message: Message) -> tuple[int, str] | None: """ Extract user ID and name from a forwarded message. - + Args: message: Telegram Message object that was forwarded. - + Returns: Tuple of (user_id, user_name) if extraction successful, None otherwise. """ @@ -148,12 +150,52 @@ def extract_forwarded_user(message: Message) -> tuple[int, str] | None: if not forwarded_user: return None - + user_id = forwarded_user.id user_name = forwarded_user.full_name if hasattr(forwarded_user, 'full_name') else forwarded_user.first_name return user_id, user_name +def is_user_admin_or_trusted(context: object, group_id: int, user_id: int) -> bool: + """ + Check whether a user is an admin or trusted user for bypass decisions. + + Args: + context: Telegram callback context. + group_id: Telegram group ID. + user_id: Telegram user ID. + + Returns: + bool: True if user should bypass spam checks. + """ + bot_data = getattr(context, "bot_data", {}) + + admin_ids = bot_data.get("group_admin_ids", {}).get(group_id, []) + if user_id in admin_ids: + return True + + trusted_ids = set(bot_data.get("trusted_user_ids", [])) + if user_id in trusted_ids: + return True + + try: + db = get_database() + if db.is_user_trusted(user_id=user_id, group_id=group_id): + trusted_ids.add(user_id) + bot_data["trusted_user_ids"] = list(trusted_ids) + return True + except RuntimeError: + return False + except Exception: + logger.error( + f"Failed trusted user lookup: user_id={user_id}, group_id={group_id}", + exc_info=True, + ) + return False + + return False + + async def fetch_group_admin_ids(bot: Bot, group_id: int) -> list[int]: """ Fetch all administrator user IDs from a group. diff --git a/tests/test_anti_spam.py b/tests/test_anti_spam.py index 583b7a4..a2d35a0 100644 --- a/tests/test_anti_spam.py +++ b/tests/test_anti_spam.py @@ -464,6 +464,22 @@ async def test_ignores_bot_messages( mock_update.message.delete.assert_not_called() + @pytest.mark.asyncio + async def test_ignores_trusted_users( + self, mock_update, mock_context, group_config + ): + """Test that trusted users bypass probation spam enforcement.""" + mock_update.message.forward_origin = MagicMock() # would otherwise violate + mock_context.bot_data = { + "group_admin_ids": {}, + "trusted_user_ids": [mock_update.message.from_user.id], + } + + with patch("bot.handlers.anti_spam.get_group_config_for_update", return_value=group_config): + await handle_new_user_spam(mock_update, mock_context) + + mock_update.message.delete.assert_not_called() + @pytest.mark.asyncio async def test_ignores_user_not_on_probation( self, mock_update, mock_context, group_config @@ -1397,6 +1413,20 @@ async def test_admin_user_returns_early(self, mock_update, mock_context, group_c mock_update.message.delete.assert_not_called() mock_context.bot.restrict_chat_member.assert_not_called() + async def test_trusted_user_returns_early(self, mock_update, mock_context, group_config): + """Test that trusted user with inline keyboard spam is NOT restricted.""" + self._add_spam_inline_keyboard(mock_update) + mock_context.bot_data = { + "group_admin_ids": {}, + "trusted_user_ids": [mock_update.message.from_user.id], + } + + with patch("bot.handlers.anti_spam.get_group_config_for_update", return_value=group_config): + await handle_inline_keyboard_spam(mock_update, mock_context) + + mock_update.message.delete.assert_not_called() + mock_context.bot.restrict_chat_member.assert_not_called() + async def test_raises_application_handler_stop(self, mock_update, mock_context, group_config): """Test that handler raises ApplicationHandlerStop after processing spam.""" from telegram.ext import ApplicationHandlerStop @@ -1553,6 +1583,18 @@ async def test_admin_user_returns_early(self, mock_update, mock_context, group_c mock_update.message.delete.assert_not_called() + async def test_trusted_user_returns_early(self, mock_update, mock_context, group_config): + """Test that trusted user with contact is NOT blocked.""" + mock_context.bot_data = { + "group_admin_ids": {}, + "trusted_user_ids": [mock_update.message.from_user.id], + } + + with patch("bot.handlers.anti_spam.get_group_config_for_update", return_value=group_config): + await handle_contact_spam(mock_update, mock_context) + + mock_update.message.delete.assert_not_called() + async def test_continues_when_delete_fails( self, mock_update, mock_context, group_config ): diff --git a/tests/test_check.py b/tests/test_check.py index 7ae071a..b097af4 100644 --- a/tests/test_check.py +++ b/tests/test_check.py @@ -173,6 +173,7 @@ async def test_check_command_incomplete_profile(self, mock_update, mock_context) mock_db = MagicMock() mock_db.is_user_photo_whitelisted.return_value = False + mock_db.is_user_trusted.return_value = False with ( patch( @@ -194,6 +195,58 @@ async def test_check_command_incomplete_profile(self, mock_update, mock_context) assert any("warn:555666" in data for data in callback_data) assert any("verify:555666" in data for data in callback_data) + async def test_check_command_incomplete_profile_shows_trust_button( + self, mock_update, mock_context + ): + """Shows trust button for incomplete non-trusted users.""" + mock_context.args = ["555666"] + + incomplete_result = ProfileCheckResult( + has_profile_photo=False, has_username=False + ) + + mock_db = MagicMock() + mock_db.is_user_photo_whitelisted.return_value = False + mock_db.is_user_trusted.return_value = False + + with ( + patch("bot.handlers.check.check_user_profile", return_value=incomplete_result), + patch("bot.handlers.check.get_database", return_value=mock_db), + ): + await handle_check_command(mock_update, mock_context) + + keyboard = mock_update.message.reply_text.call_args.kwargs.get("reply_markup") + assert keyboard is not None + buttons = keyboard.inline_keyboard[0] + callback_data = [btn.callback_data for btn in buttons] + assert any(data == "trust:555666" for data in callback_data) + + async def test_check_command_complete_profile_shows_untrust_button_when_trusted( + self, mock_update, mock_context + ): + """Shows untrust button for trusted users.""" + mock_context.args = ["555666"] + + complete_result = ProfileCheckResult( + has_profile_photo=True, has_username=True + ) + + mock_db = MagicMock() + mock_db.is_user_photo_whitelisted.return_value = False + mock_db.is_user_trusted.return_value = True + + with ( + patch("bot.handlers.check.check_user_profile", return_value=complete_result), + patch("bot.handlers.check.get_database", return_value=mock_db), + ): + await handle_check_command(mock_update, mock_context) + + keyboard = mock_update.message.reply_text.call_args.kwargs.get("reply_markup") + assert keyboard is not None + buttons = keyboard.inline_keyboard[0] + callback_data = [btn.callback_data for btn in buttons] + assert any(data == "untrust:555666" for data in callback_data) + async def test_check_command_only_private(self, mock_update, mock_context): """Command only works in private chat.""" mock_update.effective_chat.type = "group" diff --git a/tests/test_database.py b/tests/test_database.py index 60f778a..97f5ebd 100644 --- a/tests/test_database.py +++ b/tests/test_database.py @@ -387,3 +387,45 @@ def test_does_not_return_restricted_warnings(self, db_service): group_id=-100999, threshold=timedelta(minutes=0) ) assert result == [] + + +class TestTrustedUsers: + def test_add_trusted_user_and_check_true(self, db_service: DatabaseService): + db_service.add_trusted_user(user_id=123, trusted_by_admin_id=999) + + assert db_service.is_user_trusted(user_id=123) is True + + def test_add_trusted_user_duplicate_raises(self, db_service: DatabaseService): + db_service.add_trusted_user(user_id=123, trusted_by_admin_id=999) + + with pytest.raises(ValueError): + db_service.add_trusted_user(user_id=123, trusted_by_admin_id=999) + + def test_remove_trusted_user(self, db_service: DatabaseService): + db_service.add_trusted_user(user_id=123, trusted_by_admin_id=999) + + db_service.remove_trusted_user(user_id=123) + + assert db_service.is_user_trusted(user_id=123) is False + + def test_remove_missing_trusted_user_raises(self, db_service: DatabaseService): + with pytest.raises(ValueError): + db_service.remove_trusted_user(user_id=123) + + def test_get_trusted_user_ids(self, db_service: DatabaseService): + db_service.add_trusted_user(user_id=1001, trusted_by_admin_id=999) + db_service.add_trusted_user(user_id=1002, trusted_by_admin_id=999) + + trusted_ids = db_service.get_trusted_user_ids() + + assert trusted_ids == {1001, 1002} + + def test_get_trusted_users_returns_metadata(self, db_service: DatabaseService): + db_service.add_trusted_user(user_id=2001, trusted_by_admin_id=9001) + db_service.add_trusted_user(user_id=2002, trusted_by_admin_id=9002) + + trusted_users = db_service.get_trusted_users() + + assert len(trusted_users) == 2 + assert {u.user_id for u in trusted_users} == {2001, 2002} + assert {u.trusted_by_admin_id for u in trusted_users} == {9001, 9002} diff --git a/tests/test_duplicate_spam.py b/tests/test_duplicate_spam.py index c68135c..0393a8a 100644 --- a/tests/test_duplicate_spam.py +++ b/tests/test_duplicate_spam.py @@ -219,6 +219,21 @@ async def test_skips_admins(self, mock_update, mock_context, group_config): await handle_duplicate_spam(mock_update, mock_context) mock_update.message.delete.assert_not_called() + async def test_skips_trusted_users(self, mock_update, mock_context, group_config): + now = datetime.now(UTC) + norm = normalize_text(mock_update.message.text) + existing_dq = deque([ + RecentMessage(timestamp=now, normalized_text=norm, message_id=99), + ]) + mock_context.bot_data[RECENT_MESSAGES_KEY] = {(-100, 42): existing_dq} + mock_context.bot_data["trusted_user_ids"] = [mock_update.message.from_user.id] + + with patch("bot.handlers.duplicate_spam.get_group_config_for_update", return_value=group_config): + await handle_duplicate_spam(mock_update, mock_context) + + mock_update.message.delete.assert_not_called() + mock_context.bot.restrict_chat_member.assert_not_called() + async def test_skips_no_text(self, mock_update, mock_context, group_config): mock_update.message.text = None mock_update.message.caption = None diff --git a/tests/test_telegram_utils.py b/tests/test_telegram_utils.py index dfe0ea0..90a194c 100644 --- a/tests/test_telegram_utils.py +++ b/tests/test_telegram_utils.py @@ -9,6 +9,7 @@ get_user_mention, get_user_mention_by_id, get_user_status, + is_user_admin_or_trusted, unrestrict_user, ) @@ -426,6 +427,52 @@ async def test_get_user_status_with_large_ids(self, mock_bot): ) +class TestIsUserAdminOrTrusted: + def test_returns_true_for_group_admin(self): + context = MagicMock() + context.bot_data = {"group_admin_ids": {-100: [123]}, "trusted_user_ids": []} + + assert is_user_admin_or_trusted(context, -100, 123) is True + + def test_returns_true_for_trusted_cache(self): + context = MagicMock() + context.bot_data = {"group_admin_ids": {-100: []}, "trusted_user_ids": [123]} + + assert is_user_admin_or_trusted(context, -100, 123) is True + + @patch("bot.services.telegram_utils.get_database") + def test_returns_true_for_db_fallback_and_updates_cache(self, mock_get_database): + context = MagicMock() + context.bot_data = {"group_admin_ids": {-100: []}, "trusted_user_ids": []} + + db = MagicMock() + db.is_user_trusted.return_value = True + mock_get_database.return_value = db + + assert is_user_admin_or_trusted(context, -100, 321) is True + assert 321 in context.bot_data["trusted_user_ids"] + + @patch("bot.services.telegram_utils.get_database") + def test_returns_false_when_database_not_initialized(self, mock_get_database): + context = MagicMock() + context.bot_data = {"group_admin_ids": {-100: []}, "trusted_user_ids": []} + + mock_get_database.side_effect = RuntimeError("Database not initialized") + + assert is_user_admin_or_trusted(context, -100, 321) is False + + @patch("bot.services.telegram_utils.get_database") + def test_returns_false_on_db_lookup_error(self, mock_get_database): + context = MagicMock() + context.bot_data = {"group_admin_ids": {-100: []}, "trusted_user_ids": []} + + db = MagicMock() + db.is_user_trusted.side_effect = Exception("DB error") + mock_get_database.return_value = db + + assert is_user_admin_or_trusted(context, -100, 321) is False + + class TestFetchGroupAdminIds: async def test_fetch_single_admin(self, mock_bot): """Test fetching admin IDs when there is one admin.""" diff --git a/tests/test_trust_handler.py b/tests/test_trust_handler.py new file mode 100644 index 0000000..b47819b --- /dev/null +++ b/tests/test_trust_handler.py @@ -0,0 +1,367 @@ +"""Tests for trusted user command handlers.""" + +import tempfile +from pathlib import Path +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from bot.database.service import get_database, init_database, reset_database +from bot.group_config import GroupConfig, GroupRegistry +from bot.handlers.trust import ( + handle_trust_callback, + handle_trust_command, + handle_trusted_list_command, + handle_untrust_callback, + handle_untrust_command, +) + + +@pytest.fixture(autouse=True) +def temp_db(): + with tempfile.TemporaryDirectory() as tmpdir: + db_path = Path(tmpdir) / "test.db" + reset_database() + init_database(str(db_path)) + yield db_path + reset_database() + + +@pytest.fixture +def mock_registry(): + registry = GroupRegistry() + registry.register(GroupConfig(group_id=-1001, warning_topic_id=11)) + registry.register(GroupConfig(group_id=-1002, warning_topic_id=12)) + return registry + + +@pytest.fixture +def mock_update(): + update = MagicMock() + update.message = MagicMock() + update.message.from_user = MagicMock() + update.message.from_user.id = 12345 + update.message.from_user.full_name = "Admin User" + update.message.reply_text = AsyncMock() + update.effective_chat = MagicMock() + update.effective_chat.type = "private" + update.message.forward_origin = None + update.message.forward_from = None + return update + + +@pytest.fixture +def mock_context(): + context = MagicMock() + context.bot = MagicMock() + context.bot_data = {"admin_ids": [12345], "trusted_user_ids": []} + context.args = [] + return context + + +class TestTrustCommands: + async def test_trust_command_no_message_returns_early(self, mock_context): + update = MagicMock() + update.message = None + + await handle_trust_command(update, mock_context) + + async def test_trust_command_no_from_user_returns_early(self, mock_context): + update = MagicMock() + update.message = MagicMock() + update.message.from_user = None + + await handle_trust_command(update, mock_context) + + async def test_trust_command_requires_admin(self, mock_update, mock_context): + mock_update.message.from_user.id = 99999 + mock_context.args = ["1111"] + + await handle_trust_command(mock_update, mock_context) + + mock_update.message.reply_text.assert_called_once() + assert "izin" in mock_update.message.reply_text.call_args.args[0] + + async def test_trust_command_requires_private_chat(self, mock_update, mock_context): + mock_update.effective_chat.type = "group" + mock_context.args = ["1111"] + + await handle_trust_command(mock_update, mock_context) + + mock_update.message.reply_text.assert_called_once() + assert "chat pribadi" in mock_update.message.reply_text.call_args.args[0] + + async def test_trust_command_invalid_user_id(self, mock_update, mock_context): + mock_context.args = ["abc"] + + await handle_trust_command(mock_update, mock_context) + + mock_update.message.reply_text.assert_called_once() + assert "angka" in mock_update.message.reply_text.call_args.args[0] + + async def test_trust_command_missing_user_id_and_no_forward(self, mock_update, mock_context): + mock_context.args = [] + + await handle_trust_command(mock_update, mock_context) + + mock_update.message.reply_text.assert_called_once() + assert "penggunaan" in mock_update.message.reply_text.call_args.args[0].lower() + + async def test_trust_command_success_by_user_id( + self, mock_update, mock_context, mock_registry, monkeypatch + ): + monkeypatch.setattr("bot.handlers.trust.get_group_registry", lambda: mock_registry) + mock_context.args = ["1111"] + + unrestrict = AsyncMock() + monkeypatch.setattr("bot.handlers.trust.unrestrict_user", unrestrict) + + db = get_database() + db.start_new_user_probation(user_id=1111, group_id=-1001) + db.start_new_user_probation(user_id=1111, group_id=-1002) + + await handle_trust_command(mock_update, mock_context) + + assert db.is_user_trusted(1111) is True + assert 1111 in mock_context.bot_data["trusted_user_ids"] + assert db.get_new_user_probation(1111, -1001) is None + assert db.get_new_user_probation(1111, -1002) is None + assert unrestrict.await_count == 2 + assert "ditambahkan" in mock_update.message.reply_text.call_args.args[0].lower() + + async def test_trust_command_success_from_forwarded_message( + self, mock_update, mock_context, mock_registry, monkeypatch + ): + monkeypatch.setattr("bot.handlers.trust.get_group_registry", lambda: mock_registry) + monkeypatch.setattr("bot.handlers.trust.unrestrict_user", AsyncMock()) + + forwarded_user = MagicMock() + forwarded_user.id = 4444 + forwarded_user.full_name = "Forwarded User" + mock_update.message.forward_from = forwarded_user + + await handle_trust_command(mock_update, mock_context) + + assert get_database().is_user_trusted(4444) is True + + async def test_trust_command_duplicate(self, mock_update, mock_context, mock_registry, monkeypatch): + monkeypatch.setattr("bot.handlers.trust.get_group_registry", lambda: mock_registry) + monkeypatch.setattr("bot.handlers.trust.unrestrict_user", AsyncMock()) + mock_context.args = ["1111"] + + db = get_database() + db.add_trusted_user(user_id=1111, trusted_by_admin_id=12345) + + await handle_trust_command(mock_update, mock_context) + + mock_update.message.reply_text.assert_called_once() + assert "sudah" in mock_update.message.reply_text.call_args.args[0].lower() + + async def test_trust_command_continues_on_unrestrict_error( + self, mock_update, mock_context, mock_registry, monkeypatch + ): + monkeypatch.setattr("bot.handlers.trust.get_group_registry", lambda: mock_registry) + + unrestrict = AsyncMock(side_effect=[Exception("failed"), None]) + monkeypatch.setattr("bot.handlers.trust.unrestrict_user", unrestrict) + mock_context.args = ["2111"] + + await handle_trust_command(mock_update, mock_context) + + assert get_database().is_user_trusted(2111) is True + + async def test_untrust_command_requires_private_chat(self, mock_update, mock_context): + mock_update.effective_chat.type = "group" + mock_context.args = ["2222"] + + await handle_untrust_command(mock_update, mock_context) + + mock_update.message.reply_text.assert_called_once() + assert "chat pribadi" in mock_update.message.reply_text.call_args.args[0] + + async def test_untrust_command_requires_admin(self, mock_update, mock_context): + mock_update.message.from_user.id = 99999 + mock_context.args = ["2222"] + + await handle_untrust_command(mock_update, mock_context) + + mock_update.message.reply_text.assert_called_once() + assert "izin" in mock_update.message.reply_text.call_args.args[0] + + async def test_untrust_command_no_message_returns_early(self, mock_context): + update = MagicMock() + update.message = None + + await handle_untrust_command(update, mock_context) + + async def test_untrust_command_no_from_user_returns_early(self, mock_context): + update = MagicMock() + update.message = MagicMock() + update.message.from_user = None + + await handle_untrust_command(update, mock_context) + + async def test_untrust_command_invalid_user_id(self, mock_update, mock_context): + mock_context.args = ["abc"] + + await handle_untrust_command(mock_update, mock_context) + + mock_update.message.reply_text.assert_called_once() + assert "angka" in mock_update.message.reply_text.call_args.args[0] + + async def test_untrust_command_missing_user_id_and_no_forward(self, mock_update, mock_context): + mock_context.args = [] + + await handle_untrust_command(mock_update, mock_context) + + mock_update.message.reply_text.assert_called_once() + assert "penggunaan" in mock_update.message.reply_text.call_args.args[0].lower() + + async def test_untrust_command_success(self, mock_update, mock_context): + mock_context.args = ["2222"] + + db = get_database() + db.add_trusted_user(user_id=2222, trusted_by_admin_id=12345) + mock_context.bot_data["trusted_user_ids"] = [2222] + + await handle_untrust_command(mock_update, mock_context) + + assert db.is_user_trusted(2222) is False + assert 2222 not in mock_context.bot_data["trusted_user_ids"] + assert "dihapus" in mock_update.message.reply_text.call_args.args[0].lower() + + async def test_untrust_command_missing_user(self, mock_update, mock_context): + mock_context.args = ["3333"] + + await handle_untrust_command(mock_update, mock_context) + + mock_update.message.reply_text.assert_called_once() + assert "tidak ada" in mock_update.message.reply_text.call_args.args[0].lower() + + async def test_trusted_list_command_no_message_returns_early(self, mock_context): + update = MagicMock() + update.message = None + + await handle_trusted_list_command(update, mock_context) + + async def test_trusted_list_command_no_from_user_returns_early(self, mock_context): + update = MagicMock() + update.message = MagicMock() + update.message.from_user = None + + await handle_trusted_list_command(update, mock_context) + + async def test_trusted_list_command_requires_private_chat(self, mock_update, mock_context): + mock_update.effective_chat.type = "group" + + await handle_trusted_list_command(mock_update, mock_context) + + mock_update.message.reply_text.assert_called_once() + assert "chat pribadi" in mock_update.message.reply_text.call_args.args[0] + + async def test_trusted_list_command_requires_admin(self, mock_update, mock_context): + mock_update.message.from_user.id = 99999 + + await handle_trusted_list_command(mock_update, mock_context) + + mock_update.message.reply_text.assert_called_once() + assert "izin" in mock_update.message.reply_text.call_args.args[0] + + async def test_trusted_list_command_empty(self, mock_update, mock_context): + await handle_trusted_list_command(mock_update, mock_context) + + mock_update.message.reply_text.assert_called_once() + assert "kosong" in mock_update.message.reply_text.call_args.args[0].lower() + + async def test_trusted_list_command(self, mock_update, mock_context): + db = get_database() + db.add_trusted_user(user_id=8001, trusted_by_admin_id=12345) + db.add_trusted_user(user_id=8002, trusted_by_admin_id=54321) + + await handle_trusted_list_command(mock_update, mock_context) + + message = mock_update.message.reply_text.call_args.args[0] + assert "8001" in message + assert "8002" in message + assert "12345" in message + assert "54321" in message + assert "UTC" in message + + +class TestTrustCallbacks: + @pytest.fixture + def mock_callback_update(self): + update = MagicMock() + update.callback_query = MagicMock() + update.callback_query.from_user = MagicMock() + update.callback_query.from_user.id = 12345 + update.callback_query.from_user.full_name = "Admin User" + update.callback_query.answer = AsyncMock() + update.callback_query.edit_message_text = AsyncMock() + return update + + async def test_trust_callback_no_query_returns_early(self, mock_context): + update = MagicMock() + update.callback_query = None + + await handle_trust_callback(update, mock_context) + + async def test_trust_callback_invalid_data(self, mock_callback_update, mock_context): + mock_callback_update.callback_query.data = "trust:bad" + + await handle_trust_callback(mock_callback_update, mock_context) + + mock_callback_update.callback_query.edit_message_text.assert_called_once() + assert "callback" in mock_callback_update.callback_query.edit_message_text.call_args.args[0].lower() + + async def test_trust_callback_success(self, mock_callback_update, mock_context, mock_registry, monkeypatch): + monkeypatch.setattr("bot.handlers.trust.get_group_registry", lambda: mock_registry) + monkeypatch.setattr("bot.handlers.trust.unrestrict_user", AsyncMock()) + mock_callback_update.callback_query.data = "trust:7001" + + await handle_trust_callback(mock_callback_update, mock_context) + + assert get_database().is_user_trusted(7001) is True + mock_callback_update.callback_query.edit_message_text.assert_called_once() + + async def test_untrust_callback_no_query_returns_early(self, mock_context): + update = MagicMock() + update.callback_query = None + + await handle_untrust_callback(update, mock_context) + + async def test_untrust_callback_invalid_data(self, mock_callback_update, mock_context): + mock_callback_update.callback_query.data = "untrust:bad" + + await handle_untrust_callback(mock_callback_update, mock_context) + + mock_callback_update.callback_query.edit_message_text.assert_called_once() + assert "callback" in mock_callback_update.callback_query.edit_message_text.call_args.args[0].lower() + + async def test_untrust_callback_success(self, mock_callback_update, mock_context): + get_database().add_trusted_user(user_id=7002, trusted_by_admin_id=12345) + mock_context.bot_data["trusted_user_ids"] = [7002] + mock_callback_update.callback_query.data = "untrust:7002" + + await handle_untrust_callback(mock_callback_update, mock_context) + + assert get_database().is_user_trusted(7002) is False + mock_callback_update.callback_query.edit_message_text.assert_called_once() + + async def test_callback_non_admin_rejected(self, mock_callback_update, mock_context): + mock_callback_update.callback_query.from_user.id = 99999 + mock_callback_update.callback_query.data = "trust:8003" + + await handle_trust_callback(mock_callback_update, mock_context) + + mock_callback_update.callback_query.edit_message_text.assert_called_once() + assert "izin" in mock_callback_update.callback_query.edit_message_text.call_args.args[0].lower() + + async def test_untrust_callback_non_admin_rejected(self, mock_callback_update, mock_context): + mock_callback_update.callback_query.from_user.id = 99999 + mock_callback_update.callback_query.data = "untrust:8003" + + await handle_untrust_callback(mock_callback_update, mock_context) + + mock_callback_update.callback_query.edit_message_text.assert_called_once() + assert "izin" in mock_callback_update.callback_query.edit_message_text.call_args.args[0].lower()