275 lines
10 KiB
Python
275 lines
10 KiB
Python
"""
|
||
Weather plugin – primary: OpenWeatherMap, fallback: Open‑Meteo.
|
||
|
||
Uses OpenWeatherMap when a valid API key is present and the request succeeds.
|
||
Falls back to Open‑Meteo (no key required) otherwise.
|
||
|
||
Commands:
|
||
!weather <location> e.g. !weather London or !weather "New York,US"
|
||
"""
|
||
|
||
import logging
|
||
import os
|
||
import aiohttp
|
||
import simplematrixbotlib as botlib
|
||
from dotenv import load_dotenv
|
||
from urllib.parse import quote
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Load .env (for OPENWEATHER_API_KEY)
|
||
# ---------------------------------------------------------------------------
|
||
plugin_dir = os.path.dirname(os.path.abspath(__file__))
|
||
parent_dir = os.path.dirname(plugin_dir)
|
||
dotenv_path = os.path.join(parent_dir, ".env")
|
||
load_dotenv(dotenv_path)
|
||
|
||
OPENWEATHER_API_KEY = os.getenv("OPENWEATHER_API_KEY", "")
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# WMO codes → description + emoji (for Open‑Meteo)
|
||
# ---------------------------------------------------------------------------
|
||
WMO_CODES = {
|
||
0: ("Clear sky", "☀️"),
|
||
1: ("Mainly clear", "🌤️"),
|
||
2: ("Partly cloudy", "⛅"),
|
||
3: ("Overcast", "☁️"),
|
||
45: ("Fog", "🌫️"),
|
||
48: ("Depositing rime fog", "🌫️"),
|
||
51: ("Light drizzle", "🌦️"),
|
||
53: ("Moderate drizzle", "🌦️"),
|
||
55: ("Dense drizzle", "🌧️"),
|
||
56: ("Light freezing drizzle", "🌧️"),
|
||
57: ("Dense freezing drizzle", "🌧️"),
|
||
61: ("Slight rain", "🌧️"),
|
||
63: ("Moderate rain", "🌧️"),
|
||
65: ("Heavy rain", "🌧️"),
|
||
66: ("Light freezing rain", "🌧️"),
|
||
67: ("Heavy freezing rain", "🌧️"),
|
||
71: ("Slight snow fall", "❄️"),
|
||
73: ("Moderate snow fall", "❄️"),
|
||
75: ("Heavy snow fall", "❄️"),
|
||
77: ("Snow grains", "❄️"),
|
||
80: ("Slight rain showers", "🌦️"),
|
||
81: ("Moderate rain showers", "🌧️"),
|
||
82: ("Violent rain showers", "🌧️"),
|
||
85: ("Slight snow showers", "🌨️"),
|
||
86: ("Heavy snow showers", "🌨️"),
|
||
95: ("Thunderstorm", "⛈️"),
|
||
96: ("Thunderstorm with slight hail", "⛈️"),
|
||
99: ("Thunderstorm with heavy hail", "⛈️"),
|
||
}
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Primary: OpenWeatherMap
|
||
# ---------------------------------------------------------------------------
|
||
async def openweathermap_get(session: aiohttp.ClientSession, location: str) -> dict | None:
|
||
"""Fetch current weather from OpenWeatherMap. Returns None on failure."""
|
||
if not OPENWEATHER_API_KEY:
|
||
logging.info("OpenWeatherMap key missing, skipping primary")
|
||
return None
|
||
|
||
url = "https://api.openweathermap.org/data/2.5/weather"
|
||
params = {
|
||
"q": location,
|
||
"appid": OPENWEATHER_API_KEY,
|
||
"units": "metric", # Celsius
|
||
}
|
||
try:
|
||
async with session.get(url, params=params, timeout=10) as resp:
|
||
if resp.status == 200:
|
||
return await resp.json()
|
||
logging.info(f"OpenWeatherMap HTTP {resp.status}, falling back")
|
||
except Exception as e:
|
||
logging.warning(f"OpenWeatherMap request error: {e}")
|
||
return None
|
||
|
||
|
||
def format_openweathermap(data: dict) -> str:
|
||
"""Build the one-line weather message from OpenWeatherMap data."""
|
||
city = data.get("name", "Unknown")
|
||
sys_data = data.get("sys", {})
|
||
country = sys_data.get("country", "")
|
||
|
||
main_data = data.get("main", {})
|
||
temp_c = main_data.get("temp", 0)
|
||
temp_f = round(temp_c * 9 / 5 + 32, 1)
|
||
humidity = main_data.get("humidity", 0)
|
||
|
||
weather_list = data.get("weather", [])
|
||
description = weather_list[0]["description"].capitalize() if weather_list else "Unknown"
|
||
emoji = "🌡️"
|
||
if weather_list:
|
||
wmain = weather_list[0].get("main", "")
|
||
emoji = {
|
||
"Clear": "☀️", "Clouds": "☁️", "Rain": "🌧️", "Drizzle": "🌦️",
|
||
"Thunderstorm": "⛈️", "Snow": "❄️", "Mist": "🌫️", "Fog": "🌫️",
|
||
"Haze": "🌫️", "Smoke": "🌫️", "Dust": "🌫️", "Sand": "🌫️",
|
||
"Ash": "🌫️", "Squall": "💨", "Tornado": "🌪️",
|
||
}.get(wmain, "🌡️")
|
||
|
||
wind = data.get("wind", {}).get("speed", 0)
|
||
|
||
return (
|
||
f"<strong>[{emoji} Weather for {city}, {country}]</strong>: "
|
||
f"<strong>Condition:</strong> {description} | "
|
||
f"<strong>Temperature:</strong> {temp_c:.1f}°C ({temp_f:.1f}°F) | "
|
||
f"<strong>Humidity:</strong> {humidity}% | "
|
||
f"<strong>Wind Speed:</strong> {wind} m/s"
|
||
)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Fallback: Open‑Meteo (no key, free)
|
||
# ---------------------------------------------------------------------------
|
||
async def meteo_geocode(session: aiohttp.ClientSession, location: str) -> dict | None:
|
||
"""Geocode a city name via Open‑Meteo. Returns location info dict or None."""
|
||
url = "https://geocoding-api.open-meteo.com/v1/search"
|
||
params = {"name": location, "count": 1, "language": "en"}
|
||
try:
|
||
async with session.get(url, params=params, timeout=10) as resp:
|
||
if resp.status == 200:
|
||
data = await resp.json()
|
||
results = data.get("results", [])
|
||
if results:
|
||
r = results[0]
|
||
return {
|
||
"name": r["name"],
|
||
"latitude": r["latitude"],
|
||
"longitude": r["longitude"],
|
||
"country": r.get("country", ""),
|
||
"state": r.get("admin1", ""),
|
||
"timezone": r.get("timezone", "UTC"),
|
||
}
|
||
except Exception as e:
|
||
logging.warning(f"Open‑Meteo geocode error: {e}")
|
||
return None
|
||
|
||
|
||
async def meteo_weather(session: aiohttp.ClientSession, lat: float, lon: float,
|
||
timezone: str = "auto") -> dict | None:
|
||
"""Fetch current weather from Open‑Meteo. Returns JSON or None."""
|
||
url = "https://api.open-meteo.com/v1/forecast"
|
||
params = {
|
||
"latitude": lat,
|
||
"longitude": lon,
|
||
"current_weather": "true",
|
||
"temperature_unit": "fahrenheit",
|
||
"windspeed_unit": "mph",
|
||
"timezone": timezone,
|
||
}
|
||
try:
|
||
async with session.get(url, params=params, timeout=10) as resp:
|
||
if resp.status == 200:
|
||
return await resp.json()
|
||
except Exception as e:
|
||
logging.warning(f"Open‑Meteo weather error: {e}")
|
||
return None
|
||
|
||
|
||
def format_meteo(loc_info: dict, weather_data: dict) -> str:
|
||
"""Format Open‑Meteo result into the same one‑line style."""
|
||
c = weather_data["current_weather"]
|
||
code = c["weathercode"]
|
||
desc, emoji = WMO_CODES.get(code, ("Unknown", "🌡️"))
|
||
|
||
city = loc_info["name"]
|
||
country = loc_info.get("country", "")
|
||
state = loc_info.get("state", "")
|
||
|
||
# Build location string
|
||
parts = [city]
|
||
if state and state != city:
|
||
parts.append(state)
|
||
if country:
|
||
parts.append(country)
|
||
loc_str = ", ".join(parts)
|
||
|
||
temp_f = c["temperature"]
|
||
temp_c = round((temp_f - 32) * 5 / 9, 1)
|
||
wind = c["windspeed"]
|
||
|
||
return (
|
||
f"<strong>[{emoji} Weather for {loc_str}]</strong>: "
|
||
f"<strong>Condition:</strong> {desc} | "
|
||
f"<strong>Temperature:</strong> {temp_c}°C ({temp_f}°F) | "
|
||
f"<strong>Wind Speed:</strong> {wind} mph"
|
||
)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Plugin entry point
|
||
# ---------------------------------------------------------------------------
|
||
async def handle_command(room, message, bot, prefix, config):
|
||
match = botlib.MessageMatch(room, message, bot, prefix)
|
||
if not (match.is_not_from_this_bot() and match.prefix() and match.command("weather")):
|
||
return
|
||
|
||
args = match.args()
|
||
if not args:
|
||
await bot.api.send_text_message(
|
||
room.room_id,
|
||
"Usage: !weather <location>\nExample: !weather London or !weather New York,US"
|
||
)
|
||
return
|
||
|
||
location = " ".join(args)
|
||
logging.info("Received !weather command for '%s'", location)
|
||
|
||
async with aiohttp.ClientSession() as session:
|
||
# 1. Try OpenWeatherMap
|
||
owm_data = await openweathermap_get(session, location)
|
||
if owm_data:
|
||
if owm_data.get("cod") == 200:
|
||
msg = format_openweathermap(owm_data)
|
||
await bot.api.send_markdown_message(room.room_id, msg)
|
||
logging.info("Sent weather via OpenWeatherMap")
|
||
return
|
||
# OpenWeatherMap returned an error status inside JSON (e.g., 401, 404)
|
||
logging.info("OpenWeatherMap returned error code %s, falling back", owm_data.get("cod"))
|
||
|
||
# 2. Fallback: Open‑Meteo
|
||
logging.info("Falling back to Open‑Meteo")
|
||
loc_info = await meteo_geocode(session, location)
|
||
if not loc_info:
|
||
await bot.api.send_text_message(
|
||
room.room_id,
|
||
f"Location '{location}' not found."
|
||
)
|
||
return
|
||
|
||
wdata = await meteo_weather(session, loc_info["latitude"],
|
||
loc_info["longitude"],
|
||
loc_info.get("timezone", "auto"))
|
||
if not wdata:
|
||
await bot.api.send_text_message(
|
||
room.room_id,
|
||
"Could not fetch weather data from any provider."
|
||
)
|
||
return
|
||
|
||
msg = format_meteo(loc_info, wdata)
|
||
await bot.api.send_markdown_message(room.room_id, msg)
|
||
logging.info("Sent weather via Open‑Meteo (fallback)")
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Plugin setup
|
||
# ---------------------------------------------------------------------------
|
||
def setup(bot):
|
||
logging.info("Weather plugin loaded (OpenWeatherMap + Open‑Meteo fallback)")
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Plugin Metadata
|
||
# ---------------------------------------------------------------------------
|
||
__version__ = "1.0.0"
|
||
__author__ = "Funguy Bot"
|
||
__description__ = "Weather forecast (OWM primary, Open‑Meteo fallback)"
|
||
__help__ = """
|
||
<details>
|
||
<summary><strong>!weather</strong> – Current weather</summary>
|
||
<p><code>!weather <location></code> – Shows temperature, conditions, humidity, wind.<br>
|
||
Uses OpenWeatherMap if a valid API key is present; falls back to free Open‑Meteo otherwise.</p>
|
||
</details>
|
||
"""
|