imdb plugin added

This commit is contained in:
2026-04-30 22:07:19 -05:00
parent 868b2b7e32
commit c24893e141
4 changed files with 437 additions and 4 deletions
+3
View File
@@ -11,3 +11,6 @@ store
funguybot.service
stats.db
cron.db
__pycache__/
lastfm.db
venv.bak/
+390
View File
@@ -0,0 +1,390 @@
#!/usr/bin/env python3
"""
Fully Featured OMDb Plugin for Funguy Bot
Provides complete OMDb API functionality:
!imdb <title> - Get movie/series details
!imdb id <imdb_id> - Lookup by IMDb ID (e.g. tt0133093)
!imdb search <query> - Search for titles (returns list)
!imdb episode <series> -s N -e N - Get episode details
!imdb help - Show this help
Optional flags (works with title and id):
-y <year> - Year of release
-t <movie|series|episode> - Type filter
--short-plot - Show short plot (default is full)
Requires: OMDB_API_KEY in environment variables.
"""
import logging
import os
import aiohttp
import asyncio
import tempfile
import argparse
import shlex
from urllib.parse import quote_plus
# ----------------------------------------------------------------------
# Plugin setup (called by FunguyBot after bot instance is created)
# ----------------------------------------------------------------------
def setup(bot):
"""Called by the bot after the bot instance is created."""
api_key = os.getenv("OMDB_API_KEY")
if not api_key:
logging.warning("OMDB_API_KEY not set in environment. !imdb will not work.")
else:
logging.info("OMDb plugin loaded. API key present.")
# ----------------------------------------------------------------------
# Helper functions
# ----------------------------------------------------------------------
async def fetch_omdb(session, params):
"""Perform async GET request to OMDb API, return JSON."""
api_key = os.getenv("OMDB_API_KEY")
if not api_key:
return {"Error": "OMDB_API_KEY missing", "Response": "False"}
params["apikey"] = api_key
url = "https://www.omdbapi.com/"
try:
async with session.get(url, params=params, timeout=15) as resp:
if resp.status == 200:
return await resp.json()
else:
return {"Error": f"HTTP {resp.status}", "Response": "False"}
except asyncio.TimeoutError:
return {"Error": "Request timed out", "Response": "False"}
except Exception as e:
return {"Error": str(e), "Response": "False"}
async def download_poster(session, poster_url):
"""Download poster to a temp file, return path or None."""
if not poster_url or poster_url == "N/A":
return None
try:
async with session.get(poster_url, timeout=10) as resp:
if resp.status == 200:
with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as tmp:
tmp.write(await resp.read())
return tmp.name
else:
logging.debug(f"Poster download failed: HTTP {resp.status}")
return None
except Exception as e:
logging.debug(f"Poster download error: {e}")
return None
def format_movie_html(data, poster_path=None):
"""Format OMDb movie/series data into HTML for Matrix."""
if not data or data.get("Response") == "False":
error = data.get("Error", "No information found.")
return f"<em>❌ {error}</em>"
title = data.get("Title", "Unknown")
year = data.get("Year", "N/A")
rated = data.get("Rated", "N/A")
runtime = data.get("Runtime", "N/A")
genre = data.get("Genre", "N/A")
director = data.get("Director", "N/A")
writer = data.get("Writer", "N/A")
actors = data.get("Actors", "N/A")
plot = data.get("Plot", "N/A")
language = data.get("Language", "N/A")
country = data.get("Country", "N/A")
awards = data.get("Awards", "N/A")
poster_url = data.get("Poster", "")
imdb_rating = data.get("imdbRating", "N/A")
imdb_votes = data.get("imdbVotes", "N/A")
imdb_id = data.get("imdbID", "")
type_ = data.get("Type", "movie")
box_office = data.get("BoxOffice", "N/A")
production = data.get("Production", "N/A")
website = data.get("Website", "N/A")
# Rotten Tomatoes ratings
ratings = data.get("Ratings", [])
rt_rating = next((r["Value"] for r in ratings if r["Source"] == "Rotten Tomatoes"), "N/A")
mc_rating = next((r["Value"] for r in ratings if r["Source"] == "Metacritic"), "N/A")
# Build HTML
imdb_link = f"https://www.imdb.com/title/{imdb_id}/" if imdb_id and imdb_id != "N/A" else ""
title_line = f"<strong>🎬 {title}</strong> ({year})"
if imdb_link:
title_line += f" <a href='{imdb_link}'>IMDb</a>"
lines = [title_line]
lines.append(f"⭐ <strong>IMDb:</strong> {imdb_rating}/10 ({imdb_votes} votes) | 🍅 <strong>Rotten:</strong> {rt_rating} | 📊 <strong>Metacritic:</strong> {mc_rating}")
lines.append(f"📅 <strong>Rated:</strong> {rated} | ⏱ <strong>Runtime:</strong> {runtime} | 🎭 <strong>Genre:</strong> {genre} | 🎬 <strong>Type:</strong> {type_.capitalize()}")
if director != "N/A":
lines.append(f"🎬 <strong>Director:</strong> {director}")
if writer != "N/A":
lines.append(f"✍️ <strong>Writer:</strong> {writer}")
if actors != "N/A":
lines.append(f"🎭 <strong>Actors:</strong> {actors}")
if plot != "N/A":
lines.append(f"📝 <strong>Plot:</strong> {plot}")
extra = []
if language != "N/A":
extra.append(f"🗣 {language}")
if country != "N/A":
extra.append(f"🌍 {country}")
if awards != "N/A" and awards.lower() != "n/a":
extra.append(f"🏆 {awards}")
if box_office != "N/A":
extra.append(f"💰 {box_office}")
if production != "N/A":
extra.append(f"🏢 {production}")
if website != "N/A" and website.lower() != "n/a":
extra.append(f"🌐 <a href='{website}'>Website</a>")
if extra:
lines.append("<strong>️ More:</strong> " + " | ".join(extra))
if poster_path:
lines.append("<em>📸 A poster image has been sent separately.</em>")
elif poster_url and poster_url != "N/A":
lines.append(f"📸 <a href='{poster_url}'>View Poster</a>")
html = "<br>".join(lines)
return f"<details><summary><strong>🎬 OMDb: {title} ({year})</strong></summary>{html}</details>"
def format_search_results(results):
"""Format search results (list of movies) into HTML."""
if not results or results.get("Response") == "False":
error = results.get("Error", "No results found.")
return f"<em>❌ {error}</em>"
search_list = results.get("Search", [])
if not search_list:
return "<em>❌ No matching titles found.</em>"
lines = ["<strong>🔍 Search Results:</strong><br><ul>"]
for item in search_list[:10]: # OMDb returns up to 10 per page
title = item.get("Title", "Unknown")
year = item.get("Year", "N/A")
imdb_id = item.get("imdbID", "")
type_ = item.get("Type", "movie")
link = f"https://www.imdb.com/title/{imdb_id}/" if imdb_id else ""
if link:
lines.append(f"<li><strong>{title}</strong> ({year}) {type_.capitalize()} <a href='{link}'>IMDb</a></li>")
else:
lines.append(f"<li><strong>{title}</strong> ({year}) {type_.capitalize()}</li>")
lines.append("</ul>")
lines.append("<em>Use `!imdb <title>` or `!imdb id <tt...>` to get full details.</em>")
return "<br>".join(lines)
def format_episode_html(data, poster_path=None):
"""Format episode data specially."""
if not data or data.get("Response") == "False":
error = data.get("Error", "Episode not found.")
return f"<em>❌ {error}</em>"
title = data.get("Title", "Unknown Episode")
series = data.get("SeriesTitle", data.get("Title", "")) # Some APIs return SeriesTitle
season = data.get("Season", "?")
episode = data.get("Episode", "?")
year = data.get("Year", "N/A")
plot = data.get("Plot", "N/A")
imdb_rating = data.get("imdbRating", "N/A")
imdb_id = data.get("imdbID", "")
imdb_link = f"https://www.imdb.com/title/{imdb_id}/" if imdb_id else ""
header = f"<strong>📺 {series} S{season}E{episode}: {title}</strong> ({year})"
if imdb_link:
header += f" <a href='{imdb_link}'>IMDb</a>"
lines = [header]
lines.append(f"⭐ <strong>IMDb Rating:</strong> {imdb_rating}/10")
if plot != "N/A":
lines.append(f"📝 <strong>Plot:</strong> {plot}")
if poster_path:
lines.append("<em>📸 Episode poster attached.</em>")
html = "<br>".join(lines)
return f"<details><summary><strong>📺 Episode Info</strong></summary>{html}</details>"
# ----------------------------------------------------------------------
# Command parsing and dispatching
# ----------------------------------------------------------------------
async def handle_command(room, message, bot, prefix, config):
"""Main entry point for !imdb commands."""
import simplematrixbotlib as botlib
match = botlib.MessageMatch(room, message, bot, prefix)
if not (match.is_not_from_this_bot() and match.prefix() and match.command("imdb")):
return
# Full raw command text (including arguments)
raw_args = message.body.split()[1:] # skip "!imdb"
if not raw_args:
await send_help(room, bot)
return
# First argument determines subcommand
subcmd = raw_args[0].lower()
if subcmd == "help":
await send_help(room, bot)
return
elif subcmd == "search":
# !imdb search <query>
if len(raw_args) < 2:
await bot.api.send_text_message(room.room_id, "Usage: !imdb search <movie title>")
return
query = " ".join(raw_args[1:]).strip()
await handle_search(room, bot, query)
elif subcmd == "id":
# !imdb id <tt1234567>
if len(raw_args) < 2:
await bot.api.send_text_message(room.room_id, "Usage: !imdb id <IMDb ID> (e.g. tt0133093)")
return
imdb_id = raw_args[1].strip()
await handle_lookup_by_id(room, bot, imdb_id)
elif subcmd == "episode":
# !imdb episode <series> -s <season> -e <episode> [--short-plot]
await handle_episode(room, bot, raw_args[1:])
else:
# Assume it's a title lookup with optional flags
# Reconstruct command line: "!imdb <title> -y 1999 -t movie --short-plot"
# We'll parse using argparse on the whole raw_args (which is everything after !imdb)
await handle_title_lookup(room, bot, raw_args)
# ----------------------------------------------------------------------
# Subcommand implementations
# ----------------------------------------------------------------------
async def send_help(room, bot):
help_text = """
<details><summary><strong>🎬 OMDb Plugin Complete Commands</strong></summary>
<p>
<strong>!imdb &lt;title&gt;</strong><br>
Get full details for a movie or series.<br>
Flags: <code>-y YEAR</code> <code>-t movie|series|episode</code> <code>--short-plot</code><br>
Example: <code>!imdb "The Matrix" -y 1999 -t movie</code><br><br>
<strong>!imdb id &lt;tt1234567&gt;</strong><br>
Lookup by IMDb ID.<br>
Example: <code>!imdb id tt0133093</code><br><br>
<strong>!imdb search &lt;query&gt;</strong><br>
Returns a list of matching titles (up to 10).<br>
Example: <code>!imdb search Lord of the Rings</code><br><br>
<strong>!imdb episode &lt;series&gt; -s N -e N</strong><br>
Get details for a specific episode.<br>
Example: <code>!imdb episode "Breaking Bad" -s 1 -e 1</code><br><br>
<strong>!imdb help</strong><br>
Show this help.<br><br>
<em>Requires OMDB_API_KEY environment variable.</em>
</p>
</details>
"""
await bot.api.send_markdown_message(room.room_id, help_text)
async def handle_search(room, bot, query):
"""Perform a search (s=) and return formatted list."""
params = {"s": query}
async with aiohttp.ClientSession() as session:
data = await fetch_omdb(session, params)
if data.get("Response") == "False":
await bot.api.send_text_message(room.room_id, f"{data.get('Error', 'Search failed')}")
return
html = format_search_results(data)
await bot.api.send_markdown_message(room.room_id, html)
async def handle_lookup_by_id(room, bot, imdb_id):
"""Lookup by IMDb ID (i=)."""
params = {"i": imdb_id, "plot": "full"}
async with aiohttp.ClientSession() as session:
data = await fetch_omdb(session, params)
if data.get("Response") == "False":
await bot.api.send_text_message(room.room_id, f"{data.get('Error', 'ID not found')}")
return
poster_path = await download_poster(session, data.get("Poster", ""))
html = format_movie_html(data, poster_path)
await bot.api.send_markdown_message(room.room_id, html)
if poster_path:
try:
await bot.api.send_image_message(room_id=room.room_id, image_filepath=poster_path)
finally:
os.unlink(poster_path)
async def handle_title_lookup(room, bot, args):
"""Parse arguments for title lookup (supports -y, -t, --short-plot)."""
parser = argparse.ArgumentParser(prog="!imdb", add_help=False)
parser.add_argument("title", nargs="+", help="Movie or series title")
parser.add_argument("-y", "--year", type=str, help="Year of release")
parser.add_argument("-t", "--type", choices=["movie", "series", "episode"], help="Type filter")
parser.add_argument("--short-plot", action="store_true", help="Use short plot (default is full)")
try:
# argparse expects a list of strings; we already have args as list
parsed = parser.parse_args(args)
title = " ".join(parsed.title)
params = {"t": title, "plot": "short" if parsed.short_plot else "full"}
if parsed.year:
params["y"] = parsed.year
if parsed.type:
params["type"] = parsed.type
except SystemExit:
# argparse would exit on error; we catch and show usage
await bot.api.send_markdown_message(room.room_id,
"Usage: `!imdb <title> [-y YEAR] [-t movie|series|episode] [--short-plot]`\n"
"Example: `!imdb 'The Matrix' -y 1999 -t movie`")
return
except Exception as e:
await bot.api.send_text_message(room.room_id, f"Error parsing arguments: {e}")
return
async with aiohttp.ClientSession() as session:
data = await fetch_omdb(session, params)
if data.get("Response") == "False":
await bot.api.send_text_message(room.room_id, f"{data.get('Error', 'Not found')}")
return
poster_path = await download_poster(session, data.get("Poster", ""))
html = format_movie_html(data, poster_path)
await bot.api.send_markdown_message(room.room_id, html)
if poster_path:
try:
await bot.api.send_image_message(room_id=room.room_id, image_filepath=poster_path)
finally:
os.unlink(poster_path)
async def handle_episode(room, bot, args):
"""Handle episode lookup: !imdb episode <series> -s N -e N"""
parser = argparse.ArgumentParser(prog="!imdb episode", add_help=False)
parser.add_argument("series", nargs="+", help="Series title")
parser.add_argument("-s", "--season", type=int, required=True, help="Season number")
parser.add_argument("-e", "--episode", type=int, required=True, help="Episode number")
parser.add_argument("--short-plot", action="store_true", help="Short plot")
try:
parsed = parser.parse_args(args)
series = " ".join(parsed.series)
params = {
"t": series,
"season": parsed.season,
"episode": parsed.episode,
"plot": "short" if parsed.short_plot else "full"
}
except SystemExit:
await bot.api.send_markdown_message(room.room_id,
"Usage: `!imdb episode <series> -s <season> -e <episode>`\n"
"Example: `!imdb episode 'Breaking Bad' -s 1 -e 1`")
return
except Exception as e:
await bot.api.send_text_message(room.room_id, f"Error: {e}")
return
async with aiohttp.ClientSession() as session:
data = await fetch_omdb(session, params)
if data.get("Response") == "False":
await bot.api.send_text_message(room.room_id, f"❌ Episode not found: {data.get('Error', '')}")
return
poster_path = await download_poster(session, data.get("Poster", ""))
html = format_episode_html(data, poster_path)
await bot.api.send_markdown_message(room.room_id, html)
if poster_path:
try:
await bot.api.send_image_message(room_id=room.room_id, image_filepath=poster_path)
finally:
os.unlink(poster_path)
+42 -3
View File
@@ -4,11 +4,13 @@ Plugin for welcoming new users to the room.
Features:
* Automatically greets users when they join the target room.
* !welcome command manually trigger the welcome message for yourself.
* Per-user cooldown to prevent welcome spam on repeated part/join cycles.
Restricted to room: !NXdVjDXPxXowPkrJJY:matrix.org
"""
import logging
import time
import simplematrixbotlib as botlib
import nio
@@ -18,6 +20,32 @@ import nio
ALLOWED_ROOM_ID = "!NXdVjDXPxXowPkrJJY:matrix.org"
# How many seconds must pass before a user can receive another auto-welcome.
# Default: 10 minutes. Raise this if you still see abuse.
WELCOME_COOLDOWN_SECONDS = 600
# ---------------------------------------------------------------------------
# Cooldown state
# Maps Matrix user ID → monotonic timestamp of last auto-welcome sent.
# Resets on bot restart, which is fine — a fresh start is a clean slate.
# ---------------------------------------------------------------------------
_last_welcomed: dict[str, float] = {}
def _is_on_cooldown(user_id: str) -> bool:
"""Return True if the user was welcomed recently and should be skipped."""
last = _last_welcomed.get(user_id)
if last is None:
return False
return (time.monotonic() - last) < WELCOME_COOLDOWN_SECONDS
def _record_welcome(user_id: str) -> None:
"""Mark that this user was just welcomed."""
_last_welcomed[user_id] = time.monotonic()
# ---------------------------------------------------------------------------
# Internal helpers
# ---------------------------------------------------------------------------
@@ -46,8 +74,7 @@ def setup(bot):
Register the member-join listener.
funguy.py must call plugin_module.setup(self.bot) for each loaded plugin
(after self.bot is created) for this listener to fire. See the note in
funguy.py's run() method.
(after self.bot is created) for this listener to fire.
"""
@bot.listener.on_custom_event(nio.RoomMemberEvent)
@@ -70,6 +97,17 @@ def setup(bot):
logging.debug("Ignoring bot's own join event")
return
# --- Cooldown check ---
if _is_on_cooldown(event.sender):
remaining = WELCOME_COOLDOWN_SECONDS - (
time.monotonic() - _last_welcomed[event.sender]
)
logging.info(
"Skipping welcome for %s cooldown active (%.0fs remaining)",
event.sender, remaining,
)
return
# Derive a friendly display name from the Matrix ID (@name:server → name)
display_name = event.sender.split(":")[0].lstrip("@")
@@ -80,6 +118,7 @@ def setup(bot):
await bot.api.send_markdown_message(
room.room_id, _build_welcome_message(display_name)
)
_record_welcome(event.sender) # Only recorded after a successful send
logging.info("Welcome message sent successfully to %s", display_name)
except Exception as exc:
logging.error("Failed to send welcome message: %s", exc)
@@ -92,7 +131,7 @@ def setup(bot):
# ---------------------------------------------------------------------------
async def handle_command(room, message, bot, prefix, config):
"""Handle the !welcome command."""
"""Handle the !welcome command (not subject to the auto-welcome cooldown)."""
match = botlib.MessageMatch(room, message, bot, prefix)
+1
View File
@@ -18,3 +18,4 @@ python-whois
aiohttp
aiosqlite
pillow
omdbapi