#!/usr/bin/env python3 """ Wikipedia plugin for FunguyBot – uses MediaWiki APIs exclusively. No HTML scraping, no BeautifulSoup, no regex on article HTML. Commands: !wp - Fetch summary + main image !wp help - Show usage Title resolution (strategies in order): 1. Action API `opensearch` – fast, returns the most likely title directly. 2. Action API `list=search` – full‑text search, good fallback. 3. Direct hyphenated title (spaces → hyphens) – works for many drugs/chemicals. All errors are logged. No API key required. """ import logging import re import aiohttp import tempfile import os import simplematrixbotlib as botlib from urllib.parse import quote # ---------------------------------------------------------------------- # Plugin lifecycle # ---------------------------------------------------------------------- def setup(bot): logging.info("Wikipedia plugin (API‑only) loaded. !wp command ready.") # ---------------------------------------------------------------------- # Constants # ---------------------------------------------------------------------- HEADERS = {'User-Agent': 'FunguyBot/2.0 (Matrix bot; educational; contact: your-email@example.com)'} BASE = 'https://en.wikipedia.org' # ---------------------------------------------------------------------- # REST API: summary (the cleanest way to get extract + image) # ---------------------------------------------------------------------- async def _fetch_summary(session: aiohttp.ClientSession, title: str, lang: str = 'en') -> dict | None: """ GET /api/rest_v1/page/summary/ Wikipedia follows redirects automatically (e.g. 4-aco-dmt -> 4-AcO-DMT). Returns JSON dict or None. """ encoded = quote(title.replace(' ', '_')) url = f'https://{lang}.wikipedia.org/api/rest_v1/page/summary/{encoded}' try: async with session.get(url, timeout=aiohttp.ClientTimeout(total=12)) as resp: logging.info(f'Summary [{resp.status}] for {title!r}') if resp.status == 200: return await resp.json(content_type=None) return None except Exception as e: logging.warning(f'Summary fetch error for {title!r}: {e}') return None # ---------------------------------------------------------------------- # Strategy 1: OpenSearch – Wikipedia's "suggest" API, returns exact title # ---------------------------------------------------------------------- async def _strategy_opensearch(session: aiohttp.ClientSession, query: str, lang: str) -> str | None: """ GET /w/api.php?action=opensearch&search=<query>&namespace=0&limit=1 Returns a list: [query, [title1,...], [desc1,...], [url1,...]] The first title is usually the best match. """ url = f'https://{lang}.wikipedia.org/w/api.php' params = { 'action': 'opensearch', 'search': query, 'namespace': 0, 'limit': 1, 'format': 'json' } try: async with session.get(url, params=params, timeout=10) as resp: logging.info(f'OpenSearch [{resp.status}] for {query!r}') if resp.status == 200: data = await resp.json() # data = [query, [title], [description], [url]] if len(data) >= 2 and data[1]: title = data[1][0] logging.info(f'OpenSearch found: {title!r}') return title except Exception as e: logging.warning(f'OpenSearch error: {e}') return None # ---------------------------------------------------------------------- # Strategy 2: Action API full‑text search (list=search) # ---------------------------------------------------------------------- async def _strategy_action_search(session: aiohttp.ClientSession, query: str, lang: str) -> str | None: url = f'https://{lang}.wikipedia.org/w/api.php' params = { 'action': 'query', 'list': 'search', 'srsearch': query, 'srlimit': 1, 'srnamespace': 0, 'format': 'json', } try: async with session.get(url, params=params, timeout=10) as resp: logging.info(f'Action search [{resp.status}] for {query!r}') if resp.status == 200: data = await resp.json(content_type=None) results = data.get('query', {}).get('search', []) if results: title = results[0].get('title') logging.info(f'Action search found: {title!r}') return title except Exception as e: logging.warning(f'Action search error: {e}') return None # ---------------------------------------------------------------------- # Strategy 3: Direct hyphenated title (spaces → hyphens) # ---------------------------------------------------------------------- async def _strategy_direct_hyphenated(session: aiohttp.ClientSession, query: str, lang: str) -> dict | None: """ Replace spaces with hyphens and call summary directly. Wikipedia redirects mean "4-aco-dmt" -> "4-AcO-DMT" transparently. Returns the full summary dict if successful. """ if ' ' not in query: return None hyphenated = query.replace(' ', '-') data = await _fetch_summary(session, hyphenated, lang) if data and data.get('type') == 'standard' and data.get('extract'): logging.info(f'Direct hyphenated hit: {hyphenated!r} -> {data.get("title")!r}') return data return None # ---------------------------------------------------------------------- # Main fetch (orchestration) # ---------------------------------------------------------------------- async def fetch_wikipedia(query: str, lang: str = 'en') -> tuple: """ Returns (title, extract, article_url, image_path | None, error_msg | None) """ async with aiohttp.ClientSession(headers=HEADERS) as session: summary_data = None title = None # Strategy 1: OpenSearch (fastest, most accurate) title = await _strategy_opensearch(session, query, lang) if title: summary_data = await _fetch_summary(session, title, lang) # Strategy 2: Full‑text search if not summary_data: title = await _strategy_action_search(session, query, lang) if title: summary_data = await _fetch_summary(session, title, lang) # Strategy 3: Hyphenated fallback (for compounds like "4-aco-dmt") if not summary_data: summary_data = await _strategy_direct_hyphenated(session, query, lang) if not summary_data: logging.error(f'All strategies exhausted for {query!r}') return None, None, None, None, f'No Wikipedia article found for "{query}".' page_type = summary_data.get('type', 'standard') if page_type == 'disambiguation': url = summary_data.get('content_urls', {}).get('desktop', {}).get('page', '') return ( None, None, None, None, f'"{query}" is a disambiguation page – try a more specific term.' + (f' See: {url}' if url else '') ) if page_type == 'no-extract' or not summary_data.get('extract'): return None, None, None, None, f'No summary available for "{query}".' title = summary_data.get('title', query) extract = summary_data.get('extract', '').strip() url = summary_data.get('content_urls', {}).get('desktop', {}).get('page', '') # Trim long extracts (Matrix messages have ~4KB limit, but keep reasonable) if len(extract) > 1200: extract = extract[:1200].rsplit(' ', 1)[0] + '…' # Image: prefer originalimage, fallback to thumbnail image_url = ( summary_data.get('originalimage', {}).get('source') or summary_data.get('thumbnail', {}).get('source') ) image_path = await _download_image(session, image_url) if image_url else None return title, extract, url, image_path, None # ---------------------------------------------------------------------- # Image download helper # ---------------------------------------------------------------------- async def _download_image(session: aiohttp.ClientSession, image_url: str) -> str | None: if not image_url: return None if image_url.startswith('//'): image_url = 'https:' + image_url try: async with session.get(image_url, timeout=aiohttp.ClientTimeout(total=15)) as resp: if resp.status != 200: return None ct = resp.headers.get('Content-Type', '') suffix = '.png' if 'png' in ct else '.gif' if 'gif' in ct else '.jpg' data = await resp.read() with tempfile.NamedTemporaryFile(suffix=suffix, delete=False) as tmp: tmp.write(data) return tmp.name except Exception as e: logging.warning(f'Image download failed: {e}') return None # ---------------------------------------------------------------------- # Formatting (HTML for Matrix) # ---------------------------------------------------------------------- def format_response(title: str, extract: str, url: str) -> str: body = extract.replace('\n', '<br>') return ( f'<details>' f'<summary><strong>📖 Wikipedia: {title}</strong></summary>' f'<p>{body}</p>' f'<p><a href="{url}">🔗 Read full article</a></p>' f'</details>' ) # ---------------------------------------------------------------------- # Command handler (called by FunguyBot) # ---------------------------------------------------------------------- async def handle_command(room, message, bot, prefix, config): match = botlib.MessageMatch(room, message, bot, prefix) if not (match.is_not_from_this_bot() and match.prefix() and match.command('wp')): return args = match.args() if not args or args[0].lower() == 'help': help_text = """ <details> <summary><strong>📖 Wikipedia Help</strong></summary> <p> <strong>!wp <search term></strong> – Get the lead section and main image from Wikipedia.<br> <strong>!wp help</strong> – Show this help.<br><br> <strong>Examples:</strong><br> <code>!wp 4 aco dmt</code><br> <code>!wp psilocybin</code><br> <code>!wp Albert Einstein</code><br> <code>!wp Python programming language</code> </p> </details> """ await bot.api.send_markdown_message(room.room_id, help_text) return query = ' '.join(args).strip() await bot.api.send_text_message(room.room_id, f'🔍 Searching Wikipedia for: {query}…') try: title, extract, url, image_path, error = await fetch_wikipedia(query) except Exception as e: logging.exception('Unexpected error in wikipedia plugin') await bot.api.send_text_message(room.room_id, f'❌ Unexpected error: {e}') return if error: await bot.api.send_text_message(room.room_id, f'❌ {error}') return await bot.api.send_markdown_message(room.room_id, format_response(title, extract, url)) if image_path and os.path.exists(image_path): try: await bot.api.send_image_message(room_id=room.room_id, image_filepath=image_path) except Exception as e: logging.warning(f'Image send failed: {e}') finally: try: os.unlink(image_path) except OSError: pass