Files
2026-05-21 14:07:11 -05:00

658 lines
26 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
plugins/admin.py Full room moderation commands.
Supports multiword display names, standalone commands (!op, !kick, etc.)
Automatic flood detection:
message flood (5 msgs in 3s) → autoban + kick
join flood (5 joins in 3s, any domain) → room locked to inviteonly
"""
import time
import logging
import re
from collections import defaultdict, deque
import simplematrixbotlib as botlib
logger = logging.getLogger("admin")
# ------------------------------------------------------------------
# Displayname resolution cache
# ------------------------------------------------------------------
_pending_resolution = {} # room_id → {"matches": [...], "expires": timestamp}
_name_cache = {} # room_id → {display_name.lower(): mxid}
RESOLUTION_TIMEOUT = 60
# Flood detection settings
FLOOD_MAX_MESSAGES = 15
FLOOD_TIME_WINDOW = 3.0 # seconds
JOIN_FLOOD_MAX = 5
JOIN_FLOOD_WINDOW = 3.0 # seconds
# Per-room per-user message timestamps
_flood_tracker: dict[str, dict[str, deque[float]]] = defaultdict(lambda: defaultdict(deque))
# Per-room join event timestamps (any domain)
_join_flood_tracker: dict[str, deque[float]] = defaultdict(deque)
def _cleanup_resolutions():
now = time.time()
expired = [r for r, v in _pending_resolution.items() if v["expires"] < now]
for r in expired:
del _pending_resolution[r]
class UserResolutionError(Exception):
def __init__(self, matches):
self.matches = matches # list of {"mxid": ..., "display_name": ...}
async def _populate_name_cache(bot, room_id):
if room_id in _name_cache:
return
try:
resp = await bot.async_client.joined_members(room_id)
if resp.members is None:
return
cache = {}
for member in resp.members:
display = (member.display_name or "").strip().lower()
if display:
if display in cache:
cache[display] = None
else:
cache[display] = member.user_id
_name_cache[room_id] = cache
logger.info(f"Cached {len(cache)} display names for room {room_id}")
except Exception as e:
logger.error(f"Could not cache members: {e}")
async def _resolve_multiword(bot, room_id, tokens):
clean_tokens = [re.sub(r'<[^>]+>', '', t).strip() for t in tokens]
await _populate_name_cache(bot, room_id)
cache = _name_cache.get(room_id, {})
for end in range(len(clean_tokens), 0, -1):
candidate = " ".join(clean_tokens[:end]).strip().lower()
if candidate in cache:
mxid = cache[candidate]
if mxid is not None:
return mxid, candidate
else:
resp = await bot.async_client.joined_members(room_id)
matches = []
for member in resp.members:
if (member.display_name or "").strip().lower() == candidate:
matches.append({"mxid": member.user_id, "display_name": member.display_name})
if len(matches) == 1:
return matches[0]["mxid"], matches[0]["display_name"]
elif len(matches) > 1:
raise UserResolutionError(matches)
raise ValueError(f"No member with display name '{' '.join(clean_tokens)}' found.")
async def resolve_user_from_target(bot, room_id, target):
target = re.sub(r'<[^>]+>', '', target).strip()
if target.startswith("@"):
return target, None
_cleanup_resolutions()
if target.isdigit():
idx = int(target) - 1
if room_id in _pending_resolution:
pending = _pending_resolution[room_id]
if 0 <= idx < len(pending["matches"]):
match = pending["matches"][idx]
del _pending_resolution[room_id]
return match["mxid"], match.get("display_name")
else:
raise ValueError(f"Invalid selection {target}. Choose from the list.")
else:
raise ValueError("No pending resolution. Use @user:domain or display name.")
await _populate_name_cache(bot, room_id)
cache = _name_cache.get(room_id, {})
mxid = cache.get(target.strip().lower())
if mxid:
return mxid, target.strip().lower()
elif mxid is None:
resp = await bot.async_client.joined_members(room_id)
matches = []
for member in resp.members:
if (member.display_name or "").strip().lower() == target.strip().lower():
matches.append({"mxid": member.user_id, "display_name": member.display_name})
if len(matches) == 1:
return matches[0]["mxid"], matches[0]["display_name"]
elif len(matches) > 1:
raise UserResolutionError(matches)
raise ValueError(f"No member with display name '{target}' found.")
# ------------------------------------------------------------------
# Power level helpers
# ------------------------------------------------------------------
async def get_power_level(bot, room_id, user_id):
try:
resp = await bot.async_client.room_get_state_event(
room_id, "m.room.power_levels"
)
if resp.content:
return resp.content.get("users", {}).get(
user_id, resp.content.get("users_default", 0)
)
except Exception:
pass
return 0
async def has_mod_permission(bot, room_id, user_id, config):
if user_id == config.admin_user:
return True
pl = await get_power_level(bot, room_id, user_id)
return pl >= 50
async def fetch_power_levels(bot, room_id):
try:
resp = await bot.async_client.room_get_state_event(
room_id, "m.room.power_levels"
)
return resp.content if resp.content else {}
except Exception:
return {}
async def set_power_level(bot, room_id, user_id, new_pl):
current = await fetch_power_levels(bot, room_id)
if not current:
raise RuntimeError("Could not retrieve power levels.")
users = current.setdefault("users", {})
users[user_id] = new_pl
await bot.async_client.room_put_state(
room_id, "m.room.power_levels", current
)
async def get_banned_users(bot, room_id):
try:
resp = await bot.async_client.room_get_state(room_id)
banned = []
for event in resp.events:
if (
event.get("type") == "m.room.member"
and event.get("content", {}).get("membership") == "ban"
):
banned.append(event["state_key"])
return banned
except Exception as e:
logger.error(f"Failed to fetch bans: {e}")
return []
# ------------------------------------------------------------------
# Flood detection (message + global join)
# ------------------------------------------------------------------
def _check_flood(room_id, user_id) -> bool:
now = time.monotonic()
q = _flood_tracker[room_id][user_id]
while q and q[0] < now - FLOOD_TIME_WINDOW:
q.popleft()
q.append(now)
if len(q) >= FLOOD_MAX_MESSAGES:
q.clear()
return True
return False
def _check_join_flood(room_id) -> bool:
now = time.monotonic()
q = _join_flood_tracker[room_id]
while q and q[0] < now - JOIN_FLOOD_WINDOW:
q.popleft()
q.append(now)
if len(q) >= JOIN_FLOOD_MAX:
q.clear()
return True
return False
async def _kick_user(bot, room_id, user_id, reason):
try:
await bot.async_client.room_kick(room_id, user_id, reason)
logger.info(f"Kicked {user_id} from {room_id}: {reason}")
except Exception as e:
logger.error(f"Failed to kick {user_id}: {e}")
async def _ban_user(bot, room_id, user_id, reason):
try:
await bot.async_client.room_ban(room_id, user_id, reason)
logger.info(f"Banned {user_id} from {room_id}: {reason}")
except Exception as e:
logger.error(f"Failed to ban {user_id}: {e}")
async def _lock_room(bot, room_id):
"""Set room join rule to 'invite'."""
try:
await bot.async_client.room_put_state(
room_id,
"m.room.join_rules",
{"join_rule": "invite"}
)
logger.info(f"Room {room_id} locked to inviteonly (join flood detected).")
except Exception as e:
logger.error(f"Failed to lock room {room_id}: {e}")
# ------------------------------------------------------------------
# Main command handler
# ------------------------------------------------------------------
async def handle_command(room, message, bot, prefix, config):
match = botlib.MessageMatch(room, message, bot, prefix)
if not match.is_not_from_this_bot() or not match.prefix():
return
room_id = room.room_id
sender = message.sender
standalone_actions = {
"kick": "kick",
"ban": "ban",
"unban": "unban",
"invite": "invite",
"userinfo": "whois",
"op": "op",
"deop": "deop",
"topic": "topic",
"roomname": "roomname",
"avatar": "avatar",
"members": "members",
"bans": "bans",
"mkick": "mkick",
"joinrule": "joinrule",
"modhelp": "help",
"admin": "admin",
}
cmd = match.command()
if cmd not in standalone_actions:
return
if cmd not in ("modhelp", "help"):
if not await has_mod_permission(bot, room_id, sender, config):
await bot.api.send_text_message(
room_id, "⛔ You don't have permission to use moderator commands."
)
return
args = match.args()
if cmd == "admin":
if not args:
await bot.api.send_text_message(room_id, "Usage: !admin <action> [args...]")
return
action = args[0].lower()
sub_args = args[1:] if len(args) > 1 else []
else:
action = cmd
sub_args = args
# ------------------------------------------------------------
# Masskick by domain
# ------------------------------------------------------------
if action == "mkick":
if not sub_args:
await bot.api.send_text_message(room_id, "Usage: !mkick <domain>\nExample: !mkick evilbots.net")
return
domain = sub_args[0].strip().lower()
if ':' in domain:
domain = domain.split(':')[-1]
try:
resp = await bot.async_client.joined_members(room_id)
if not resp.members:
await bot.api.send_text_message(room_id, "Could not fetch member list.")
return
targets = [m for m in resp.members if m.user_id.endswith(f":{domain}")]
if not targets:
await bot.api.send_text_message(room_id, f"No users found from domain '{domain}'.")
return
reason = f"Masskick of domain {domain}"
count = 0
for member in targets:
await _kick_user(bot, room_id, member.user_id, reason)
count += 1
await bot.api.send_text_message(room_id, f"👢 Kicked {count} user(s) from {domain}.")
except Exception as e:
await bot.api.send_text_message(room_id, f"❌ Masskick failed: {e}")
# ------------------------------------------------------------
# Join rule toggle
# ------------------------------------------------------------
elif action == "joinrule":
if not sub_args or sub_args[0] not in ("public", "invite"):
await bot.api.send_text_message(room_id, "Usage: !joinrule <public|invite>")
return
new_rule = sub_args[0].lower()
try:
await bot.async_client.room_put_state(
room_id,
"m.room.join_rules",
{"join_rule": new_rule}
)
await bot.api.send_text_message(room_id, f"🔐 Join rule set to **{new_rule}**.")
except Exception as e:
await bot.api.send_text_message(room_id, f"❌ Failed to set join rule: {e}")
# ------------------------------------------------------------
# User-targeting actions
# ------------------------------------------------------------
elif action in ("kick", "ban", "invite", "userinfo", "op", "deop"):
if not sub_args:
await bot.api.send_text_message(
room_id, f"Missing user. Usage: !{cmd} <@user|name> [reason...]"
)
return
if action in ("op", "deop"):
potential_pl = sub_args[-1]
try:
power = int(potential_pl)
name_tokens = sub_args[:-1]
if not name_tokens:
await bot.api.send_text_message(room_id, "Missing user name.")
return
except ValueError:
name_tokens = sub_args
power = None
else:
name_tokens = sub_args
power = None
try:
target_mxid, target_display = await _resolve_multiword(bot, room_id, name_tokens)
except UserResolutionError as e:
lines = [
"Multiple users found. Reissue the command with a number:"
]
for i, m in enumerate(e.matches, 1):
lines.append(f"{i}. {m['mxid']} ({m['display_name']})")
await bot.api.send_text_message(room_id, "\n".join(lines))
return
except ValueError as e:
target_str = sub_args[0]
try:
target_mxid, target_display = await resolve_user_from_target(bot, room_id, target_str)
except Exception as e2:
await bot.api.send_text_message(room_id, str(e2))
return
if action in ("op", "deop"):
if action == "op":
requested_pl = power if power is not None else 50
if requested_pl > 50:
await bot.api.send_text_message(
room_id,
"Maximum power level for op is 50 (moderator). Setting to 50."
)
new_pl = 50
else:
new_pl = requested_pl
else:
new_pl = 0
sender_pl = await get_power_level(bot, room_id, sender)
target_pl = await get_power_level(bot, room_id, target_mxid)
if sender != config.admin_user and sender_pl <= target_pl:
await bot.api.send_text_message(
room_id,
"⛔ You can only modify users with a lower power level than yours.",
)
return
try:
await set_power_level(bot, room_id, target_mxid, new_pl)
verb = "Promoted" if action == "op" else "Demoted"
await bot.api.send_text_message(
room_id, f"{verb} {target_mxid} to power level {new_pl}."
)
except Exception as e:
await bot.api.send_text_message(room_id, f"❌ Failed to set power: {e}")
else:
reason = " ".join(sub_args[len(name_tokens):]) if len(sub_args) > len(name_tokens) else ""
if action == "kick":
try:
await bot.async_client.room_kick(room_id, target_mxid, reason)
await bot.api.send_text_message(room_id, f"👢 Kicked {target_mxid}.")
except Exception as e:
await bot.api.send_text_message(room_id, f"❌ Failed to kick: {e}")
elif action == "ban":
try:
await bot.async_client.room_ban(room_id, target_mxid, reason)
await bot.api.send_text_message(room_id, f"🚫 Banned {target_mxid}.")
except Exception as e:
await bot.api.send_text_message(room_id, f"❌ Failed to ban: {e}")
elif action == "invite":
try:
await bot.async_client.room_invite(room_id, target_mxid)
await bot.api.send_text_message(room_id, f"📨 Invited {target_mxid}.")
except Exception as e:
await bot.api.send_text_message(room_id, f"❌ Failed to invite: {e}")
elif action == "userinfo":
try:
resp = await bot.async_client.joined_members(room_id)
member_info = None
for m in resp.members:
if m.user_id == target_mxid:
member_info = m
break
pl = await get_power_level(bot, room_id, target_mxid)
display = member_info.display_name if member_info else "Unknown"
msg = (
f"**User:** `{target_mxid}`\n"
f"**Display Name:** {display}\n"
f"**Power Level:** {pl}"
)
await bot.api.send_text_message(room_id, msg)
except Exception as e:
await bot.api.send_text_message(room_id, f"❌ Failed to get userinfo: {e}")
# ------------------------------------------------------------
# UNBAN
# ------------------------------------------------------------
elif action == "unban":
if not sub_args:
await bot.api.send_text_message(room_id, "Usage: !unban <@user:domain>")
return
target_str = sub_args[0]
if not target_str.startswith("@"):
await bot.api.send_text_message(
room_id, "For unban, please provide the full Matrix ID (e.g., @user:domain)."
)
return
try:
await bot.async_client.room_unban(room_id, target_str)
await bot.api.send_text_message(room_id, f"🔓 Unbanned {target_str}.")
except Exception as e:
await bot.api.send_text_message(room_id, f"❌ Failed to unban: {e}")
# ------------------------------------------------------------
# TOPIC, ROOMNAME, AVATAR, MEMBERS, BANS, HELP ...
# ------------------------------------------------------------
elif action == "topic":
if not sub_args:
try:
resp = await bot.async_client.room_get_state_event(room_id, "m.room.topic")
topic = resp.content.get("topic", "(none)") if resp.content else "(none)"
await bot.api.send_text_message(room_id, f"📝 Topic: {topic}")
except Exception:
await bot.api.send_text_message(room_id, "Could not retrieve topic.")
else:
new_topic = " ".join(sub_args)
try:
await bot.async_client.room_put_state(room_id, "m.room.topic", {"topic": new_topic})
await bot.api.send_text_message(room_id, "📝 Topic updated.")
except Exception as e:
await bot.api.send_text_message(room_id, f"❌ Failed: {e}")
elif action == "roomname":
if not sub_args:
try:
resp = await bot.async_client.room_get_state_event(room_id, "m.room.name")
name = resp.content.get("name", "(none)") if resp.content else "(none)"
await bot.api.send_text_message(room_id, f"🏠 Room name: {name}")
except Exception:
await bot.api.send_text_message(room_id, "Could not retrieve room name.")
else:
new_name = " ".join(sub_args)
try:
await bot.async_client.room_put_state(room_id, "m.room.name", {"name": new_name})
await bot.api.send_text_message(room_id, "🏠 Room name updated.")
except Exception as e:
await bot.api.send_text_message(room_id, f"❌ Failed: {e}")
elif action == "avatar":
if not sub_args:
try:
resp = await bot.async_client.room_get_state_event(room_id, "m.room.avatar")
url = resp.content.get("url", "not set") if resp.content else "not set"
await bot.api.send_text_message(room_id, f"🖼️ Avatar URL: {url}")
except Exception:
await bot.api.send_text_message(room_id, "Could not retrieve avatar.")
else:
mxc_url = sub_args[0]
if not mxc_url.startswith("mxc://"):
await bot.api.send_text_message(room_id, "Invalid avatar URL. Must start with mxc://.")
return
try:
await bot.async_client.room_put_state(room_id, "m.room.avatar", {"url": mxc_url})
await bot.api.send_text_message(room_id, "🖼️ Avatar set.")
except Exception as e:
await bot.api.send_text_message(room_id, f"❌ Failed: {e}")
elif action == "members":
try:
resp = await bot.async_client.joined_members(room_id)
if not resp.members:
await bot.api.send_text_message(room_id, "No members found.")
return
lines = ["**Members:**"]
for member in resp.members:
display = member.display_name or member.user_id
pl = await get_power_level(bot, room_id, member.user_id)
lines.append(f"{display} (`{member.user_id}`) [PL: {pl}]")
msg = "\n".join(lines)
while len(msg) > 4000:
part = msg[:4000]
msg = msg[4000:]
await bot.api.send_text_message(room_id, part)
if msg:
await bot.api.send_text_message(room_id, msg)
except Exception as e:
await bot.api.send_text_message(room_id, f"❌ Failed to list members: {e}")
elif action == "bans":
try:
banned = await get_banned_users(bot, room_id)
if not banned:
await bot.api.send_text_message(room_id, "No banned users.")
else:
ban_list = "\n".join(f"• `{u}`" for u in banned)
await bot.api.send_text_message(room_id, f"**Banned users:**\n{ban_list}")
except Exception as e:
await bot.api.send_text_message(room_id, f"❌ Failed to fetch bans: {e}")
elif action in ("help", "modhelp"):
help_text = """
**Moderator Commands (standalone or via !admin):**
- `!kick <@user|name> [reason]` Kick a user
- `!ban <@user|name> [reason]` Ban a user
- `!unban <@user:domain>` Unban (full MXID required)
- `!invite <@user|name>` Invite a user
- `!mkick <domain>` Kick all users from the given domain
- `!joinrule <public|invite>` Manually set the room join rule
- `!userinfo <@user|name>` Show user details & power level (was `!whois`)
- `!op <@user|name> [pl=50]` Promote user (max 50, moderator)
- `!deop <@user|name>` Demote user to power level 0
- `!topic [new topic]` Show / set room topic
- `!roomname [new name]` Show / set room name
- `!avatar [mxc://...]` Show / set room avatar
- `!members` List all joined members with power levels
- `!bans` List all banned users
- `!modhelp` Show this help
Names may be **multiword**; the bot will automatically detect them.
If the name is ambiguous you'll be asked to choose from a numbered list.
"""
await bot.api.send_text_message(room_id, help_text.strip())
else:
await bot.api.send_text_message(
room_id, f"Unknown action: {action}. Use `!modhelp`."
)
# ------------------------------------------------------------------
# Plugin setup register flood detectors
# ------------------------------------------------------------------
def setup(bot):
"""Initialize the admin plugin and register flood detectors."""
# Message flood detector (bans + kicks)
@bot.listener.on_message_event
async def _message_flood(room, message):
room_id = room.room_id
sender = message.sender
if sender == bot.async_client.user_id:
return
if _check_flood(room_id, sender):
disp = sender
try:
resp = await bot.async_client.joined_members(room_id)
if resp.members:
for m in resp.members:
if m.user_id == sender:
disp = m.display_name or sender
break
except:
pass
reason = f"Autoban for flooding ({FLOOD_MAX_MESSAGES} messages in {FLOOD_TIME_WINDOW}s)"
await _ban_user(bot, room_id, sender, reason)
await _kick_user(bot, room_id, sender, reason)
# Join flood detector (any domain)
@bot.listener.on_custom_event(botlib.nio.RoomMemberEvent)
async def _join_flood(room, event):
room_id = room.room_id
if event.membership != "join":
return
sender = event.state_key
if sender == bot.async_client.user_id:
return
if _check_join_flood(room_id):
await _lock_room(bot, room_id)
await bot.api.send_text_message(
room_id,
"🔐 Join flood detected room locked to inviteonly. Use `!joinrule public` to reopen."
)
logger.info("Admin plugin flood detectors registered")
# ------------------------------------------------------------------
# Plugin metadata
# ------------------------------------------------------------------
__version__ = "1.2.3"
__author__ = "Funguy Admin"
__description__ = "Full room moderation multiword name support + flood detection + mass domain kick"
__help__ = """
<details>
<summary><strong>Admin / Moderator Commands</strong></summary>
<ul>
<li><code>!kick</code>, <code>!ban</code>, <code>!unban</code>, <code>!invite</code></li>
<li><code>!mkick &lt;domain&gt;</code> Kick all users from a domain</li>
<li><code>!joinrule &lt;public|invite&gt;</code> Change room join rule</li>
<li><code>!userinfo</code> Show user details & power level</li>
<li><code>!op</code> (max PL 50), <code>!deop</code></li>
<li><code>!topic</code>, <code>!roomname</code>, <code>!avatar</code></li>
<li><code>!members</code>, <code>!bans</code></li>
<li><code>!admin &lt;action&gt;</code> also works as a parent command</li>
</ul>
<p>Power level ≥ 50 required (or global admin).</p>
<p>Multiword display names are automatically recognized.</p>
<p><strong>Flood detection:</strong>
<ul>
<li>Message flood: 5 messages in 3 seconds → autoban + kick</li>
<li>Join flood: 5 users in 3 seconds (any domain) → room locked to inviteonly</li>
</ul>
</p>
</details>
"""