""" Advanced Karma Plugin for Funguy Bot Provides comprehensive karma tracking with leaderboards, trends, and statistics. Features: * Give/take karma points from users using display names or Matrix IDs * Track karma history with timestamps * View karma leaderboards (top/bottom) * Rate limiting to prevent spam * Room-specific karma tracking Commands: !karma - Show this help !karma - Show karma for a user !karma++ - Give +1 karma !karma-- - Give -1 karma !karma top [n] - Show top karma entries !karma bottom [n] - Show bottom karma entries !karma rank - Show rank of user !karma stats - Show overall statistics !karma history - Show recent karma history Shortcuts: !++ - Same as !karma++ !-- - Same as !karma-- ++ - Give +1 karma (inline) -- - Give -1 karma (inline) """ import sqlite3 import logging import simplematrixbotlib as botlib from datetime import datetime, timedelta import re import asyncio import traceback # --------------------------------------------------------------------------- # Configuration # --------------------------------------------------------------------------- # Prevent spam: minimum seconds between karma changes to same target by same user COOLDOWN_SECONDS = 5 # Database file DB_FILE = "karma.db" # Cache for display name to user ID mappings (per room) # Structure: {room_id: {display_name: user_id}} display_name_cache = {} # Last time we refreshed the cache (per room) cache_timestamp = {} # --------------------------------------------------------------------------- # Helper: pluralize "point" vs "points" # --------------------------------------------------------------------------- def pluralize_points(amount): """Return 'point' if amount is 1 or -1, else 'points'.""" return "point" if abs(amount) == 1 else "points" # --------------------------------------------------------------------------- # Database Setup with Room Support # --------------------------------------------------------------------------- def init_db(): """Initialize the database tables with room support.""" logging.info("Initializing karma database...") conn = sqlite3.connect(DB_FILE) c = conn.cursor() # Create tables (store user_id, not display name) c.execute('''CREATE TABLE IF NOT EXISTS karma ( room_id TEXT, user_id TEXT, points INTEGER DEFAULT 0, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (room_id, user_id) )''') c.execute('''CREATE TABLE IF NOT EXISTS karma_history ( id INTEGER PRIMARY KEY AUTOINCREMENT, room_id TEXT, user_id TEXT, change INTEGER, new_points INTEGER, voter TEXT, voted_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP )''') c.execute('''CREATE TABLE IF NOT EXISTS karma_cooldown ( room_id TEXT, user_id TEXT, voter TEXT, last_voted TIMESTAMP, PRIMARY KEY (room_id, user_id, voter) )''') conn.commit() conn.close() logging.info("Karma database initialized successfully") def get_connection(): """Get database connection with row factory for dict access.""" conn = sqlite3.connect(DB_FILE) conn.row_factory = sqlite3.Row return conn # --------------------------------------------------------------------------- # Display Name Resolution # --------------------------------------------------------------------------- async def refresh_display_name_cache(bot, room_id): """Refresh the cache of display names to user IDs for a room.""" global display_name_cache, cache_timestamp # Check if we need to refresh (cache older than 5 minutes) now = datetime.now().timestamp() if room_id in cache_timestamp and (now - cache_timestamp[room_id]) < 300: return logging.info(f"Refreshing display name cache for room {room_id}") try: # Try to get room members from the bot's state if hasattr(bot, 'async_client') and bot.async_client: # Get the room state room = bot.async_client.rooms.get(room_id) if room and hasattr(room, 'users'): # Build mapping of display names to user IDs name_map = {} for user_id, user_info in room.users.items(): # Get display name - try different attributes display_name = None if hasattr(user_info, 'display_name') and user_info.display_name: display_name = user_info.display_name elif hasattr(user_info, 'name') and user_info.name: display_name = user_info.name if display_name: name_map[display_name.lower()] = user_id # Also store without emojis for easier matching clean_name = re.sub(r'[^\w\s]', '', display_name).strip().lower() if clean_name and clean_name != display_name.lower(): name_map[clean_name] = user_id display_name_cache[room_id] = name_map cache_timestamp[room_id] = now logging.info(f"Cached {len(name_map)} display names for room {room_id}") return except Exception as e: logging.warning(f"Could not refresh display name cache: {e}") # If we couldn't get members, initialize empty cache display_name_cache[room_id] = {} cache_timestamp[room_id] = now def resolve_display_name(room_id, display_name, bot=None): """Resolve a display name to a Matrix user ID.""" global display_name_cache # If it's already a valid Matrix ID, return it if display_name.startswith('@') and ':' in display_name: return display_name # Check the cache if room_id in display_name_cache: name_map = display_name_cache[room_id] # Try exact match (case-insensitive) key = display_name.lower() if key in name_map: return name_map[key] # Try without emojis/special characters clean_key = re.sub(r'[^\w\s]', '', display_name).strip().lower() if clean_key and clean_key in name_map: return name_map[clean_key] # Try partial match (if display name is contained in a cached name) for cached_name, user_id in name_map.items(): if key in cached_name or cached_name in key: return user_id return None def get_display_name_from_user_id(bot, room_id, user_id): """Get the display name for a user ID.""" try: if hasattr(bot, 'async_client') and bot.async_client: room = bot.async_client.rooms.get(room_id) if room and hasattr(room, 'users') and user_id in room.users: user_info = room.users[user_id] if hasattr(user_info, 'display_name') and user_info.display_name: return user_info.display_name except: pass # Fallback: extract local part from user ID if ':' in user_id: return user_id.split(':')[0].lstrip('@') return user_id.lstrip('@') # --------------------------------------------------------------------------- # Helper Functions # --------------------------------------------------------------------------- def is_on_cooldown(room_id, user_id, voter): """Check if voter is on cooldown for this user in this room.""" conn = get_connection() c = conn.cursor() c.execute('''SELECT last_voted FROM karma_cooldown WHERE room_id = ? AND user_id = ? AND voter = ?''', (room_id, user_id, voter)) row = c.fetchone() conn.close() if row: try: last_voted = datetime.fromisoformat(row['last_voted']) if datetime.now() - last_voted < timedelta(seconds=COOLDOWN_SECONDS): return True except: pass return False def get_cooldown_remaining(room_id, user_id, voter): """Get remaining cooldown seconds for a voter.""" conn = get_connection() c = conn.cursor() c.execute('''SELECT last_voted FROM karma_cooldown WHERE room_id = ? AND user_id = ? AND voter = ?''', (room_id, user_id, voter)) row = c.fetchone() conn.close() if row: try: last_voted = datetime.fromisoformat(row['last_voted']) elapsed = (datetime.now() - last_voted).total_seconds() remaining = COOLDOWN_SECONDS - elapsed if remaining > 0: return int(remaining) except: pass return 0 def update_cooldown(room_id, user_id, voter): """Update the cooldown timestamp for this voter.""" conn = get_connection() c = conn.cursor() c.execute('''INSERT OR REPLACE INTO karma_cooldown (room_id, user_id, voter, last_voted) VALUES (?, ?, ?, ?)''', (room_id, user_id, voter, datetime.now().isoformat())) conn.commit() conn.close() def update_karma(room_id, user_id, change, voter): """Update karma points for a user in a specific room.""" if change == 0: return None logging.debug(f"Updating karma: room={room_id}, user={user_id}, change={change}, voter={voter}") conn = get_connection() c = conn.cursor() # Insert or ignore the user c.execute('''INSERT OR IGNORE INTO karma (room_id, user_id, points) VALUES (?, ?, 0)''', (room_id, user_id)) # Update points c.execute('''UPDATE karma SET points = points + ?, updated_at = CURRENT_TIMESTAMP WHERE room_id = ? AND user_id = ?''', (change, room_id, user_id)) # Get new points c.execute('''SELECT points FROM karma WHERE room_id = ? AND user_id = ?''', (room_id, user_id)) row = c.fetchone() new_points = row['points'] if row else 0 # Record history c.execute('''INSERT INTO karma_history (room_id, user_id, change, new_points, voter) VALUES (?, ?, ?, ?, ?)''', (room_id, user_id, change, new_points, voter)) conn.commit() conn.close() logging.debug(f"Karma updated: {user_id} now has {new_points} points in room {room_id}") return new_points def get_karma(room_id, user_id): """Get karma points for a user in a specific room.""" conn = get_connection() c = conn.cursor() c.execute('''SELECT points FROM karma WHERE room_id = ? AND user_id = ?''', (room_id, user_id)) row = c.fetchone() conn.close() return row['points'] if row else 0 def get_leaderboard(room_id, bot, limit=10, reverse=False): """Get top or bottom karma leaderboard for a room.""" conn = get_connection() c = conn.cursor() order = "DESC" if not reverse else "ASC" c.execute(f'''SELECT user_id, points FROM karma WHERE room_id = ? AND points != 0 ORDER BY points {order} LIMIT ?''', (room_id, limit)) rows = c.fetchall() conn.close() # Convert user IDs to display names leaderboard = [] for row in rows: display_name = get_display_name_from_user_id(bot, room_id, row['user_id']) leaderboard.append({ 'display_name': display_name, 'user_id': row['user_id'], 'points': row['points'] }) return leaderboard def get_rank(room_id, user_id): """Get rank of a user in karma leaderboard for a room.""" conn = get_connection() c = conn.cursor() # Get all points ordered descending c.execute('''SELECT user_id, points FROM karma WHERE room_id = ? AND points != 0 ORDER BY points DESC''', (room_id,)) rows = c.fetchall() conn.close() for i, row in enumerate(rows, 1): if row['user_id'] == user_id: return i, len(rows) return None, len(rows) def get_recent_history(room_id, user_id, bot, limit=5): """Get recent karma history for a user in a room.""" conn = get_connection() c = conn.cursor() c.execute('''SELECT change, voter, voted_at FROM karma_history WHERE room_id = ? AND user_id = ? ORDER BY voted_at DESC LIMIT ?''', (room_id, user_id, limit)) rows = c.fetchall() conn.close() history = [] for row in rows: voter_name = get_display_name_from_user_id(bot, room_id, row['voter']) history.append({ 'change': row['change'], 'voter': voter_name, 'voter_id': row['voter'], 'voted_at': row['voted_at'] }) return history def get_stats(room_id=None): """Get overall karma statistics.""" conn = get_connection() c = conn.cursor() if room_id: c.execute('''SELECT COUNT(*) as total_users FROM karma WHERE room_id = ?''', (room_id,)) total_users = c.fetchone()['total_users'] c.execute('''SELECT SUM(points) as total_points FROM karma WHERE room_id = ?''', (room_id,)) total_points = c.fetchone()['total_points'] or 0 c.execute('''SELECT AVG(points) as avg_points FROM karma WHERE room_id = ?''', (room_id,)) avg_points = c.fetchone()['avg_points'] or 0 c.execute('''SELECT MAX(points) as max_points FROM karma WHERE room_id = ?''', (room_id,)) max_points = c.fetchone()['max_points'] or 0 c.execute('''SELECT MIN(points) as min_points FROM karma WHERE room_id = ?''', (room_id,)) min_points = c.fetchone()['min_points'] or 0 c.execute('''SELECT COUNT(*) as total_votes FROM karma_history WHERE room_id = ?''', (room_id,)) total_votes = c.fetchone()['total_votes'] else: c.execute('''SELECT COUNT(*) as total_users FROM karma''') total_users = c.fetchone()['total_users'] c.execute('''SELECT SUM(points) as total_points FROM karma''') total_points = c.fetchone()['total_points'] or 0 c.execute('''SELECT AVG(points) as avg_points FROM karma''') avg_points = c.fetchone()['avg_points'] or 0 c.execute('''SELECT MAX(points) as max_points FROM karma''') max_points = c.fetchone()['max_points'] or 0 c.execute('''SELECT MIN(points) as min_points FROM karma''') min_points = c.fetchone()['min_points'] or 0 c.execute('''SELECT COUNT(*) as total_votes FROM karma_history''') total_votes = c.fetchone()['total_votes'] conn.close() return { 'total_users': total_users, 'total_points': total_points, 'avg_points': round(avg_points, 2), 'max_points': max_points, 'min_points': min_points, 'total_votes': total_votes } def format_karma_display(display_name, points): """Format karma display with visual indicators.""" if points > 0: indicator = "👍" if points > 10 else "➕" return f"💗 **{display_name}** has {points} karma {indicator}" elif points < 0: indicator = "👎" if points < -10 else "➖" return f"💔 **{display_name}** has {points} karma {indicator}" else: return f"⚖️ **{display_name}** has neutral karma (0)" # --------------------------------------------------------------------------- # Command Handlers # --------------------------------------------------------------------------- async def handle_command(room, message, bot, prefix, config): """Handle karma commands.""" match = botlib.MessageMatch(room, message, bot, prefix) room_id = room.room_id # Refresh display name cache await refresh_display_name_cache(bot, room_id) # Debug logging message_body = message.body if hasattr(message, 'body') else str(message) logging.info(f"Karma plugin received message: '{message_body}' from {message.sender}") # Get the full command (including what might be karma++ etc.) full_cmd = match.command() if hasattr(match, 'command') else '' logging.debug(f"Full command: '{full_cmd}'") # Check for !karma++ or !karma-- as a single command if match.is_not_from_this_bot() and match.prefix(): if full_cmd == 'karma++': # !karma++ username args = match.args() if not args: await bot.api.send_markdown_message(room.room_id, "Usage: !karma++ ") return display_name = ' '.join(args) await process_karma_vote(room, display_name, '++', message.sender, bot) return elif full_cmd == 'karma--': # !karma-- username args = match.args() if not args: await bot.api.send_markdown_message(room.room_id, "Usage: !karma-- ") return display_name = ' '.join(args) await process_karma_vote(room, display_name, '--', message.sender, bot) return elif full_cmd == 'karma': # Regular !karma command args = match.args() if args and args[0] in ['++', '--']: # !karma ++ username (space between) action = args[0] if len(args) < 2: await bot.api.send_markdown_message(room.room_id, f"Usage: !karma {action} ") return display_name = ' '.join(args[1:]) await process_karma_vote(room, display_name, action, message.sender, bot) return else: # !karma with subcommands or username await handle_karma_command(room, message, bot, config) return # Handle !++ and !-- shortcuts elif full_cmd in ['++', '--']: args = match.args() if not args: await bot.api.send_markdown_message(room.room_id, f"Usage: !{full_cmd} \nExample: !{full_cmd} Nexilva") return display_name = ' '.join(args) await process_karma_vote(room, display_name, full_cmd, message.sender, bot) return # Handle inline karma (message body contains ++ or --) if match.is_not_from_this_bot() and not match.prefix(): await handle_inline_karma(room, message, bot) async def process_karma_vote(room, display_name, action, voter, bot): """Process a karma vote using display name.""" room_id = room.room_id voter_str = str(voter) change = 1 if action == '++' else -1 logging.info(f"Karma vote: {voter_str} -> '{display_name}' ({action}, change={change:+d}) in room {room_id}") # Resolve display name to user ID user_id = resolve_display_name(room_id, display_name, bot) if not user_id: await bot.api.send_markdown_message(room.room_id, f"❌ Could not find user '{display_name}'. Please use their display name or Matrix ID (e.g., @username:server)") return # Prevent self-modification if user_id == voter_str: await bot.api.send_markdown_message(room.room_id, "❌ You cannot modify your own karma!") return # Check cooldown if is_on_cooldown(room_id, user_id, voter_str): remaining = get_cooldown_remaining(room_id, user_id, voter_str) await bot.api.send_markdown_message(room.room_id, f"⏳ You're doing that too fast! Wait {remaining} seconds.") return # Update karma try: new_points = update_karma(room_id, user_id, change, voter_str) update_cooldown(room_id, user_id, voter_str) # Get display name for response display_name_resolved = get_display_name_from_user_id(bot, room_id, user_id) response = format_karma_display(display_name_resolved, new_points) await bot.api.send_markdown_message(room.room_id, response) logging.info(f"Karma updated successfully: {user_id} now has {new_points} points") except Exception as e: logging.error(f"Error updating karma: {e}") logging.error(traceback.format_exc()) await bot.api.send_markdown_message(room.room_id, f"❌ Error updating karma: {str(e)}") async def handle_karma_command(room, message, bot, config): """Handle the !karma command and its subcommands.""" match = botlib.MessageMatch(room, message, bot, config.prefix) args = match.args() room_id = room.room_id if not args: # Show help for !karma command in collapsible details tag with proper HTML lists help_text = f"""
💗 Karma Plugin Help (click to expand) Commands:
  • !karma - Show this help
  • !karma <user> - Show karma for a user (use display name or @user:server)
  • !karma++ <user> - Give +1 karma to a user
  • !karma-- <user> - Give -1 karma to a user
  • !karma top [n] - Show top karma leaders
  • !karma bottom [n] - Show bottom karma entries
  • !karma rank <user> - Show rank of a user
  • !karma stats - Show overall statistics
  • !karma history <user> - Show recent karma history
