837 lines
30 KiB
Python
837 lines
30 KiB
Python
"""
|
||
Advanced Karma Plugin for Funguy Bot
|
||
|
||
Provides comprehensive karma tracking with leaderboards, trends, and statistics.
|
||
|
||
Features:
|
||
* Give/take karma points from users using display names or Matrix IDs
|
||
* Track karma history with timestamps
|
||
* View karma leaderboards (top/bottom)
|
||
* Rate limiting to prevent spam
|
||
* Room-specific karma tracking
|
||
|
||
Commands:
|
||
!karma - Show this help
|
||
!karma <user> - Show karma for a user
|
||
!karma++ <user> - Give +1 karma
|
||
!karma-- <user> - Give -1 karma
|
||
!karma top [n] - Show top karma entries
|
||
!karma bottom [n] - Show bottom karma entries
|
||
!karma rank <user> - Show rank of user
|
||
!karma stats - Show overall statistics
|
||
!karma history <user>- Show recent karma history
|
||
|
||
Shortcuts:
|
||
!++ <user> - Same as !karma++ <user>
|
||
!-- <user> - Same as !karma-- <user>
|
||
<user>++ - Give +1 karma (inline)
|
||
<user>-- - Give -1 karma (inline)
|
||
"""
|
||
|
||
import sqlite3
|
||
import logging
|
||
import simplematrixbotlib as botlib
|
||
from datetime import datetime, timedelta
|
||
import re
|
||
import asyncio
|
||
import traceback
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Configuration
|
||
# ---------------------------------------------------------------------------
|
||
|
||
# Prevent spam: minimum seconds between karma changes to same target by same user
|
||
COOLDOWN_SECONDS = 5
|
||
|
||
# Database file
|
||
DB_FILE = "karma.db"
|
||
|
||
# Cache for display name to user ID mappings (per room)
|
||
# Structure: {room_id: {display_name: user_id}}
|
||
display_name_cache = {}
|
||
|
||
# Last time we refreshed the cache (per room)
|
||
cache_timestamp = {}
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Helper: pluralize "point" vs "points"
|
||
# ---------------------------------------------------------------------------
|
||
def pluralize_points(amount):
|
||
"""Return 'point' if amount is 1 or -1, else 'points'."""
|
||
return "point" if abs(amount) == 1 else "points"
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Database Setup with Room Support
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def init_db():
|
||
"""Initialize the database tables with room support."""
|
||
logging.info("Initializing karma database...")
|
||
conn = sqlite3.connect(DB_FILE)
|
||
c = conn.cursor()
|
||
|
||
# Create tables (store user_id, not display name)
|
||
c.execute('''CREATE TABLE IF NOT EXISTS karma (
|
||
room_id TEXT,
|
||
user_id TEXT,
|
||
points INTEGER DEFAULT 0,
|
||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||
PRIMARY KEY (room_id, user_id)
|
||
)''')
|
||
|
||
c.execute('''CREATE TABLE IF NOT EXISTS karma_history (
|
||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||
room_id TEXT,
|
||
user_id TEXT,
|
||
change INTEGER,
|
||
new_points INTEGER,
|
||
voter TEXT,
|
||
voted_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
||
)''')
|
||
|
||
c.execute('''CREATE TABLE IF NOT EXISTS karma_cooldown (
|
||
room_id TEXT,
|
||
user_id TEXT,
|
||
voter TEXT,
|
||
last_voted TIMESTAMP,
|
||
PRIMARY KEY (room_id, user_id, voter)
|
||
)''')
|
||
|
||
conn.commit()
|
||
conn.close()
|
||
logging.info("Karma database initialized successfully")
|
||
|
||
|
||
def get_connection():
|
||
"""Get database connection with row factory for dict access."""
|
||
conn = sqlite3.connect(DB_FILE)
|
||
conn.row_factory = sqlite3.Row
|
||
return conn
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Display Name Resolution
|
||
# ---------------------------------------------------------------------------
|
||
|
||
async def refresh_display_name_cache(bot, room_id):
|
||
"""Refresh the cache of display names to user IDs for a room."""
|
||
global display_name_cache, cache_timestamp
|
||
|
||
# Check if we need to refresh (cache older than 5 minutes)
|
||
now = datetime.now().timestamp()
|
||
if room_id in cache_timestamp and (now - cache_timestamp[room_id]) < 300:
|
||
return
|
||
|
||
logging.info(f"Refreshing display name cache for room {room_id}")
|
||
|
||
try:
|
||
# Try to get room members from the bot's state
|
||
if hasattr(bot, 'async_client') and bot.async_client:
|
||
# Get the room state
|
||
room = bot.async_client.rooms.get(room_id)
|
||
if room and hasattr(room, 'users'):
|
||
# Build mapping of display names to user IDs
|
||
name_map = {}
|
||
for user_id, user_info in room.users.items():
|
||
# Get display name - try different attributes
|
||
display_name = None
|
||
if hasattr(user_info, 'display_name') and user_info.display_name:
|
||
display_name = user_info.display_name
|
||
elif hasattr(user_info, 'name') and user_info.name:
|
||
display_name = user_info.name
|
||
|
||
if display_name:
|
||
name_map[display_name.lower()] = user_id
|
||
# Also store without emojis for easier matching
|
||
clean_name = re.sub(r'[^\w\s]', '', display_name).strip().lower()
|
||
if clean_name and clean_name != display_name.lower():
|
||
name_map[clean_name] = user_id
|
||
|
||
display_name_cache[room_id] = name_map
|
||
cache_timestamp[room_id] = now
|
||
logging.info(f"Cached {len(name_map)} display names for room {room_id}")
|
||
return
|
||
|
||
except Exception as e:
|
||
logging.warning(f"Could not refresh display name cache: {e}")
|
||
|
||
# If we couldn't get members, initialize empty cache
|
||
display_name_cache[room_id] = {}
|
||
cache_timestamp[room_id] = now
|
||
|
||
|
||
def resolve_display_name(room_id, display_name, bot=None):
|
||
"""Resolve a display name to a Matrix user ID."""
|
||
global display_name_cache
|
||
|
||
# If it's already a valid Matrix ID, return it
|
||
if display_name.startswith('@') and ':' in display_name:
|
||
return display_name
|
||
|
||
# Check the cache
|
||
if room_id in display_name_cache:
|
||
name_map = display_name_cache[room_id]
|
||
|
||
# Try exact match (case-insensitive)
|
||
key = display_name.lower()
|
||
if key in name_map:
|
||
return name_map[key]
|
||
|
||
# Try without emojis/special characters
|
||
clean_key = re.sub(r'[^\w\s]', '', display_name).strip().lower()
|
||
if clean_key and clean_key in name_map:
|
||
return name_map[clean_key]
|
||
|
||
# Try partial match (if display name is contained in a cached name)
|
||
for cached_name, user_id in name_map.items():
|
||
if key in cached_name or cached_name in key:
|
||
return user_id
|
||
|
||
return None
|
||
|
||
|
||
def get_display_name_from_user_id(bot, room_id, user_id):
|
||
"""Get the display name for a user ID."""
|
||
try:
|
||
if hasattr(bot, 'async_client') and bot.async_client:
|
||
room = bot.async_client.rooms.get(room_id)
|
||
if room and hasattr(room, 'users') and user_id in room.users:
|
||
user_info = room.users[user_id]
|
||
if hasattr(user_info, 'display_name') and user_info.display_name:
|
||
return user_info.display_name
|
||
except:
|
||
pass
|
||
|
||
# Fallback: extract local part from user ID
|
||
if ':' in user_id:
|
||
return user_id.split(':')[0].lstrip('@')
|
||
return user_id.lstrip('@')
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Helper Functions
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def is_on_cooldown(room_id, user_id, voter):
|
||
"""Check if voter is on cooldown for this user in this room."""
|
||
conn = get_connection()
|
||
c = conn.cursor()
|
||
|
||
c.execute('''SELECT last_voted FROM karma_cooldown
|
||
WHERE room_id = ? AND user_id = ? AND voter = ?''',
|
||
(room_id, user_id, voter))
|
||
row = c.fetchone()
|
||
conn.close()
|
||
|
||
if row:
|
||
try:
|
||
last_voted = datetime.fromisoformat(row['last_voted'])
|
||
if datetime.now() - last_voted < timedelta(seconds=COOLDOWN_SECONDS):
|
||
return True
|
||
except:
|
||
pass
|
||
return False
|
||
|
||
|
||
def get_cooldown_remaining(room_id, user_id, voter):
|
||
"""Get remaining cooldown seconds for a voter."""
|
||
conn = get_connection()
|
||
c = conn.cursor()
|
||
|
||
c.execute('''SELECT last_voted FROM karma_cooldown
|
||
WHERE room_id = ? AND user_id = ? AND voter = ?''',
|
||
(room_id, user_id, voter))
|
||
row = c.fetchone()
|
||
conn.close()
|
||
|
||
if row:
|
||
try:
|
||
last_voted = datetime.fromisoformat(row['last_voted'])
|
||
elapsed = (datetime.now() - last_voted).total_seconds()
|
||
remaining = COOLDOWN_SECONDS - elapsed
|
||
if remaining > 0:
|
||
return int(remaining)
|
||
except:
|
||
pass
|
||
return 0
|
||
|
||
|
||
def update_cooldown(room_id, user_id, voter):
|
||
"""Update the cooldown timestamp for this voter."""
|
||
conn = get_connection()
|
||
c = conn.cursor()
|
||
|
||
c.execute('''INSERT OR REPLACE INTO karma_cooldown (room_id, user_id, voter, last_voted)
|
||
VALUES (?, ?, ?, ?)''', (room_id, user_id, voter, datetime.now().isoformat()))
|
||
conn.commit()
|
||
conn.close()
|
||
|
||
|
||
def update_karma(room_id, user_id, change, voter):
|
||
"""Update karma points for a user in a specific room."""
|
||
if change == 0:
|
||
return None
|
||
|
||
logging.debug(f"Updating karma: room={room_id}, user={user_id}, change={change}, voter={voter}")
|
||
|
||
conn = get_connection()
|
||
c = conn.cursor()
|
||
|
||
# Insert or ignore the user
|
||
c.execute('''INSERT OR IGNORE INTO karma (room_id, user_id, points) VALUES (?, ?, 0)''',
|
||
(room_id, user_id))
|
||
|
||
# Update points
|
||
c.execute('''UPDATE karma SET points = points + ?, updated_at = CURRENT_TIMESTAMP
|
||
WHERE room_id = ? AND user_id = ?''', (change, room_id, user_id))
|
||
|
||
# Get new points
|
||
c.execute('''SELECT points FROM karma WHERE room_id = ? AND user_id = ?''', (room_id, user_id))
|
||
row = c.fetchone()
|
||
new_points = row['points'] if row else 0
|
||
|
||
# Record history
|
||
c.execute('''INSERT INTO karma_history (room_id, user_id, change, new_points, voter)
|
||
VALUES (?, ?, ?, ?, ?)''', (room_id, user_id, change, new_points, voter))
|
||
|
||
conn.commit()
|
||
conn.close()
|
||
|
||
logging.debug(f"Karma updated: {user_id} now has {new_points} points in room {room_id}")
|
||
return new_points
|
||
|
||
|
||
def get_karma(room_id, user_id):
|
||
"""Get karma points for a user in a specific room."""
|
||
conn = get_connection()
|
||
c = conn.cursor()
|
||
|
||
c.execute('''SELECT points FROM karma WHERE room_id = ? AND user_id = ?''', (room_id, user_id))
|
||
row = c.fetchone()
|
||
conn.close()
|
||
|
||
return row['points'] if row else 0
|
||
|
||
|
||
def get_leaderboard(room_id, bot, limit=10, reverse=False):
|
||
"""Get top or bottom karma leaderboard for a room."""
|
||
conn = get_connection()
|
||
c = conn.cursor()
|
||
|
||
order = "DESC" if not reverse else "ASC"
|
||
c.execute(f'''SELECT user_id, points FROM karma
|
||
WHERE room_id = ? AND points != 0
|
||
ORDER BY points {order} LIMIT ?''', (room_id, limit))
|
||
rows = c.fetchall()
|
||
conn.close()
|
||
|
||
# Convert user IDs to display names
|
||
leaderboard = []
|
||
for row in rows:
|
||
display_name = get_display_name_from_user_id(bot, room_id, row['user_id'])
|
||
leaderboard.append({
|
||
'display_name': display_name,
|
||
'user_id': row['user_id'],
|
||
'points': row['points']
|
||
})
|
||
|
||
return leaderboard
|
||
|
||
|
||
def get_rank(room_id, user_id):
|
||
"""Get rank of a user in karma leaderboard for a room."""
|
||
conn = get_connection()
|
||
c = conn.cursor()
|
||
|
||
# Get all points ordered descending
|
||
c.execute('''SELECT user_id, points FROM karma
|
||
WHERE room_id = ? AND points != 0
|
||
ORDER BY points DESC''', (room_id,))
|
||
rows = c.fetchall()
|
||
conn.close()
|
||
|
||
for i, row in enumerate(rows, 1):
|
||
if row['user_id'] == user_id:
|
||
return i, len(rows)
|
||
return None, len(rows)
|
||
|
||
|
||
def get_recent_history(room_id, user_id, bot, limit=5):
|
||
"""Get recent karma history for a user in a room."""
|
||
conn = get_connection()
|
||
c = conn.cursor()
|
||
|
||
c.execute('''SELECT change, voter, voted_at
|
||
FROM karma_history
|
||
WHERE room_id = ? AND user_id = ?
|
||
ORDER BY voted_at DESC LIMIT ?''', (room_id, user_id, limit))
|
||
rows = c.fetchall()
|
||
conn.close()
|
||
|
||
history = []
|
||
for row in rows:
|
||
voter_name = get_display_name_from_user_id(bot, room_id, row['voter'])
|
||
history.append({
|
||
'change': row['change'],
|
||
'voter': voter_name,
|
||
'voter_id': row['voter'],
|
||
'voted_at': row['voted_at']
|
||
})
|
||
|
||
return history
|
||
|
||
|
||
def get_stats(room_id=None):
|
||
"""Get overall karma statistics."""
|
||
conn = get_connection()
|
||
c = conn.cursor()
|
||
|
||
if room_id:
|
||
c.execute('''SELECT COUNT(*) as total_users FROM karma WHERE room_id = ?''', (room_id,))
|
||
total_users = c.fetchone()['total_users']
|
||
|
||
c.execute('''SELECT SUM(points) as total_points FROM karma WHERE room_id = ?''', (room_id,))
|
||
total_points = c.fetchone()['total_points'] or 0
|
||
|
||
c.execute('''SELECT AVG(points) as avg_points FROM karma WHERE room_id = ?''', (room_id,))
|
||
avg_points = c.fetchone()['avg_points'] or 0
|
||
|
||
c.execute('''SELECT MAX(points) as max_points FROM karma WHERE room_id = ?''', (room_id,))
|
||
max_points = c.fetchone()['max_points'] or 0
|
||
|
||
c.execute('''SELECT MIN(points) as min_points FROM karma WHERE room_id = ?''', (room_id,))
|
||
min_points = c.fetchone()['min_points'] or 0
|
||
|
||
c.execute('''SELECT COUNT(*) as total_votes FROM karma_history WHERE room_id = ?''', (room_id,))
|
||
total_votes = c.fetchone()['total_votes']
|
||
else:
|
||
c.execute('''SELECT COUNT(*) as total_users FROM karma''')
|
||
total_users = c.fetchone()['total_users']
|
||
|
||
c.execute('''SELECT SUM(points) as total_points FROM karma''')
|
||
total_points = c.fetchone()['total_points'] or 0
|
||
|
||
c.execute('''SELECT AVG(points) as avg_points FROM karma''')
|
||
avg_points = c.fetchone()['avg_points'] or 0
|
||
|
||
c.execute('''SELECT MAX(points) as max_points FROM karma''')
|
||
max_points = c.fetchone()['max_points'] or 0
|
||
|
||
c.execute('''SELECT MIN(points) as min_points FROM karma''')
|
||
min_points = c.fetchone()['min_points'] or 0
|
||
|
||
c.execute('''SELECT COUNT(*) as total_votes FROM karma_history''')
|
||
total_votes = c.fetchone()['total_votes']
|
||
|
||
conn.close()
|
||
|
||
return {
|
||
'total_users': total_users,
|
||
'total_points': total_points,
|
||
'avg_points': round(avg_points, 2),
|
||
'max_points': max_points,
|
||
'min_points': min_points,
|
||
'total_votes': total_votes
|
||
}
|
||
|
||
|
||
def format_karma_display(display_name, points):
|
||
"""Format karma display with visual indicators."""
|
||
if points > 0:
|
||
indicator = "👍" if points > 10 else "➕"
|
||
return f"💗 **{display_name}** has {points} karma {indicator}"
|
||
elif points < 0:
|
||
indicator = "👎" if points < -10 else "➖"
|
||
return f"💔 **{display_name}** has {points} karma {indicator}"
|
||
else:
|
||
return f"⚖️ **{display_name}** has neutral karma (0)"
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Command Handlers
|
||
# ---------------------------------------------------------------------------
|
||
|
||
async def handle_command(room, message, bot, prefix, config):
|
||
"""Handle karma commands."""
|
||
match = botlib.MessageMatch(room, message, bot, prefix)
|
||
room_id = room.room_id
|
||
|
||
# Refresh display name cache
|
||
await refresh_display_name_cache(bot, room_id)
|
||
|
||
# Debug logging
|
||
message_body = message.body if hasattr(message, 'body') else str(message)
|
||
logging.info(f"Karma plugin received message: '{message_body}' from {message.sender}")
|
||
|
||
# Get the full command (including what might be karma++ etc.)
|
||
full_cmd = match.command() if hasattr(match, 'command') else ''
|
||
|
||
logging.debug(f"Full command: '{full_cmd}'")
|
||
|
||
# Check for !karma++ or !karma-- as a single command
|
||
if match.is_not_from_this_bot() and match.prefix():
|
||
if full_cmd == 'karma++':
|
||
# !karma++ username
|
||
args = match.args()
|
||
if not args:
|
||
await bot.api.send_markdown_message(room.room_id, "Usage: !karma++ <username>")
|
||
return
|
||
display_name = ' '.join(args)
|
||
await process_karma_vote(room, display_name, '++', message.sender, bot)
|
||
return
|
||
|
||
elif full_cmd == 'karma--':
|
||
# !karma-- username
|
||
args = match.args()
|
||
if not args:
|
||
await bot.api.send_markdown_message(room.room_id, "Usage: !karma-- <username>")
|
||
return
|
||
display_name = ' '.join(args)
|
||
await process_karma_vote(room, display_name, '--', message.sender, bot)
|
||
return
|
||
|
||
elif full_cmd == 'karma':
|
||
# Regular !karma command
|
||
args = match.args()
|
||
if args and args[0] in ['++', '--']:
|
||
# !karma ++ username (space between)
|
||
action = args[0]
|
||
if len(args) < 2:
|
||
await bot.api.send_markdown_message(room.room_id, f"Usage: !karma {action} <username>")
|
||
return
|
||
display_name = ' '.join(args[1:])
|
||
await process_karma_vote(room, display_name, action, message.sender, bot)
|
||
return
|
||
else:
|
||
# !karma with subcommands or username
|
||
await handle_karma_command(room, message, bot, config)
|
||
return
|
||
|
||
# Handle !++ and !-- shortcuts
|
||
elif full_cmd in ['++', '--']:
|
||
args = match.args()
|
||
if not args:
|
||
await bot.api.send_markdown_message(room.room_id, f"Usage: !{full_cmd} <username>\nExample: !{full_cmd} Nexilva")
|
||
return
|
||
display_name = ' '.join(args)
|
||
await process_karma_vote(room, display_name, full_cmd, message.sender, bot)
|
||
return
|
||
|
||
# Handle inline karma (message body contains ++ or --)
|
||
if match.is_not_from_this_bot() and not match.prefix():
|
||
await handle_inline_karma(room, message, bot)
|
||
|
||
|
||
async def process_karma_vote(room, display_name, action, voter, bot):
|
||
"""Process a karma vote using display name."""
|
||
room_id = room.room_id
|
||
voter_str = str(voter)
|
||
change = 1 if action == '++' else -1
|
||
|
||
logging.info(f"Karma vote: {voter_str} -> '{display_name}' ({action}, change={change:+d}) in room {room_id}")
|
||
|
||
# Resolve display name to user ID
|
||
user_id = resolve_display_name(room_id, display_name, bot)
|
||
|
||
if not user_id:
|
||
await bot.api.send_markdown_message(room.room_id, f"❌ Could not find user '{display_name}'. Please use their display name or Matrix ID (e.g., @username:server)")
|
||
return
|
||
|
||
# Prevent self-modification
|
||
if user_id == voter_str:
|
||
await bot.api.send_markdown_message(room.room_id, "❌ You cannot modify your own karma!")
|
||
return
|
||
|
||
# Check cooldown
|
||
if is_on_cooldown(room_id, user_id, voter_str):
|
||
remaining = get_cooldown_remaining(room_id, user_id, voter_str)
|
||
await bot.api.send_markdown_message(room.room_id, f"⏳ You're doing that too fast! Wait {remaining} seconds.")
|
||
return
|
||
|
||
# Update karma
|
||
try:
|
||
new_points = update_karma(room_id, user_id, change, voter_str)
|
||
update_cooldown(room_id, user_id, voter_str)
|
||
|
||
# Get display name for response
|
||
display_name_resolved = get_display_name_from_user_id(bot, room_id, user_id)
|
||
response = format_karma_display(display_name_resolved, new_points)
|
||
await bot.api.send_markdown_message(room.room_id, response)
|
||
logging.info(f"Karma updated successfully: {user_id} now has {new_points} points")
|
||
except Exception as e:
|
||
logging.error(f"Error updating karma: {e}")
|
||
logging.error(traceback.format_exc())
|
||
await bot.api.send_markdown_message(room.room_id, f"❌ Error updating karma: {str(e)}")
|
||
|
||
|
||
async def handle_karma_command(room, message, bot, config):
|
||
"""Handle the !karma command and its subcommands."""
|
||
match = botlib.MessageMatch(room, message, bot, config.prefix)
|
||
args = match.args()
|
||
room_id = room.room_id
|
||
|
||
if not args:
|
||
# Show help for !karma command in collapsible details tag with proper HTML lists
|
||
help_text = f"""<details>
|
||
<summary>💗 <strong>Karma Plugin Help</strong> (click to expand)</summary>
|
||
|
||
<strong>Commands:</strong>
|
||
<ul>
|
||
<li><code>!karma</code> - Show this help</li>
|
||
<li><code>!karma <user></code> - Show karma for a user (use display name or @user:server)</li>
|
||
<li><code>!karma++ <user></code> - Give +1 karma to a user</li>
|
||
<li><code>!karma-- <user></code> - Give -1 karma to a user</li>
|
||
<li><code>!karma top [n]</code> - Show top karma leaders</li>
|
||
<li><code>!karma bottom [n]</code> - Show bottom karma entries</li>
|
||
<li><code>!karma rank <user></code> - Show rank of a user</li>
|
||
<li><code>!karma stats</code> - Show overall statistics</li>
|
||
<li><code>!karma history <user></code> - Show recent karma history</li>
|
||
</ul>
|
||
|
||
<strong>Shortcuts:</strong>
|
||
<ul>
|
||
<li><code>!++ <user></code> - Same as !karma++ <user></li>
|
||
<li><code>!-- <user></code> - Same as !karma-- <user></li>
|
||
<li><code><user>++</code> - Give +1 karma (inline)</li>
|
||
<li><code><user>--</code> - Give -1 karma (inline)</li>
|
||
</ul>
|
||
|
||
<strong>Examples:</strong>
|
||
<ul>
|
||
<li><code>!karma Nexilva</code> - Check Nexilva's karma</li>
|
||
<li><code>!karma++ @hb:matrix.org</code> - Give HB +1 karma</li>
|
||
<li><code>!karma top 5</code> - Show top 5 leaders</li>
|
||
<li><code>Nexilva++</code> - Give Nexilva +1 karma (inline)</li>
|
||
</ul>
|
||
|
||
<strong>Notes:</strong>
|
||
<ul>
|
||
<li>You cannot modify your own karma</li>
|
||
<li>There is a {COOLDOWN_SECONDS} second cooldown between votes</li>
|
||
<li>Karma is tracked separately per room</li>
|
||
<li>Display names with emojis are supported</li>
|
||
</ul>
|
||
|
||
</details>"""
|
||
|
||
await bot.api.send_markdown_message(room.room_id, help_text)
|
||
return
|
||
|
||
subcommand = args[0].lower()
|
||
|
||
# !karma top [n]
|
||
if subcommand == "top":
|
||
limit = 10
|
||
if len(args) > 1 and args[1].isdigit():
|
||
limit = min(int(args[1]), 25)
|
||
|
||
leaderboard = get_leaderboard(room_id, bot, limit, reverse=False)
|
||
if leaderboard:
|
||
response = f"🏆 **Top {len(leaderboard)} Karma Leaders**\n\n"
|
||
for i, entry in enumerate(leaderboard, 1):
|
||
medal = "🥇" if i == 1 else "🥈" if i == 2 else "🥉" if i == 3 else "📌"
|
||
pts = entry['points']
|
||
response += f"{medal} **{i}.** {entry['display_name']}: {pts} {pluralize_points(pts)}\n"
|
||
await bot.api.send_markdown_message(room.room_id, response)
|
||
else:
|
||
await bot.api.send_markdown_message(room.room_id, "No karma entries found in this room.")
|
||
return
|
||
|
||
# !karma bottom [n]
|
||
if subcommand == "bottom":
|
||
limit = 10
|
||
if len(args) > 1 and args[1].isdigit():
|
||
limit = min(int(args[1]), 25)
|
||
|
||
leaderboard = get_leaderboard(room_id, bot, limit, reverse=True)
|
||
if leaderboard:
|
||
response = f"📉 **Bottom {len(leaderboard)} Karma (Needs Love)**\n\n"
|
||
for i, entry in enumerate(leaderboard, 1):
|
||
pts = entry['points']
|
||
response += f"⚠️ **{i}.** {entry['display_name']}: {pts} {pluralize_points(pts)}\n"
|
||
await bot.api.send_markdown_message(room.room_id, response)
|
||
else:
|
||
await bot.api.send_markdown_message(room.room_id, "No karma entries found in this room.")
|
||
return
|
||
|
||
# !karma rank <user>
|
||
if subcommand == "rank" and len(args) >= 2:
|
||
display_name = ' '.join(args[1:])
|
||
user_id = resolve_display_name(room_id, display_name, bot)
|
||
|
||
if not user_id:
|
||
await bot.api.send_markdown_message(room.room_id, f"❌ Could not find user '{display_name}'")
|
||
return
|
||
|
||
rank, total = get_rank(room_id, user_id)
|
||
if rank:
|
||
points = get_karma(room_id, user_id)
|
||
percentile = round((1 - rank/total) * 100, 1) if total > 0 else 0
|
||
display_name_resolved = get_display_name_from_user_id(bot, room_id, user_id)
|
||
response = f"📊 **{display_name_resolved}** is ranked #{rank} out of {total} (top {percentile}%)\n💗 Karma: {points} {pluralize_points(points)}"
|
||
await bot.api.send_markdown_message(room.room_id, response)
|
||
else:
|
||
display_name_resolved = get_display_name_from_user_id(bot, room_id, user_id)
|
||
await bot.api.send_markdown_message(room.room_id, f"❌ {display_name_resolved} has no karma yet in this room.")
|
||
return
|
||
|
||
# !karma history <user>
|
||
if subcommand == "history" and len(args) >= 2:
|
||
display_name = ' '.join(args[1:])
|
||
user_id = resolve_display_name(room_id, display_name, bot)
|
||
|
||
if not user_id:
|
||
await bot.api.send_markdown_message(room.room_id, f"❌ Could not find user '{display_name}'")
|
||
return
|
||
|
||
history = get_recent_history(room_id, user_id, bot, 5)
|
||
|
||
if history:
|
||
display_name_resolved = get_display_name_from_user_id(bot, room_id, user_id)
|
||
response = f"📜 **Recent Karma History for {display_name_resolved}**\n\n"
|
||
for h in history:
|
||
arrow = "⬆️" if h['change'] > 0 else "⬇️"
|
||
try:
|
||
voted_at = datetime.fromisoformat(h['voted_at'])
|
||
time_str = voted_at.strftime("%Y-%m-%d %H:%M")
|
||
except:
|
||
time_str = "recently"
|
||
response += f"{arrow} {h['change']:+d} by **{h['voter']}** at {time_str}\n"
|
||
await bot.api.send_markdown_message(room.room_id, response)
|
||
else:
|
||
display_name_resolved = get_display_name_from_user_id(bot, room_id, user_id)
|
||
await bot.api.send_markdown_message(room.room_id, f"No karma history for {display_name_resolved} in this room.")
|
||
return
|
||
|
||
# !karma stats
|
||
if subcommand == "stats":
|
||
stats = get_stats(room_id)
|
||
response = f"📊 **Karma System Statistics for this Room**\n\n"
|
||
response += f"📝 Total users tracked: {stats['total_users']}\n"
|
||
response += f"⭐ Total karma points: {stats['total_points']}\n"
|
||
response += f"📈 Average karma: {stats['avg_points']}\n"
|
||
response += f"🏆 Highest karma: {stats['max_points']}\n"
|
||
response += f"📉 Lowest karma: {stats['min_points']}\n"
|
||
response += f"🗳️ Total votes cast: {stats['total_votes']}\n"
|
||
await bot.api.send_markdown_message(room.room_id, response)
|
||
return
|
||
|
||
# !karma <user> (show karma for specific user)
|
||
display_name = ' '.join(args)
|
||
user_id = resolve_display_name(room_id, display_name, bot)
|
||
|
||
if not user_id:
|
||
await bot.api.send_markdown_message(room.room_id, f"❌ Could not find user '{display_name}'")
|
||
return
|
||
|
||
points = get_karma(room_id, user_id)
|
||
display_name_resolved = get_display_name_from_user_id(bot, room_id, user_id)
|
||
response = format_karma_display(display_name_resolved, points)
|
||
await bot.api.send_markdown_message(room.room_id, response)
|
||
return
|
||
|
||
|
||
async def handle_inline_karma(room, message, bot):
|
||
"""Handle inline karma expressions like 'user++' or 'user--'."""
|
||
body = message.body if hasattr(message, 'body') else str(message)
|
||
sender = str(message.sender)
|
||
room_id = room.room_id
|
||
|
||
# Refresh display name cache
|
||
await refresh_display_name_cache(bot, room_id)
|
||
|
||
# Pattern to match text followed by ++ or -- at end of word/string
|
||
pattern = r'(.+?)(\+\+|--)(?:\s|$)'
|
||
matches = re.findall(pattern, body)
|
||
|
||
if not matches:
|
||
return
|
||
|
||
logging.info(f"Found inline karma matches: {matches}")
|
||
|
||
responses = []
|
||
for display_name, operator in matches:
|
||
display_name = display_name.strip()
|
||
change = 1 if operator == '++' else -1
|
||
|
||
# Resolve display name to user ID
|
||
user_id = resolve_display_name(room_id, display_name, bot)
|
||
|
||
if not user_id:
|
||
logging.debug(f"Could not resolve display name: '{display_name}'")
|
||
continue
|
||
|
||
# Skip self-modification
|
||
if user_id == sender:
|
||
logging.debug(f"Skipping self-modification: {sender} -> {display_name}")
|
||
continue
|
||
|
||
# Check cooldown
|
||
if is_on_cooldown(room_id, user_id, sender):
|
||
logging.debug(f"Cooldown active for {sender} -> {user_id}")
|
||
continue
|
||
|
||
# Update karma
|
||
try:
|
||
new_points = update_karma(room_id, user_id, change, sender)
|
||
update_cooldown(room_id, user_id, sender)
|
||
|
||
# Format response
|
||
display_name_resolved = get_display_name_from_user_id(bot, room_id, user_id)
|
||
arrow = "⬆️" if change > 0 else "⬇️"
|
||
responses.append(f"{arrow} **{display_name_resolved}** → {new_points}")
|
||
except Exception as e:
|
||
logging.error(f"Error updating inline karma: {e}")
|
||
|
||
await asyncio.sleep(0.3)
|
||
|
||
if responses:
|
||
response_text = " | ".join(responses[:3])
|
||
if len(responses) > 3:
|
||
response_text += f" (and {len(responses)-3} more)"
|
||
await bot.api.send_markdown_message(room.room_id, f"💗 {response_text}")
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Plugin Setup
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def setup(bot):
|
||
"""Initialize the karma plugin."""
|
||
logging.info("=" * 50)
|
||
logging.info("LOADING KARMA PLUGIN")
|
||
logging.info("=" * 50)
|
||
|
||
init_db()
|
||
|
||
logging.info("Advanced Karma plugin loaded successfully")
|
||
logging.info("Supports display names (e.g., 'Nexilva' or '🍄 HB🍄') and Matrix IDs")
|
||
logging.info("Commands: !karma, !karma++, !karma--, !++, !--, and inline ++/--")
|
||
logging.info("=" * 50)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Plugin Metadata
|
||
# ---------------------------------------------------------------------------
|
||
|
||
__version__ = "1.0.0"
|
||
__author__ = "Funguy Bot"
|
||
__description__ = "Room karma tracking system"
|
||
__help__ = """
|
||
<details>
|
||
<summary><strong>!karma</strong> – Karma system</summary>
|
||
<ul>
|
||
<li><code>!karma <user></code> – Show points</li>
|
||
<li><code>!karma++ <user></code> / <code>!-- <user></code> – Modify karma</li>
|
||
<li><code>!karma top [n]</code> / <code>!karma bottom [n]</code> – Leaderboard</li>
|
||
<li><code>!karma rank <user></code> – Position</li>
|
||
<li><code>!karma stats</code> – Room statistics</li>
|
||
<li><code>!karma history <user></code> – Recent votes</li>
|
||
</ul>
|
||
<p>Shortcuts: <code>!++ user</code>, <code>!-- user</code>, and inline <code>user++</code> / <code>user--</code>.</p>
|
||
</details>
|
||
"""
|