Files
FunguyBot/plugins/geo.py
T

289 lines
9.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
This plugin provides IP geolocation functionality using free APIs.
It uses ip-api.com as the primary API with a fallback to ipapi.co.
"""
import logging
import aiohttp
import simplematrixbotlib as botlib
import socket
import re
async def is_valid_ip(ip):
"""
Check if the provided string is a valid IP address.
Args:
ip (str): The IP address to validate.
Returns:
bool: True if valid IP, False otherwise.
"""
try:
# Check for IPv4
socket.inet_pton(socket.AF_INET, ip)
return True
except socket.error:
try:
# Check for IPv6
socket.inet_pton(socket.AF_INET6, ip)
return True
except socket.error:
return False
def is_domain(domain):
"""
Check if the provided string is a domain name.
Args:
domain (str): The string to check.
Returns:
bool: True if it's a domain, False otherwise.
"""
domain_pattern = re.compile(
r'^([a-zA-Z0-9]([a-zA-Z0-9\-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]{2,}$'
)
return bool(domain_pattern.match(domain))
async def resolve_domain(domain):
"""
Resolve a domain name to an IP address.
Args:
domain (str): The domain to resolve.
Returns:
str: The resolved IP address or None.
"""
try:
return socket.gethostbyname(domain)
except socket.gaierror:
return None
async def query_ip_api_com(ip):
"""
Query ip-api.com for geolocation information.
Args:
ip (str): The IP address to geolocate.
Returns:
dict: Geolocation data or None if error.
"""
url = f"http://ip-api.com/json/{ip}"
try:
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
if response.status == 200:
data = await response.json()
return data
else:
logging.error(f"ip-api.com returned status {response.status}")
return None
except Exception as e:
logging.error(f"Error querying ip-api.com: {e}")
return None
async def query_ipapi_co(ip):
"""
Query ipapi.co for geolocation information (fallback).
Args:
ip (str): The IP address to geolocate.
Returns:
dict: Geolocation data or None if error.
"""
url = f"https://ipapi.co/{ip}/json/"
try:
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
if response.status == 200:
data = await response.json()
return data
else:
logging.error(f"ipapi.co returned status {response.status}")
return None
except Exception as e:
logging.error(f"Error querying ipapi.co: {e}")
return None
async def query_geolocation(ip):
"""
Query geolocation information using primary and fallback APIs.
Args:
ip (str): The IP address to geolocate.
Returns:
dict: Geolocation data or None if error.
"""
# Try primary API first
data = await query_ip_api_com(ip)
# If primary API fails, try fallback API
if not data or data.get('status') == 'fail':
logging.info("Primary API failed, trying fallback API")
data = await query_ipapi_co(ip)
return data
async def format_geolocation_results(ip, data):
"""
Format geolocation results into a readable message.
Args:
ip (str): The queried IP address
data (dict): Geolocation data
Returns:
str: Formatted message
"""
if not data:
return f"🔍 No geolocation data found for {ip}."
# Check if data is from ip-api.com or ipapi.co and format accordingly
if 'status' in data and data.get('status') == 'fail':
return f"🔍 No geolocation data found for {ip}."
# Extract relevant information based on API used
if 'country' in data: # ip-api.com format
country = data.get('country', 'N/A')
country_code = data.get('countryCode', 'N/A')
region = data.get('regionName', data.get('region', 'N/A'))
city = data.get('city', 'N/A')
postal = data.get('zip', 'N/A')
latitude = data.get('lat', 'N/A')
longitude = data.get('lon', 'N/A')
timezone = data.get('timezone', 'N/A')
isp = data.get('isp', 'N/A')
org = data.get('org', 'N/A')
asn = data.get('as', 'N/A')
else: # ipapi.co format
country = data.get('country_name', data.get('country', 'N/A'))
country_code = data.get('country_code', data.get('countryCode', 'N/A'))
region = data.get('region', 'N/A')
city = data.get('city', 'N/A')
postal = data.get('postal', 'N/A')
latitude = data.get('latitude', 'N/A')
longitude = data.get('longitude', 'N/A')
timezone = data.get('timezone', 'N/A')
isp = data.get('org', 'N/A')
org = data.get('org', 'N/A')
asn = data.get('asn', 'N/A')
# Create collapsible content
content = f"<strong>🔍 IP Geolocation Results for {ip}</strong><br><br>"
content += f"<strong>Country:</strong> {country} ({country_code})<br>"
content += f"<strong>Region:</strong> {region}<br>"
content += f"<strong>City:</strong> {city}<br>"
content += f"<strong>Postal Code:</strong> {postal}<br>"
content += f"<strong>Coordinates:</strong> {latitude}, {longitude}<br>"
content += f"<strong>Timezone:</strong> {timezone}<br>"
content += f"<strong>ISP/Organization:</strong> {isp}<br>"
content += f"<strong>ASN:</strong> {asn}<br>"
# Wrap in details tag for Matrix compatibility
message = f"<details><summary><strong>🔍 Geolocation: {ip}</strong></summary>{content}</details>"
return message
async def handle_command(room, message, bot, prefix, config):
"""
Function to handle the !geo command.
Args:
room (Room): The Matrix room where the command was invoked.
message (RoomMessage): The message object containing the command.
bot (Bot): The bot object.
prefix (str): The command prefix.
config (dict): Configuration parameters.
Returns:
None
"""
match = botlib.MessageMatch(room, message, bot, prefix)
if match.is_not_from_this_bot() and match.prefix() and match.command("geo"):
args = match.args()
if len(args) < 1:
await bot.api.send_text_message(
room.room_id,
"Usage: !geo <ip_address/domain>\nExample: !geo 8.8.8.8\nExample: !geo example.com"
)
return
query = args[0].strip()
logging.info(f"Received !geo command for: {query}")
try:
# Determine if input is IP or domain
ip = query
if is_domain(query):
# Resolve domain to IP first
await bot.api.send_text_message(
room.room_id,
f"🔍 Resolving domain {query} to IP address..."
)
ip = await resolve_domain(query)
if not ip:
await bot.api.send_text_message(
room.room_id,
f"Failed to resolve domain {query} to IP address."
)
return
await bot.api.send_text_message(
room.room_id,
f"Domain {query} resolved to IP {ip}"
)
elif not await is_valid_ip(query):
await bot.api.send_text_message(
room.room_id,
f"Invalid IP address or domain format: {query}"
)
return
# Notify user that we're starting the lookup
await bot.api.send_text_message(
room.room_id,
f"🔍 Looking up geolocation for {ip}..."
)
# Query geolocation data with fallback
geo_data = await query_geolocation(ip)
# Format and send results
result_message = await format_geolocation_results(ip, geo_data)
await bot.api.send_markdown_message(room.room_id, result_message)
logging.info(f"Successfully sent geolocation results for {ip}")
except Exception as e:
await bot.api.send_text_message(
room.room_id,
f"An error occurred during geolocation lookup for {query}. Please try again later."
)
logging.error(f"Error in geo plugin for {query}: {e}", exc_info=True)
# ---------------------------------------------------------------------------
# Plugin Metadata
# ---------------------------------------------------------------------------
__version__ = "1.0.0"
__author__ = "Funguy Bot"
__description__ = "IP geolocation lookup"
__help__ = """
<details>
<summary><strong>!geo</strong> IP / domain geolocation</summary>
<ul>
<li><code>!geo &lt;ip&gt;</code> Locate an IP address</li>
<li><code>!geo &lt;domain&gt;</code> Resolves domain then locates</li>
</ul>
<p>Shows country, region, city, coordinates, ISP, ASN. Uses ip-api.com / ipapi.co.</p>
</details>
"""