#!/usr/bin/env python3 """ plugins/admin.py – Full room moderation commands. Supports multi‑word display names, standalone commands (!op, !kick, etc.) """ import time import logging import simplematrixbotlib as botlib logger = logging.getLogger("admin") # ------------------------------------------------------------------ # Display‑name resolution cache # ------------------------------------------------------------------ _pending_resolution = {} # room_id → {"matches": [...], "expires": timestamp} _name_cache = {} # room_id → {display_name.lower(): mxid} RESOLUTION_TIMEOUT = 60 def _cleanup_resolutions(): now = time.time() expired = [r for r, v in _pending_resolution.items() if v["expires"] < now] for r in expired: del _pending_resolution[r] class UserResolutionError(Exception): def __init__(self, matches): self.matches = matches # list of {"mxid": ..., "display_name": ...} async def _populate_name_cache(bot, room_id): """Fetch the full member list and cache display names.""" if room_id in _name_cache: return try: resp = await bot.async_client.joined_members(room_id) if resp.members is None: return cache = {} for member in resp.members: display = (member.display_name or "").strip().lower() if display: # If duplicate display name, store None to indicate ambiguity if display in cache: cache[display] = None else: cache[display] = member.user_id _name_cache[room_id] = cache logger.info(f"Cached {len(cache)} display names for room {room_id}") except Exception as e: logger.error(f"Could not cache members: {e}") async def _resolve_multiword(bot, room_id, tokens): """ Given a list of word tokens, try to find a matching display name by testing progressively longer prefixes of the token list. Returns (mxid, display_name) or raises ValueError if no match. """ await _populate_name_cache(bot, room_id) cache = _name_cache.get(room_id, {}) # Build candidates from 1 token up to all tokens for end in range(len(tokens), 0, -1): candidate = " ".join(tokens[:end]).strip().lower() if candidate in cache: mxid = cache[candidate] if mxid is not None: return mxid, candidate else: # Duplicate display name → fall through to ambiguity handling # We'll fetch the real members for this candidate and raise UserResolutionError resp = await bot.async_client.joined_members(room_id) matches = [] for member in resp.members: if (member.display_name or "").strip().lower() == candidate: matches.append({"mxid": member.user_id, "display_name": member.display_name}) if len(matches) == 1: return matches[0]["mxid"], matches[0]["display_name"] elif len(matches) > 1: raise UserResolutionError(matches) # else: not found (unlikely) → continue raise ValueError(f"No member with display name '{' '.join(tokens)}' found.") async def resolve_user_from_target(bot, room_id, target): """ Resolve a target string to a Matrix user ID. Accepts: full MXID (@user:domain), display name (multi‑word), or number (referring to a previous ambiguous resolution). Returns (mxid, display_name_or_None). Raises ValueError or UserResolutionError. """ if target.startswith("@"): return target, None _cleanup_resolutions() # Check for number reference to a previous ambiguous match if target.isdigit(): idx = int(target) - 1 if room_id in _pending_resolution: pending = _pending_resolution[room_id] if 0 <= idx < len(pending["matches"]): match = pending["matches"][idx] del _pending_resolution[room_id] return match["mxid"], match.get("display_name") else: raise ValueError(f"Invalid selection {target}. Choose from the list.") else: raise ValueError("No pending resolution. Use @user:domain or display name.") # If we reached here, `target` is a single token, but might be part of a longer name. # That case is handled by calling _resolve_multiword from handle_command. # But for completeness, we still attempt a direct cache match. await _populate_name_cache(bot, room_id) cache = _name_cache.get(room_id, {}) mxid = cache.get(target.strip().lower()) if mxid: return mxid, target.strip().lower() elif mxid is None: # Ambiguous: fetch and raise resp = await bot.async_client.joined_members(room_id) matches = [] for member in resp.members: if (member.display_name or "").strip().lower() == target.strip().lower(): matches.append({"mxid": member.user_id, "display_name": member.display_name}) if len(matches) == 1: return matches[0]["mxid"], matches[0]["display_name"] elif len(matches) > 1: raise UserResolutionError(matches) raise ValueError(f"No member with display name '{target}' found.") # ------------------------------------------------------------------ # Power level helpers # ------------------------------------------------------------------ async def get_power_level(bot, room_id, user_id): try: resp = await bot.async_client.room_get_state_event( room_id, "m.room.power_levels" ) if resp.content: return resp.content.get("users", {}).get( user_id, resp.content.get("users_default", 0) ) except Exception: pass return 0 async def has_mod_permission(bot, room_id, user_id, config): if user_id == config.admin_user: return True pl = await get_power_level(bot, room_id, user_id) return pl >= 50 async def fetch_power_levels(bot, room_id): try: resp = await bot.async_client.room_get_state_event( room_id, "m.room.power_levels" ) return resp.content if resp.content else {} except Exception: return {} async def set_power_level(bot, room_id, user_id, new_pl): current = await fetch_power_levels(bot, room_id) if not current: raise RuntimeError("Could not retrieve power levels.") users = current.setdefault("users", {}) users[user_id] = new_pl await bot.async_client.room_put_state( room_id, "m.room.power_levels", current ) async def get_banned_users(bot, room_id): try: resp = await bot.async_client.room_get_state(room_id) banned = [] for event in resp.events: if ( event.get("type") == "m.room.member" and event.get("content", {}).get("membership") == "ban" ): banned.append(event["state_key"]) return banned except Exception as e: logger.error(f"Failed to fetch bans: {e}") return [] # ------------------------------------------------------------------ # Main command handler # ------------------------------------------------------------------ async def handle_command(room, message, bot, prefix, config): """Dispatches !admin or standalone moderation commands.""" match = botlib.MessageMatch(room, message, bot, prefix) if not match.is_not_from_this_bot() or not match.prefix(): return room_id = room.room_id sender = message.sender standalone_actions = { "kick": "kick", "ban": "ban", "unban": "unban", "invite": "invite", "whois": "whois", "op": "op", "deop": "deop", "topic": "topic", "roomname": "roomname", "avatar": "avatar", "members": "members", "bans": "bans", "modhelp": "help", "admin": "admin", } cmd = match.command() if cmd not in standalone_actions: return # Permission gate (skip for help) if cmd not in ("modhelp", "help"): if not await has_mod_permission(bot, room_id, sender, config): await bot.api.send_text_message( room_id, "⛔ You don't have permission to use moderator commands." ) return args = match.args() # Determine action and sub_args if cmd == "admin": if not args: await bot.api.send_text_message(room_id, "Usage: !admin [args...]") return action = args[0].lower() sub_args = args[1:] if len(args) > 1 else [] else: action = cmd sub_args = args # ------------------------------------------------------------ # User-targeting actions # ------------------------------------------------------------ if action in ("kick", "ban", "invite", "whois", "op", "deop"): if not sub_args: await bot.api.send_text_message( room_id, f"Missing user. Usage: !{cmd} <@user|name> [reason...]" ) return # For op/deop, the last token might be a power level (number) # For kick/ban, everything after the name is the reason. # Strategy: Try progressively longer multi-word names from the start of sub_args. # For op: if the very last token is a pure integer, it's a power level; rest is the name. # For kick/ban: everything is name except we can't know the boundary, so just try the whole thing. if action in ("op", "deop"): # Try to parse last token as power level potential_pl = sub_args[-1] try: power = int(potential_pl) # Success: power level found, name is sub_args[:-1] name_tokens = sub_args[:-1] if not name_tokens: await bot.api.send_text_message(room_id, "Missing user name.") return except ValueError: # No numeric power, whole sub_args is the name name_tokens = sub_args power = None else: # kick, ban, invite, whois name_tokens = sub_args # entire args is the name power = None # Resolve the multi-word name try: target_mxid, target_display = await _resolve_multiword(bot, room_id, name_tokens) except UserResolutionError as e: lines = [ "Multiple users found. Re‑issue the command with a number:" ] for i, m in enumerate(e.matches, 1): lines.append(f"{i}. {m['mxid']} ({m['display_name']})") await bot.api.send_text_message(room_id, "\n".join(lines)) return except ValueError as e: # Fallback: also try the old way with just the first token (maybe they used @user) target_str = sub_args[0] try: target_mxid, target_display = await resolve_user_from_target(bot, room_id, target_str) except Exception as e2: await bot.api.send_text_message(room_id, str(e2)) return # Determine reason and power level for op/deop if action in ("op", "deop"): if action == "op": requested_pl = power if power is not None else 50 if requested_pl > 50: await bot.api.send_text_message( room_id, "Maximum power level for op is 50 (moderator). Setting to 50." ) new_pl = 50 else: new_pl = requested_pl else: new_pl = 0 sender_pl = await get_power_level(bot, room_id, sender) target_pl = await get_power_level(bot, room_id, target_mxid) if sender != config.admin_user and sender_pl <= target_pl: await bot.api.send_text_message( room_id, "⛔ You can only modify users with a lower power level than yours.", ) return try: await set_power_level(bot, room_id, target_mxid, new_pl) verb = "Promoted" if action == "op" else "Demoted" await bot.api.send_text_message( room_id, f"✅ {verb} {target_mxid} to power level {new_pl}." ) except Exception as e: await bot.api.send_text_message(room_id, f"❌ Failed to set power: {e}") else: # For kick/ban/invite: reason is everything after the name tokens reason = " ".join(sub_args[len(name_tokens):]) if len(sub_args) > len(name_tokens) else "" if action == "kick": try: await bot.async_client.room_kick(room_id, target_mxid, reason) await bot.api.send_text_message(room_id, f"👢 Kicked {target_mxid}.") except Exception as e: await bot.api.send_text_message(room_id, f"❌ Failed to kick: {e}") elif action == "ban": try: await bot.async_client.room_ban(room_id, target_mxid, reason) await bot.api.send_text_message(room_id, f"🚫 Banned {target_mxid}.") except Exception as e: await bot.api.send_text_message(room_id, f"❌ Failed to ban: {e}") elif action == "invite": try: await bot.async_client.room_invite(room_id, target_mxid) await bot.api.send_text_message(room_id, f"📨 Invited {target_mxid}.") except Exception as e: await bot.api.send_text_message(room_id, f"❌ Failed to invite: {e}") elif action == "whois": try: resp = await bot.async_client.joined_members(room_id) member_info = None for m in resp.members: if m.user_id == target_mxid: member_info = m break pl = await get_power_level(bot, room_id, target_mxid) display = member_info.display_name if member_info else "Unknown" msg = ( f"**User:** `{target_mxid}`\n" f"**Display Name:** {display}\n" f"**Power Level:** {pl}" ) await bot.api.send_text_message(room_id, msg) except Exception as e: await bot.api.send_text_message(room_id, f"❌ Failed to get whois: {e}") # ------------------------------------------------------------ # UNBAN # ------------------------------------------------------------ elif action == "unban": if not sub_args: await bot.api.send_text_message(room_id, "Usage: !unban <@user:domain>") return target_str = sub_args[0] if not target_str.startswith("@"): await bot.api.send_text_message( room_id, "For unban, please provide the full Matrix ID (e.g., @user:domain)." ) return try: await bot.async_client.room_unban(room_id, target_str) await bot.api.send_text_message(room_id, f"🔓 Unbanned {target_str}.") except Exception as e: await bot.api.send_text_message(room_id, f"❌ Failed to unban: {e}") # ------------------------------------------------------------ # TOPIC # ------------------------------------------------------------ elif action == "topic": if not sub_args: try: resp = await bot.async_client.room_get_state_event(room_id, "m.room.topic") topic = resp.content.get("topic", "(none)") if resp.content else "(none)" await bot.api.send_text_message(room_id, f"📝 Topic: {topic}") except Exception: await bot.api.send_text_message(room_id, "Could not retrieve topic.") else: new_topic = " ".join(sub_args) try: await bot.async_client.room_put_state(room_id, "m.room.topic", {"topic": new_topic}) await bot.api.send_text_message(room_id, "📝 Topic updated.") except Exception as e: await bot.api.send_text_message(room_id, f"❌ Failed: {e}") # ------------------------------------------------------------ # ROOM NAME # ------------------------------------------------------------ elif action == "roomname": if not sub_args: try: resp = await bot.async_client.room_get_state_event(room_id, "m.room.name") name = resp.content.get("name", "(none)") if resp.content else "(none)" await bot.api.send_text_message(room_id, f"🏠 Room name: {name}") except Exception: await bot.api.send_text_message(room_id, "Could not retrieve room name.") else: new_name = " ".join(sub_args) try: await bot.async_client.room_put_state(room_id, "m.room.name", {"name": new_name}) await bot.api.send_text_message(room_id, "🏠 Room name updated.") except Exception as e: await bot.api.send_text_message(room_id, f"❌ Failed: {e}") # ------------------------------------------------------------ # AVATAR # ------------------------------------------------------------ elif action == "avatar": if not sub_args: try: resp = await bot.async_client.room_get_state_event(room_id, "m.room.avatar") url = resp.content.get("url", "not set") if resp.content else "not set" await bot.api.send_text_message(room_id, f"🖼️ Avatar URL: {url}") except Exception: await bot.api.send_text_message(room_id, "Could not retrieve avatar.") else: mxc_url = sub_args[0] if not mxc_url.startswith("mxc://"): await bot.api.send_text_message(room_id, "Invalid avatar URL. Must start with mxc://.") return try: await bot.async_client.room_put_state(room_id, "m.room.avatar", {"url": mxc_url}) await bot.api.send_text_message(room_id, "🖼️ Avatar set.") except Exception as e: await bot.api.send_text_message(room_id, f"❌ Failed: {e}") # ------------------------------------------------------------ # MEMBERS # ------------------------------------------------------------ elif action == "members": try: resp = await bot.async_client.joined_members(room_id) if not resp.members: await bot.api.send_text_message(room_id, "No members found.") return lines = ["**Members:**"] for member in resp.members: display = member.display_name or member.user_id pl = await get_power_level(bot, room_id, member.user_id) lines.append(f"{display} (`{member.user_id}`) [PL: {pl}]") msg = "\n".join(lines) while len(msg) > 4000: part = msg[:4000] msg = msg[4000:] await bot.api.send_text_message(room_id, part) if msg: await bot.api.send_text_message(room_id, msg) except Exception as e: await bot.api.send_text_message(room_id, f"❌ Failed to list members: {e}") # ------------------------------------------------------------ # BANS # ------------------------------------------------------------ elif action == "bans": try: banned = await get_banned_users(bot, room_id) if not banned: await bot.api.send_text_message(room_id, "No banned users.") else: ban_list = "\n".join(f"• `{u}`" for u in banned) await bot.api.send_text_message(room_id, f"**Banned users:**\n{ban_list}") except Exception as e: await bot.api.send_text_message(room_id, f"❌ Failed to fetch bans: {e}") # ------------------------------------------------------------ # HELP # ------------------------------------------------------------ elif action in ("help", "modhelp"): help_text = """ **Moderator Commands (standalone or via !admin):** - `!kick <@user|name> [reason]` – Kick a user - `!ban <@user|name> [reason]` – Ban a user - `!unban <@user:domain>` – Unban (full MXID required) - `!invite <@user|name>` – Invite a user - `!whois <@user|name>` – Show user details & power level - `!op <@user|name> [pl=50]` – Promote user (max 50, moderator) - `!deop <@user|name>` – Demote user to power level 0 - `!topic [new topic]` – Show / set room topic - `!roomname [new name]` – Show / set room name - `!avatar [mxc://...]` – Show / set room avatar - `!members` – List all joined members with power levels - `!bans` – List all banned users - `!modhelp` – Show this help Names may be **multi‑word**; the bot will automatically detect them. If the name is ambiguous you'll be asked to choose from a numbered list. """ await bot.api.send_text_message(room_id, help_text.strip()) else: await bot.api.send_text_message( room_id, f"Unknown action: {action}. Use `!modhelp`." ) # ------------------------------------------------------------------ # Plugin metadata # ------------------------------------------------------------------ __version__ = "1.1.0" __author__ = "Funguy Admin" __description__ = "Full room moderation – multi‑word name support" __help__ = """
Admin / Moderator Commands

Power level ≥ 50 required (or global admin).

Multi‑word display names are automatically recognized.

"""