""" DNS reconnaissance plugin – queries A, AAAA, MX, NS, TXT, CNAME, SOA, SRV, PTR records. Outputs a formatted code block with emojis and perfectly aligned columns. """ import logging import asyncio import dns.resolver import dns.reversename import simplematrixbotlib as botlib import re from plugins.common import is_public_destination, html_escape, collapsible_summary, code_block RECORD_TYPES = ['A', 'AAAA', 'MX', 'NS', 'TXT', 'CNAME', 'SOA', 'PTR', 'SRV'] def is_valid_domain(domain): pattern = r'^(?:[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]{2,}$' return re.match(pattern, domain) is not None async def query_dns_records(domain): loop = asyncio.get_running_loop() def _resolve(): results = {} resolver = dns.resolver.Resolver() resolver.timeout = 5 resolver.lifetime = 5 for record_type in RECORD_TYPES: try: answers = resolver.resolve(domain, record_type) records = [] for rdata in answers: if record_type == 'MX': records.append(f"{rdata.preference} {rdata.exchange}") elif record_type == 'SOA': records.append(f"{rdata.mname} {rdata.rname}") elif record_type == 'SRV': records.append(f"{rdata.priority} {rdata.weight} {rdata.port} {rdata.target}") elif record_type == 'TXT': txt_data = ' '.join([s.decode() if isinstance(s, bytes) else str(s) for s in rdata.strings]) records.append(txt_data) else: records.append(str(rdata)) if records: results[record_type] = records except dns.resolver.NoAnswer: continue except dns.resolver.NXDOMAIN: return None except dns.resolver.Timeout: continue except Exception as e: logging.error(f"Error querying {record_type} for {domain}: {e}") continue return results return await loop.run_in_executor(None, _resolve) RECORD_META = { 'A': ('🌐', 'A (IPv4)'), 'AAAA': ('🌐', 'AAAA (IPv6)'), 'MX': ('📧', 'MX (Mail)'), 'NS': ('🌐', 'NS (Nameserver)'), 'TXT': ('📄', 'TXT'), 'CNAME': ('🔀', 'CNAME'), 'SOA': ('📋', 'SOA'), 'PTR': ('↩️', 'PTR'), 'SRV': ('🔌', 'SRV'), } async def handle_command(room, message, bot, prefix, config): match = botlib.MessageMatch(room, message, bot, prefix) if match.is_not_from_this_bot() and match.prefix() and match.command("dns"): args = match.args() if len(args) != 1: await bot.api.send_text_message(room.room_id, "Usage: !dns \nExample: !dns example.com") return domain = args[0].lower().strip() domain = domain.replace('http://', '').replace('https://', '').rstrip('/') if not is_valid_domain(domain): await bot.api.send_text_message(room.room_id, f"Invalid domain name: {html_escape(domain)}") return if not is_public_destination(domain): await bot.api.send_text_message(room.room_id, "❌ DNS queries for private/internal domains are not allowed.") return await bot.api.send_text_message(room.room_id, f"🔍 Performing DNS reconnaissance on {html_escape(domain)}...") try: results = await query_dns_records(domain) if results is None: await bot.api.send_text_message(room.room_id, f"Domain {html_escape(domain)} does not exist (NXDOMAIN)") return if not results: await bot.api.send_text_message(room.room_id, f"No DNS records found for {html_escape(domain)}") return a_records = results.get('A', []) aaaa_records = results.get('AAAA', []) all_ips = a_records + aaaa_records if all_ips and not any(is_public_destination(ip) for ip in all_ips): await bot.api.send_text_message(room.room_id, "❌ This domain resolves exclusively to private/internal IPs.") return rows = [] preferred = ['A', 'AAAA', 'CNAME', 'MX', 'NS', 'TXT', 'SOA', 'SRV', 'PTR'] for rtype in preferred: if rtype in results: emoji, label = RECORD_META.get(rtype, ('❓', rtype)) for rec in results[rtype]: rows.append((emoji, label, rec)) emoji = "" label = "" for rtype in results: if rtype not in preferred: emoji, label = RECORD_META.get(rtype, ('❓', rtype)) for rec in results[rtype]: rows.append((emoji, label, rec)) emoji = "" label = "" if not rows: await bot.api.send_text_message(room.room_id, f"No displayable records for {html_escape(domain)}") return sections = [{"title": "", "rows": rows}] block = code_block(f"🔍 DNS Records for {domain}", sections) output = collapsible_summary(f"🔍 DNS: {html_escape(domain)}", block) await bot.api.send_markdown_message(room.room_id, output) logging.info(f"Sent DNS records for {domain}") except Exception as e: await bot.api.send_text_message(room.room_id, f"An error occurred while performing DNS lookup: {str(e)}") logging.error(f"Error in DNS plugin for {domain}: {e}", exc_info=True) # --------------------------------------------------------------------------- # Plugin Metadata # --------------------------------------------------------------------------- __version__ = "1.1.1" __author__ = "Funguy Bot" __description__ = "DNS reconnaissance (SSRF‑safe)" __help__ = """
!dns – DNS reconnaissance

!dns <domain> – Queries A, AAAA, MX, NS, TXT, CNAME, SOA, SRV, PTR records and displays them in a clean, aligned table.

"""