admin plugin and roomstats plugin added. cron fixed and ddg fixed

This commit is contained in:
2026-05-07 15:28:50 -05:00
parent 4b10c13b29
commit 10a6028037
7 changed files with 1673 additions and 529 deletions
BIN
View File
Binary file not shown.
+538
View File
@@ -0,0 +1,538 @@
#!/usr/bin/env python3
"""
plugins/admin.py Full room moderation commands.
Supports multiword display names, standalone commands (!op, !kick, etc.)
"""
import time
import logging
import simplematrixbotlib as botlib
logger = logging.getLogger("admin")
# ------------------------------------------------------------------
# Displayname resolution cache
# ------------------------------------------------------------------
_pending_resolution = {} # room_id → {"matches": [...], "expires": timestamp}
_name_cache = {} # room_id → {display_name.lower(): mxid}
RESOLUTION_TIMEOUT = 60
def _cleanup_resolutions():
now = time.time()
expired = [r for r, v in _pending_resolution.items() if v["expires"] < now]
for r in expired:
del _pending_resolution[r]
class UserResolutionError(Exception):
def __init__(self, matches):
self.matches = matches # list of {"mxid": ..., "display_name": ...}
async def _populate_name_cache(bot, room_id):
"""Fetch the full member list and cache display names."""
if room_id in _name_cache:
return
try:
resp = await bot.async_client.joined_members(room_id)
if resp.members is None:
return
cache = {}
for member in resp.members:
display = (member.display_name or "").strip().lower()
if display:
# If duplicate display name, store None to indicate ambiguity
if display in cache:
cache[display] = None
else:
cache[display] = member.user_id
_name_cache[room_id] = cache
logger.info(f"Cached {len(cache)} display names for room {room_id}")
except Exception as e:
logger.error(f"Could not cache members: {e}")
async def _resolve_multiword(bot, room_id, tokens):
"""
Given a list of word tokens, try to find a matching display name
by testing progressively longer prefixes of the token list.
Returns (mxid, display_name) or raises ValueError if no match.
"""
await _populate_name_cache(bot, room_id)
cache = _name_cache.get(room_id, {})
# Build candidates from 1 token up to all tokens
for end in range(len(tokens), 0, -1):
candidate = " ".join(tokens[:end]).strip().lower()
if candidate in cache:
mxid = cache[candidate]
if mxid is not None:
return mxid, candidate
else:
# Duplicate display name → fall through to ambiguity handling
# We'll fetch the real members for this candidate and raise UserResolutionError
resp = await bot.async_client.joined_members(room_id)
matches = []
for member in resp.members:
if (member.display_name or "").strip().lower() == candidate:
matches.append({"mxid": member.user_id, "display_name": member.display_name})
if len(matches) == 1:
return matches[0]["mxid"], matches[0]["display_name"]
elif len(matches) > 1:
raise UserResolutionError(matches)
# else: not found (unlikely) → continue
raise ValueError(f"No member with display name '{' '.join(tokens)}' found.")
async def resolve_user_from_target(bot, room_id, target):
"""
Resolve a target string to a Matrix user ID.
Accepts: full MXID (@user:domain), display name (multiword), or number
(referring to a previous ambiguous resolution).
Returns (mxid, display_name_or_None).
Raises ValueError or UserResolutionError.
"""
if target.startswith("@"):
return target, None
_cleanup_resolutions()
# Check for number reference to a previous ambiguous match
if target.isdigit():
idx = int(target) - 1
if room_id in _pending_resolution:
pending = _pending_resolution[room_id]
if 0 <= idx < len(pending["matches"]):
match = pending["matches"][idx]
del _pending_resolution[room_id]
return match["mxid"], match.get("display_name")
else:
raise ValueError(f"Invalid selection {target}. Choose from the list.")
else:
raise ValueError("No pending resolution. Use @user:domain or display name.")
# If we reached here, `target` is a single token, but might be part of a longer name.
# That case is handled by calling _resolve_multiword from handle_command.
# But for completeness, we still attempt a direct cache match.
await _populate_name_cache(bot, room_id)
cache = _name_cache.get(room_id, {})
mxid = cache.get(target.strip().lower())
if mxid:
return mxid, target.strip().lower()
elif mxid is None:
# Ambiguous: fetch and raise
resp = await bot.async_client.joined_members(room_id)
matches = []
for member in resp.members:
if (member.display_name or "").strip().lower() == target.strip().lower():
matches.append({"mxid": member.user_id, "display_name": member.display_name})
if len(matches) == 1:
return matches[0]["mxid"], matches[0]["display_name"]
elif len(matches) > 1:
raise UserResolutionError(matches)
raise ValueError(f"No member with display name '{target}' found.")
# ------------------------------------------------------------------
# Power level helpers
# ------------------------------------------------------------------
async def get_power_level(bot, room_id, user_id):
try:
resp = await bot.async_client.room_get_state_event(
room_id, "m.room.power_levels"
)
if resp.content:
return resp.content.get("users", {}).get(
user_id, resp.content.get("users_default", 0)
)
except Exception:
pass
return 0
async def has_mod_permission(bot, room_id, user_id, config):
if user_id == config.admin_user:
return True
pl = await get_power_level(bot, room_id, user_id)
return pl >= 50
async def fetch_power_levels(bot, room_id):
try:
resp = await bot.async_client.room_get_state_event(
room_id, "m.room.power_levels"
)
return resp.content if resp.content else {}
except Exception:
return {}
async def set_power_level(bot, room_id, user_id, new_pl):
current = await fetch_power_levels(bot, room_id)
if not current:
raise RuntimeError("Could not retrieve power levels.")
users = current.setdefault("users", {})
users[user_id] = new_pl
await bot.async_client.room_put_state(
room_id, "m.room.power_levels", current
)
async def get_banned_users(bot, room_id):
try:
resp = await bot.async_client.room_get_state(room_id)
banned = []
for event in resp.events:
if (
event.get("type") == "m.room.member"
and event.get("content", {}).get("membership") == "ban"
):
banned.append(event["state_key"])
return banned
except Exception as e:
logger.error(f"Failed to fetch bans: {e}")
return []
# ------------------------------------------------------------------
# Main command handler
# ------------------------------------------------------------------
async def handle_command(room, message, bot, prefix, config):
"""Dispatches !admin or standalone moderation commands."""
match = botlib.MessageMatch(room, message, bot, prefix)
if not match.is_not_from_this_bot() or not match.prefix():
return
room_id = room.room_id
sender = message.sender
standalone_actions = {
"kick": "kick",
"ban": "ban",
"unban": "unban",
"invite": "invite",
"whois": "whois",
"op": "op",
"deop": "deop",
"topic": "topic",
"roomname": "roomname",
"avatar": "avatar",
"members": "members",
"bans": "bans",
"modhelp": "help",
"admin": "admin",
}
cmd = match.command()
if cmd not in standalone_actions:
return
# Permission gate (skip for help)
if cmd not in ("modhelp", "help"):
if not await has_mod_permission(bot, room_id, sender, config):
await bot.api.send_text_message(
room_id, "⛔ You don't have permission to use moderator commands."
)
return
args = match.args()
# Determine action and sub_args
if cmd == "admin":
if not args:
await bot.api.send_text_message(room_id, "Usage: !admin <action> [args...]")
return
action = args[0].lower()
sub_args = args[1:] if len(args) > 1 else []
else:
action = cmd
sub_args = args
# ------------------------------------------------------------
# User-targeting actions
# ------------------------------------------------------------
if action in ("kick", "ban", "invite", "whois", "op", "deop"):
if not sub_args:
await bot.api.send_text_message(
room_id, f"Missing user. Usage: !{cmd} <@user|name> [reason...]"
)
return
# For op/deop, the last token might be a power level (number)
# For kick/ban, everything after the name is the reason.
# Strategy: Try progressively longer multi-word names from the start of sub_args.
# For op: if the very last token is a pure integer, it's a power level; rest is the name.
# For kick/ban: everything is name except we can't know the boundary, so just try the whole thing.
if action in ("op", "deop"):
# Try to parse last token as power level
potential_pl = sub_args[-1]
try:
power = int(potential_pl)
# Success: power level found, name is sub_args[:-1]
name_tokens = sub_args[:-1]
if not name_tokens:
await bot.api.send_text_message(room_id, "Missing user name.")
return
except ValueError:
# No numeric power, whole sub_args is the name
name_tokens = sub_args
power = None
else:
# kick, ban, invite, whois
name_tokens = sub_args # entire args is the name
power = None
# Resolve the multi-word name
try:
target_mxid, target_display = await _resolve_multiword(bot, room_id, name_tokens)
except UserResolutionError as e:
lines = [
"Multiple users found. Reissue the command with a number:"
]
for i, m in enumerate(e.matches, 1):
lines.append(f"{i}. {m['mxid']} ({m['display_name']})")
await bot.api.send_text_message(room_id, "\n".join(lines))
return
except ValueError as e:
# Fallback: also try the old way with just the first token (maybe they used @user)
target_str = sub_args[0]
try:
target_mxid, target_display = await resolve_user_from_target(bot, room_id, target_str)
except Exception as e2:
await bot.api.send_text_message(room_id, str(e2))
return
# Determine reason and power level for op/deop
if action in ("op", "deop"):
if action == "op":
requested_pl = power if power is not None else 50
if requested_pl > 50:
await bot.api.send_text_message(
room_id,
"Maximum power level for op is 50 (moderator). Setting to 50."
)
new_pl = 50
else:
new_pl = requested_pl
else:
new_pl = 0
sender_pl = await get_power_level(bot, room_id, sender)
target_pl = await get_power_level(bot, room_id, target_mxid)
if sender != config.admin_user and sender_pl <= target_pl:
await bot.api.send_text_message(
room_id,
"⛔ You can only modify users with a lower power level than yours.",
)
return
try:
await set_power_level(bot, room_id, target_mxid, new_pl)
verb = "Promoted" if action == "op" else "Demoted"
await bot.api.send_text_message(
room_id, f"{verb} {target_mxid} to power level {new_pl}."
)
except Exception as e:
await bot.api.send_text_message(room_id, f"❌ Failed to set power: {e}")
else:
# For kick/ban/invite: reason is everything after the name tokens
reason = " ".join(sub_args[len(name_tokens):]) if len(sub_args) > len(name_tokens) else ""
if action == "kick":
try:
await bot.async_client.room_kick(room_id, target_mxid, reason)
await bot.api.send_text_message(room_id, f"👢 Kicked {target_mxid}.")
except Exception as e:
await bot.api.send_text_message(room_id, f"❌ Failed to kick: {e}")
elif action == "ban":
try:
await bot.async_client.room_ban(room_id, target_mxid, reason)
await bot.api.send_text_message(room_id, f"🚫 Banned {target_mxid}.")
except Exception as e:
await bot.api.send_text_message(room_id, f"❌ Failed to ban: {e}")
elif action == "invite":
try:
await bot.async_client.room_invite(room_id, target_mxid)
await bot.api.send_text_message(room_id, f"📨 Invited {target_mxid}.")
except Exception as e:
await bot.api.send_text_message(room_id, f"❌ Failed to invite: {e}")
elif action == "whois":
try:
resp = await bot.async_client.joined_members(room_id)
member_info = None
for m in resp.members:
if m.user_id == target_mxid:
member_info = m
break
pl = await get_power_level(bot, room_id, target_mxid)
display = member_info.display_name if member_info else "Unknown"
msg = (
f"**User:** `{target_mxid}`\n"
f"**Display Name:** {display}\n"
f"**Power Level:** {pl}"
)
await bot.api.send_text_message(room_id, msg)
except Exception as e:
await bot.api.send_text_message(room_id, f"❌ Failed to get whois: {e}")
# ------------------------------------------------------------
# UNBAN
# ------------------------------------------------------------
elif action == "unban":
if not sub_args:
await bot.api.send_text_message(room_id, "Usage: !unban <@user:domain>")
return
target_str = sub_args[0]
if not target_str.startswith("@"):
await bot.api.send_text_message(
room_id, "For unban, please provide the full Matrix ID (e.g., @user:domain)."
)
return
try:
await bot.async_client.room_unban(room_id, target_str)
await bot.api.send_text_message(room_id, f"🔓 Unbanned {target_str}.")
except Exception as e:
await bot.api.send_text_message(room_id, f"❌ Failed to unban: {e}")
# ------------------------------------------------------------
# TOPIC
# ------------------------------------------------------------
elif action == "topic":
if not sub_args:
try:
resp = await bot.async_client.room_get_state_event(room_id, "m.room.topic")
topic = resp.content.get("topic", "(none)") if resp.content else "(none)"
await bot.api.send_text_message(room_id, f"📝 Topic: {topic}")
except Exception:
await bot.api.send_text_message(room_id, "Could not retrieve topic.")
else:
new_topic = " ".join(sub_args)
try:
await bot.async_client.room_put_state(room_id, "m.room.topic", {"topic": new_topic})
await bot.api.send_text_message(room_id, "📝 Topic updated.")
except Exception as e:
await bot.api.send_text_message(room_id, f"❌ Failed: {e}")
# ------------------------------------------------------------
# ROOM NAME
# ------------------------------------------------------------
elif action == "roomname":
if not sub_args:
try:
resp = await bot.async_client.room_get_state_event(room_id, "m.room.name")
name = resp.content.get("name", "(none)") if resp.content else "(none)"
await bot.api.send_text_message(room_id, f"🏠 Room name: {name}")
except Exception:
await bot.api.send_text_message(room_id, "Could not retrieve room name.")
else:
new_name = " ".join(sub_args)
try:
await bot.async_client.room_put_state(room_id, "m.room.name", {"name": new_name})
await bot.api.send_text_message(room_id, "🏠 Room name updated.")
except Exception as e:
await bot.api.send_text_message(room_id, f"❌ Failed: {e}")
# ------------------------------------------------------------
# AVATAR
# ------------------------------------------------------------
elif action == "avatar":
if not sub_args:
try:
resp = await bot.async_client.room_get_state_event(room_id, "m.room.avatar")
url = resp.content.get("url", "not set") if resp.content else "not set"
await bot.api.send_text_message(room_id, f"🖼️ Avatar URL: {url}")
except Exception:
await bot.api.send_text_message(room_id, "Could not retrieve avatar.")
else:
mxc_url = sub_args[0]
if not mxc_url.startswith("mxc://"):
await bot.api.send_text_message(room_id, "Invalid avatar URL. Must start with mxc://.")
return
try:
await bot.async_client.room_put_state(room_id, "m.room.avatar", {"url": mxc_url})
await bot.api.send_text_message(room_id, "🖼️ Avatar set.")
except Exception as e:
await bot.api.send_text_message(room_id, f"❌ Failed: {e}")
# ------------------------------------------------------------
# MEMBERS
# ------------------------------------------------------------
elif action == "members":
try:
resp = await bot.async_client.joined_members(room_id)
if not resp.members:
await bot.api.send_text_message(room_id, "No members found.")
return
lines = ["**Members:**"]
for member in resp.members:
display = member.display_name or member.user_id
pl = await get_power_level(bot, room_id, member.user_id)
lines.append(f"{display} (`{member.user_id}`) [PL: {pl}]")
msg = "\n".join(lines)
while len(msg) > 4000:
part = msg[:4000]
msg = msg[4000:]
await bot.api.send_text_message(room_id, part)
if msg:
await bot.api.send_text_message(room_id, msg)
except Exception as e:
await bot.api.send_text_message(room_id, f"❌ Failed to list members: {e}")
# ------------------------------------------------------------
# BANS
# ------------------------------------------------------------
elif action == "bans":
try:
banned = await get_banned_users(bot, room_id)
if not banned:
await bot.api.send_text_message(room_id, "No banned users.")
else:
ban_list = "\n".join(f"• `{u}`" for u in banned)
await bot.api.send_text_message(room_id, f"**Banned users:**\n{ban_list}")
except Exception as e:
await bot.api.send_text_message(room_id, f"❌ Failed to fetch bans: {e}")
# ------------------------------------------------------------
# HELP
# ------------------------------------------------------------
elif action in ("help", "modhelp"):
help_text = """
**Moderator Commands (standalone or via !admin):**
- `!kick <@user|name> [reason]` Kick a user
- `!ban <@user|name> [reason]` Ban a user
- `!unban <@user:domain>` Unban (full MXID required)
- `!invite <@user|name>` Invite a user
- `!whois <@user|name>` Show user details & power level
- `!op <@user|name> [pl=50]` Promote user (max 50, moderator)
- `!deop <@user|name>` Demote user to power level 0
- `!topic [new topic]` Show / set room topic
- `!roomname [new name]` Show / set room name
- `!avatar [mxc://...]` Show / set room avatar
- `!members` List all joined members with power levels
- `!bans` List all banned users
- `!modhelp` Show this help
Names may be **multiword**; the bot will automatically detect them.
If the name is ambiguous you'll be asked to choose from a numbered list.
"""
await bot.api.send_text_message(room_id, help_text.strip())
else:
await bot.api.send_text_message(
room_id, f"Unknown action: {action}. Use `!modhelp`."
)
# ------------------------------------------------------------------
# Plugin metadata
# ------------------------------------------------------------------
__version__ = "1.1.0"
__author__ = "Funguy Admin"
__description__ = "Full room moderation multiword name support"
__help__ = """
<details>
<summary><strong>Admin / Moderator Commands</strong></summary>
<ul>
<li><code>!kick</code>, <code>!ban</code>, <code>!unban</code>, <code>!invite</code></li>
<li><code>!op</code> (max PL 50), <code>!deop</code>, <code>!whois</code></li>
<li><code>!topic</code>, <code>!roomname</code>, <code>!avatar</code></li>
<li><code>!members</code>, <code>!bans</code></li>
<li><code>!admin &lt;action&gt;</code> also works as a parent command</li>
</ul>
<p>Power level ≥ 50 required (or global admin).</p>
<p>Multiword display names are automatically recognized.</p>
</details>
"""
+398 -63
View File
@@ -1,81 +1,416 @@
# plugins/cron.py
#!/usr/bin/env python3
"""
plugins/cron.py Inprocess cron scheduler (no system crontab).
Room ID is derived automatically from the command context.
"""
import logging
from typing import Optional
import sqlite3
from crontab import CronTab
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
import pytz
import simplematrixbotlib as botlib
# Database connection and cursor
conn = sqlite3.connect('cron.db')
cursor = conn.cursor()
logger = logging.getLogger("cron")
# Create table if not exists
cursor.execute('''CREATE TABLE IF NOT EXISTS cron (
room_id TEXT,
cron_entry TEXT,
command TEXT
)''')
# ------------------------------------------------------------------
# Database
# ------------------------------------------------------------------
DB_PATH = "cron_jobs.db"
def init_db():
with sqlite3.connect(DB_PATH) as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS cron_jobs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
room_id TEXT NOT NULL,
cron_expr TEXT NOT NULL,
command TEXT NOT NULL,
timezone TEXT DEFAULT 'UTC',
enabled INTEGER DEFAULT 1,
added_by TEXT DEFAULT ''
)
""")
conn.commit()
async def handle_command(room, message, bot, prefix, config):
match = botlib.MessageMatch(room, message, bot, prefix)
if match.is_not_from_this_bot() and match.prefix() and match.command("cron"):
args = match.args()
if len(args) >= 4:
action = args[0]
room_id = args[1]
cron_entry = ' '.join(args[2:-1])
command = args[-1]
if action == "add":
add_cron(room_id, cron_entry, command)
await bot.api.send_text_message(room.room_id, f"Cron added successfully")
elif action == "remove":
remove_cron(room_id, command)
await bot.api.send_text_message(room.room_id, f"Cron removed successfully")
# ------------------------------------------------------------------
# Fake objects for command injection
# ------------------------------------------------------------------
class FakeRoom:
def __init__(self, room_id):
self.room_id = room_id
class FakeMessage:
def __init__(self, body):
self.sender = "@cron:system"
self.body = body
# ------------------------------------------------------------------
# Scheduler
# ------------------------------------------------------------------
scheduler = AsyncIOScheduler()
def load_jobs(bot):
"""Load all enabled jobs from DB into scheduler."""
with sqlite3.connect(DB_PATH) as conn:
conn.row_factory = sqlite3.Row
rows = conn.execute(
"SELECT id, room_id, cron_expr, command, timezone FROM cron_jobs WHERE enabled=1"
).fetchall()
for row in rows:
job_id = str(row["id"])
trigger = CronTrigger.from_crontab(row["cron_expr"], timezone=pytz.timezone(row["timezone"]))
scheduler.add_job(
fire_job,
trigger=trigger,
args=[bot, row["room_id"], row["command"]],
id=job_id,
replace_existing=True,
)
logger.info(f"Loaded cron job {job_id}: {row['cron_expr']} -> {row['command']}")
async def fire_job(bot, room_id: str, command: str):
"""Execute a scheduled command by injecting it into the bot's dispatcher."""
room = FakeRoom(room_id)
message = FakeMessage(command)
# Prefer the main bot instance (set by funguy.py) for full dispatch
if hasattr(bot, "main_bot"):
await bot.main_bot.handle_commands(room, message)
else:
await bot.api.send_text_message(room.room_id, "Usage: !cron add|remove room_id cron_entry command")
def add_cron(room_id, cron_entry, command):
# Check if the cron entry already exists in the database for the given room_id and command
cursor.execute('SELECT * FROM cron WHERE room_id=? AND command=? AND cron_entry=?', (room_id, command, cron_entry))
existing_entry = cursor.fetchone()
if existing_entry:
return # Cron entry already exists, do not add duplicate
# Insert the cron entry into the database
cursor.execute('INSERT INTO cron (room_id, cron_entry, command) VALUES (?, ?, ?)', (room_id, cron_entry, command))
conn.commit()
def remove_cron(room_id, command):
cursor.execute('DELETE FROM cron WHERE room_id=? AND command=?', (room_id, command))
conn.commit()
async def run_cron_jobs(bot):
cron = CronTab()
for job in cron:
cron_entry = str(job)
for row in cursor.execute('SELECT * FROM cron WHERE cron_entry=?', (cron_entry,)):
room_id, _, command = row
room = await bot.api.get_room_by_id(room_id)
if room:
plugin_name = command.split()[0].replace(prefix, '') # Extract plugin name
# Fallback: direct plugin call
prefix = bot.config.prefix
if command.startswith(prefix):
body = command[len(prefix):].strip()
if " " in body:
plugin_name, _ = body.split(" ", 1)
else:
plugin_name = body
plugin_module = bot.plugins.get(plugin_name)
if plugin_module:
await plugin_module.handle_command(room, None, bot, prefix, config)
logger.info(f"Cron executing via {plugin_name}: {command}")
await plugin_module.handle_command(room, message, bot, prefix, bot.config)
else:
logger.warning(f"No plugin found for cron command: {command}")
await bot.api.send_text_message(room_id, f"[cron] Unknown command: {command}")
else:
# Nonbot command: just post as plain text
await bot.api.send_text_message(room_id, command)
# ---------------------------------------------------------------------------
# Plugin Metadata
# ---------------------------------------------------------------------------
# ------------------------------------------------------------------
# Setup (called by FunguyBot after bot is created)
# ------------------------------------------------------------------
def setup(bot):
init_db()
load_jobs(bot)
if not scheduler.running:
scheduler.start()
logger.info("APScheduler started")
__version__ = "1.0.0"
__author__ = "Funguy Bot"
__description__ = "Cron job scheduler"
# ------------------------------------------------------------------
# Command handler autodetects room_id from the current room
# ------------------------------------------------------------------
async def handle_command(room, message, bot, prefix, config):
match = botlib.MessageMatch(room, message, bot, prefix)
if not (match.is_not_from_this_bot() and match.prefix() and match.command("cron")):
return
# Admin only
if str(message.sender) != config.admin_user:
await bot.api.send_text_message(room.room_id, "⛔ You must be admin to use cron.")
return
args = match.args()
if not args:
await bot.api.send_text_message(room.room_id,
"📋 Usage: !cron <add|remove|list|enable|disable|clear> [arguments]")
return
action = args[0].lower()
current_room = room.room_id # ← automatically derived
# ---------------------------------------------------------------
# ADD: !cron add <cron_expr> <command> [tz=Timezone]
# ---------------------------------------------------------------
if action == "add":
if len(args) < 3: # at least: add, cron_expr, command
await bot.api.send_text_message(room.room_id,
"Usage: `!cron add <cron_expr> <command> [tz=IANA]`\n"
"Example: `!cron add 0 8 * * * !weather london tz=Europe/London`")
return
cron_parts = []
command_parts = []
timezone = "UTC"
# The cron expression is everything between 'add' and the last part that
# starts with '!', or the whole remaining if no '!'. Simple heuristic:
# We'll assume the command starts with the bot's prefix (or a word that is a plugin command).
# Better: separate at the first argument that looks like a command (starts with prefix or no spaces?).
# We'll use: after "add", take all tokens until we encounter a token that is likely a command
# (i.e., starts with the prefix or contains a space? Actually let's just take everything after the add
# as command except the trailing tz=).
all_remaining = args[1:] # everything after "add"
# Find possible tz= at the end
if all_remaining and all_remaining[-1].startswith("tz="):
timezone_str = all_remaining[-1]
timezone = timezone_str.split("=", 1)[1]
all_remaining = all_remaining[:-1] # remove tz part
# Now all_remaining is the cron expression + command all in one list.
# The last element of all_remaining might be the full command if it was quoted?
# But MessageMatch.args splits by spaces, so multi-word commands are broken.
# To keep it simple, we'll require the user to wrap the command in quotes if it contains spaces.
# That is: !cron add "* * * * *" "!echo Hello World" tz=...
# However, Matrix messages typically don't preserve quotes.
# So we'll instead define: the cron expression consists of exactly 5 fields (minute, hour, dom, month, dow).
# So take the first 5 tokens after "add" as cron_expr. The rest is the command.
# If the command needs to be multiple words, they must be the remaining tokens.
if len(all_remaining) < 6:
await bot.api.send_text_message(room.room_id,
"Invalid syntax. Cron expression requires 5 fields (min hour dom month dow).\n"
"Example: `!cron add 0 8 * * * !weather london`")
return
cron_expr_tokens = all_remaining[:5]
command_tokens = all_remaining[5:]
cron_expr = " ".join(cron_expr_tokens)
command = " ".join(command_tokens) # may be multi-word, e.g., "!weather london"
# Validate timezone
if timezone not in pytz.all_timezones:
await bot.api.send_text_message(room.room_id,
f"❌ Unknown timezone: `{timezone}`. Use IANA names like UTC, Europe/London.")
return
# Validate cron expression
try:
CronTrigger.from_crontab(cron_expr, timezone=pytz.timezone(timezone))
except Exception as e:
await bot.api.send_text_message(room.room_id,
f"❌ Invalid cron expression: `{cron_expr}` {e}")
return
# Store in DB using current_room
with sqlite3.connect(DB_PATH) as conn:
cur = conn.execute(
"INSERT INTO cron_jobs (room_id, cron_expr, command, timezone, added_by) VALUES (?,?,?,?,?)",
(current_room, cron_expr, command, timezone, str(message.sender))
)
job_id = cur.lastrowid
conn.commit()
# Add to scheduler
trigger = CronTrigger.from_crontab(cron_expr, timezone=pytz.timezone(timezone))
scheduler.add_job(
fire_job,
trigger=trigger,
args=[bot, current_room, command],
id=str(job_id),
replace_existing=True,
)
await bot.api.send_text_message(room.room_id,
f"✅ Cron job **#{job_id}** added in this room:\n"
f"Schedule: `{cron_expr}` ({timezone})\n"
f"Command: `{command}`")
# ---------------------------------------------------------------
# REMOVE: !cron remove <job_id> (only from current room)
# ---------------------------------------------------------------
elif action == "remove":
if len(args) != 2:
await bot.api.send_text_message(room.room_id, "Usage: `!cron remove <job_id>`")
return
try:
job_id = int(args[1])
except ValueError:
await bot.api.send_text_message(room.room_id, "❌ Job ID must be a number.")
return
with sqlite3.connect(DB_PATH) as conn:
cur = conn.execute(
"DELETE FROM cron_jobs WHERE id=? AND room_id=?",
(job_id, current_room)
)
if cur.rowcount == 0:
await bot.api.send_text_message(room.room_id,
f"❌ No job #{job_id} found in this room.")
return
conn.commit()
if scheduler.get_job(str(job_id)):
scheduler.remove_job(str(job_id))
await bot.api.send_text_message(room.room_id, f"🗑️ Job #{job_id} removed from this room.")
# ---------------------------------------------------------------
# LIST: !cron list [*] (default: current room, * for all rooms)
# ---------------------------------------------------------------
elif action == "list":
show_all = False
if len(args) > 1:
if args[1] == "*":
show_all = True
else:
await bot.api.send_text_message(room.room_id,
"Usage: `!cron list` (this room) or `!cron list *` (all rooms)")
return
with sqlite3.connect(DB_PATH) as conn:
conn.row_factory = sqlite3.Row
if show_all:
rows = conn.execute(
"SELECT id, room_id, cron_expr, command, timezone, enabled FROM cron_jobs ORDER BY id"
).fetchall()
else:
rows = conn.execute(
"SELECT id, cron_expr, command, timezone, enabled FROM cron_jobs WHERE room_id=? ORDER BY id",
(current_room,)
).fetchall()
if not rows:
scope = "all rooms" if show_all else "this room"
await bot.api.send_text_message(room.room_id, f"📭 No cron jobs in {scope}.")
return
lines = [f"**Cron jobs in {'all rooms' if show_all else 'this room'}:**\n"]
for r in rows:
status = "🟢" if r["enabled"] else "🔴"
if show_all:
lines.append(
f"{status} **#{r['id']}** in `{r['room_id']}` → "
f"`{r['cron_expr']}` ({r['timezone']})\n"
f" Cmd: `{r['command']}`"
)
else:
lines.append(
f"{status} **#{r['id']}** → `{r['cron_expr']}` ({r['timezone']})\n"
f" Cmd: `{r['command']}`"
)
message_text = "\n".join(lines)
# Chunk if needed
while len(message_text) > 2000:
split_at = message_text.rfind("\n", 0, 2000)
if split_at == -1:
split_at = 2000
chunk = message_text[:split_at]
message_text = message_text[split_at:].lstrip()
await bot.api.send_text_message(room.room_id, chunk)
if message_text:
await bot.api.send_text_message(room.room_id, message_text)
# ---------------------------------------------------------------
# ENABLE: !cron enable <job_id> (only if in current room)
# ---------------------------------------------------------------
elif action == "enable":
if len(args) != 2:
await bot.api.send_text_message(room.room_id, "Usage: `!cron enable <job_id>`")
return
try:
job_id = int(args[1])
except ValueError:
await bot.api.send_text_message(room.room_id, "❌ Job ID must be a number.")
return
with sqlite3.connect(DB_PATH) as conn:
cur = conn.execute(
"UPDATE cron_jobs SET enabled=1 WHERE id=? AND room_id=?",
(job_id, current_room)
)
if cur.rowcount == 0:
await bot.api.send_text_message(room.room_id, f"❌ Job #{job_id} not found in this room.")
return
conn.commit()
# Readd to scheduler
row = conn.execute(
"SELECT cron_expr, command, timezone FROM cron_jobs WHERE id=?",
(job_id,)
).fetchone()
if row:
trigger = CronTrigger.from_crontab(row[0], timezone=pytz.timezone(row[2]))
scheduler.add_job(
fire_job,
trigger=trigger,
args=[bot, current_room, row[1]],
id=str(job_id),
replace_existing=True,
)
await bot.api.send_text_message(room.room_id, f"✅ Job #{job_id} enabled.")
# ---------------------------------------------------------------
# DISABLE: !cron disable <job_id> (only if in current room)
# ---------------------------------------------------------------
elif action == "disable":
if len(args) != 2:
await bot.api.send_text_message(room.room_id, "Usage: `!cron disable <job_id>`")
return
try:
job_id = int(args[1])
except ValueError:
await bot.api.send_text_message(room.room_id, "❌ Job ID must be a number.")
return
with sqlite3.connect(DB_PATH) as conn:
cur = conn.execute(
"UPDATE cron_jobs SET enabled=0 WHERE id=? AND room_id=?",
(job_id, current_room)
)
if cur.rowcount == 0:
await bot.api.send_text_message(room.room_id, f"❌ Job #{job_id} not found in this room.")
return
conn.commit()
if scheduler.get_job(str(job_id)):
scheduler.remove_job(str(job_id))
await bot.api.send_text_message(room.room_id, f"⏸️ Job #{job_id} disabled.")
# ---------------------------------------------------------------
# CLEAR: !cron clear (all jobs in current room)
# ---------------------------------------------------------------
elif action == "clear":
with sqlite3.connect(DB_PATH) as conn:
cur = conn.execute("SELECT id FROM cron_jobs WHERE room_id=?", (current_room,))
job_ids = [str(row[0]) for row in cur.fetchall()]
conn.execute("DELETE FROM cron_jobs WHERE room_id=?", (current_room,))
conn.commit()
for jid in job_ids:
if scheduler.get_job(jid):
scheduler.remove_job(jid)
await bot.api.send_text_message(room.room_id,
f"🧹 All cron jobs cleared from this room.")
else:
await bot.api.send_text_message(room.room_id,
"❓ Unknown action. Use: add, remove, list, enable, disable, clear.")
# ------------------------------------------------------------------
# Plugin metadata
# ------------------------------------------------------------------
__version__ = "2.1.0"
__author__ = "Funguy Cron Team"
__description__ = "Inprocess cron scheduler (roomaware, no system crontab)"
__help__ = """
<details>
<summary><strong>!cron</strong> Schedule commands via cron syntax</summary>
<summary><strong>!cron</strong> Schedule commands (roomcontext aware)</summary>
<ul>
<li><code>!cron add &lt;room_id&gt; &lt;cron_entry&gt; &lt;command&gt;</code> Add job</li>
<li><code>!cron remove &lt;room_id&gt; &lt;command&gt;</code> Remove job</li>
<li><code>!cron add &lt;cron_expr&gt; &lt;command&gt; [tz=IANA]</code> Add job to current room</li>
<li><code>!cron remove &lt;job_id&gt;</code> Remove a job</li>
<li><code>!cron list</code> List jobs in current room</li>
<li><code>!cron list *</code> List jobs in all rooms (admin)</li>
<li><code>!cron enable &lt;job_id&gt;</code> Reenable a disabled job</li>
<li><code>!cron disable &lt;job_id&gt;</code> Disable a job</li>
<li><code>!cron clear</code> Remove all jobs from current room</li>
</ul>
<p>Admin only.</p>
<p>Admin only. Timezone defaults to UTC; use <code>tz=Europe/London</code> at end.</p>
<p>Cron expression: 5 fields (<em>min hour dom month dow</em>), e.g. <code>0 8 * * *</code></p>
</details>
"""
+283 -448
View File
@@ -1,510 +1,345 @@
#!/usr/bin/env python3
"""
This plugin provides DuckDuckGo search functionality using the DuckDuckGo Instant Answer API.
DuckDuckGo search plugin (ddgs library). Results are shown inside collapsible details boxes.
"""
import asyncio
import logging
import requests
import json
from html import escape
import simplematrixbotlib as botlib
from urllib.parse import quote, urlencode
import html
from ddgs import DDGS
DDG_API_URL = "https://api.duckduckgo.com/"
DDG_SEARCH_URL = "https://html.duckduckgo.com/html/"
logger = logging.getLogger("ddg")
# ---------------------------------------------------------------------------
# Async search wrapper
# ---------------------------------------------------------------------------
async def _async_search(func, *args, **kwargs):
loop = asyncio.get_running_loop()
return await loop.run_in_executor(None, lambda: func(*args, **kwargs))
# ---------------------------------------------------------------------------
# Command handler
# ---------------------------------------------------------------------------
async def handle_command(room, message, bot, prefix, config):
"""
Function to handle DuckDuckGo search commands.
Args:
room (Room): The Matrix room where the command was invoked.
message (RoomMessage): The message object containing the command.
bot (Bot): The bot object.
prefix (str): The command prefix.
config (dict): Configuration parameters.
Returns:
None
"""
match = botlib.MessageMatch(room, message, bot, prefix)
if match.is_not_from_this_bot() and match.prefix() and match.command("ddg"):
logging.info("Received !ddg command")
if not (match.is_not_from_this_bot() and match.prefix() and match.command("ddg")):
return
args = match.args()
if len(args) < 1:
await show_usage(room, bot)
if not args:
await send_help(room, bot)
return
subcommand = args[0].lower()
if subcommand == "search":
if len(args) < 2:
await bot.api.send_text_message(room.room_id, "Usage: !ddg search <query>")
return
query = ' '.join(args[1:])
await ddg_search(room, bot, query)
elif subcommand == "instant":
if len(args) < 2:
# ---- Instant answer (default) ----
if subcommand in ("instant", "i"):
query = " ".join(args[1:]) if len(args) > 1 else ""
if not query:
await bot.api.send_text_message(room.room_id, "Usage: !ddg instant <query>")
return
query = ' '.join(args[1:])
await ddg_instant_answer(room, bot, query)
await instant_answer(room, bot, query)
# ---- Web search ----
elif subcommand == "search":
query = " ".join(args[1:]) if len(args) > 1 else ""
if not query:
await bot.api.send_text_message(room.room_id, "Usage: !ddg search <query>")
return
await web_search(room, bot, query)
# ---- Image search ----
elif subcommand == "image":
if len(args) < 2:
query = " ".join(args[1:]) if len(args) > 1 else ""
if not query:
await bot.api.send_text_message(room.room_id, "Usage: !ddg image <query>")
return
query = ' '.join(args[1:])
await ddg_image_search(room, bot, query)
await image_search(room, bot, query)
# ---- News search ----
elif subcommand == "news":
if len(args) < 2:
query = " ".join(args[1:]) if len(args) > 1 else ""
if not query:
await bot.api.send_text_message(room.room_id, "Usage: !ddg news <query>")
return
query = ' '.join(args[1:])
await ddg_news_search(room, bot, query)
await news_search(room, bot, query)
# ---- Video search ----
elif subcommand == "video":
if len(args) < 2:
query = " ".join(args[1:]) if len(args) > 1 else ""
if not query:
await bot.api.send_text_message(room.room_id, "Usage: !ddg video <query>")
return
query = ' '.join(args[1:])
await ddg_video_search(room, bot, query)
await video_search(room, bot, query)
# ---- Bang search ----
elif subcommand == "bang":
if len(args) < 2:
await show_bang_help(room, bot)
bang_query = " ".join(args[1:]) if len(args) > 1 else ""
if not bang_query:
await bang_help(room, bot)
return
bang_query = ' '.join(args[1:])
await ddg_bang_search(room, bot, bang_query)
await bang_search(room, bot, bang_query)
# ---- Definitions ----
elif subcommand == "define":
if len(args) < 2:
word = " ".join(args[1:]) if len(args) > 1 else ""
if not word:
await bot.api.send_text_message(room.room_id, "Usage: !ddg define <word>")
return
word = ' '.join(args[1:])
await ddg_definition(room, bot, word)
await definition(room, bot, word)
# ---- Calculator ----
elif subcommand == "calc":
if len(args) < 2:
expr = " ".join(args[1:]) if len(args) > 1 else ""
if not expr:
await bot.api.send_text_message(room.room_id, "Usage: !ddg calc <expression>")
return
expression = ' '.join(args[1:])
await ddg_calculator(room, bot, expression)
await calculator(room, bot, expr)
# ---- Weather ----
elif subcommand == "weather":
location = ' '.join(args[1:]) if len(args) > 1 else ""
await ddg_weather(room, bot, location)
elif subcommand == "help":
await show_usage(room, bot)
else:
# Default to instant answer search
query = ' '.join(args)
await ddg_instant_answer(room, bot, query)
async def show_usage(room, bot):
"""Display DuckDuckGo command usage."""
usage = """
<strong>🦆 DuckDuckGo Search Commands</strong>
<strong>!ddg &lt;query&gt;</strong> - Instant answer search (default)
<strong>!ddg search &lt;query&gt;</strong> - Web search with results
<strong>!ddg instant &lt;query&gt;</strong> - Instant answer with detailed info
<strong>!ddg image &lt;query&gt;</strong> - Image search
<strong>!ddg news &lt;query&gt;</strong> - News search
<strong>!ddg video &lt;query&gt;</strong> - Video search
<strong>!ddg bang &lt;!bang query&gt;</strong> - Use DuckDuckGo bangs
<strong>!ddg define &lt;word&gt;</strong> - Word definitions
<strong>!ddg calc &lt;expression&gt;</strong> - Calculator
<strong>!ddg weather [location]</strong> - Weather information
<strong>!ddg help</strong> - Show this help
<strong>Examples:</strong>
• <code>!ddg python programming</code>
• <code>!ddg search matrix protocol</code>
• <code>!ddg image cute cats</code>
• <code>!ddg bang !w matrix</code>
• <code>!ddg define serendipity</code>
• <code>!ddg calc 2+2*5</code>
• <code>!ddg weather London</code>
<strong>Popular Bangs:</strong>
• <code>!w</code> - Wikipedia
• <code>!g</code> - Google
• <code>!yt</code> - YouTube
• <code>!aw</code> - ArchWiki
• <code>!gh</code> - GitHub
• <code>!so</code> - Stack Overflow
"""
await bot.api.send_markdown_message(room.room_id, usage)
async def show_bang_help(room, bot):
"""Display DuckDuckGo bang help."""
bang_help = """
<strong>🦆 DuckDuckGo Bangs</strong>
<strong>Usage:</strong> <code>!ddg bang &lt;!bang query&gt;</code>
<strong>Popular Bangs:</strong>
• <code>!ddg bang !w matrix</code> - Search Wikipedia
• <code>!ddg bang !g python</code> - Search Google
• <code>!ddg bang !yt music</code> - Search YouTube
• <code>!ddg bang !aw arch</code> - Search ArchWiki
• <code>!ddg bang !gh repository</code> - Search GitHub
• <code>!ddg bang !so error</code> - Search Stack Overflow
• <code>!ddg bang !amazon book</code> - Search Amazon
• <code>!ddg bang !imdb movie</code> - Search IMDb
• <code>!ddg bang !reddit topic</code> - Search Reddit
• <code>!ddg bang !tw tweet</code> - Search Twitter
<strong>More Bangs:</strong>
• <code>!ddg</code> - DuckDuckGo
• <code>!bing</code> - Bing
• <code>!ddgimages</code> - DuckDuckGo Images
• <code>!npm</code> - npm packages
• <code>!cpp</code> - C++ reference
• <code>!python</code> - Python docs
• <code>!rust</code> - Rust docs
• <code>!mdn</code> - MDN Web Docs
<em>Thousands of bangs available! See: https://duckduckgo.com/bangs</em>
"""
await bot.api.send_markdown_message(room.room_id, bang_help)
async def ddg_instant_answer(room, bot, query):
"""Get DuckDuckGo instant answer."""
try:
params = {
'q': query,
'format': 'json',
'no_html': '1',
'skip_disambig': '1',
'no_redirect': '1'
}
logging.info(f"Fetching DuckDuckGo instant answer for: {query}")
response = requests.get(DDG_API_URL, params=params, timeout=10)
if response.status_code != 200:
# If API fails, provide direct search link
search_url = f"https://duckduckgo.com/?q={quote(query)}"
await bot.api.send_markdown_message(
room.room_id,
f"<strong>🦆 DuckDuckGo: {html.escape(query)}</strong><br><br>"
f"API temporarily unavailable. <a href='{search_url}'>Search on DuckDuckGo</a>"
)
return
data = response.json()
output = f"<strong>🦆 DuckDuckGo: {html.escape(query)}</strong><br><br>"
# Handle different answer types
if data.get('AbstractText'):
# Wikipedia-style answer
output += f"<strong>📚 {data.get('Heading', 'Definition')}</strong><br>"
output += f"{html.escape(data['AbstractText'])}<br>"
if data.get('AbstractURL'):
output += f"<a href='{data['AbstractURL']}'>Read more on {data.get('AbstractSource', 'Wikipedia')}</a><br>"
elif data.get('Answer'):
# Direct answer
output += f"<strong>💡 Answer</strong><br>"
output += f"{html.escape(data['Answer'])}<br>"
elif data.get('Definition'):
# Definition
output += f"<strong>📖 Definition</strong><br>"
output += f"{html.escape(data['Definition'])}<br>"
if data.get('DefinitionSource'):
output += f"<em>Source: {data['DefinitionSource']}</em><br>"
elif data.get('Results'):
# List of results
output += f"<strong>🔍 Results</strong><br>"
for result in data['Results'][:3]:
output += f"• <a href='{result.get('FirstURL', '#')}'>{html.escape(result.get('Text', 'Result'))}</a><br>"
elif data.get('RelatedTopics'):
# Related topics
output += f"<strong>🔗 Related Topics</strong><br>"
for topic in data['RelatedTopics'][:3]:
if isinstance(topic, dict) and topic.get('FirstURL'):
output += f"• <a href='{topic['FirstURL']}'>{html.escape(topic.get('Text', 'Topic'))}</a><br>"
elif isinstance(topic, dict) and topic.get('Name'):
output += f"{html.escape(topic['Name'])}<br>"
else:
# No instant answer found, show search results
output += "<strong>🔍 No instant answer found.</strong><br>"
# Add search link
search_url = f"https://duckduckgo.com/?q={quote(query)}"
output += f"<br><a href='{search_url}'>View all results on DuckDuckGo</a>"
await bot.api.send_markdown_message(room.room_id, output)
except Exception as e:
# Fallback to direct search link
search_url = f"https://duckduckgo.com/?q={quote(query)}"
await bot.api.send_markdown_message(
room.room_id,
f"<strong>🦆 DuckDuckGo: {html.escape(query)}</strong><br><br>"
f"Error accessing API. <a href='{search_url}'>Search on DuckDuckGo</a>"
)
logging.error(f"Error in ddg_instant_answer: {e}")
async def ddg_search(room, bot, query):
"""Perform web search with multiple results."""
try:
await ddg_web_search(room, bot, query, limit=5)
except Exception as e:
await bot.api.send_text_message(room.room_id, f"Error performing search: {str(e)}")
async def ddg_web_search(room, bot, query, limit=5):
"""Perform web search and return results."""
try:
params = {
'q': query,
'format': 'json'
}
response = requests.get(DDG_API_URL, params=params, timeout=10)
if response.status_code != 200:
# Fallback to direct search
search_url = f"https://duckduckgo.com/?q={quote(query)}"
await bot.api.send_markdown_message(
room.room_id,
f"<strong>🔍 DuckDuckGo Search: {html.escape(query)}</strong><br><br>"
f"API temporarily unavailable. <a href='{search_url}'>Search on DuckDuckGo</a>"
)
return
data = response.json()
output = f"<strong>🔍 DuckDuckGo Search: {html.escape(query)}</strong><br><br>"
results_shown = 0
# Show instant answer if available
if data.get('AbstractText') and results_shown < limit:
output += f"<strong>💡 {data.get('Heading', 'Instant Answer')}</strong><br>"
abstract = data['AbstractText'][:200] + "..." if len(data['AbstractText']) > 200 else data['AbstractText']
output += f"{html.escape(abstract)}<br>"
if data.get('AbstractURL'):
output += f"<a href='{data['AbstractURL']}'>Read more</a><br>"
output += "<br>"
results_shown += 1
# Show web results
if data.get('Results') and results_shown < limit:
output += "<strong>🌐 Web Results</strong><br>"
for result in data['Results'][:limit - results_shown]:
output += f"• <a href='{result.get('FirstURL', '#')}'>{html.escape(result.get('Text', 'Result'))}</a><br>"
results_shown += 1
# Show related topics
if data.get('RelatedTopics') and results_shown < limit:
output += "<strong>🔗 Related Topics</strong><br>"
for topic in data['RelatedTopics'][:limit - results_shown]:
if isinstance(topic, dict) and topic.get('FirstURL'):
output += f"• <a href='{topic['FirstURL']}'>{html.escape(topic.get('Text', 'Topic'))}</a><br>"
results_shown += 1
# Add search link
search_url = f"https://duckduckgo.com/?q={quote(query)}"
output += f"<br><a href='{search_url}'>View all results on DuckDuckGo</a>"
await bot.api.send_markdown_message(room.room_id, output)
except Exception as e:
# Fallback to direct search
search_url = f"https://duckduckgo.com/?q={quote(query)}"
await bot.api.send_markdown_message(
room.room_id,
f"<strong>🔍 DuckDuckGo Search: {html.escape(query)}</strong><br><br>"
f"Error accessing API. <a href='{search_url}'>Search on DuckDuckGo</a>"
)
logging.error(f"Error in ddg_web_search: {e}")
async def ddg_image_search(room, bot, query):
"""Perform image search."""
try:
params = {
'q': query,
'format': 'json',
'iax': 'images',
'ia': 'images'
}
response = requests.get(DDG_API_URL, params=params, timeout=10)
if response.status_code != 200:
search_url = f"https://duckduckgo.com/?q={quote(query)}&iax=images&ia=images"
await bot.api.send_markdown_message(
room.room_id,
f"<strong>🖼️ DuckDuckGo Images: {html.escape(query)}</strong><br><br>"
f"API temporarily unavailable. <a href='{search_url}'>Search images on DuckDuckGo</a>"
)
return
data = response.json()
output = f"<strong>🖼️ DuckDuckGo Images: {html.escape(query)}</strong><br><br>"
if data.get('Results'):
output += "<strong>📸 Image Results</strong><br>"
for image in data['Results'][:3]:
output += f"• <a href='{image.get('Image', '#')}'>{html.escape(image.get('Title', 'Image'))}</a><br>"
if image.get('Width') and image.get('Height'):
output += f" Size: {image['Width']}×{image['Height']}<br>"
else:
output += "No image results found.<br>"
# Add search link
search_url = f"https://duckduckgo.com/?q={quote(query)}&iax=images&ia=images"
output += f"<br><a href='{search_url}'>View all images on DuckDuckGo</a>"
await bot.api.send_markdown_message(room.room_id, output)
except Exception as e:
search_url = f"https://duckduckgo.com/?q={quote(query)}&iax=images&ia=images"
await bot.api.send_markdown_message(
room.room_id,
f"<strong>🖼️ DuckDuckGo Images: {html.escape(query)}</strong><br><br>"
f"Error accessing API. <a href='{search_url}'>Search images on DuckDuckGo</a>"
)
async def ddg_news_search(room, bot, query):
"""Perform news search."""
try:
search_url = f"https://duckduckgo.com/?q={quote(query)}&iar=news"
await bot.api.send_markdown_message(
room.room_id,
f"<strong>📰 DuckDuckGo News: {html.escape(query)}</strong><br><br>"
f"<a href='{search_url}'>View news on DuckDuckGo</a>"
)
except Exception as e:
await bot.api.send_text_message(room.room_id, f"Error performing news search: {str(e)}")
async def ddg_video_search(room, bot, query):
"""Perform video search."""
try:
search_url = f"https://duckduckgo.com/?q={quote(query)}&iar=videos"
await bot.api.send_markdown_message(
room.room_id,
f"<strong>🎬 DuckDuckGo Videos: {html.escape(query)}</strong><br><br>"
f"<a href='{search_url}'>View videos on DuckDuckGo</a>"
)
except Exception as e:
await bot.api.send_text_message(room.room_id, f"Error performing video search: {str(e)}")
async def ddg_bang_search(room, bot, bang_query):
"""Perform search using DuckDuckGo bangs."""
try:
# Create search URL directly - this is more reliable than API for bangs
search_url = f"https://duckduckgo.com/?q={quote(bang_query)}"
# Common bangs with descriptions
bang_descriptions = {
'!w': 'Wikipedia',
'!g': 'Google',
'!yt': 'YouTube',
'!aw': 'ArchWiki',
'!gh': 'GitHub',
'!so': 'Stack Overflow',
'!amazon': 'Amazon',
'!imdb': 'IMDb',
'!reddit': 'Reddit',
'!tw': 'Twitter'
}
# Extract bang for description
bang = bang_query.split(' ')[0] if ' ' in bang_query else bang_query
description = bang_descriptions.get(bang, 'Site-specific search')
output = f"<strong>🎯 DuckDuckGo Bang: {html.escape(bang)}</strong><br>"
output += f"<strong>Description:</strong> {description}<br>"
if ' ' in bang_query:
output += f"<strong>Query:</strong> {html.escape(bang_query.split(' ', 1)[1])}<br><br>"
output += f"<a href='{search_url}'>Search with {html.escape(bang)} on DuckDuckGo</a>"
await bot.api.send_markdown_message(room.room_id, output)
except Exception as e:
await bot.api.send_text_message(room.room_id, f"Error with bang search: {str(e)}")
async def ddg_definition(room, bot, word):
"""Get word definition."""
try:
search_url = f"https://duckduckgo.com/?q=define+{quote(word)}"
await bot.api.send_markdown_message(
room.room_id,
f"<strong>📖 Definition: {html.escape(word)}</strong><br><br>"
f"<a href='{search_url}'>Get definition on DuckDuckGo</a>"
)
except Exception as e:
await bot.api.send_text_message(room.room_id, f"Error getting definition: {str(e)}")
async def ddg_calculator(room, bot, expression):
"""Use DuckDuckGo as a calculator."""
try:
search_url = f"https://duckduckgo.com/?q={quote(expression)}"
await bot.api.send_markdown_message(
room.room_id,
f"<strong>🧮 Calculator: {html.escape(expression)}</strong><br><br>"
f"<a href='{search_url}'>Calculate on DuckDuckGo</a>"
)
except Exception as e:
await bot.api.send_text_message(room.room_id, f"Error with calculator: {str(e)}")
async def ddg_weather(room, bot, location):
"""Get weather information."""
try:
location = " ".join(args[1:]) if len(args) > 1 else ""
if not location:
location = "current location"
await weather(room, bot, location)
search_url = f"https://duckduckgo.com/?q=weather+{quote(location)}"
# ---- Help ----
elif subcommand == "help":
await send_help(room, bot)
# ---- Default: treat as instant answer ----
else:
query = " ".join(args)
await instant_answer(room, bot, query)
# ==============================
# Result functions (all wrapped in <details>)
# ==============================
async def instant_answer(room, bot, query):
"""Top web result wrapped in a collapsible box."""
try:
with DDGS() as ddgs:
results = await _async_search(ddgs.text, query, max_results=1)
except Exception as e:
logger.error(f"DDG instant answer error: {e}")
await bot.api.send_markdown_message(
room.room_id,
f"<strong>🌤️ Weather: {html.escape(location)}</strong><br><br>"
f"<a href='{search_url}'>Get weather on DuckDuckGo</a>"
f"🦆 <strong>DuckDuckGo: {escape(query)}</strong><br><br>Error fetching results. Try again later."
)
return
content = ""
if results:
r = results[0]
title = escape(r.get("title", "Result"))
body = escape(r.get("body", ""))
content = f"💡 <strong>{title}</strong><br>{body[:300]}…<br><a href='{r['href']}'>Read more</a>"
else:
search_url = f"https://duckduckgo.com/?q={escape(query)}"
content = f"No results found.<br>🔍 <a href='{search_url}'>Search on DuckDuckGo</a>"
msg = f"""<details>
<summary>🦆 DuckDuckGo: {escape(query)}</summary>
{content}
</details>"""
await bot.api.send_markdown_message(room.room_id, msg)
async def web_search(room, bot, query):
try:
with DDGS() as ddgs:
results = await _async_search(ddgs.text, query, max_results=5)
except Exception as e:
await bot.api.send_text_message(room.room_id, f"Error getting weather: {str(e)}")
logger.error(f"DDG web search error: {e}")
await bot.api.send_text_message(room.room_id, f"Error: {e}")
return
if not results:
await bot.api.send_text_message(room.room_id, f"No results for '{query}'.")
return
items = ""
for r in results:
title = escape(r.get("title", "Result"))
body = escape(r.get("body", ""))
items += f"• <a href='{r['href']}'>{title}</a><br> {body[:200]}…<br><br>"
msg = f"""<details>
<summary>🔍 Search: {escape(query)}</summary>
{items}
</details>"""
await bot.api.send_markdown_message(room.room_id, msg)
async def image_search(room, bot, query):
try:
with DDGS() as ddgs:
results = await _async_search(ddgs.images, query, max_results=3)
except Exception as e:
logger.error(f"DDG image error: {e}")
await bot.api.send_text_message(room.room_id, f"Error: {e}")
return
if not results:
await bot.api.send_text_message(room.room_id, f"No images for '{query}'.")
return
items = ""
for img in results:
title = escape(img.get("title", "Image"))
items += f"• <a href='{img['image']}'>{title}</a>"
if img.get("width") and img.get("height"):
items += f" ({img['width']}×{img['height']})"
items += "<br>"
search_url = f"https://duckduckgo.com/?q={escape(query)}&iax=images&ia=images"
items += f"<br>🔍 <a href='{search_url}'>View all images</a>"
msg = f"""<details>
<summary>🖼️ Images: {escape(query)}</summary>
{items}
</details>"""
await bot.api.send_markdown_message(room.room_id, msg)
async def news_search(room, bot, query):
try:
with DDGS() as ddgs:
results = await _async_search(ddgs.news, query, max_results=3)
except Exception as e:
logger.error(f"DDG news error: {e}")
await bot.api.send_text_message(room.room_id, f"Error: {e}")
return
if not results:
await bot.api.send_text_message(room.room_id, f"No news for '{query}'.")
return
items = ""
for n in results:
title = escape(n.get("title", "Article"))
body = escape(n.get("body", ""))
items += f"• <a href='{n['url']}'>{title}</a><br> {body[:200]}…<br><br>"
msg = f"""<details>
<summary>📰 News: {escape(query)}</summary>
{items}
</details>"""
await bot.api.send_markdown_message(room.room_id, msg)
async def video_search(room, bot, query):
try:
with DDGS() as ddgs:
results = await _async_search(ddgs.videos, query, max_results=3)
except Exception as e:
logger.error(f"DDG video error: {e}")
await bot.api.send_text_message(room.room_id, f"Error: {e}")
return
if not results:
await bot.api.send_text_message(room.room_id, f"No videos for '{query}'.")
return
items = ""
for v in results:
title = escape(v.get("title", "Video"))
items += f"• <a href='{v['content']}'>{title}</a><br>"
search_url = f"https://duckduckgo.com/?q={escape(query)}&iar=videos"
items += f"<br>🔍 <a href='{search_url}'>View all videos</a>"
msg = f"""<details>
<summary>🎬 Videos: {escape(query)}</summary>
{items}
</details>"""
await bot.api.send_markdown_message(room.room_id, msg)
async def bang_search(room, bot, bang_query):
search_url = f"https://duckduckgo.com/?q={escape(bang_query)}"
content = f"🔗 <a href='{search_url}'>Search with {escape(bang_query)} on DuckDuckGo</a>"
msg = f"""<details>
<summary>🎯 Bang: {escape(bang_query)}</summary>
{content}
</details>"""
await bot.api.send_markdown_message(room.room_id, msg)
async def definition(room, bot, word):
await instant_answer(room, bot, f"define {word}")
async def calculator(room, bot, expr):
await instant_answer(room, bot, expr)
async def weather(room, bot, location):
await instant_answer(room, bot, f"weather {location}")
# ---------------------------------------------------------------------------
# Plugin Metadata
# Help messages (no details wrapper kept readable)
# ---------------------------------------------------------------------------
async def bang_help(room, bot):
msg = """
<strong>🎯 DuckDuckGo Bangs</strong><br>
Usage: <code>!ddg bang !bang query</code><br><br>
<strong>Popular bangs:</strong><br>
• <code>!w</code> Wikipedia
• <code>!g</code> Google
• <code>!yt</code> YouTube
• <code>!aw</code> ArchWiki
• <code>!gh</code> GitHub
• <code>!so</code> Stack Overflow
• <code>!reddit</code> Reddit
<br>
<a href="https://duckduckgo.com/bangs">Full list here</a>
"""
await bot.api.send_markdown_message(room.room_id, msg)
__version__ = "1.0.0"
async def send_help(room, bot):
help_msg = """
<strong>🦆 DuckDuckGo Commands</strong><br>
<code>!ddg &lt;query&gt;</code> Top result (collapsible)<br>
<code>!ddg search &lt;query&gt;</code> 5 web results<br>
<code>!ddg image &lt;query&gt;</code> 3 images<br>
<code>!ddg news &lt;query&gt;</code> 3 news articles<br>
<code>!ddg video &lt;query&gt;</code> 3 videos<br>
<code>!ddg bang &lt;!bang query&gt;</code> Bang redirect<br>
<code>!ddg define &lt;word&gt;</code> Definition<br>
<code>!ddg calc &lt;expr&gt;</code> Calculator<br>
<code>!ddg weather [city]</code> Weather<br>
<code>!ddg help</code> This help
"""
await bot.api.send_markdown_message(room.room_id, help_msg)
# ---------------------------------------------------------------------------
# Plugin metadata
# ---------------------------------------------------------------------------
__version__ = "2.1.0"
__author__ = "Funguy Bot"
__description__ = "DuckDuckGo search"
__description__ = "DuckDuckGo search collapsible results (ddgs library, no API key)"
__help__ = """
<details>
<summary><strong>!ddg</strong> DuckDuckGo search and instant answers</summary>
<summary><strong>!ddg</strong> DuckDuckGo search (web, images, news, etc.)</summary>
<ul>
<li><code>!ddg &lt;query&gt;</code> Instant answer (default)</li>
<li><code>!ddg search &lt;query&gt;</code> Web search results</li>
<li><code>!ddg instant &lt;query&gt;</code> Detailed instant answer</li>
<li><code>!ddg image &lt;query&gt;</code> Image search</li>
<li><code>!ddg news &lt;query&gt;</code> News search</li>
<li><code>!ddg video &lt;query&gt;</code> Video search</li>
<li><code>!ddg bang &lt;!bang query&gt;</code> Use DuckDuckGo bangs</li>
<li><code>!ddg define &lt;word&gt;</code> Word definition</li>
<li><code>!ddg &lt;query&gt;</code> Top web result snippet (collapsible)</li>
<li><code>!ddg search &lt;query&gt;</code> 5 web results</li>
<li><code>!ddg image &lt;query&gt;</code> 3 images</li>
<li><code>!ddg news &lt;query&gt;</code> 3 news articles</li>
<li><code>!ddg video &lt;query&gt;</code> 3 videos</li>
<li><code>!ddg bang &lt;!bang query&gt;</code> Bang redirect</li>
<li><code>!ddg define &lt;word&gt;</code> Definition</li>
<li><code>!ddg calc &lt;expression&gt;</code> Calculator</li>
<li><code>!ddg weather [location]</code> Weather information</li>
<li><code>!ddg help</code> Show detailed help</li>
<li><code>!ddg weather [location]</code> Weather</li>
</ul>
<p>No API key required.</p>
<p>Uses <code>ddgs</code> library. No API key required.</p>
</details>
"""
+434
View File
@@ -0,0 +1,434 @@
#!/usr/bin/env python3
"""
plugins/roomstats.py — peruser room statistics (Limnoriastyle).
Commands: !roomstats, !rank, !stats
"""
import time
import re
import sqlite3
import logging
import nio
import simplematrixbotlib as botlib
logger = logging.getLogger("roomstats")
DB_PATH = "roomstats.db"
# ------------------------------------------------------------------
# Emoji / smiley regex (Unicode blocks)
# ------------------------------------------------------------------
EMOJI_RE = re.compile(
"["
"\U0001F600-\U0001F64F" # Emoticons
"\U0001F300-\U0001F5FF" # Symbols & pictographs
"\U0001F680-\U0001F6FF" # Transport & map
"\U0001F1E0-\U0001F1FF" # Flags
"\U00002702-\U000027B0" # Dingbats
"\U000024C2-\U0001F251" # Misc
"]+", re.UNICODE)
def count_smileys(text):
"""Return number of emoji occurrences."""
return len(EMOJI_RE.findall(text))
# ------------------------------------------------------------------
# Database init
# ------------------------------------------------------------------
def init_db():
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
c.execute("""
CREATE TABLE IF NOT EXISTS user_room_stats (
room_id TEXT,
user_id TEXT,
msgs INTEGER DEFAULT 0,
chars INTEGER DEFAULT 0,
words INTEGER DEFAULT 0,
smileys INTEGER DEFAULT 0,
actions INTEGER DEFAULT 0,
joins INTEGER DEFAULT 0,
parts INTEGER DEFAULT 0,
kicks_given INTEGER DEFAULT 0,
kicked_received INTEGER DEFAULT 0,
topics_set INTEGER DEFAULT 0,
last_updated INTEGER,
PRIMARY KEY (room_id, user_id)
)
""")
conn.commit()
conn.close()
# ------------------------------------------------------------------
# Multiword user resolution helper
# ------------------------------------------------------------------
async def resolve_user_from_tokens(bot, room_id, tokens):
"""
Given a list of word tokens, find a matching display name.
Returns (mxid, display_name) or raises ValueError.
"""
# Build cache of (lowered display name → user_id) from joined members
resp = await bot.async_client.joined_members(room_id)
if resp.members is None:
raise ValueError("Could not fetch member list.")
# Create a dict: lower_display → (mxid, display_name)
# If duplicate display name, store None to signal ambiguity.
cache = {}
for member in resp.members:
display = (member.display_name or "").strip()
if not display:
continue
key = display.lower()
if key in cache:
cache[key] = None
else:
cache[key] = (member.user_id, display)
# Try progressively longer prefixes of the tokens
for end in range(len(tokens), 0, -1):
candidate = " ".join(tokens[:end]).strip().lower()
if candidate in cache:
entry = cache[candidate]
if entry is not None:
return entry # (mxid, display_name)
else:
# Ambiguous we need to fetch and check exactly
matches = []
for member in resp.members:
if (member.display_name or "").strip().lower() == candidate:
matches.append((member.user_id, member.display_name or member.user_id))
if len(matches) == 1:
return matches[0]
elif len(matches) > 1:
raise ValueError(
f"Multiple users have display name '{candidate}'. Use an MXID instead."
)
# if none, continue
raise ValueError(f"No member found for '{' '.join(tokens)}'.")
async def resolve_user(bot, room_id, name_or_tokens):
"""
Accept either a single string (MXID or single-token display name)
or a list of tokens. Returns (mxid, display_name).
"""
if isinstance(name_or_tokens, str):
if name_or_tokens.startswith("@"):
return name_or_tokens, None
# Single token try direct cache match or fallback to multiword
tokens = [name_or_tokens]
else:
tokens = name_or_tokens
return await resolve_user_from_tokens(bot, room_id, tokens)
# ------------------------------------------------------------------
# Setup: register custom event listeners for membership & topics
# ------------------------------------------------------------------
def setup(bot):
init_db()
@bot.listener.on_custom_event(nio.RoomMemberEvent)
async def member_event(room, event):
room_id = room.room_id
membership = event.content.get("membership")
state_key = event.state_key
sender = event.sender
# Ignore the bot's own membership changes
if state_key == bot.async_client.user_id:
return
if membership == "join":
_incr(room_id, state_key, "joins")
elif membership == "leave":
if sender != state_key: # kick
_incr(room_id, sender, "kicks_given")
_incr(room_id, state_key, "kicked_received")
else: # part
_incr(room_id, state_key, "parts")
@bot.listener.on_custom_event(nio.RoomTopicEvent)
async def topic_event(room, event):
room_id = room.room_id
sender = event.sender
_incr(room_id, sender, "topics_set")
def _incr(room_id, user_id, column):
"""Increment a stat column by 1, creating row if needed."""
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
c.execute(
"INSERT OR IGNORE INTO user_room_stats (room_id, user_id) VALUES (?, ?)",
(room_id, user_id)
)
c.execute(
f"UPDATE user_room_stats SET {column} = {column} + 1, last_updated = ? WHERE room_id = ? AND user_id = ?",
(int(time.time()), room_id, user_id)
)
conn.commit()
conn.close()
# ------------------------------------------------------------------
# Message handler silently records stats, and handles commands
# ------------------------------------------------------------------
async def handle_command(room, message, bot, prefix, config):
room_id = room.room_id
sender = message.sender
# ----- silently record stats for any nonbot message -----
if sender != bot.async_client.user_id: # <-- FIXED
body = message.body or ""
words = len(body.split())
chars = len(body)
smileys = count_smileys(body)
is_action = getattr(message, "msgtype", None) == "m.emote"
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
c.execute("INSERT OR IGNORE INTO user_room_stats (room_id, user_id) VALUES (?, ?)", (room_id, sender))
c.execute(
"""UPDATE user_room_stats
SET msgs = msgs + 1,
chars = chars + ?,
words = words + ?,
smileys = smileys + ?,
actions = actions + ?,
last_updated = ?
WHERE room_id = ? AND user_id = ?""",
(chars, words, smileys, 1 if is_action else 0, int(time.time()), room_id, sender)
)
conn.commit()
conn.close()
# ----- command matching -----
match = botlib.MessageMatch(room, message, bot, prefix)
if not match.is_not_from_this_bot() or not match.prefix():
return
cmd = match.command()
args = match.args()
# ===============================
# !roomstats
# ===============================
if cmd == "roomstats":
await _handle_roomstats(bot, room_id)
# ===============================
# !rank <expr>
# ===============================
elif cmd == "rank":
if not args:
await bot.api.send_text_message(
room_id,
"Usage: !rank <stat>\n"
"Stats: msgs, chars, words, smileys, actions, joins, parts, "
"kicks_given, kicked_received, topics_set"
)
return
col = args[0].lower()
await _handle_rank(bot, room_id, col)
# ===============================
# !stats [<name>]
# ===============================
elif cmd == "stats":
if args:
# Use all tokens as the display name (multiword)
try:
target_mxid, _ = await resolve_user_from_tokens(bot, room_id, args)
except ValueError as e:
await bot.api.send_text_message(room_id, str(e))
return
else:
target_mxid = sender
await _handle_user_stats(bot, room_id, target_mxid, sender)
# ------------------------------------------------------------------
# Command implementations
# ------------------------------------------------------------------
VALID_STATS = {
"msgs": "Messages",
"chars": "Characters",
"words": "Words",
"smileys": "Smileys",
"actions": "Actions",
"joins": "Joins",
"parts": "Parts",
"kicks_given": "Kicks given",
"kicked_received": "Times kicked",
"topics_set": "Topics set",
}
async def _get_aggregate(room_id):
"""Return dict of aggregate stats for a room."""
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
c.execute("""SELECT
COALESCE(SUM(msgs),0), COALESCE(SUM(chars),0),
COALESCE(SUM(words),0), COALESCE(SUM(smileys),0),
COALESCE(SUM(actions),0), COALESCE(SUM(joins),0),
COALESCE(SUM(parts),0), COALESCE(SUM(kicks_given),0),
COALESCE(SUM(kicked_received),0), COALESCE(SUM(topics_set),0)
FROM user_room_stats WHERE room_id=?""", (room_id,))
row = c.fetchone()
conn.close()
if not row or all(v == 0 for v in row):
return None
return {
"msgs": row[0], "chars": row[1], "words": row[2], "smileys": row[3],
"actions": row[4], "joins": row[5], "parts": row[6],
"kicks_given": row[7], "kicked_received": row[8], "topics_set": row[9]
}
async def _handle_roomstats(bot, room_id):
agg = await _get_aggregate(room_id)
if not agg:
await bot.api.send_text_message(room_id, "No stats collected yet.")
return
# Get top 10 by msgs
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
c.execute("""SELECT user_id, msgs FROM user_room_stats
WHERE room_id=? ORDER BY msgs DESC LIMIT 10""", (room_id,))
top = c.fetchall()
conn.close()
# Resolve display names for top users
top_lines = []
resp = await bot.async_client.joined_members(room_id)
for uid, cnt in top:
disp = uid
if resp.members:
for m in resp.members:
if m.user_id == uid:
disp = m.display_name or uid
break
top_lines.append(f"<li><code>{disp}</code> — {cnt} msgs</li>")
msg = f"""<details>
<summary><strong>Room Statistics</strong></summary>
<ul>
<li>📩 Messages: {agg['msgs']}</li>
<li>🔤 Characters: {agg['chars']}</li>
<li>📝 Words: {agg['words']}</li>
<li>😀 Smileys: {agg['smileys']}</li>
<li>🎭 Actions: {agg['actions']}</li>
<li>🚪 Joins: {agg['joins']}</li>
<li>👋 Parts: {agg['parts']}</li>
<li>👢 Kicks given: {agg['kicks_given']}</li>
<li>🥾 Times kicked: {agg['kicked_received']}</li>
<li>📌 Topics set: {agg['topics_set']}</li>
</ul>
<p><strong>Top 10 by messages:</strong></p>
<ol>
{''.join(top_lines)}
</ol>
</details>"""
await bot.api.send_markdown_message(room_id, msg)
async def _handle_rank(bot, room_id, col):
# Validate column
if col not in VALID_STATS:
await bot.api.send_text_message(room_id, f"Unknown stat: {col}. Allowed: {', '.join(VALID_STATS.keys())}")
return
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
# Safe to use f-string because col is validated against a hardcoded set
c.execute(f"""SELECT user_id, {col} FROM user_room_stats
WHERE room_id=? AND {col} > 0 ORDER BY {col} DESC LIMIT 10""", (room_id,))
rows = c.fetchall()
conn.close()
if not rows:
await bot.api.send_text_message(room_id, f"No users with {VALID_STATS[col]} > 0.")
return
resp = await bot.async_client.joined_members(room_id)
items = []
for i, (uid, val) in enumerate(rows, 1):
disp = uid
if resp.members:
for m in resp.members:
if m.user_id == uid:
disp = m.display_name or uid
break
items.append(f"<li>{i}. <code>{disp}</code> — {val}</li>")
msg = f"""<details>
<summary><strong>Ranking by {VALID_STATS[col]}</strong></summary>
<ol>
{''.join(items)}
</ol>
</details>"""
await bot.api.send_markdown_message(room_id, msg)
async def _handle_user_stats(bot, room_id, user_id, sender):
# Fetch stats
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
c.execute("""SELECT msgs, chars, words, smileys, actions, joins, parts,
kicks_given, kicked_received, topics_set
FROM user_room_stats WHERE room_id=? AND user_id=?""", (room_id, user_id))
row = c.fetchone()
conn.close()
if not row or all(v == 0 for v in row):
# No stats, maybe just joined get display name for the message
disp = user_id
resp = await bot.async_client.joined_members(room_id)
if resp.members:
for m in resp.members:
if m.user_id == user_id:
disp = m.display_name or user_id
break
await bot.api.send_text_message(room_id, f"No stats recorded for {disp}.")
return
# Get display name
disp = user_id
resp = await bot.async_client.joined_members(room_id)
if resp.members:
for m in resp.members:
if m.user_id == user_id:
disp = m.display_name or user_id
break
msg = f"""<details>
<summary><strong>Stats for {disp}</strong></summary>
<ul>
<li>📩 Messages: {row[0]}</li>
<li>🔤 Characters: {row[1]}</li>
<li>📝 Words: {row[2]}</li>
<li>😀 Smileys: {row[3]}</li>
<li>🎭 Actions: {row[4]}</li>
<li>🚪 Joins: {row[5]}</li>
<li>👋 Parts: {row[6]}</li>
<li>👢 Kicks given: {row[7]}</li>
<li>🥾 Times kicked: {row[8]}</li>
<li>📌 Topics set: {row[9]}</li>
</ul>
</details>"""
await bot.api.send_markdown_message(room_id, msg)
# ------------------------------------------------------------------
# Plugin metadata
# ------------------------------------------------------------------
__version__ = "1.0.1"
__author__ = "Funguy Roomstats"
__description__ = "Peruser room statistics (Limnoriastyle), with multiword name support"
__help__ = """
<details>
<summary><strong>Room Statistics Commands</strong></summary>
<ul>
<li><code>!roomstats</code> Aggregate room stats + top 10 users</li>
<li><code>!rank &lt;stat&gt;</code> Top 10 by a specific stat (msgs, words, smileys, actions, joins, parts, kicks_given, kicked_received, topics_set)</li>
<li><code>!stats [name]</code> Show stats for a user (supports multiword names)</li>
</ul>
<p>All commands work in the current room; display names are automatically resolved.</p>
</details>
"""
+3 -1
View File
@@ -1,6 +1,5 @@
python-dotenv
requests
duckduckgo_search
nio
markdown2
watchdog
@@ -19,3 +18,6 @@ aiohttp
aiosqlite
pillow
omdbapi
apscheduler
pytz
ddgs
BIN
View File
Binary file not shown.