""" This plugin provides IP geolocation functionality using free APIs. It uses ip-api.com as the primary API with a fallback to ipapi.co. """ import logging import aiohttp import simplematrixbotlib as botlib import socket import re async def is_valid_ip(ip): """ Check if the provided string is a valid IP address. Args: ip (str): The IP address to validate. Returns: bool: True if valid IP, False otherwise. """ try: # Check for IPv4 socket.inet_pton(socket.AF_INET, ip) return True except socket.error: try: # Check for IPv6 socket.inet_pton(socket.AF_INET6, ip) return True except socket.error: return False def is_domain(domain): """ Check if the provided string is a domain name. Args: domain (str): The string to check. Returns: bool: True if it's a domain, False otherwise. """ domain_pattern = re.compile( r'^([a-zA-Z0-9]([a-zA-Z0-9\-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]{2,}$' ) return bool(domain_pattern.match(domain)) async def resolve_domain(domain): """ Resolve a domain name to an IP address. Args: domain (str): The domain to resolve. Returns: str: The resolved IP address or None. """ try: return socket.gethostbyname(domain) except socket.gaierror: return None async def query_ip_api_com(ip): """ Query ip-api.com for geolocation information. Args: ip (str): The IP address to geolocate. Returns: dict: Geolocation data or None if error. """ url = f"http://ip-api.com/json/{ip}" try: async with aiohttp.ClientSession() as session: async with session.get(url) as response: if response.status == 200: data = await response.json() return data else: logging.error(f"ip-api.com returned status {response.status}") return None except Exception as e: logging.error(f"Error querying ip-api.com: {e}") return None async def query_ipapi_co(ip): """ Query ipapi.co for geolocation information (fallback). Args: ip (str): The IP address to geolocate. Returns: dict: Geolocation data or None if error. """ url = f"https://ipapi.co/{ip}/json/" try: async with aiohttp.ClientSession() as session: async with session.get(url) as response: if response.status == 200: data = await response.json() return data else: logging.error(f"ipapi.co returned status {response.status}") return None except Exception as e: logging.error(f"Error querying ipapi.co: {e}") return None async def query_geolocation(ip): """ Query geolocation information using primary and fallback APIs. Args: ip (str): The IP address to geolocate. Returns: dict: Geolocation data or None if error. """ # Try primary API first data = await query_ip_api_com(ip) # If primary API fails, try fallback API if not data or data.get('status') == 'fail': logging.info("Primary API failed, trying fallback API") data = await query_ipapi_co(ip) return data async def format_geolocation_results(ip, data): """ Format geolocation results into a readable message. Args: ip (str): The queried IP address data (dict): Geolocation data Returns: str: Formatted message """ if not data: return f"🔍 No geolocation data found for {ip}." # Check if data is from ip-api.com or ipapi.co and format accordingly if 'status' in data and data.get('status') == 'fail': return f"🔍 No geolocation data found for {ip}." # Extract relevant information based on API used if 'country' in data: # ip-api.com format country = data.get('country', 'N/A') country_code = data.get('countryCode', 'N/A') region = data.get('regionName', data.get('region', 'N/A')) city = data.get('city', 'N/A') postal = data.get('zip', 'N/A') latitude = data.get('lat', 'N/A') longitude = data.get('lon', 'N/A') timezone = data.get('timezone', 'N/A') isp = data.get('isp', 'N/A') org = data.get('org', 'N/A') asn = data.get('as', 'N/A') else: # ipapi.co format country = data.get('country_name', data.get('country', 'N/A')) country_code = data.get('country_code', data.get('countryCode', 'N/A')) region = data.get('region', 'N/A') city = data.get('city', 'N/A') postal = data.get('postal', 'N/A') latitude = data.get('latitude', 'N/A') longitude = data.get('longitude', 'N/A') timezone = data.get('timezone', 'N/A') isp = data.get('org', 'N/A') org = data.get('org', 'N/A') asn = data.get('asn', 'N/A') # Create collapsible content content = f"🔍 IP Geolocation Results for {ip}

" content += f"Country: {country} ({country_code})
" content += f"Region: {region}
" content += f"City: {city}
" content += f"Postal Code: {postal}
" content += f"Coordinates: {latitude}, {longitude}
" content += f"Timezone: {timezone}
" content += f"ISP/Organization: {isp}
" content += f"ASN: {asn}
" # Wrap in details tag for Matrix compatibility message = f"
🔍 Geolocation: {ip}{content}
" return message async def handle_command(room, message, bot, prefix, config): """ Function to handle the !geo command. Args: room (Room): The Matrix room where the command was invoked. message (RoomMessage): The message object containing the command. bot (Bot): The bot object. prefix (str): The command prefix. config (dict): Configuration parameters. Returns: None """ match = botlib.MessageMatch(room, message, bot, prefix) if match.is_not_from_this_bot() and match.prefix() and match.command("geo"): args = match.args() if len(args) < 1: await bot.api.send_text_message( room.room_id, "Usage: !geo \nExample: !geo 8.8.8.8\nExample: !geo example.com" ) return query = args[0].strip() logging.info(f"Received !geo command for: {query}") try: # Determine if input is IP or domain ip = query if is_domain(query): # Resolve domain to IP first await bot.api.send_text_message( room.room_id, f"🔍 Resolving domain {query} to IP address..." ) ip = await resolve_domain(query) if not ip: await bot.api.send_text_message( room.room_id, f"Failed to resolve domain {query} to IP address." ) return await bot.api.send_text_message( room.room_id, f"Domain {query} resolved to IP {ip}" ) elif not await is_valid_ip(query): await bot.api.send_text_message( room.room_id, f"Invalid IP address or domain format: {query}" ) return # Notify user that we're starting the lookup await bot.api.send_text_message( room.room_id, f"🔍 Looking up geolocation for {ip}..." ) # Query geolocation data with fallback geo_data = await query_geolocation(ip) # Format and send results result_message = await format_geolocation_results(ip, geo_data) await bot.api.send_markdown_message(room.room_id, result_message) logging.info(f"Successfully sent geolocation results for {ip}") except Exception as e: await bot.api.send_text_message( room.room_id, f"An error occurred during geolocation lookup for {query}. Please try again later." ) logging.error(f"Error in geo plugin for {query}: {e}", exc_info=True)