Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,15 @@ A comprehensive Telegram bot for managing group members with profile verificatio
- **New user probation**: New members restricted from sending links/forwarded messages for 3 days (configurable)
- **Contact card blocking**: Prevents all non-admin members from sharing contact cards/phone numbers (delete + restrict)
- **Anti-spam enforcement**: Tracks violations and restricts spammers after threshold
- **Trusted users**: Admin-managed trusted list to bypass anti-spam + duplicate-spam checks

### Admin Tools
- **/verify command**: Whitelist users with hidden profile pictures (DM only)
- **/unverify command**: Remove users from verification whitelist (DM only)
- **Inline verification**: Forward messages to bot for quick verify/unverify buttons
- **/trust command**: Add trusted users (DM only, supports user ID or forwarded message)
- **/untrust command**: Remove trusted users from trusted list (DM only)
- **/trusted command**: List all trusted users (DM only)
- **Automatic clearance**: Sends notification when verified users' warnings are cleared

## Requirements
Expand Down
22 changes: 22 additions & 0 deletions src/bot/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,28 @@ def format_hours_display(hours: int) -> str:

ADMIN_WARN_SENT_MESSAGE = "✅ Peringatan telah dikirim ke {user_mention} di grup."

TRUST_USER_ID_REQUIRED_MESSAGE = (
"❌ Penggunaan: /trust USER_ID atau /untrust USER_ID, atau forward pesan user ke bot."
)

TRUST_USER_ID_INVALID_MESSAGE = "❌ User ID harus berupa angka."

TRUST_ADDED_MESSAGE = (
"✅ User `{user_id}` ditambahkan ke trusted list.\n"
"• Probation dibersihkan di {probation_clear_count} grup\n"
"• Unrestrict dicoba di {unrestrict_count} grup"
)

TRUST_ALREADY_EXISTS_MESSAGE = "ℹ️ User `{user_id}` sudah ada di trusted list."

TRUST_REMOVED_MESSAGE = "✅ User `{user_id}` dihapus dari trusted list."

TRUST_USER_NOT_FOUND_MESSAGE = "ℹ️ User `{user_id}` tidak ada di trusted list."

TRUST_LIST_EMPTY_MESSAGE = "ℹ️ Trusted list masih kosong."

TRUST_LIST_HEADER = "📋 Trusted Users:\n{trusted_lines}"

