206 lines
7.0 KiB
Python
206 lines
7.0 KiB
Python
"""
|
|
This plugin provides WHOIS lookup functionality for domains, IPs, and related network information.
|
|
"""
|
|
|
|
import logging
|
|
import whois
|
|
import ipaddress
|
|
import re
|
|
import simplematrixbotlib as botlib
|
|
|
|
|
|
def is_valid_domain(domain):
|
|
"""
|
|
Validate if the provided string is a valid domain name.
|
|
|
|
Args:
|
|
domain (str): The domain to validate.
|
|
|
|
Returns:
|
|
bool: True if valid, False otherwise.
|
|
"""
|
|
pattern = r'^(?:[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]{2,}$|^[a-zA-Z0-9-]{1,63}$'
|
|
return re.match(pattern, domain) is not None
|
|
|
|
|
|
def is_valid_ip(ip):
|
|
"""
|
|
Validate if the provided string is a valid IPv4 or IPv6 address.
|
|
|
|
Args:
|
|
ip (str): The IP address to validate.
|
|
|
|
Returns:
|
|
bool: True if valid, False otherwise.
|
|
"""
|
|
try:
|
|
ipaddress.ip_address(ip)
|
|
return True
|
|
except ValueError:
|
|
return False
|
|
|
|
|
|
def format_whois_data(domain, data):
|
|
"""
|
|
Format WHOIS data into a readable format.
|
|
|
|
Args:
|
|
domain (str): The queried domain/IP.
|
|
data (whois domain object): The WHOIS data object.
|
|
|
|
Returns:
|
|
str: Formatted HTML message.
|
|
"""
|
|
sections = []
|
|
|
|
# Domain/Query Information
|
|
if hasattr(data, 'domain_name') or hasattr(data, 'query'):
|
|
domain_names = getattr(data, 'domain_name', domain)
|
|
if isinstance(domain_names, list):
|
|
domain_names = ', '.join(domain_names)
|
|
sections.append(f"<strong>🔍 Query:</strong> {domain_names}")
|
|
|
|
# Registrar Information
|
|
registrar_items = []
|
|
if hasattr(data, 'registrar'):
|
|
registrar_items.append(f"<strong>Registrar:</strong> {data.registrar}")
|
|
if hasattr(data, 'whois_server'):
|
|
registrar_items.append(f"<strong>WHOIS Server:</strong> {data.whois_server}")
|
|
if registrar_items:
|
|
sections.append('<br>'.join(registrar_items))
|
|
|
|
# Dates
|
|
date_items = []
|
|
if hasattr(data, 'creation_date'):
|
|
creation = data.creation_date
|
|
if isinstance(creation, list):
|
|
creation = creation[0]
|
|
date_items.append(f"<strong>Created:</strong> {creation}")
|
|
|
|
if hasattr(data, 'updated_date'):
|
|
updated = data.updated_date
|
|
if isinstance(updated, list):
|
|
updated = updated[0]
|
|
date_items.append(f"<strong>Updated:</strong> {updated}")
|
|
|
|
if hasattr(data, 'expiration_date'):
|
|
expiration = data.expiration_date
|
|
if isinstance(expiration, list):
|
|
expiration = expiration[0]
|
|
date_items.append(f"<strong>Expires:</strong> {expiration}")
|
|
|
|
if date_items:
|
|
sections.append('<br>'.join(date_items))
|
|
|
|
# Status
|
|
if hasattr(data, 'status'):
|
|
status = data.status
|
|
if isinstance(status, list):
|
|
status = '<br>'.join(status[:3]) # Limit to first 3 status entries
|
|
sections.append(f"<strong>Status:</strong><br>{status}")
|
|
|
|
# Name Servers
|
|
if hasattr(data, 'name_servers'):
|
|
name_servers = data.name_servers
|
|
if isinstance(name_servers, list):
|
|
if len(name_servers) > 5:
|
|
name_servers_list = '<br>'.join(sorted(name_servers)[:5])
|
|
name_servers_list += f"<br><em>...(+{len(name_servers) - 5} more)</em>"
|
|
else:
|
|
name_servers_list = '<br>'.join(sorted(name_servers))
|
|
else:
|
|
name_servers_list = str(name_servers)
|
|
sections.append(f"<strong>Name Servers:</strong><br>{name_servers_list}")
|
|
|
|
# Contact Information
|
|
contact_items = []
|
|
if hasattr(data, 'org'):
|
|
contact_items.append(f"<strong>Organization:</strong> {data.org}")
|
|
if hasattr(data, 'country'):
|
|
contact_items.append(f"<strong>Country:</strong> {data.country}")
|
|
if hasattr(data, 'state'):
|
|
contact_items.append(f"<strong>State:</strong> {data.state}")
|
|
if hasattr(data, 'city'):
|
|
contact_items.append(f"<strong>City:</strong> {data.city}")
|
|
|
|
if contact_items:
|
|
sections.append('<br>'.join(contact_items))
|
|
|
|
# Build the final message
|
|
if sections:
|
|
content = f"<strong>🌐 WHOIS Report: {domain}</strong><br><br>"
|
|
content += '<br><br>'.join(sections)
|
|
else:
|
|
content = f"<strong>🌐 WHOIS Information for {domain}</strong><br><br>"
|
|
content += "<em>No detailed information available or query returned minimal data.</em>"
|
|
|
|
# Wrap in collapsible details block for Matrix compatibility
|
|
message = f"<details><summary><strong>🌐 WHOIS Report: {domain} (Click to expand)</strong></summary>{content}</details>"
|
|
|
|
return message
|
|
|
|
|
|
async def handle_command(room, message, bot, prefix, config):
|
|
"""
|
|
Function to handle the !whois command.
|
|
|
|
Args:
|
|
room (Room): The Matrix room where the command was invoked.
|
|
message (RoomMessage): The message object containing the command.
|
|
bot (Bot): The bot object.
|
|
prefix (str): The command prefix.
|
|
config (dict): Configuration parameters.
|
|
|
|
Returns:
|
|
None
|
|
"""
|
|
match = botlib.MessageMatch(room, message, bot, prefix)
|
|
|
|
if match.is_not_from_this_bot() and match.prefix() and match.command("whois"):
|
|
args = match.args()
|
|
|
|
if len(args) < 1:
|
|
await bot.api.send_text_message(
|
|
room.room_id,
|
|
"Usage: !whois <domain/ip>\nExample: !whois example.com\nExample: !whois 8.8.8.8"
|
|
)
|
|
return
|
|
|
|
query = args[0].strip()
|
|
logging.info(f"Received !whois command for: {query}")
|
|
|
|
# Validate the query
|
|
if not is_valid_domain(query) and not is_valid_ip(query):
|
|
await bot.api.send_text_message(
|
|
room.room_id,
|
|
f"Invalid domain or IP address format: {query}\nPlease provide a valid domain (e.g., example.com) or IP address."
|
|
)
|
|
logging.warning(f"Invalid WHOIS query format: {query}")
|
|
return
|
|
|
|
try:
|
|
# Perform WHOIS lookup
|
|
logging.info(f"Performing WHOIS lookup for: {query}")
|
|
await bot.api.send_text_message(room.room_id, f"🔍 Performing WHOIS lookup for {query}...")
|
|
|
|
# Use python-whois library
|
|
whois_data = whois.whois(query)
|
|
|
|
# Format and send the results
|
|
result_message = format_whois_data(query, whois_data)
|
|
await bot.api.send_markdown_message(room.room_id, result_message)
|
|
logging.info(f"Successfully sent WHOIS results for {query}")
|
|
|
|
except whois.parser.PywhoisError as e:
|
|
error_msg = f"WHOIS lookup failed for {query}.\n"
|
|
error_msg += "Possible reasons:\n- Domain/IP not found\n- WHOIS server unavailable\n- Rate limited by registrar"
|
|
await bot.api.send_text_message(room.room_id, error_msg)
|
|
logging.error(f"WHOIS lookup error for {query}: {e}")
|
|
|
|
except Exception as e:
|
|
await bot.api.send_text_message(
|
|
room.room_id,
|
|
f"An unexpected error occurred during WHOIS lookup for {query}. Please try again later."
|
|
)
|
|
logging.error(f"Unexpected error in WHOIS plugin for {query}: {e}", exc_info=True)
|