Shortcuts:
  • !++ <user> - Same as !karma++ <user>
  • !-- <user> - Same as !karma-- <user>
  • <user>++ - Give +1 karma (inline)
  • <user>-- - Give -1 karma (inline)
Examples:
  • !karma Nexilva - Check Nexilva's karma
  • !karma++ @hb:matrix.org - Give HB +1 karma
  • !karma top 5 - Show top 5 leaders
  • Nexilva++ - Give Nexilva +1 karma (inline)
Notes:
  • You cannot modify your own karma
  • There is a {COOLDOWN_SECONDS} second cooldown between votes
  • Karma is tracked separately per room
  • Display names with emojis are supported
""" await bot.api.send_markdown_message(room.room_id, help_text) return subcommand = args[0].lower() # !karma top [n] if subcommand == "top": limit = 10 if len(args) > 1 and args[1].isdigit(): limit = min(int(args[1]), 25) leaderboard = get_leaderboard(room_id, bot, limit, reverse=False) if leaderboard: response = f"🏆 **Top {len(leaderboard)} Karma Leaders**\n\n" for i, entry in enumerate(leaderboard, 1): medal = "🥇" if i == 1 else "🥈" if i == 2 else "🥉" if i == 3 else "📌" pts = entry['points'] response += f"{medal} **{i}.** {entry['display_name']}: {pts} {pluralize_points(pts)}\n" await bot.api.send_markdown_message(room.room_id, response) else: await bot.api.send_markdown_message(room.room_id, "No karma entries found in this room.") return # !karma bottom [n] if subcommand == "bottom": limit = 10 if len(args) > 1 and args[1].isdigit(): limit = min(int(args[1]), 25) leaderboard = get_leaderboard(room_id, bot, limit, reverse=True) if leaderboard: response = f"📉 **Bottom {len(leaderboard)} Karma (Needs Love)**\n\n" for i, entry in enumerate(leaderboard, 1): pts = entry['points'] response += f"⚠️ **{i}.** {entry['display_name']}: {pts} {pluralize_points(pts)}\n" await bot.api.send_markdown_message(room.room_id, response) else: await bot.api.send_markdown_message(room.room_id, "No karma entries found in this room.") return # !karma rank if subcommand == "rank" and len(args) >= 2: display_name = ' '.join(args[1:]) user_id = resolve_display_name(room_id, display_name, bot) if not user_id: await bot.api.send_markdown_message(room.room_id, f"❌ Could not find user '{display_name}'") return rank, total = get_rank(room_id, user_id) if rank: points = get_karma(room_id, user_id) percentile = round((1 - rank/total) * 100, 1) if total > 0 else 0 display_name_resolved = get_display_name_from_user_id(bot, room_id, user_id) response = f"📊 **{display_name_resolved}** is ranked #{rank} out of {total} (top {percentile}%)\n💗 Karma: {points} {pluralize_points(points)}" await bot.api.send_markdown_message(room.room_id, response) else: display_name_resolved = get_display_name_from_user_id(bot, room_id, user_id) await bot.api.send_markdown_message(room.room_id, f"❌ {display_name_resolved} has no karma yet in this room.") return # !karma history if subcommand == "history" and len(args) >= 2: display_name = ' '.join(args[1:]) user_id = resolve_display_name(room_id, display_name, bot) if not user_id: await bot.api.send_markdown_message(room.room_id, f"❌ Could not find user '{display_name}'") return history = get_recent_history(room_id, user_id, bot, 5) if history: display_name_resolved = get_display_name_from_user_id(bot, room_id, user_id) response = f"📜 **Recent Karma History for {display_name_resolved}**\n\n" for h in history: arrow = "⬆️" if h['change'] > 0 else "⬇️" try: voted_at = datetime.fromisoformat(h['voted_at']) time_str = voted_at.strftime("%Y-%m-%d %H:%M") except: time_str = "recently" response += f"{arrow} {h['change']:+d} by **{h['voter']}** at {time_str}\n" await bot.api.send_markdown_message(room.room_id, response) else: display_name_resolved = get_display_name_from_user_id(bot, room_id, user_id) await bot.api.send_markdown_message(room.room_id, f"No karma history for {display_name_resolved} in this room.") return # !karma stats if subcommand == "stats": stats = get_stats(room_id) response = f"📊 **Karma System Statistics for this Room**\n\n" response += f"📝 Total users tracked: {stats['total_users']}\n" response += f"⭐ Total karma points: {stats['total_points']}\n" response += f"📈 Average karma: {stats['avg_points']}\n" response += f"🏆 Highest karma: {stats['max_points']}\n" response += f"📉 Lowest karma: {stats['min_points']}\n" response += f"🗳️ Total votes cast: {stats['total_votes']}\n" await bot.api.send_markdown_message(room.room_id, response) return # !karma (show karma for specific user) display_name = ' '.join(args) user_id = resolve_display_name(room_id, display_name, bot) if not user_id: await bot.api.send_markdown_message(room.room_id, f"❌ Could not find user '{display_name}'") return points = get_karma(room_id, user_id) display_name_resolved = get_display_name_from_user_id(bot, room_id, user_id) response = format_karma_display(display_name_resolved, points) await bot.api.send_markdown_message(room.room_id, response) return async def handle_inline_karma(room, message, bot): """Handle inline karma expressions like 'user++' or 'user--'.""" body = message.body if hasattr(message, 'body') else str(message) sender = str(message.sender) room_id = room.room_id # Refresh display name cache await refresh_display_name_cache(bot, room_id) # Pattern to match text followed by ++ or -- at end of word/string pattern = r'(.+?)(\+\+|--)(?:\s|$)' matches = re.findall(pattern, body) if not matches: return logging.info(f"Found inline karma matches: {matches}") responses = [] for display_name, operator in matches: display_name = display_name.strip() change = 1 if operator == '++' else -1 # Resolve display name to user ID user_id = resolve_display_name(room_id, display_name, bot) if not user_id: logging.debug(f"Could not resolve display name: '{display_name}'") continue # Skip self-modification if user_id == sender: logging.debug(f"Skipping self-modification: {sender} -> {display_name}") continue # Check cooldown if is_on_cooldown(room_id, user_id, sender): logging.debug(f"Cooldown active for {sender} -> {user_id}") continue # Update karma try: new_points = update_karma(room_id, user_id, change, sender) update_cooldown(room_id, user_id, sender) # Format response display_name_resolved = get_display_name_from_user_id(bot, room_id, user_id) arrow = "⬆️" if change > 0 else "⬇️" responses.append(f"{arrow} **{display_name_resolved}** → {new_points}") except Exception as e: logging.error(f"Error updating inline karma: {e}") await asyncio.sleep(0.3) if responses: response_text = " | ".join(responses[:3]) if len(responses) > 3: response_text += f" (and {len(responses)-3} more)" await bot.api.send_markdown_message(room.room_id, f"💗 {response_text}") # --------------------------------------------------------------------------- # Plugin Setup # --------------------------------------------------------------------------- def setup(bot): """Initialize the karma plugin.""" logging.info("=" * 50) logging.info("LOADING KARMA PLUGIN") logging.info("=" * 50) init_db() logging.info("Advanced Karma plugin loaded successfully") logging.info("Supports display names (e.g., 'Nexilva' or '🍄 HB🍄') and Matrix IDs") logging.info("Commands: !karma, !karma++, !karma--, !++, !--, and inline ++/--") logging.info("=" * 50) # --------------------------------------------------------------------------- # Plugin Metadata # --------------------------------------------------------------------------- __version__ = "1.0.0" __author__ = "Funguy Bot" __description__ = "Room karma tracking system" __help__ = """
!karma – Karma system
  • !karma <user> – Show points
  • !karma++ <user> / !-- <user> – Modify karma
  • !karma top [n] / !karma bottom [n] – Leaderboard
  • !karma rank <user> – Position
  • !karma stats – Room statistics
  • !karma history <user> – Recent votes

Shortcuts: !++ user, !-- user, and inline user++ / user--.

"""