#!/usr/bin/env python3 """ plugins/roomstats.py — per‑user room statistics (Limnoria‑style). Commands: !roomstats, !rank, !stats """ import time import re import sqlite3 import logging import nio import simplematrixbotlib as botlib logger = logging.getLogger("roomstats") DB_PATH = "roomstats.db" # ------------------------------------------------------------------ # Emoji / smiley regex (Unicode blocks) # ------------------------------------------------------------------ EMOJI_RE = re.compile( "[" "\U0001F600-\U0001F64F" # Emoticons "\U0001F300-\U0001F5FF" # Symbols & pictographs "\U0001F680-\U0001F6FF" # Transport & map "\U0001F1E0-\U0001F1FF" # Flags "\U00002702-\U000027B0" # Dingbats "\U000024C2-\U0001F251" # Misc "]+", re.UNICODE) def count_smileys(text): """Return number of emoji occurrences.""" return len(EMOJI_RE.findall(text)) # ------------------------------------------------------------------ # Database init # ------------------------------------------------------------------ def init_db(): conn = sqlite3.connect(DB_PATH) c = conn.cursor() c.execute(""" CREATE TABLE IF NOT EXISTS user_room_stats ( room_id TEXT, user_id TEXT, msgs INTEGER DEFAULT 0, chars INTEGER DEFAULT 0, words INTEGER DEFAULT 0, smileys INTEGER DEFAULT 0, actions INTEGER DEFAULT 0, joins INTEGER DEFAULT 0, parts INTEGER DEFAULT 0, kicks_given INTEGER DEFAULT 0, kicked_received INTEGER DEFAULT 0, topics_set INTEGER DEFAULT 0, last_updated INTEGER, PRIMARY KEY (room_id, user_id) ) """) conn.commit() conn.close() # ------------------------------------------------------------------ # Multi‑word user resolution helper # ------------------------------------------------------------------ async def resolve_user_from_tokens(bot, room_id, tokens): """ Given a list of word tokens, find a matching display name. Returns (mxid, display_name) or raises ValueError. """ # Build cache of (lowered display name → user_id) from joined members resp = await bot.async_client.joined_members(room_id) if resp.members is None: raise ValueError("Could not fetch member list.") # Create a dict: lower_display → (mxid, display_name) # If duplicate display name, store None to signal ambiguity. cache = {} for member in resp.members: display = (member.display_name or "").strip() if not display: continue key = display.lower() if key in cache: cache[key] = None else: cache[key] = (member.user_id, display) # Try progressively longer prefixes of the tokens for end in range(len(tokens), 0, -1): candidate = " ".join(tokens[:end]).strip().lower() if candidate in cache: entry = cache[candidate] if entry is not None: return entry # (mxid, display_name) else: # Ambiguous – we need to fetch and check exactly matches = [] for member in resp.members: if (member.display_name or "").strip().lower() == candidate: matches.append((member.user_id, member.display_name or member.user_id)) if len(matches) == 1: return matches[0] elif len(matches) > 1: raise ValueError( f"Multiple users have display name '{candidate}'. Use an MXID instead." ) # if none, continue raise ValueError(f"No member found for '{' '.join(tokens)}'.") async def resolve_user(bot, room_id, name_or_tokens): """ Accept either a single string (MXID or single-token display name) or a list of tokens. Returns (mxid, display_name). """ if isinstance(name_or_tokens, str): if name_or_tokens.startswith("@"): return name_or_tokens, None # Single token – try direct cache match or fallback to multi‑word tokens = [name_or_tokens] else: tokens = name_or_tokens return await resolve_user_from_tokens(bot, room_id, tokens) # ------------------------------------------------------------------ # Setup: register custom event listeners for membership & topics # ------------------------------------------------------------------ def setup(bot): init_db() @bot.listener.on_custom_event(nio.RoomMemberEvent) async def member_event(room, event): room_id = room.room_id membership = event.content.get("membership") state_key = event.state_key sender = event.sender # Ignore the bot's own membership changes if state_key == bot.async_client.user_id: return if membership == "join": _incr(room_id, state_key, "joins") elif membership == "leave": if sender != state_key: # kick _incr(room_id, sender, "kicks_given") _incr(room_id, state_key, "kicked_received") else: # part _incr(room_id, state_key, "parts") @bot.listener.on_custom_event(nio.RoomTopicEvent) async def topic_event(room, event): room_id = room.room_id sender = event.sender _incr(room_id, sender, "topics_set") def _incr(room_id, user_id, column): """Increment a stat column by 1, creating row if needed.""" conn = sqlite3.connect(DB_PATH) c = conn.cursor() c.execute( "INSERT OR IGNORE INTO user_room_stats (room_id, user_id) VALUES (?, ?)", (room_id, user_id) ) c.execute( f"UPDATE user_room_stats SET {column} = {column} + 1, last_updated = ? WHERE room_id = ? AND user_id = ?", (int(time.time()), room_id, user_id) ) conn.commit() conn.close() # ------------------------------------------------------------------ # Message handler – silently records stats, and handles commands # ------------------------------------------------------------------ async def handle_command(room, message, bot, prefix, config): room_id = room.room_id sender = message.sender # ----- silently record stats for any non‑bot message ----- if sender != bot.async_client.user_id: # <-- FIXED body = message.body or "" words = len(body.split()) chars = len(body) smileys = count_smileys(body) is_action = getattr(message, "msgtype", None) == "m.emote" conn = sqlite3.connect(DB_PATH) c = conn.cursor() c.execute("INSERT OR IGNORE INTO user_room_stats (room_id, user_id) VALUES (?, ?)", (room_id, sender)) c.execute( """UPDATE user_room_stats SET msgs = msgs + 1, chars = chars + ?, words = words + ?, smileys = smileys + ?, actions = actions + ?, last_updated = ? WHERE room_id = ? AND user_id = ?""", (chars, words, smileys, 1 if is_action else 0, int(time.time()), room_id, sender) ) conn.commit() conn.close() # ----- command matching ----- match = botlib.MessageMatch(room, message, bot, prefix) if not match.is_not_from_this_bot() or not match.prefix(): return cmd = match.command() args = match.args() # =============================== # !roomstats # =============================== if cmd == "roomstats": await _handle_roomstats(bot, room_id) # =============================== # !rank # =============================== elif cmd == "rank": if not args: await bot.api.send_text_message( room_id, "Usage: !rank \n" "Stats: msgs, chars, words, smileys, actions, joins, parts, " "kicks_given, kicked_received, topics_set" ) return col = args[0].lower() await _handle_rank(bot, room_id, col) # =============================== # !stats [] # =============================== elif cmd == "stats": if args: # Use all tokens as the display name (multi‑word) try: target_mxid, _ = await resolve_user_from_tokens(bot, room_id, args) except ValueError as e: await bot.api.send_text_message(room_id, str(e)) return else: target_mxid = sender await _handle_user_stats(bot, room_id, target_mxid, sender) # ------------------------------------------------------------------ # Command implementations # ------------------------------------------------------------------ VALID_STATS = { "msgs": "Messages", "chars": "Characters", "words": "Words", "smileys": "Smileys", "actions": "Actions", "joins": "Joins", "parts": "Parts", "kicks_given": "Kicks given", "kicked_received": "Times kicked", "topics_set": "Topics set", } async def _get_aggregate(room_id): """Return dict of aggregate stats for a room.""" conn = sqlite3.connect(DB_PATH) c = conn.cursor() c.execute("""SELECT COALESCE(SUM(msgs),0), COALESCE(SUM(chars),0), COALESCE(SUM(words),0), COALESCE(SUM(smileys),0), COALESCE(SUM(actions),0), COALESCE(SUM(joins),0), COALESCE(SUM(parts),0), COALESCE(SUM(kicks_given),0), COALESCE(SUM(kicked_received),0), COALESCE(SUM(topics_set),0) FROM user_room_stats WHERE room_id=?""", (room_id,)) row = c.fetchone() conn.close() if not row or all(v == 0 for v in row): return None return { "msgs": row[0], "chars": row[1], "words": row[2], "smileys": row[3], "actions": row[4], "joins": row[5], "parts": row[6], "kicks_given": row[7], "kicked_received": row[8], "topics_set": row[9] } async def _handle_roomstats(bot, room_id): agg = await _get_aggregate(room_id) if not agg: await bot.api.send_text_message(room_id, "No stats collected yet.") return # Get top 10 by msgs conn = sqlite3.connect(DB_PATH) c = conn.cursor() c.execute("""SELECT user_id, msgs FROM user_room_stats WHERE room_id=? ORDER BY msgs DESC LIMIT 10""", (room_id,)) top = c.fetchall() conn.close() # Resolve display names for top users top_lines = [] resp = await bot.async_client.joined_members(room_id) for uid, cnt in top: disp = uid if resp.members: for m in resp.members: if m.user_id == uid: disp = m.display_name or uid break top_lines.append(f"
  • {disp} — {cnt} msgs
  • ") msg = f"""
    Room Statistics
    • 📩 Messages: {agg['msgs']}
    • 🔤 Characters: {agg['chars']}
    • 📝 Words: {agg['words']}
    • 😀 Smileys: {agg['smileys']}
    • 🎭 Actions: {agg['actions']}
    • 🚪 Joins: {agg['joins']}
    • 👋 Parts: {agg['parts']}
    • 👢 Kicks given: {agg['kicks_given']}
    • 🥾 Times kicked: {agg['kicked_received']}
    • 📌 Topics set: {agg['topics_set']}

    Top 10 by messages:

      {''.join(top_lines)}
    """ await bot.api.send_markdown_message(room_id, msg) async def _handle_rank(bot, room_id, col): # Validate column if col not in VALID_STATS: await bot.api.send_text_message(room_id, f"Unknown stat: {col}. Allowed: {', '.join(VALID_STATS.keys())}") return conn = sqlite3.connect(DB_PATH) c = conn.cursor() # Safe to use f-string because col is validated against a hardcoded set c.execute(f"""SELECT user_id, {col} FROM user_room_stats WHERE room_id=? AND {col} > 0 ORDER BY {col} DESC LIMIT 10""", (room_id,)) rows = c.fetchall() conn.close() if not rows: await bot.api.send_text_message(room_id, f"No users with {VALID_STATS[col]} > 0.") return resp = await bot.async_client.joined_members(room_id) items = [] for i, (uid, val) in enumerate(rows, 1): disp = uid if resp.members: for m in resp.members: if m.user_id == uid: disp = m.display_name or uid break items.append(f"
  • {i}. {disp} — {val}
  • ") msg = f"""
    Ranking by {VALID_STATS[col]}
      {''.join(items)}
    """ await bot.api.send_markdown_message(room_id, msg) async def _handle_user_stats(bot, room_id, user_id, sender): # Fetch stats conn = sqlite3.connect(DB_PATH) c = conn.cursor() c.execute("""SELECT msgs, chars, words, smileys, actions, joins, parts, kicks_given, kicked_received, topics_set FROM user_room_stats WHERE room_id=? AND user_id=?""", (room_id, user_id)) row = c.fetchone() conn.close() if not row or all(v == 0 for v in row): # No stats, maybe just joined – get display name for the message disp = user_id resp = await bot.async_client.joined_members(room_id) if resp.members: for m in resp.members: if m.user_id == user_id: disp = m.display_name or user_id break await bot.api.send_text_message(room_id, f"No stats recorded for {disp}.") return # Get display name disp = user_id resp = await bot.async_client.joined_members(room_id) if resp.members: for m in resp.members: if m.user_id == user_id: disp = m.display_name or user_id break msg = f"""
    Stats for {disp}
    • 📩 Messages: {row[0]}
    • 🔤 Characters: {row[1]}
    • 📝 Words: {row[2]}
    • 😀 Smileys: {row[3]}
    • 🎭 Actions: {row[4]}
    • 🚪 Joins: {row[5]}
    • 👋 Parts: {row[6]}
    • 👢 Kicks given: {row[7]}
    • 🥾 Times kicked: {row[8]}
    • 📌 Topics set: {row[9]}
    """ await bot.api.send_markdown_message(room_id, msg) # ------------------------------------------------------------------ # Plugin metadata # ------------------------------------------------------------------ __version__ = "1.0.1" __author__ = "Funguy Roomstats" __description__ = "Per‑user room statistics (Limnoria‑style), with multi‑word name support" __help__ = """
    Room Statistics Commands
    • !roomstats – Aggregate room stats + top 10 users
    • !rank <stat> – Top 10 by a specific stat (msgs, words, smileys, actions, joins, parts, kicks_given, kicked_received, topics_set)
    • !stats [name] – Show stats for a user (supports multi‑word names)

    All commands work in the current room; display names are automatically resolved.

    """