Files
FunguyBot/plugins/dns.py
2025-10-16 11:55:31 -05:00

210 lines
6.9 KiB
Python

"""
This plugin provides a command to perform DNS reconnaissance on a domain.
"""
import logging
import dns.resolver
import dns.reversename
import simplematrixbotlib as botlib
import re
# Common DNS record types to query
RECORD_TYPES = ['A', 'AAAA', 'MX', 'NS', 'TXT', 'CNAME', 'SOA', 'PTR', 'SRV']
def is_valid_domain(domain):
"""
Validate if the provided string is a valid domain name.
Args:
domain (str): The domain to validate.
Returns:
bool: True if valid, False otherwise.
"""
# Basic domain validation regex
pattern = r'^(?:[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]{2,}$'
return re.match(pattern, domain) is not None
def format_dns_record(record_type, records):
"""
Format DNS records for display.
Args:
record_type (str): The type of DNS record.
records (list): List of DNS record values.
Returns:
str: Formatted HTML string.
"""
if not records:
return ""
output = f"<strong>{record_type} Records:</strong><br>"
for record in records:
output += f"{record}<br>"
return output
async def query_dns_records(domain):
"""
Query all common DNS record types for a domain.
Args:
domain (str): The domain to query.
Returns:
dict: Dictionary with record types as keys and lists of records as values.
"""
results = {}
resolver = dns.resolver.Resolver()
resolver.timeout = 5
resolver.lifetime = 5
for record_type in RECORD_TYPES:
try:
logging.info(f"Querying {record_type} records for {domain}")
answers = resolver.resolve(domain, record_type)
records = []
for rdata in answers:
if record_type == 'MX':
# MX records have preference and exchange
records.append(f"{rdata.preference} {rdata.exchange}")
elif record_type == 'SOA':
# SOA records have multiple fields
records.append(f"{rdata.mname} {rdata.rname}")
elif record_type == 'SRV':
# SRV records have priority, weight, port, and target
records.append(f"{rdata.priority} {rdata.weight} {rdata.port} {rdata.target}")
elif record_type == 'TXT':
# TXT records can have multiple strings
txt_data = ' '.join([s.decode() if isinstance(s, bytes) else str(s) for s in rdata.strings])
records.append(txt_data)
else:
records.append(str(rdata))
if records:
results[record_type] = records
logging.info(f"Found {len(records)} {record_type} record(s)")
except dns.resolver.NoAnswer:
logging.debug(f"No {record_type} records found for {domain}")
continue
except dns.resolver.NXDOMAIN:
logging.warning(f"Domain {domain} does not exist")
return None
except dns.resolver.Timeout:
logging.warning(f"Timeout querying {record_type} for {domain}")
continue
except Exception as e:
logging.error(f"Error querying {record_type} for {domain}: {e}")
continue
return results
async def handle_command(room, message, bot, prefix, config):
"""
Function to handle the !dns command.
Args:
room (Room): The Matrix room where the command was invoked.
message (RoomMessage): The message object containing the command.
bot (Bot): The bot object.
prefix (str): The command prefix.
config (dict): Configuration parameters.
Returns:
None
"""
match = botlib.MessageMatch(room, message, bot, prefix)
if match.is_not_from_this_bot() and match.prefix() and match.command("dns"):
logging.info("Received !dns command")
args = match.args()
if len(args) != 1:
await bot.api.send_text_message(
room.room_id,
"Usage: !dns <domain>\nExample: !dns example.com"
)
logging.info("Sent usage message for !dns")
return
domain = args[0].lower().strip()
# Remove protocol if present
domain = domain.replace('http://', '').replace('https://', '')
# Remove trailing slash if present
domain = domain.rstrip('/')
# Remove www. prefix if present (optional - you can keep it if you want)
# domain = domain.replace('www.', '')
# Validate domain
if not is_valid_domain(domain):
await bot.api.send_text_message(
room.room_id,
f"Invalid domain name: {domain}"
)
logging.warning(f"Invalid domain provided: {domain}")
return
try:
logging.info(f"Starting DNS reconnaissance for {domain}")
# Send "working on it" message for longer queries
await bot.api.send_text_message(
room.room_id,
f"🔍 Performing DNS reconnaissance on {domain}..."
)
# Query DNS records
results = await query_dns_records(domain)
if results is None:
await bot.api.send_text_message(
room.room_id,
f"Domain {domain} does not exist (NXDOMAIN)"
)
return
if not results:
await bot.api.send_text_message(
room.room_id,
f"No DNS records found for {domain}"
)
return
# Format the output
output = f"<strong>🔍 DNS Records for {domain}</strong><br><br>"
# Order the records in a logical way
preferred_order = ['A', 'AAAA', 'CNAME', 'MX', 'NS', 'TXT', 'SOA', 'SRV', 'PTR']
for record_type in preferred_order:
if record_type in results:
output += format_dns_record(record_type, results[record_type])
output += "<br>"
# Add any remaining record types not in preferred order
for record_type in results:
if record_type not in preferred_order:
output += format_dns_record(record_type, results[record_type])
output += "<br>"
# Wrap in collapsible details if output is large
if output.count('<br>') > 15:
output = f"<details><summary><strong>🔍 DNS Records for {domain}</strong></summary>{output}</details>"
await bot.api.send_markdown_message(room.room_id, output)
logging.info(f"Sent DNS records for {domain}")
except Exception as e:
await bot.api.send_text_message(
room.room_id,
f"An error occurred while performing DNS lookup: {str(e)}"
)
logging.error(f"Error in DNS plugin for {domain}: {e}", exc_info=True)