148 lines
6.2 KiB
Python
148 lines
6.2 KiB
Python
"""
|
||
DNS reconnaissance plugin – queries A, AAAA, MX, NS, TXT, CNAME, SOA, SRV, PTR records.
|
||
Outputs a formatted code block with emojis and perfectly aligned columns.
|
||
"""
|
||
|
||
import logging
|
||
import asyncio
|
||
import dns.resolver
|
||
import dns.reversename
|
||
import simplematrixbotlib as botlib
|
||
import re
|
||
from plugins.common import is_public_destination, html_escape, collapsible_summary, code_block
|
||
|
||
RECORD_TYPES = ['A', 'AAAA', 'MX', 'NS', 'TXT', 'CNAME', 'SOA', 'PTR', 'SRV']
|
||
|
||
def is_valid_domain(domain):
|
||
pattern = r'^(?:[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]{2,}$'
|
||
return re.match(pattern, domain) is not None
|
||
|
||
async def query_dns_records(domain):
|
||
loop = asyncio.get_running_loop()
|
||
def _resolve():
|
||
results = {}
|
||
resolver = dns.resolver.Resolver()
|
||
resolver.timeout = 5
|
||
resolver.lifetime = 5
|
||
for record_type in RECORD_TYPES:
|
||
try:
|
||
answers = resolver.resolve(domain, record_type)
|
||
records = []
|
||
for rdata in answers:
|
||
if record_type == 'MX':
|
||
records.append(f"{rdata.preference} {rdata.exchange}")
|
||
elif record_type == 'SOA':
|
||
records.append(f"{rdata.mname} {rdata.rname}")
|
||
elif record_type == 'SRV':
|
||
records.append(f"{rdata.priority} {rdata.weight} {rdata.port} {rdata.target}")
|
||
elif record_type == 'TXT':
|
||
txt_data = ' '.join([s.decode() if isinstance(s, bytes) else str(s) for s in rdata.strings])
|
||
records.append(txt_data)
|
||
else:
|
||
records.append(str(rdata))
|
||
if records:
|
||
results[record_type] = records
|
||
except dns.resolver.NoAnswer:
|
||
continue
|
||
except dns.resolver.NXDOMAIN:
|
||
return None
|
||
except dns.resolver.Timeout:
|
||
continue
|
||
except Exception as e:
|
||
logging.error(f"Error querying {record_type} for {domain}: {e}")
|
||
continue
|
||
return results
|
||
return await loop.run_in_executor(None, _resolve)
|
||
|
||
RECORD_META = {
|
||
'A': ('🌐', 'A (IPv4)'),
|
||
'AAAA': ('🌐', 'AAAA (IPv6)'),
|
||
'MX': ('📧', 'MX (Mail)'),
|
||
'NS': ('🌐', 'NS (Nameserver)'),
|
||
'TXT': ('📄', 'TXT'),
|
||
'CNAME': ('🔀', 'CNAME'),
|
||
'SOA': ('📋', 'SOA'),
|
||
'PTR': ('↩️', 'PTR'),
|
||
'SRV': ('🔌', 'SRV'),
|
||
}
|
||
|
||
async def handle_command(room, message, bot, prefix, config):
|
||
match = botlib.MessageMatch(room, message, bot, prefix)
|
||
if match.is_not_from_this_bot() and match.prefix() and match.command("dns"):
|
||
args = match.args()
|
||
if len(args) != 1:
|
||
await bot.api.send_text_message(room.room_id, "Usage: !dns <domain>\nExample: !dns example.com")
|
||
return
|
||
domain = args[0].lower().strip()
|
||
domain = domain.replace('http://', '').replace('https://', '').rstrip('/')
|
||
|
||
if not is_valid_domain(domain):
|
||
await bot.api.send_text_message(room.room_id, f"Invalid domain name: {html_escape(domain)}")
|
||
return
|
||
|
||
if not is_public_destination(domain):
|
||
await bot.api.send_text_message(room.room_id, "❌ DNS queries for private/internal domains are not allowed.")
|
||
return
|
||
|
||
await bot.api.send_text_message(room.room_id, f"🔍 Performing DNS reconnaissance on {html_escape(domain)}...")
|
||
|
||
try:
|
||
results = await query_dns_records(domain)
|
||
if results is None:
|
||
await bot.api.send_text_message(room.room_id, f"Domain {html_escape(domain)} does not exist (NXDOMAIN)")
|
||
return
|
||
if not results:
|
||
await bot.api.send_text_message(room.room_id, f"No DNS records found for {html_escape(domain)}")
|
||
return
|
||
|
||
a_records = results.get('A', [])
|
||
aaaa_records = results.get('AAAA', [])
|
||
all_ips = a_records + aaaa_records
|
||
if all_ips and not any(is_public_destination(ip) for ip in all_ips):
|
||
await bot.api.send_text_message(room.room_id, "❌ This domain resolves exclusively to private/internal IPs.")
|
||
return
|
||
|
||
rows = []
|
||
preferred = ['A', 'AAAA', 'CNAME', 'MX', 'NS', 'TXT', 'SOA', 'SRV', 'PTR']
|
||
for rtype in preferred:
|
||
if rtype in results:
|
||
emoji, label = RECORD_META.get(rtype, ('❓', rtype))
|
||
for rec in results[rtype]:
|
||
rows.append((emoji, label, rec))
|
||
emoji = ""
|
||
label = ""
|
||
for rtype in results:
|
||
if rtype not in preferred:
|
||
emoji, label = RECORD_META.get(rtype, ('❓', rtype))
|
||
for rec in results[rtype]:
|
||
rows.append((emoji, label, rec))
|
||
emoji = ""
|
||
label = ""
|
||
|
||
if not rows:
|
||
await bot.api.send_text_message(room.room_id, f"No displayable records for {html_escape(domain)}")
|
||
return
|
||
|
||
sections = [{"title": "", "rows": rows}]
|
||
block = code_block(f"🔍 DNS Records for {domain}", sections)
|
||
output = collapsible_summary(f"🔍 DNS: {html_escape(domain)}", block)
|
||
await bot.api.send_markdown_message(room.room_id, output)
|
||
logging.info(f"Sent DNS records for {domain}")
|
||
|
||
except Exception as e:
|
||
await bot.api.send_text_message(room.room_id, f"An error occurred while performing DNS lookup: {str(e)}")
|
||
logging.error(f"Error in DNS plugin for {domain}: {e}", exc_info=True)
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Plugin Metadata
|
||
# ---------------------------------------------------------------------------
|
||
__version__ = "1.1.1"
|
||
__author__ = "Funguy Bot"
|
||
__description__ = "DNS reconnaissance (SSRF‑safe)"
|
||
__help__ = """
|
||
<details>
|
||
<summary><strong>!dns</strong> – DNS reconnaissance</summary>
|
||
<p><code>!dns <domain></code> – Queries A, AAAA, MX, NS, TXT, CNAME, SOA, SRV, PTR records and displays them in a clean, aligned table.</p>
|
||
</details>
|
||
"""
|