658 lines
26 KiB
Python
658 lines
26 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
plugins/admin.py – Full room moderation commands.
|
||
Supports multi‑word display names, standalone commands (!op, !kick, etc.)
|
||
Automatic flood detection:
|
||
– message flood (5 msgs in 3s) → auto‑ban + kick
|
||
– join flood (5 joins in 3s, any domain) → room locked to invite‑only
|
||
"""
|
||
|
||
import time
|
||
import logging
|
||
import re
|
||
from collections import defaultdict, deque
|
||
import simplematrixbotlib as botlib
|
||
|
||
logger = logging.getLogger("admin")
|
||
|
||
# ------------------------------------------------------------------
|
||
# Display‑name resolution cache
|
||
# ------------------------------------------------------------------
|
||
_pending_resolution = {} # room_id → {"matches": [...], "expires": timestamp}
|
||
_name_cache = {} # room_id → {display_name.lower(): mxid}
|
||
RESOLUTION_TIMEOUT = 60
|
||
|
||
# Flood detection settings
|
||
FLOOD_MAX_MESSAGES = 15
|
||
FLOOD_TIME_WINDOW = 3.0 # seconds
|
||
JOIN_FLOOD_MAX = 5
|
||
JOIN_FLOOD_WINDOW = 3.0 # seconds
|
||
|
||
# Per-room per-user message timestamps
|
||
_flood_tracker: dict[str, dict[str, deque[float]]] = defaultdict(lambda: defaultdict(deque))
|
||
# Per-room join event timestamps (any domain)
|
||
_join_flood_tracker: dict[str, deque[float]] = defaultdict(deque)
|
||
|
||
def _cleanup_resolutions():
|
||
now = time.time()
|
||
expired = [r for r, v in _pending_resolution.items() if v["expires"] < now]
|
||
for r in expired:
|
||
del _pending_resolution[r]
|
||
|
||
class UserResolutionError(Exception):
|
||
def __init__(self, matches):
|
||
self.matches = matches # list of {"mxid": ..., "display_name": ...}
|
||
|
||
async def _populate_name_cache(bot, room_id):
|
||
if room_id in _name_cache:
|
||
return
|
||
try:
|
||
resp = await bot.async_client.joined_members(room_id)
|
||
if resp.members is None:
|
||
return
|
||
cache = {}
|
||
for member in resp.members:
|
||
display = (member.display_name or "").strip().lower()
|
||
if display:
|
||
if display in cache:
|
||
cache[display] = None
|
||
else:
|
||
cache[display] = member.user_id
|
||
_name_cache[room_id] = cache
|
||
logger.info(f"Cached {len(cache)} display names for room {room_id}")
|
||
except Exception as e:
|
||
logger.error(f"Could not cache members: {e}")
|
||
|
||
async def _resolve_multiword(bot, room_id, tokens):
|
||
clean_tokens = [re.sub(r'<[^>]+>', '', t).strip() for t in tokens]
|
||
await _populate_name_cache(bot, room_id)
|
||
cache = _name_cache.get(room_id, {})
|
||
|
||
for end in range(len(clean_tokens), 0, -1):
|
||
candidate = " ".join(clean_tokens[:end]).strip().lower()
|
||
if candidate in cache:
|
||
mxid = cache[candidate]
|
||
if mxid is not None:
|
||
return mxid, candidate
|
||
else:
|
||
resp = await bot.async_client.joined_members(room_id)
|
||
matches = []
|
||
for member in resp.members:
|
||
if (member.display_name or "").strip().lower() == candidate:
|
||
matches.append({"mxid": member.user_id, "display_name": member.display_name})
|
||
if len(matches) == 1:
|
||
return matches[0]["mxid"], matches[0]["display_name"]
|
||
elif len(matches) > 1:
|
||
raise UserResolutionError(matches)
|
||
raise ValueError(f"No member with display name '{' '.join(clean_tokens)}' found.")
|
||
|
||
async def resolve_user_from_target(bot, room_id, target):
|
||
target = re.sub(r'<[^>]+>', '', target).strip()
|
||
if target.startswith("@"):
|
||
return target, None
|
||
|
||
_cleanup_resolutions()
|
||
|
||
if target.isdigit():
|
||
idx = int(target) - 1
|
||
if room_id in _pending_resolution:
|
||
pending = _pending_resolution[room_id]
|
||
if 0 <= idx < len(pending["matches"]):
|
||
match = pending["matches"][idx]
|
||
del _pending_resolution[room_id]
|
||
return match["mxid"], match.get("display_name")
|
||
else:
|
||
raise ValueError(f"Invalid selection {target}. Choose from the list.")
|
||
else:
|
||
raise ValueError("No pending resolution. Use @user:domain or display name.")
|
||
|
||
await _populate_name_cache(bot, room_id)
|
||
cache = _name_cache.get(room_id, {})
|
||
mxid = cache.get(target.strip().lower())
|
||
if mxid:
|
||
return mxid, target.strip().lower()
|
||
elif mxid is None:
|
||
resp = await bot.async_client.joined_members(room_id)
|
||
matches = []
|
||
for member in resp.members:
|
||
if (member.display_name or "").strip().lower() == target.strip().lower():
|
||
matches.append({"mxid": member.user_id, "display_name": member.display_name})
|
||
if len(matches) == 1:
|
||
return matches[0]["mxid"], matches[0]["display_name"]
|
||
elif len(matches) > 1:
|
||
raise UserResolutionError(matches)
|
||
raise ValueError(f"No member with display name '{target}' found.")
|
||
|
||
# ------------------------------------------------------------------
|
||
# Power level helpers
|
||
# ------------------------------------------------------------------
|
||
async def get_power_level(bot, room_id, user_id):
|
||
try:
|
||
resp = await bot.async_client.room_get_state_event(
|
||
room_id, "m.room.power_levels"
|
||
)
|
||
if resp.content:
|
||
return resp.content.get("users", {}).get(
|
||
user_id, resp.content.get("users_default", 0)
|
||
)
|
||
except Exception:
|
||
pass
|
||
return 0
|
||
|
||
async def has_mod_permission(bot, room_id, user_id, config):
|
||
if user_id == config.admin_user:
|
||
return True
|
||
pl = await get_power_level(bot, room_id, user_id)
|
||
return pl >= 50
|
||
|
||
async def fetch_power_levels(bot, room_id):
|
||
try:
|
||
resp = await bot.async_client.room_get_state_event(
|
||
room_id, "m.room.power_levels"
|
||
)
|
||
return resp.content if resp.content else {}
|
||
except Exception:
|
||
return {}
|
||
|
||
async def set_power_level(bot, room_id, user_id, new_pl):
|
||
current = await fetch_power_levels(bot, room_id)
|
||
if not current:
|
||
raise RuntimeError("Could not retrieve power levels.")
|
||
users = current.setdefault("users", {})
|
||
users[user_id] = new_pl
|
||
await bot.async_client.room_put_state(
|
||
room_id, "m.room.power_levels", current
|
||
)
|
||
|
||
async def get_banned_users(bot, room_id):
|
||
try:
|
||
resp = await bot.async_client.room_get_state(room_id)
|
||
banned = []
|
||
for event in resp.events:
|
||
if (
|
||
event.get("type") == "m.room.member"
|
||
and event.get("content", {}).get("membership") == "ban"
|
||
):
|
||
banned.append(event["state_key"])
|
||
return banned
|
||
except Exception as e:
|
||
logger.error(f"Failed to fetch bans: {e}")
|
||
return []
|
||
|
||
# ------------------------------------------------------------------
|
||
# Flood detection (message + global join)
|
||
# ------------------------------------------------------------------
|
||
def _check_flood(room_id, user_id) -> bool:
|
||
now = time.monotonic()
|
||
q = _flood_tracker[room_id][user_id]
|
||
while q and q[0] < now - FLOOD_TIME_WINDOW:
|
||
q.popleft()
|
||
q.append(now)
|
||
if len(q) >= FLOOD_MAX_MESSAGES:
|
||
q.clear()
|
||
return True
|
||
return False
|
||
|
||
def _check_join_flood(room_id) -> bool:
|
||
now = time.monotonic()
|
||
q = _join_flood_tracker[room_id]
|
||
while q and q[0] < now - JOIN_FLOOD_WINDOW:
|
||
q.popleft()
|
||
q.append(now)
|
||
if len(q) >= JOIN_FLOOD_MAX:
|
||
q.clear()
|
||
return True
|
||
return False
|
||
|
||
async def _kick_user(bot, room_id, user_id, reason):
|
||
try:
|
||
await bot.async_client.room_kick(room_id, user_id, reason)
|
||
logger.info(f"Kicked {user_id} from {room_id}: {reason}")
|
||
except Exception as e:
|
||
logger.error(f"Failed to kick {user_id}: {e}")
|
||
|
||
async def _ban_user(bot, room_id, user_id, reason):
|
||
try:
|
||
await bot.async_client.room_ban(room_id, user_id, reason)
|
||
logger.info(f"Banned {user_id} from {room_id}: {reason}")
|
||
except Exception as e:
|
||
logger.error(f"Failed to ban {user_id}: {e}")
|
||
|
||
async def _lock_room(bot, room_id):
|
||
"""Set room join rule to 'invite'."""
|
||
try:
|
||
await bot.async_client.room_put_state(
|
||
room_id,
|
||
"m.room.join_rules",
|
||
{"join_rule": "invite"}
|
||
)
|
||
logger.info(f"Room {room_id} locked to invite‑only (join flood detected).")
|
||
except Exception as e:
|
||
logger.error(f"Failed to lock room {room_id}: {e}")
|
||
|
||
# ------------------------------------------------------------------
|
||
# Main command handler
|
||
# ------------------------------------------------------------------
|
||
async def handle_command(room, message, bot, prefix, config):
|
||
match = botlib.MessageMatch(room, message, bot, prefix)
|
||
if not match.is_not_from_this_bot() or not match.prefix():
|
||
return
|
||
|
||
room_id = room.room_id
|
||
sender = message.sender
|
||
|
||
standalone_actions = {
|
||
"kick": "kick",
|
||
"ban": "ban",
|
||
"unban": "unban",
|
||
"invite": "invite",
|
||
"userinfo": "whois",
|
||
"op": "op",
|
||
"deop": "deop",
|
||
"topic": "topic",
|
||
"roomname": "roomname",
|
||
"avatar": "avatar",
|
||
"members": "members",
|
||
"bans": "bans",
|
||
"mkick": "mkick",
|
||
"joinrule": "joinrule",
|
||
"modhelp": "help",
|
||
"admin": "admin",
|
||
}
|
||
|
||
cmd = match.command()
|
||
if cmd not in standalone_actions:
|
||
return
|
||
|
||
if cmd not in ("modhelp", "help"):
|
||
if not await has_mod_permission(bot, room_id, sender, config):
|
||
await bot.api.send_text_message(
|
||
room_id, "⛔ You don't have permission to use moderator commands."
|
||
)
|
||
return
|
||
|
||
args = match.args()
|
||
|
||
if cmd == "admin":
|
||
if not args:
|
||
await bot.api.send_text_message(room_id, "Usage: !admin <action> [args...]")
|
||
return
|
||
action = args[0].lower()
|
||
sub_args = args[1:] if len(args) > 1 else []
|
||
else:
|
||
action = cmd
|
||
sub_args = args
|
||
|
||
# ------------------------------------------------------------
|
||
# Mass‑kick by domain
|
||
# ------------------------------------------------------------
|
||
if action == "mkick":
|
||
if not sub_args:
|
||
await bot.api.send_text_message(room_id, "Usage: !mkick <domain>\nExample: !mkick evilbots.net")
|
||
return
|
||
domain = sub_args[0].strip().lower()
|
||
if ':' in domain:
|
||
domain = domain.split(':')[-1]
|
||
try:
|
||
resp = await bot.async_client.joined_members(room_id)
|
||
if not resp.members:
|
||
await bot.api.send_text_message(room_id, "Could not fetch member list.")
|
||
return
|
||
targets = [m for m in resp.members if m.user_id.endswith(f":{domain}")]
|
||
if not targets:
|
||
await bot.api.send_text_message(room_id, f"No users found from domain '{domain}'.")
|
||
return
|
||
reason = f"Mass‑kick of domain {domain}"
|
||
count = 0
|
||
for member in targets:
|
||
await _kick_user(bot, room_id, member.user_id, reason)
|
||
count += 1
|
||
await bot.api.send_text_message(room_id, f"👢 Kicked {count} user(s) from {domain}.")
|
||
except Exception as e:
|
||
await bot.api.send_text_message(room_id, f"❌ Mass‑kick failed: {e}")
|
||
|
||
# ------------------------------------------------------------
|
||
# Join rule toggle
|
||
# ------------------------------------------------------------
|
||
elif action == "joinrule":
|
||
if not sub_args or sub_args[0] not in ("public", "invite"):
|
||
await bot.api.send_text_message(room_id, "Usage: !joinrule <public|invite>")
|
||
return
|
||
new_rule = sub_args[0].lower()
|
||
try:
|
||
await bot.async_client.room_put_state(
|
||
room_id,
|
||
"m.room.join_rules",
|
||
{"join_rule": new_rule}
|
||
)
|
||
await bot.api.send_text_message(room_id, f"🔐 Join rule set to **{new_rule}**.")
|
||
except Exception as e:
|
||
await bot.api.send_text_message(room_id, f"❌ Failed to set join rule: {e}")
|
||
|
||
# ------------------------------------------------------------
|
||
# User-targeting actions
|
||
# ------------------------------------------------------------
|
||
elif action in ("kick", "ban", "invite", "userinfo", "op", "deop"):
|
||
if not sub_args:
|
||
await bot.api.send_text_message(
|
||
room_id, f"Missing user. Usage: !{cmd} <@user|name> [reason...]"
|
||
)
|
||
return
|
||
|
||
if action in ("op", "deop"):
|
||
potential_pl = sub_args[-1]
|
||
try:
|
||
power = int(potential_pl)
|
||
name_tokens = sub_args[:-1]
|
||
if not name_tokens:
|
||
await bot.api.send_text_message(room_id, "Missing user name.")
|
||
return
|
||
except ValueError:
|
||
name_tokens = sub_args
|
||
power = None
|
||
else:
|
||
name_tokens = sub_args
|
||
power = None
|
||
|
||
try:
|
||
target_mxid, target_display = await _resolve_multiword(bot, room_id, name_tokens)
|
||
except UserResolutionError as e:
|
||
lines = [
|
||
"Multiple users found. Re‑issue the command with a number:"
|
||
]
|
||
for i, m in enumerate(e.matches, 1):
|
||
lines.append(f"{i}. {m['mxid']} ({m['display_name']})")
|
||
await bot.api.send_text_message(room_id, "\n".join(lines))
|
||
return
|
||
except ValueError as e:
|
||
target_str = sub_args[0]
|
||
try:
|
||
target_mxid, target_display = await resolve_user_from_target(bot, room_id, target_str)
|
||
except Exception as e2:
|
||
await bot.api.send_text_message(room_id, str(e2))
|
||
return
|
||
|
||
if action in ("op", "deop"):
|
||
if action == "op":
|
||
requested_pl = power if power is not None else 50
|
||
if requested_pl > 50:
|
||
await bot.api.send_text_message(
|
||
room_id,
|
||
"Maximum power level for op is 50 (moderator). Setting to 50."
|
||
)
|
||
new_pl = 50
|
||
else:
|
||
new_pl = requested_pl
|
||
else:
|
||
new_pl = 0
|
||
|
||
sender_pl = await get_power_level(bot, room_id, sender)
|
||
target_pl = await get_power_level(bot, room_id, target_mxid)
|
||
if sender != config.admin_user and sender_pl <= target_pl:
|
||
await bot.api.send_text_message(
|
||
room_id,
|
||
"⛔ You can only modify users with a lower power level than yours.",
|
||
)
|
||
return
|
||
|
||
try:
|
||
await set_power_level(bot, room_id, target_mxid, new_pl)
|
||
verb = "Promoted" if action == "op" else "Demoted"
|
||
await bot.api.send_text_message(
|
||
room_id, f"✅ {verb} {target_mxid} to power level {new_pl}."
|
||
)
|
||
except Exception as e:
|
||
await bot.api.send_text_message(room_id, f"❌ Failed to set power: {e}")
|
||
|
||
else:
|
||
reason = " ".join(sub_args[len(name_tokens):]) if len(sub_args) > len(name_tokens) else ""
|
||
|
||
if action == "kick":
|
||
try:
|
||
await bot.async_client.room_kick(room_id, target_mxid, reason)
|
||
await bot.api.send_text_message(room_id, f"👢 Kicked {target_mxid}.")
|
||
except Exception as e:
|
||
await bot.api.send_text_message(room_id, f"❌ Failed to kick: {e}")
|
||
|
||
elif action == "ban":
|
||
try:
|
||
await bot.async_client.room_ban(room_id, target_mxid, reason)
|
||
await bot.api.send_text_message(room_id, f"🚫 Banned {target_mxid}.")
|
||
except Exception as e:
|
||
await bot.api.send_text_message(room_id, f"❌ Failed to ban: {e}")
|
||
|
||
elif action == "invite":
|
||
try:
|
||
await bot.async_client.room_invite(room_id, target_mxid)
|
||
await bot.api.send_text_message(room_id, f"📨 Invited {target_mxid}.")
|
||
except Exception as e:
|
||
await bot.api.send_text_message(room_id, f"❌ Failed to invite: {e}")
|
||
|
||
elif action == "userinfo":
|
||
try:
|
||
resp = await bot.async_client.joined_members(room_id)
|
||
member_info = None
|
||
for m in resp.members:
|
||
if m.user_id == target_mxid:
|
||
member_info = m
|
||
break
|
||
pl = await get_power_level(bot, room_id, target_mxid)
|
||
display = member_info.display_name if member_info else "Unknown"
|
||
msg = (
|
||
f"**User:** `{target_mxid}`\n"
|
||
f"**Display Name:** {display}\n"
|
||
f"**Power Level:** {pl}"
|
||
)
|
||
await bot.api.send_text_message(room_id, msg)
|
||
except Exception as e:
|
||
await bot.api.send_text_message(room_id, f"❌ Failed to get userinfo: {e}")
|
||
|
||
# ------------------------------------------------------------
|
||
# UNBAN
|
||
# ------------------------------------------------------------
|
||
elif action == "unban":
|
||
if not sub_args:
|
||
await bot.api.send_text_message(room_id, "Usage: !unban <@user:domain>")
|
||
return
|
||
target_str = sub_args[0]
|
||
if not target_str.startswith("@"):
|
||
await bot.api.send_text_message(
|
||
room_id, "For unban, please provide the full Matrix ID (e.g., @user:domain)."
|
||
)
|
||
return
|
||
try:
|
||
await bot.async_client.room_unban(room_id, target_str)
|
||
await bot.api.send_text_message(room_id, f"🔓 Unbanned {target_str}.")
|
||
except Exception as e:
|
||
await bot.api.send_text_message(room_id, f"❌ Failed to unban: {e}")
|
||
|
||
# ------------------------------------------------------------
|
||
# TOPIC, ROOMNAME, AVATAR, MEMBERS, BANS, HELP ...
|
||
# ------------------------------------------------------------
|
||
elif action == "topic":
|
||
if not sub_args:
|
||
try:
|
||
resp = await bot.async_client.room_get_state_event(room_id, "m.room.topic")
|
||
topic = resp.content.get("topic", "(none)") if resp.content else "(none)"
|
||
await bot.api.send_text_message(room_id, f"📝 Topic: {topic}")
|
||
except Exception:
|
||
await bot.api.send_text_message(room_id, "Could not retrieve topic.")
|
||
else:
|
||
new_topic = " ".join(sub_args)
|
||
try:
|
||
await bot.async_client.room_put_state(room_id, "m.room.topic", {"topic": new_topic})
|
||
await bot.api.send_text_message(room_id, "📝 Topic updated.")
|
||
except Exception as e:
|
||
await bot.api.send_text_message(room_id, f"❌ Failed: {e}")
|
||
|
||
elif action == "roomname":
|
||
if not sub_args:
|
||
try:
|
||
resp = await bot.async_client.room_get_state_event(room_id, "m.room.name")
|
||
name = resp.content.get("name", "(none)") if resp.content else "(none)"
|
||
await bot.api.send_text_message(room_id, f"🏠 Room name: {name}")
|
||
except Exception:
|
||
await bot.api.send_text_message(room_id, "Could not retrieve room name.")
|
||
else:
|
||
new_name = " ".join(sub_args)
|
||
try:
|
||
await bot.async_client.room_put_state(room_id, "m.room.name", {"name": new_name})
|
||
await bot.api.send_text_message(room_id, "🏠 Room name updated.")
|
||
except Exception as e:
|
||
await bot.api.send_text_message(room_id, f"❌ Failed: {e}")
|
||
|
||
elif action == "avatar":
|
||
if not sub_args:
|
||
try:
|
||
resp = await bot.async_client.room_get_state_event(room_id, "m.room.avatar")
|
||
url = resp.content.get("url", "not set") if resp.content else "not set"
|
||
await bot.api.send_text_message(room_id, f"🖼️ Avatar URL: {url}")
|
||
except Exception:
|
||
await bot.api.send_text_message(room_id, "Could not retrieve avatar.")
|
||
else:
|
||
mxc_url = sub_args[0]
|
||
if not mxc_url.startswith("mxc://"):
|
||
await bot.api.send_text_message(room_id, "Invalid avatar URL. Must start with mxc://.")
|
||
return
|
||
try:
|
||
await bot.async_client.room_put_state(room_id, "m.room.avatar", {"url": mxc_url})
|
||
await bot.api.send_text_message(room_id, "🖼️ Avatar set.")
|
||
except Exception as e:
|
||
await bot.api.send_text_message(room_id, f"❌ Failed: {e}")
|
||
|
||
elif action == "members":
|
||
try:
|
||
resp = await bot.async_client.joined_members(room_id)
|
||
if not resp.members:
|
||
await bot.api.send_text_message(room_id, "No members found.")
|
||
return
|
||
lines = ["**Members:**"]
|
||
for member in resp.members:
|
||
display = member.display_name or member.user_id
|
||
pl = await get_power_level(bot, room_id, member.user_id)
|
||
lines.append(f"{display} (`{member.user_id}`) [PL: {pl}]")
|
||
msg = "\n".join(lines)
|
||
while len(msg) > 4000:
|
||
part = msg[:4000]
|
||
msg = msg[4000:]
|
||
await bot.api.send_text_message(room_id, part)
|
||
if msg:
|
||
await bot.api.send_text_message(room_id, msg)
|
||
except Exception as e:
|
||
await bot.api.send_text_message(room_id, f"❌ Failed to list members: {e}")
|
||
|
||
elif action == "bans":
|
||
try:
|
||
banned = await get_banned_users(bot, room_id)
|
||
if not banned:
|
||
await bot.api.send_text_message(room_id, "No banned users.")
|
||
else:
|
||
ban_list = "\n".join(f"• `{u}`" for u in banned)
|
||
await bot.api.send_text_message(room_id, f"**Banned users:**\n{ban_list}")
|
||
except Exception as e:
|
||
await bot.api.send_text_message(room_id, f"❌ Failed to fetch bans: {e}")
|
||
|
||
elif action in ("help", "modhelp"):
|
||
help_text = """
|
||
**Moderator Commands (standalone or via !admin):**
|
||
- `!kick <@user|name> [reason]` – Kick a user
|
||
- `!ban <@user|name> [reason]` – Ban a user
|
||
- `!unban <@user:domain>` – Unban (full MXID required)
|
||
- `!invite <@user|name>` – Invite a user
|
||
- `!mkick <domain>` – Kick all users from the given domain
|
||
- `!joinrule <public|invite>` – Manually set the room join rule
|
||
- `!userinfo <@user|name>` – Show user details & power level (was `!whois`)
|
||
- `!op <@user|name> [pl=50]` – Promote user (max 50, moderator)
|
||
- `!deop <@user|name>` – Demote user to power level 0
|
||
- `!topic [new topic]` – Show / set room topic
|
||
- `!roomname [new name]` – Show / set room name
|
||
- `!avatar [mxc://...]` – Show / set room avatar
|
||
- `!members` – List all joined members with power levels
|
||
- `!bans` – List all banned users
|
||
- `!modhelp` – Show this help
|
||
|
||
Names may be **multi‑word**; the bot will automatically detect them.
|
||
If the name is ambiguous you'll be asked to choose from a numbered list.
|
||
"""
|
||
await bot.api.send_text_message(room_id, help_text.strip())
|
||
|
||
else:
|
||
await bot.api.send_text_message(
|
||
room_id, f"Unknown action: {action}. Use `!modhelp`."
|
||
)
|
||
|
||
# ------------------------------------------------------------------
|
||
# Plugin setup – register flood detectors
|
||
# ------------------------------------------------------------------
|
||
def setup(bot):
|
||
"""Initialize the admin plugin and register flood detectors."""
|
||
# Message flood detector (bans + kicks)
|
||
@bot.listener.on_message_event
|
||
async def _message_flood(room, message):
|
||
room_id = room.room_id
|
||
sender = message.sender
|
||
if sender == bot.async_client.user_id:
|
||
return
|
||
if _check_flood(room_id, sender):
|
||
disp = sender
|
||
try:
|
||
resp = await bot.async_client.joined_members(room_id)
|
||
if resp.members:
|
||
for m in resp.members:
|
||
if m.user_id == sender:
|
||
disp = m.display_name or sender
|
||
break
|
||
except:
|
||
pass
|
||
reason = f"Auto‑ban for flooding ({FLOOD_MAX_MESSAGES} messages in {FLOOD_TIME_WINDOW}s)"
|
||
await _ban_user(bot, room_id, sender, reason)
|
||
await _kick_user(bot, room_id, sender, reason)
|
||
|
||
# Join flood detector (any domain)
|
||
@bot.listener.on_custom_event(botlib.nio.RoomMemberEvent)
|
||
async def _join_flood(room, event):
|
||
room_id = room.room_id
|
||
if event.membership != "join":
|
||
return
|
||
sender = event.state_key
|
||
if sender == bot.async_client.user_id:
|
||
return
|
||
if _check_join_flood(room_id):
|
||
await _lock_room(bot, room_id)
|
||
await bot.api.send_text_message(
|
||
room_id,
|
||
"🔐 Join flood detected – room locked to invite‑only. Use `!joinrule public` to reopen."
|
||
)
|
||
|
||
logger.info("Admin plugin flood detectors registered")
|
||
|
||
# ------------------------------------------------------------------
|
||
# Plugin metadata
|
||
# ------------------------------------------------------------------
|
||
__version__ = "1.2.3"
|
||
__author__ = "Funguy Admin"
|
||
__description__ = "Full room moderation – multi‑word name support + flood detection + mass domain kick"
|
||
__help__ = """
|
||
<details>
|
||
<summary><strong>Admin / Moderator Commands</strong></summary>
|
||
<ul>
|
||
<li><code>!kick</code>, <code>!ban</code>, <code>!unban</code>, <code>!invite</code></li>
|
||
<li><code>!mkick <domain></code> – Kick all users from a domain</li>
|
||
<li><code>!joinrule <public|invite></code> – Change room join rule</li>
|
||
<li><code>!userinfo</code> – Show user details & power level</li>
|
||
<li><code>!op</code> (max PL 50), <code>!deop</code></li>
|
||
<li><code>!topic</code>, <code>!roomname</code>, <code>!avatar</code></li>
|
||
<li><code>!members</code>, <code>!bans</code></li>
|
||
<li><code>!admin <action></code> also works as a parent command</li>
|
||
</ul>
|
||
<p>Power level ≥ 50 required (or global admin).</p>
|
||
<p>Multi‑word display names are automatically recognized.</p>
|
||
<p><strong>Flood detection:</strong>
|
||
<ul>
|
||
<li>Message flood: 5 messages in 3 seconds → auto‑ban + kick</li>
|
||
<li>Join flood: 5 users in 3 seconds (any domain) → room locked to invite‑only</li>
|
||
</ul>
|
||
</p>
|
||
</details>
|
||
"""
|