"""
This plugin provides WHOIS lookup functionality for domains, IPs, and related network information.
"""
import logging
import whois
import ipaddress
import re
import simplematrixbotlib as botlib
def is_valid_domain(domain):
"""
Validate if the provided string is a valid domain name.
Args:
domain (str): The domain to validate.
Returns:
bool: True if valid, False otherwise.
"""
pattern = r'^(?:[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]{2,}$|^[a-zA-Z0-9-]{1,63}$'
return re.match(pattern, domain) is not None
def is_valid_ip(ip):
"""
Validate if the provided string is a valid IPv4 or IPv6 address.
Args:
ip (str): The IP address to validate.
Returns:
bool: True if valid, False otherwise.
"""
try:
ipaddress.ip_address(ip)
return True
except ValueError:
return False
def format_whois_data(domain, data):
"""
Format WHOIS data into a readable format.
Args:
domain (str): The queried domain/IP.
data (whois domain object): The WHOIS data object.
Returns:
str: Formatted HTML message.
"""
sections = []
# Domain/Query Information
if hasattr(data, 'domain_name') or hasattr(data, 'query'):
domain_names = getattr(data, 'domain_name', domain)
if isinstance(domain_names, list):
domain_names = ', '.join(domain_names)
sections.append(f"🔍 Query: {domain_names}")
# Registrar Information
registrar_items = []
if hasattr(data, 'registrar'):
registrar_items.append(f"Registrar: {data.registrar}")
if hasattr(data, 'whois_server'):
registrar_items.append(f"WHOIS Server: {data.whois_server}")
if registrar_items:
sections.append('
'.join(registrar_items))
# Dates
date_items = []
if hasattr(data, 'creation_date'):
creation = data.creation_date
if isinstance(creation, list):
creation = creation[0]
date_items.append(f"Created: {creation}")
if hasattr(data, 'updated_date'):
updated = data.updated_date
if isinstance(updated, list):
updated = updated[0]
date_items.append(f"Updated: {updated}")
if hasattr(data, 'expiration_date'):
expiration = data.expiration_date
if isinstance(expiration, list):
expiration = expiration[0]
date_items.append(f"Expires: {expiration}")
if date_items:
sections.append('
'.join(date_items))
# Status
if hasattr(data, 'status'):
status = data.status
if isinstance(status, list):
status = '
'.join(status[:3]) # Limit to first 3 status entries
sections.append(f"Status:
{status}")
# Name Servers
if hasattr(data, 'name_servers'):
name_servers = data.name_servers
if isinstance(name_servers, list):
if len(name_servers) > 5:
name_servers_list = '
'.join(sorted(name_servers)[:5])
name_servers_list += f"
...(+{len(name_servers) - 5} more)"
else:
name_servers_list = '
'.join(sorted(name_servers))
else:
name_servers_list = str(name_servers)
sections.append(f"Name Servers:
{name_servers_list}")
# Contact Information
contact_items = []
if hasattr(data, 'org'):
contact_items.append(f"Organization: {data.org}")
if hasattr(data, 'country'):
contact_items.append(f"Country: {data.country}")
if hasattr(data, 'state'):
contact_items.append(f"State: {data.state}")
if hasattr(data, 'city'):
contact_items.append(f"City: {data.city}")
if contact_items:
sections.append('
'.join(contact_items))
# Build the final message
if sections:
content = f"🌐 WHOIS Report: {domain}
"
content += '
'.join(sections)
else:
content = f"🌐 WHOIS Information for {domain}
"
content += "No detailed information available or query returned minimal data."
# Wrap in collapsible details block for Matrix compatibility
message = f"🌐 WHOIS Report: {domain} (Click to expand)
{content} "
return message
async def handle_command(room, message, bot, prefix, config):
"""
Function to handle the !whois command.
Args:
room (Room): The Matrix room where the command was invoked.
message (RoomMessage): The message object containing the command.
bot (Bot): The bot object.
prefix (str): The command prefix.
config (dict): Configuration parameters.
Returns:
None
"""
match = botlib.MessageMatch(room, message, bot, prefix)
if match.is_not_from_this_bot() and match.prefix() and match.command("whois"):
args = match.args()
if len(args) < 1:
await bot.api.send_text_message(
room.room_id,
"Usage: !whois \nExample: !whois example.com\nExample: !whois 8.8.8.8"
)
return
query = args[0].strip()
logging.info(f"Received !whois command for: {query}")
# Validate the query
if not is_valid_domain(query) and not is_valid_ip(query):
await bot.api.send_text_message(
room.room_id,
f"Invalid domain or IP address format: {query}\nPlease provide a valid domain (e.g., example.com) or IP address."
)
logging.warning(f"Invalid WHOIS query format: {query}")
return
try:
# Perform WHOIS lookup
logging.info(f"Performing WHOIS lookup for: {query}")
await bot.api.send_text_message(room.room_id, f"🔍 Performing WHOIS lookup for {query}...")
# Use python-whois library
whois_data = whois.whois(query)
# Format and send the results
result_message = format_whois_data(query, whois_data)
await bot.api.send_markdown_message(room.room_id, result_message)
logging.info(f"Successfully sent WHOIS results for {query}")
except whois.parser.PywhoisError as e:
error_msg = f"WHOIS lookup failed for {query}.\n"
error_msg += "Possible reasons:\n- Domain/IP not found\n- WHOIS server unavailable\n- Rate limited by registrar"
await bot.api.send_text_message(room.room_id, error_msg)
logging.error(f"WHOIS lookup error for {query}: {e}")
except Exception as e:
await bot.api.send_text_message(
room.room_id,
f"An unexpected error occurred during WHOIS lookup for {query}. Please try again later."
)
logging.error(f"Unexpected error in WHOIS plugin for {query}: {e}", exc_info=True)