#!/usr/bin/env python3 """ plugins/roomstats.py β€” per‑user room statistics (Limnoria‑style). Commands: !roomstats, !rank, !stats Output is a clean code block with emojis and aligned columns. """ import time import re import sqlite3 import logging import nio import simplematrixbotlib as botlib from plugins.common import collapsible_summary, code_block logger = logging.getLogger("roomstats") DB_PATH = "roomstats.db" # Emoji regex (unchanged) EMOJI_RE = re.compile( "[" "\U0001F600-\U0001F64F" "\U0001F300-\U0001F5FF" "\U0001F680-\U0001F6FF" "\U0001F1E0-\U0001F1FF" "\U00002702-\U000027B0" "\U000024C2-\U0001F251" "]+", re.UNICODE ) def count_smileys(text): return len(EMOJI_RE.findall(text)) def init_db(): conn = sqlite3.connect(DB_PATH) c = conn.cursor() c.execute(""" CREATE TABLE IF NOT EXISTS user_room_stats ( room_id TEXT, user_id TEXT, msgs INTEGER DEFAULT 0, chars INTEGER DEFAULT 0, words INTEGER DEFAULT 0, smileys INTEGER DEFAULT 0, actions INTEGER DEFAULT 0, joins INTEGER DEFAULT 0, parts INTEGER DEFAULT 0, kicks_given INTEGER DEFAULT 0, kicked_received INTEGER DEFAULT 0, topics_set INTEGER DEFAULT 0, last_updated INTEGER, PRIMARY KEY (room_id, user_id) ) """) conn.commit() conn.close() async def resolve_user_from_tokens(bot, room_id, tokens): resp = await bot.async_client.joined_members(room_id) if resp.members is None: raise ValueError("Could not fetch member list.") cache = {} for member in resp.members: display = (member.display_name or "").strip() if not display: continue key = display.lower() if key in cache: cache[key] = None else: cache[key] = (member.user_id, display) for end in range(len(tokens), 0, -1): candidate = " ".join(tokens[:end]).strip().lower() if candidate in cache: entry = cache[candidate] if entry is not None: return entry raise ValueError(f"No member found for '{' '.join(tokens)}'.") def setup(bot): init_db() @bot.listener.on_custom_event(nio.RoomMemberEvent) async def member_event(room, event): room_id = room.room_id membership = event.content.get("membership") state_key = event.state_key sender = event.sender if state_key == bot.async_client.user_id: return if membership == "join": _incr(room_id, state_key, "joins") elif membership == "leave": if sender != state_key: _incr(room_id, sender, "kicks_given") _incr(room_id, state_key, "kicked_received") else: _incr(room_id, state_key, "parts") @bot.listener.on_custom_event(nio.RoomTopicEvent) async def topic_event(room, event): room_id = room.room_id sender = event.sender _incr(room_id, sender, "topics_set") def _incr(room_id, user_id, column): conn = sqlite3.connect(DB_PATH) c = conn.cursor() c.execute("INSERT OR IGNORE INTO user_room_stats (room_id, user_id) VALUES (?, ?)", (room_id, user_id)) c.execute(f"UPDATE user_room_stats SET {column} = {column} + 1, last_updated = ? WHERE room_id = ? AND user_id = ?", (int(time.time()), room_id, user_id)) conn.commit() conn.close() async def handle_command(room, message, bot, prefix, config): room_id = room.room_id sender = message.sender # silently record stats if sender != bot.async_client.user_id: body = message.body or "" words = len(body.split()) chars = len(body) smileys = count_smileys(body) is_action = getattr(message, "msgtype", None) == "m.emote" conn = sqlite3.connect(DB_PATH) c = conn.cursor() c.execute("INSERT OR IGNORE INTO user_room_stats (room_id, user_id) VALUES (?, ?)", (room_id, sender)) c.execute("""UPDATE user_room_stats SET msgs=msgs+1, chars=chars+?, words=words+?, smileys=smileys+?, actions=actions+?, last_updated=? WHERE room_id=? AND user_id=?""", (chars, words, smileys, 1 if is_action else 0, int(time.time()), room_id, sender)) conn.commit() conn.close() match = botlib.MessageMatch(room, message, bot, prefix) if not match.is_not_from_this_bot() or not match.prefix(): return cmd = match.command() args = match.args() if cmd == "roomstats": await _handle_roomstats(bot, room_id) elif cmd == "rank": if not args: await bot.api.send_text_message(room_id, "Usage: !rank ") return col = args[0].lower() await _handle_rank(bot, room_id, col) elif cmd == "stats": if args: try: target_mxid, _ = await resolve_user_from_tokens(bot, room_id, args) except ValueError as e: await bot.api.send_text_message(room_id, str(e)) return else: target_mxid = sender await _handle_user_stats(bot, room_id, target_mxid) VALID_STATS = { "msgs": "Messages", "chars": "Characters", "words": "Words", "smileys": "Smileys", "actions": "Actions", "joins": "Joins", "parts": "Parts", "kicks_given": "Kicks given", "kicked_received": "Times kicked", "topics_set": "Topics set", } async def _get_aggregate(room_id): conn = sqlite3.connect(DB_PATH) c = conn.cursor() c.execute("""SELECT COALESCE(SUM(msgs),0), COALESCE(SUM(chars),0), COALESCE(SUM(words),0), COALESCE(SUM(smileys),0), COALESCE(SUM(actions),0), COALESCE(SUM(joins),0), COALESCE(SUM(parts),0), COALESCE(SUM(kicks_given),0), COALESCE(SUM(kicked_received),0), COALESCE(SUM(topics_set),0) FROM user_room_stats WHERE room_id=?""", (room_id,)) row = c.fetchone() conn.close() if not row or all(v == 0 for v in row): return None return dict(zip(VALID_STATS.keys(), row)) async def _handle_roomstats(bot, room_id): agg = await _get_aggregate(room_id) if not agg: await bot.api.send_text_message(room_id, "No stats collected yet.") return conn = sqlite3.connect(DB_PATH) c = conn.cursor() c.execute("SELECT user_id, msgs FROM user_room_stats WHERE room_id=? ORDER BY msgs DESC LIMIT 10", (room_id,)) top = c.fetchall() conn.close() resp = await bot.async_client.joined_members(room_id) top_rows = [] for uid, cnt in top: disp = uid if resp.members: for m in resp.members: if m.user_id == uid: disp = m.display_name or uid break top_rows.append(("πŸ“ˆ", disp, f"{cnt} msgs")) sections = [ {"title": "Room Statistics", "rows": [ ("πŸ“©", "Messages", agg["msgs"]), ("πŸ”€", "Characters", agg["chars"]), ("πŸ“", "Words", agg["words"]), ("πŸ˜€", "Smileys", agg["smileys"]), ("🎭", "Actions", agg["actions"]), ("πŸšͺ", "Joins", agg["joins"]), ("πŸ‘‹", "Parts", agg["parts"]), ("πŸ‘’", "Kicks given", agg["kicks_given"]), ("πŸ₯Ύ", "Times kicked", agg["kicked_received"]), ("πŸ“Œ", "Topics set", agg["topics_set"]), ]}, {"title": "Top 10 by messages", "rows": top_rows}, ] block = code_block("πŸ“Š Room Statistics", sections) output = collapsible_summary("πŸ“Š Room Statistics", block) await bot.api.send_markdown_message(room_id, output) async def _handle_rank(bot, room_id, col): if col not in VALID_STATS: await bot.api.send_text_message(room_id, f"Unknown stat: {col}. Allowed: {', '.join(VALID_STATS.keys())}") return conn = sqlite3.connect(DB_PATH) c = conn.cursor() c.execute(f"SELECT user_id, {col} FROM user_room_stats WHERE room_id=? AND {col}>0 ORDER BY {col} DESC LIMIT 10", (room_id,)) rows = c.fetchall() conn.close() if not rows: await bot.api.send_text_message(room_id, f"No users with {VALID_STATS[col]} > 0.") return resp = await bot.async_client.joined_members(room_id) rank_rows = [] for uid, val in rows: disp = uid if resp.members: for m in resp.members: if m.user_id == uid: disp = m.display_name or uid break rank_rows.append(("πŸ…", disp, str(val))) sections = [{"title": f"Ranking by {VALID_STATS[col]}", "rows": rank_rows}] block = code_block(f"πŸ† Top {VALID_STATS[col]}", sections) output = collapsible_summary(f"πŸ† {VALID_STATS[col]} Ranking", block) await bot.api.send_markdown_message(room_id, output) async def _handle_user_stats(bot, room_id, user_id): conn = sqlite3.connect(DB_PATH) c = conn.cursor() c.execute("""SELECT msgs, chars, words, smileys, actions, joins, parts, kicks_given, kicked_received, topics_set FROM user_room_stats WHERE room_id=? AND user_id=?""", (room_id, user_id)) row = c.fetchone() conn.close() if not row or all(v == 0 for v in row): disp = user_id resp = await bot.async_client.joined_members(room_id) if resp.members: for m in resp.members: if m.user_id == user_id: disp = m.display_name or user_id break await bot.api.send_text_message(room_id, f"No stats recorded for {disp}.") return resp = await bot.async_client.joined_members(room_id) disp = user_id if resp.members: for m in resp.members: if m.user_id == user_id: disp = m.display_name or user_id break rows = [ ("πŸ“©", "Messages", row[0]), ("πŸ”€", "Characters", row[1]), ("πŸ“", "Words", row[2]), ("πŸ˜€", "Smileys", row[3]), ("🎭", "Actions", row[4]), ("πŸšͺ", "Joins", row[5]), ("πŸ‘‹", "Parts", row[6]), ("πŸ‘’", "Kicks given", row[7]), ("πŸ₯Ύ", "Times kicked", row[8]), ("πŸ“Œ", "Topics set", row[9]), ] sections = [{"title": f"Stats for {disp}", "rows": rows}] block = code_block(f"πŸ“Š Stats for {disp}", sections) output = collapsible_summary(f"πŸ“Š Stats: {disp}", block) await bot.api.send_markdown_message(room_id, output) # --------------------------------------------------------------------------- # Plugin Metadata # --------------------------------------------------------------------------- __version__ = "1.1.0" __author__ = "Funguy Roomstats" __description__ = "Per‑user room statistics" __help__ = """
Room Statistics Commands
"""