# Anti-spam probation warning for new users
NEW_USER_SPAM_WARNING = (
"⚠️ {user_mention} baru bergabung dan sedang dalam masa percobaan.\n"
Expand Down
29 changes: 29 additions & 0 deletions src/bot/database/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,35 @@ class PhotoVerificationWhitelist(SQLModel, table=True):
notes: str | None = Field(default=None)


class TrustedUser(SQLModel, table=True):
"""
Trusted users who bypass anti-spam and duplicate spam enforcement.

`group_id=0` is used as a global trust scope. Future per-group trust can
use a real Telegram group ID without changing the schema.

Attributes:
id: Primary key (auto-generated).
user_id: Telegram user ID.
group_id: Scope identifier (0 = global).
trusted_by_admin_id: Telegram user ID of admin granting trust.
trusted_at: Timestamp when trust was granted.
notes: Optional admin notes.
"""

__tablename__ = "trusted_users"
__table_args__ = (
UniqueConstraint('user_id', 'group_id', name='uix_trusted_user_group'),
)

id: int | None = Field(default=None, primary_key=True)
user_id: int = Field(index=True)
group_id: int = Field(default=0, index=True)
trusted_by_admin_id: int
trusted_at: datetime = Field(default_factory=lambda: datetime.now(UTC))
notes: str | None = Field(default=None)


class PendingCaptchaValidation(SQLModel, table=True):
"""
Tracks users who need to complete captcha verification.
Expand Down
129 changes: 129 additions & 0 deletions src/bot/database/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
NewUserProbation,
PendingCaptchaValidation,
PhotoVerificationWhitelist,
TrustedUser,
UserWarning,
)

Expand Down Expand Up @@ -347,6 +348,134 @@ def remove_photo_verification_whitelist(self, user_id: int) -> None:
session.commit()
logger.info(f"Removed from photo whitelist: user_id={user_id}")

def add_trusted_user(
self,
user_id: int,
trusted_by_admin_id: int,
group_id: int = 0,
notes: str | None = None,
) -> TrustedUser:
"""
Add a user to trusted list.

Args:
user_id: Telegram user ID.
trusted_by_admin_id: Telegram user ID of admin granting trust.
group_id: Trust scope ID (0 means global).
notes: Optional admin notes.

Returns:
TrustedUser: Created trusted record.

Raises:
ValueError: If user is already trusted in the scope.
"""
with Session(self._engine) as session:
statement = select(TrustedUser).where(
TrustedUser.user_id == user_id,
TrustedUser.group_id == group_id,
)
existing = session.exec(statement).first()

if existing:
raise ValueError(f"User {user_id} is already trusted for scope {group_id}")

record = TrustedUser(
user_id=user_id,
group_id=group_id,
trusted_by_admin_id=trusted_by_admin_id,
notes=notes,
)
session.add(record)
session.commit()
session.refresh(record)
logger.info(
f"Added trusted user: user_id={user_id}, admin_id={trusted_by_admin_id}, scope={group_id}"
)
return record

def remove_trusted_user(self, user_id: int, group_id: int = 0) -> None:
"""
Remove a user from trusted list.

Args:
user_id: Telegram user ID.
group_id: Trust scope ID (0 means global).

Raises:
ValueError: If user is not trusted in the scope.
"""
with Session(self._engine) as session:
statement = select(TrustedUser).where(
TrustedUser.user_id == user_id,
TrustedUser.group_id == group_id,
)
record = session.exec(statement).first()

if not record:
raise ValueError(f"User {user_id} is not trusted for scope {group_id}")

session.delete(record)
session.commit()
logger.info(f"Removed trusted user: user_id={user_id}, scope={group_id}")

def is_user_trusted(self, user_id: int, group_id: int | None = None) -> bool:
"""
Check whether a user is trusted.

Args:
user_id: Telegram user ID.
group_id: Optional group scope. For v1, only global trust is checked.

Returns:
bool: True if user is trusted.
"""
del group_id # Reserved for future per-group trust behavior.

with Session(self._engine) as session:
statement = select(TrustedUser).where(
TrustedUser.user_id == user_id,
TrustedUser.group_id == 0,
)
record = session.exec(statement).first()
return record is not None

def get_trusted_user_ids(self, group_id: int | None = None) -> set[int]:
"""
Get trusted user IDs.

Args:
group_id: Optional group scope. For v1, only global records are returned.

Returns:
set[int]: Trusted user IDs.
"""
del group_id # Reserved for future per-group trust behavior.

with Session(self._engine) as session:
statement = select(TrustedUser.user_id).where(TrustedUser.group_id == 0)
return set(session.exec(statement).all())

def get_trusted_users(self, group_id: int | None = None) -> list[TrustedUser]:
"""
Get trusted user records with metadata.

Args:
group_id: Optional group scope. For v1, only global records are returned.

Returns:
list[TrustedUser]: Trusted user records.
"""
del group_id # Reserved for future per-group trust behavior.

with Session(self._engine) as session:
statement = (
select(TrustedUser)
.where(TrustedUser.group_id == 0)
.order_by(TrustedUser.trusted_at.desc())
)
return list(session.exec(statement).all())

def get_warnings_past_time_threshold(
self, threshold: timedelta
) -> list[UserWarning]:
Expand Down
11 changes: 6 additions & 5 deletions src/bot/handlers/anti_spam.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
)
from bot.database.service import get_database
from bot.group_config import get_group_config_for_update
from bot.services.telegram_utils import get_user_mention
from bot.services.telegram_utils import get_user_mention, is_user_admin_or_trusted

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -305,8 +305,7 @@ async def handle_contact_spam(
if user.is_bot:
return

admin_ids = context.bot_data.get("group_admin_ids", {}).get(group_config.group_id, [])
if user.id in admin_ids:
if is_user_admin_or_trusted(context, group_config.group_id, user.id):
return

msg = update.message
Expand Down Expand Up @@ -394,8 +393,7 @@ async def handle_inline_keyboard_spam(
if user.is_bot:
return

admin_ids = context.bot_data.get("group_admin_ids", {}).get(group_config.group_id, [])
if user.id in admin_ids:
if is_user_admin_or_trusted(context, group_config.group_id, user.id):
return

msg = update.message
Expand Down Expand Up @@ -488,6 +486,9 @@ async def handle_new_user_spam(
if user.is_bot:
return

if is_user_admin_or_trusted(context, group_config.group_id, user.id):
return

db = get_database()
record = db.get_new_user_probation(user.id, group_config.group_id)

Expand Down
23 changes: 18 additions & 5 deletions src/bot/handlers/check.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,20 @@ async def _build_check_response(

db = get_database()
is_whitelisted = db.is_user_photo_whitelisted(user_id)
is_trusted = db.is_user_trusted(user_id) is True

if result.is_complete:
action_prompt = ADMIN_CHECK_ACTION_COMPLETE
buttons: list[InlineKeyboardButton] = []
if is_whitelisted:
keyboard = InlineKeyboardMarkup([
[InlineKeyboardButton("❌ Unverify User", callback_data=f"unverify:{user_id}")]
])
else:
keyboard = None
buttons.append(
InlineKeyboardButton("❌ Unverify User", callback_data=f"unverify:{user_id}")
)
if is_trusted:
buttons.append(
InlineKeyboardButton("🤝 Untrust User", callback_data=f"untrust:{user_id}")
)
keyboard = InlineKeyboardMarkup([buttons]) if buttons else None
else:
action_prompt = ADMIN_CHECK_ACTION_INCOMPLETE
# Store missing items in callback data (photo,username format)
Expand All @@ -77,10 +82,18 @@ async def _build_check_response(
missing_code += "p"
if not result.has_username:
missing_code += "u"

trust_button = (
InlineKeyboardButton("🤝 Untrust User", callback_data=f"untrust:{user_id}")
if is_trusted
else InlineKeyboardButton("🤝 Trust User", callback_data=f"trust:{user_id}")
)

keyboard = InlineKeyboardMarkup([
[
InlineKeyboardButton("⚠️ Warn User", callback_data=f"warn:{user_id}:{missing_code}"),
InlineKeyboardButton("✅ Verify User", callback_data=f"verify:{user_id}"),
trust_button,
]
])

Expand Down
5 changes: 2 additions & 3 deletions src/bot/handlers/duplicate_spam.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
RESTRICTED_PERMISSIONS,
)
from bot.group_config import GroupConfig, get_group_config_for_update
from bot.services.telegram_utils import get_user_mention
from bot.services.telegram_utils import get_user_mention, is_user_admin_or_trusted

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -117,8 +117,7 @@ async def handle_duplicate_spam(
if user.is_bot:
return

admin_ids = context.bot_data.get("group_admin_ids", {}).get(group_config.group_id, [])
if user.id in admin_ids:
if is_user_admin_or_trusted(context, group_config.group_id, user.id):
return

text = update.message.text or update.message.caption
Expand Down
Loading
Loading