Files
FunguyBot/plugins/admin.py
T
2026-05-07 16:57:33 -05:00

521 lines
21 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
plugins/admin.py Full room moderation commands.
Supports multiword display names, standalone commands (!op, !kick, etc.)
"""
import time
import logging
import simplematrixbotlib as botlib
logger = logging.getLogger("admin")
# ------------------------------------------------------------------
# Displayname resolution cache
# ------------------------------------------------------------------
_pending_resolution = {} # room_id → {"matches": [...], "expires": timestamp}
_name_cache = {} # room_id → {display_name.lower(): mxid}
RESOLUTION_TIMEOUT = 60
def _cleanup_resolutions():
now = time.time()
expired = [r for r, v in _pending_resolution.items() if v["expires"] < now]
for r in expired:
del _pending_resolution[r]
class UserResolutionError(Exception):
def __init__(self, matches):
self.matches = matches # list of {"mxid": ..., "display_name": ...}
async def _populate_name_cache(bot, room_id):
"""Fetch the full member list and cache display names."""
if room_id in _name_cache:
return
try:
resp = await bot.async_client.joined_members(room_id)
if resp.members is None:
return
cache = {}
for member in resp.members:
display = (member.display_name or "").strip().lower()
if display:
# If duplicate display name, store None to indicate ambiguity
if display in cache:
cache[display] = None
else:
cache[display] = member.user_id
_name_cache[room_id] = cache
logger.info(f"Cached {len(cache)} display names for room {room_id}")
except Exception as e:
logger.error(f"Could not cache members: {e}")
async def _resolve_multiword(bot, room_id, tokens):
"""
Given a list of word tokens, try to find a matching display name
by testing progressively longer prefixes of the token list.
Returns (mxid, display_name) or raises ValueError if no match.
"""
await _populate_name_cache(bot, room_id)
cache = _name_cache.get(room_id, {})
# Build candidates from 1 token up to all tokens
for end in range(len(tokens), 0, -1):
candidate = " ".join(tokens[:end]).strip().lower()
if candidate in cache:
mxid = cache[candidate]
if mxid is not None:
return mxid, candidate
else:
# Duplicate display name → fall through to ambiguity handling
resp = await bot.async_client.joined_members(room_id)
matches = []
for member in resp.members:
if (member.display_name or "").strip().lower() == candidate:
matches.append({"mxid": member.user_id, "display_name": member.display_name})
if len(matches) == 1:
return matches[0]["mxid"], matches[0]["display_name"]
elif len(matches) > 1:
raise UserResolutionError(matches)
# else: not found (unlikely) → continue
raise ValueError(f"No member with display name '{' '.join(tokens)}' found.")
async def resolve_user_from_target(bot, room_id, target):
"""
Resolve a target string to a Matrix user ID.
Accepts: full MXID (@user:domain), display name (multiword), or number
(referring to a previous ambiguous resolution).
Returns (mxid, display_name_or_None).
Raises ValueError or UserResolutionError.
"""
if target.startswith("@"):
return target, None
_cleanup_resolutions()
# Check for number reference to a previous ambiguous match
if target.isdigit():
idx = int(target) - 1
if room_id in _pending_resolution:
pending = _pending_resolution[room_id]
if 0 <= idx < len(pending["matches"]):
match = pending["matches"][idx]
del _pending_resolution[room_id]
return match["mxid"], match.get("display_name")
else:
raise ValueError(f"Invalid selection {target}. Choose from the list.")
else:
raise ValueError("No pending resolution. Use @user:domain or display name.")
# If we reached here, `target` is a single token, but might be part of a longer name.
# That case is handled by calling _resolve_multiword from handle_command.
# But for completeness, we still attempt a direct cache match.
await _populate_name_cache(bot, room_id)
cache = _name_cache.get(room_id, {})
mxid = cache.get(target.strip().lower())
if mxid:
return mxid, target.strip().lower()
elif mxid is None:
# Ambiguous: fetch and raise
resp = await bot.async_client.joined_members(room_id)
matches = []
for member in resp.members:
if (member.display_name or "").strip().lower() == target.strip().lower():
matches.append({"mxid": member.user_id, "display_name": member.display_name})
if len(matches) == 1:
return matches[0]["mxid"], matches[0]["display_name"]
elif len(matches) > 1:
raise UserResolutionError(matches)
raise ValueError(f"No member with display name '{target}' found.")
# ------------------------------------------------------------------
# Power level helpers
# ------------------------------------------------------------------
async def get_power_level(bot, room_id, user_id):
try:
resp = await bot.async_client.room_get_state_event(
room_id, "m.room.power_levels"
)
if resp.content:
return resp.content.get("users", {}).get(
user_id, resp.content.get("users_default", 0)
)
except Exception:
pass
return 0
async def has_mod_permission(bot, room_id, user_id, config):
if user_id == config.admin_user:
return True
pl = await get_power_level(bot, room_id, user_id)
return pl >= 50
async def fetch_power_levels(bot, room_id):
try:
resp = await bot.async_client.room_get_state_event(
room_id, "m.room.power_levels"
)
return resp.content if resp.content else {}
except Exception:
return {}
async def set_power_level(bot, room_id, user_id, new_pl):
current = await fetch_power_levels(bot, room_id)
if not current:
raise RuntimeError("Could not retrieve power levels.")
users = current.setdefault("users", {})
users[user_id] = new_pl
await bot.async_client.room_put_state(
room_id, "m.room.power_levels", current
)
async def get_banned_users(bot, room_id):
try:
resp = await bot.async_client.room_get_state(room_id)
banned = []
for event in resp.events:
if (
event.get("type") == "m.room.member"
and event.get("content", {}).get("membership") == "ban"
):
banned.append(event["state_key"])
return banned
except Exception as e:
logger.error(f"Failed to fetch bans: {e}")
return []
# ------------------------------------------------------------------
# Main command handler
# ------------------------------------------------------------------
async def handle_command(room, message, bot, prefix, config):
"""Dispatches !admin or standalone moderation commands."""
match = botlib.MessageMatch(room, message, bot, prefix)
if not match.is_not_from_this_bot() or not match.prefix():
return
room_id = room.room_id
sender = message.sender
standalone_actions = {
"kick": "kick",
"ban": "ban",
"unban": "unban",
"invite": "invite",
"userinfo": "whois", # <-- renamed from "whois" to "userinfo"
"op": "op",
"deop": "deop",
"topic": "topic",
"roomname": "roomname",
"avatar": "avatar",
"members": "members",
"bans": "bans",
"modhelp": "help",
"admin": "admin",
}
cmd = match.command()
if cmd not in standalone_actions:
return
# Permission gate (skip for help)
if cmd not in ("modhelp", "help"):
if not await has_mod_permission(bot, room_id, sender, config):
await bot.api.send_text_message(
room_id, "⛔ You don't have permission to use moderator commands."
)
return
args = match.args()
# Determine action and sub_args
if cmd == "admin":
if not args:
await bot.api.send_text_message(room_id, "Usage: !admin <action> [args...]")
return
action = args[0].lower()
sub_args = args[1:] if len(args) > 1 else []
else:
action = cmd
sub_args = args
# ------------------------------------------------------------
# User-targeting actions (kick, ban, invite, userinfo, op, deop)
# ------------------------------------------------------------
if action in ("kick", "ban", "invite", "userinfo", "op", "deop"):
if not sub_args:
await bot.api.send_text_message(
room_id, f"Missing user. Usage: !{cmd} <@user|name> [reason...]"
)
return
# For op/deop, the last token might be a power level (number)
if action in ("op", "deop"):
# Try to parse last token as power level
potential_pl = sub_args[-1]
try:
power = int(potential_pl)
# Success: power level found, name is sub_args[:-1]
name_tokens = sub_args[:-1]
if not name_tokens:
await bot.api.send_text_message(room_id, "Missing user name.")
return
except ValueError:
# No numeric power, whole sub_args is the name
name_tokens = sub_args
power = None
else:
# kick, ban, invite, userinfo
name_tokens = sub_args # entire args is the name
power = None
# Resolve the multi-word name
try:
target_mxid, target_display = await _resolve_multiword(bot, room_id, name_tokens)
except UserResolutionError as e:
lines = [
"Multiple users found. Reissue the command with a number:"
]
for i, m in enumerate(e.matches, 1):
lines.append(f"{i}. {m['mxid']} ({m['display_name']})")
await bot.api.send_text_message(room_id, "\n".join(lines))
return
except ValueError as e:
# Fallback: also try the old way with just the first token (maybe they used @user)
target_str = sub_args[0]
try:
target_mxid, target_display = await resolve_user_from_target(bot, room_id, target_str)
except Exception as e2:
await bot.api.send_text_message(room_id, str(e2))
return
# Determine reason and power level for op/deop
if action in ("op", "deop"):
if action == "op":
requested_pl = power if power is not None else 50
if requested_pl > 50:
await bot.api.send_text_message(
room_id,
"Maximum power level for op is 50 (moderator). Setting to 50."
)
new_pl = 50
else:
new_pl = requested_pl
else:
new_pl = 0
sender_pl = await get_power_level(bot, room_id, sender)
target_pl = await get_power_level(bot, room_id, target_mxid)
if sender != config.admin_user and sender_pl <= target_pl:
await bot.api.send_text_message(
room_id,
"⛔ You can only modify users with a lower power level than yours.",
)
return
try:
await set_power_level(bot, room_id, target_mxid, new_pl)
verb = "Promoted" if action == "op" else "Demoted"
await bot.api.send_text_message(
room_id, f"{verb} {target_mxid} to power level {new_pl}."
)
except Exception as e:
await bot.api.send_text_message(room_id, f"❌ Failed to set power: {e}")
else:
# For kick/ban/invite/userinfo: reason is everything after the name tokens
reason = " ".join(sub_args[len(name_tokens):]) if len(sub_args) > len(name_tokens) else ""
if action == "kick":
try:
await bot.async_client.room_kick(room_id, target_mxid, reason)
await bot.api.send_text_message(room_id, f"👢 Kicked {target_mxid}.")
except Exception as e:
await bot.api.send_text_message(room_id, f"❌ Failed to kick: {e}")
elif action == "ban":
try:
await bot.async_client.room_ban(room_id, target_mxid, reason)
await bot.api.send_text_message(room_id, f"🚫 Banned {target_mxid}.")
except Exception as e:
await bot.api.send_text_message(room_id, f"❌ Failed to ban: {e}")
elif action == "invite":
try:
await bot.async_client.room_invite(room_id, target_mxid)
await bot.api.send_text_message(room_id, f"📨 Invited {target_mxid}.")
except Exception as e:
await bot.api.send_text_message(room_id, f"❌ Failed to invite: {e}")
elif action == "userinfo": # <-- was "whois", now "userinfo"
try:
resp = await bot.async_client.joined_members(room_id)
member_info = None
for m in resp.members:
if m.user_id == target_mxid:
member_info = m
break
pl = await get_power_level(bot, room_id, target_mxid)
display = member_info.display_name if member_info else "Unknown"
msg = (
f"**User:** `{target_mxid}`\n"
f"**Display Name:** {display}\n"
f"**Power Level:** {pl}"
)
await bot.api.send_text_message(room_id, msg)
except Exception as e:
await bot.api.send_text_message(room_id, f"❌ Failed to get userinfo: {e}")
# ------------------------------------------------------------
# UNBAN
# ------------------------------------------------------------
elif action == "unban":
if not sub_args:
await bot.api.send_text_message(room_id, "Usage: !unban <@user:domain>")
return
target_str = sub_args[0]
if not target_str.startswith("@"):
await bot.api.send_text_message(
room_id, "For unban, please provide the full Matrix ID (e.g., @user:domain)."
)
return
try:
await bot.async_client.room_unban(room_id, target_str)
await bot.api.send_text_message(room_id, f"🔓 Unbanned {target_str}.")
except Exception as e:
await bot.api.send_text_message(room_id, f"❌ Failed to unban: {e}")
# ------------------------------------------------------------
# TOPIC, ROOMNAME, AVATAR, MEMBERS, BANS, HELP ...
# (unchanged)
# ------------------------------------------------------------
elif action == "topic":
if not sub_args:
try:
resp = await bot.async_client.room_get_state_event(room_id, "m.room.topic")
topic = resp.content.get("topic", "(none)") if resp.content else "(none)"
await bot.api.send_text_message(room_id, f"📝 Topic: {topic}")
except Exception:
await bot.api.send_text_message(room_id, "Could not retrieve topic.")
else:
new_topic = " ".join(sub_args)
try:
await bot.async_client.room_put_state(room_id, "m.room.topic", {"topic": new_topic})
await bot.api.send_text_message(room_id, "📝 Topic updated.")
except Exception as e:
await bot.api.send_text_message(room_id, f"❌ Failed: {e}")
elif action == "roomname":
if not sub_args:
try:
resp = await bot.async_client.room_get_state_event(room_id, "m.room.name")
name = resp.content.get("name", "(none)") if resp.content else "(none)"
await bot.api.send_text_message(room_id, f"🏠 Room name: {name}")
except Exception:
await bot.api.send_text_message(room_id, "Could not retrieve room name.")
else:
new_name = " ".join(sub_args)
try:
await bot.async_client.room_put_state(room_id, "m.room.name", {"name": new_name})
await bot.api.send_text_message(room_id, "🏠 Room name updated.")
except Exception as e:
await bot.api.send_text_message(room_id, f"❌ Failed: {e}")
elif action == "avatar":
if not sub_args:
try:
resp = await bot.async_client.room_get_state_event(room_id, "m.room.avatar")
url = resp.content.get("url", "not set") if resp.content else "not set"
await bot.api.send_text_message(room_id, f"🖼️ Avatar URL: {url}")
except Exception:
await bot.api.send_text_message(room_id, "Could not retrieve avatar.")
else:
mxc_url = sub_args[0]
if not mxc_url.startswith("mxc://"):
await bot.api.send_text_message(room_id, "Invalid avatar URL. Must start with mxc://.")
return
try:
await bot.async_client.room_put_state(room_id, "m.room.avatar", {"url": mxc_url})
await bot.api.send_text_message(room_id, "🖼️ Avatar set.")
except Exception as e:
await bot.api.send_text_message(room_id, f"❌ Failed: {e}")
elif action == "members":
try:
resp = await bot.async_client.joined_members(room_id)
if not resp.members:
await bot.api.send_text_message(room_id, "No members found.")
return
lines = ["**Members:**"]
for member in resp.members:
display = member.display_name or member.user_id
pl = await get_power_level(bot, room_id, member.user_id)
lines.append(f"{display} (`{member.user_id}`) [PL: {pl}]")
msg = "\n".join(lines)
while len(msg) > 4000:
part = msg[:4000]
msg = msg[4000:]
await bot.api.send_text_message(room_id, part)
if msg:
await bot.api.send_text_message(room_id, msg)
except Exception as e:
await bot.api.send_text_message(room_id, f"❌ Failed to list members: {e}")
elif action == "bans":
try:
banned = await get_banned_users(bot, room_id)
if not banned:
await bot.api.send_text_message(room_id, "No banned users.")
else:
ban_list = "\n".join(f"• `{u}`" for u in banned)
await bot.api.send_text_message(room_id, f"**Banned users:**\n{ban_list}")
except Exception as e:
await bot.api.send_text_message(room_id, f"❌ Failed to fetch bans: {e}")
elif action in ("help", "modhelp"):
help_text = """
**Moderator Commands (standalone or via !admin):**
- `!kick <@user|name> [reason]` Kick a user
- `!ban <@user|name> [reason]` Ban a user
- `!unban <@user:domain>` Unban (full MXID required)
- `!invite <@user|name>` Invite a user
- `!userinfo <@user|name>` Show user details & power level (was `!whois`)
- `!op <@user|name> [pl=50]` Promote user (max 50, moderator)
- `!deop <@user|name>` Demote user to power level 0
- `!topic [new topic]` Show / set room topic
- `!roomname [new name]` Show / set room name
- `!avatar [mxc://...]` Show / set room avatar
- `!members` List all joined members with power levels
- `!bans` List all banned users
- `!modhelp` Show this help
Names may be **multiword**; the bot will automatically detect them.
If the name is ambiguous you'll be asked to choose from a numbered list.
"""
await bot.api.send_text_message(room_id, help_text.strip())
else:
await bot.api.send_text_message(
room_id, f"Unknown action: {action}. Use `!modhelp`."
)
# ------------------------------------------------------------------
# Plugin metadata
# ------------------------------------------------------------------
__version__ = "1.1.1"
__author__ = "Funguy Admin"
__description__ = "Full room moderation multiword name support"
__help__ = """
<details>
<summary><strong>Admin / Moderator Commands</strong></summary>
<ul>
<li><code>!kick</code>, <code>!ban</code>, <code>!unban</code>, <code>!invite</code></li>
<li><code>!userinfo</code> Show user details & power level (was !whois)</li>
<li><code>!op</code> (max PL 50), <code>!deop</code></li>
<li><code>!topic</code>, <code>!roomname</code>, <code>!avatar</code></li>
<li><code>!members</code>, <code>!bans</code></li>
<li><code>!admin &lt;action&gt;</code> also works as a parent command</li>
</ul>
<p>Power level ≥ 50 required (or global admin).</p>
<p>Multiword display names are automatically recognized.</p>
</details>
"""