Security fixes

This commit is contained in:
2026-05-07 17:37:20 -05:00
parent e71a145dea
commit f8e8ae9533
9 changed files with 162 additions and 386 deletions
+32 -132
View File
@@ -9,70 +9,37 @@ import simplematrixbotlib as botlib
import socket
import re
from plugins.utils import is_public_destination
async def is_valid_ip(ip):
"""
Check if the provided string is a valid IP address.
Args:
ip (str): The IP address to validate.
Returns:
bool: True if valid IP, False otherwise.
"""
"""Check if the provided string is a valid IP address."""
try:
# Check for IPv4
socket.inet_pton(socket.AF_INET, ip)
return True
except socket.error:
try:
# Check for IPv6
socket.inet_pton(socket.AF_INET6, ip)
return True
except socket.error:
return False
def is_domain(domain):
"""
Check if the provided string is a domain name.
Args:
domain (str): The string to check.
Returns:
bool: True if it's a domain, False otherwise.
"""
"""Check if the provided string is a domain name."""
domain_pattern = re.compile(
r'^([a-zA-Z0-9]([a-zA-Z0-9\-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]{2,}$'
)
return bool(domain_pattern.match(domain))
async def resolve_domain(domain):
"""
Resolve a domain name to an IP address.
Args:
domain (str): The domain to resolve.
Returns:
str: The resolved IP address or None.
"""
"""Resolve a domain name to an IP address."""
try:
return socket.gethostbyname(domain)
except socket.gaierror:
return None
async def query_ip_api_com(ip):
"""
Query ip-api.com for geolocation information.
Args:
ip (str): The IP address to geolocate.
Returns:
dict: Geolocation data or None if error.
"""
"""Query ip-api.com for geolocation information."""
url = f"http://ip-api.com/json/{ip}"
try:
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
@@ -87,17 +54,8 @@ async def query_ip_api_com(ip):
return None
async def query_ipapi_co(ip):
"""
Query ipapi.co for geolocation information (fallback).
Args:
ip (str): The IP address to geolocate.
Returns:
dict: Geolocation data or None if error.
"""
"""Query ipapi.co for geolocation information (fallback)."""
url = f"https://ipapi.co/{ip}/json/"
try:
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
@@ -112,45 +70,20 @@ async def query_ipapi_co(ip):
return None
async def query_geolocation(ip):
"""
Query geolocation information using primary and fallback APIs.
Args:
ip (str): The IP address to geolocate.
Returns:
dict: Geolocation data or None if error.
"""
# Try primary API first
"""Query geolocation information using primary and fallback APIs."""
data = await query_ip_api_com(ip)
# If primary API fails, try fallback API
if not data or data.get('status') == 'fail':
logging.info("Primary API failed, trying fallback API")
data = await query_ipapi_co(ip)
return data
async def format_geolocation_results(ip, data):
"""
Format geolocation results into a readable message.
Args:
ip (str): The queried IP address
data (dict): Geolocation data
Returns:
str: Formatted message
"""
"""Format geolocation results into a readable message."""
if not data:
return f"🔍 No geolocation data found for {ip}."
# Check if data is from ip-api.com or ipapi.co and format accordingly
if 'status' in data and data.get('status') == 'fail':
return f"🔍 No geolocation data found for {ip}."
# Extract relevant information based on API used
if 'country' in data: # ip-api.com format
if 'country' in data:
country = data.get('country', 'N/A')
country_code = data.get('countryCode', 'N/A')
region = data.get('regionName', data.get('region', 'N/A'))
@@ -162,7 +95,7 @@ async def format_geolocation_results(ip, data):
isp = data.get('isp', 'N/A')
org = data.get('org', 'N/A')
asn = data.get('as', 'N/A')
else: # ipapi.co format
else:
country = data.get('country_name', data.get('country', 'N/A'))
country_code = data.get('country_code', data.get('countryCode', 'N/A'))
region = data.get('region', 'N/A')
@@ -174,8 +107,6 @@ async def format_geolocation_results(ip, data):
isp = data.get('org', 'N/A')
org = data.get('org', 'N/A')
asn = data.get('asn', 'N/A')
# Create collapsible content
content = f"<strong>🔍 IP Geolocation Results for {ip}</strong><br><br>"
content += f"<strong>Country:</strong> {country} ({country_code})<br>"
content += f"<strong>Region:</strong> {region}<br>"
@@ -185,95 +116,64 @@ async def format_geolocation_results(ip, data):
content += f"<strong>Timezone:</strong> {timezone}<br>"
content += f"<strong>ISP/Organization:</strong> {isp}<br>"
content += f"<strong>ASN:</strong> {asn}<br>"
# Wrap in details tag for Matrix compatibility
message = f"<details><summary><strong>🔍 Geolocation: {ip}</strong></summary>{content}</details>"
return message
async def handle_command(room, message, bot, prefix, config):
"""
Function to handle the !geo command.
Args:
room (Room): The Matrix room where the command was invoked.
message (RoomMessage): The message object containing the command.
bot (Bot): The bot object.
prefix (str): The command prefix.
config (dict): Configuration parameters.
Returns:
None
"""
"""Handle the !geo command."""
match = botlib.MessageMatch(room, message, bot, prefix)
if match.is_not_from_this_bot() and match.prefix() and match.command("geo"):
args = match.args()
if len(args) < 1:
await bot.api.send_text_message(
room.room_id,
"Usage: !geo <ip_address/domain>\nExample: !geo 8.8.8.8\nExample: !geo example.com"
)
return
query = args[0].strip()
logging.info(f"Received !geo command for: {query}")
try:
# Determine if input is IP or domain
ip = query
if is_domain(query):
# Resolve domain to IP first
await bot.api.send_text_message(
room.room_id,
f"🔍 Resolving domain {query} to IP address..."
)
ip = await resolve_domain(query)
if not ip:
await bot.api.send_text_message(
room.room_id,
f"Failed to resolve domain {query} to IP address."
)
await bot.api.send_text_message(room.room_id,
f"Failed to resolve domain {query} to IP address.")
return
await bot.api.send_text_message(
room.room_id,
f"Domain {query} resolved to IP {ip}"
)
if not is_public_destination(ip):
await bot.api.send_text_message(room.room_id,
"❌ That domain resolves to a private/internal IP, geo not allowed.")
return
await bot.api.send_text_message(room.room_id,
f"Domain {query} resolved to IP {ip}")
elif not await is_valid_ip(query):
await bot.api.send_text_message(
room.room_id,
f"Invalid IP address or domain format: {query}"
)
await bot.api.send_text_message(room.room_id,
f"Invalid IP address or domain format: {query}")
return
# Notify user that we're starting the lookup
await bot.api.send_text_message(
room.room_id,
f"🔍 Looking up geolocation for {ip}..."
)
# Query geolocation data with fallback
else:
if not is_public_destination(ip):
await bot.api.send_text_message(room.room_id,
"❌ Geolocation of private IP addresses is not allowed.")
return
await bot.api.send_text_message(room.room_id,
f"🔍 Looking up geolocation for {ip}...")
geo_data = await query_geolocation(ip)
# Format and send results
result_message = await format_geolocation_results(ip, geo_data)
await bot.api.send_markdown_message(room.room_id, result_message)
logging.info(f"Successfully sent geolocation results for {ip}")
except Exception as e:
await bot.api.send_text_message(
room.room_id,
f"An error occurred during geolocation lookup for {query}. Please try again later."
)
await bot.api.send_text_message(room.room_id,
f"An error occurred during geolocation lookup for {query}. Please try again later.")
logging.error(f"Error in geo plugin for {query}: {e}", exc_info=True)
# ---------------------------------------------------------------------------
# Plugin Metadata
# ---------------------------------------------------------------------------
__version__ = "1.0.0"
__version__ = "1.0.1"
__author__ = "Funguy Bot"
__description__ = "IP geolocation lookup"
__help__ = """