Compare commits

..

7 Commits

Author SHA1 Message Date
ed62397661 Reverted to 5d746027e2 for funguy.py and loadplugin.py 2025-10-19 15:55:44 -05:00
d61036d5ac HashID and SSLScan plugins created 2025-10-16 18:31:34 -05:00
9da1009c0e Headers plugin added 2025-10-16 16:33:23 -05:00
4306d013eb ExploitDB plugin added 2025-10-16 15:35:30 -05:00
bb6f6c15f6 DNSDumpster plugin added 2025-10-16 14:30:30 -05:00
428d21d884 New plugins help updated 2025-10-16 12:06:33 -05:00
5ace1083f1 New plugins 2025-10-16 11:55:31 -05:00
15 changed files with 3897 additions and 74 deletions

218
README.md
View File

@@ -131,6 +131,224 @@ Fetches current weather information for any location using OpenWeatherMap API.
**📖 !ud [term] [index]**
Fetches definitions from Urban Dictionary. Use without arguments for random definition, or specify term and optional index.
**🔍 !dns [domain]**
Performs comprehensive DNS reconnaissance on a domain. Shows A, AAAA, MX, NS, TXT, CNAME, SOA, and other DNS records.
**💰 !btc**
Fetches the current Bitcoin price in USD from bitcointicker.co API.
### 🔍 Shodan Security Research
**📡 !shodan [command] [query]**
Shodan.io integration for security reconnaissance and threat intelligence.
**Commands:**
- `!shodan ip <ip_address>` - Detailed IP information (services, ports, banners)
- `!shodan search <query>` - Search Shodan database with filters
- `!shodan host <domain>` - Host information and subdomain enumeration
- `!shodan count <query>` - Count results with geographic/organization breakdown
- `!shodan test` - Test API connection and debug queries
**Search Examples:**
```bash
!shodan search apache
!shodan search "port:22 country:US"
!shodan search "product:nginx city:'New York'"
!shodan search "net:192.168.1.0/24"
!shodan search "vuln:cve-2021-44228"
!shodan search "http.title:'phpMyAdmin'"
!shodan search "ssl.cert.subject.cn:'example.com'"
Common Search Filters:
country:US - Filter by country
city:"New York" - Filter by city
port:80,443,8080 - Filter by ports
product:nginx - Filter by service/product
os:Windows - Filter by operating system
org:"Google" - Filter by organization
net:192.168.0.0/16 - Filter by network range
has_ssl:true - Has SSL certificate
http.title:"admin" - HTTP page title contains
```
### 🔍 DNSDumpster Reconnaissance
**🌐 !dnsdumpster [domain]**
Comprehensive DNS reconnaissance and attack surface mapping using DNSDumpster.com API.
**Commands:**
- `!dnsdumpster <domain>` - Complete DNS reconnaissance for any domain
- `!dnsdumpster test` - Test API connection and key validity
**Features:**
- **A Records**: All IPv4 addresses with geographic and ASN information
- **NS Records**: Complete name server information with IP locations
- **MX Records**: All mail servers with geographic data
- **CNAME Records**: Full alias chain mappings
- **TXT Records**: All text records including SPF, DKIM, verification records
- **Additional Records**: AAAA, SRV, SOA, PTR records when available
- **Web Services**: HTTP/HTTPS service detection with banner information
**Examples:**
```bash
!dnsdumpster google.com
!dnsdumpster github.com
!dnsdumpster example.com
!dnsdumpster test
Data Returned:
Total record counts for each type
IP addresses with country and ASN information
Web server banners and technologies
Complete subdomain and host mappings
Geographic distribution of services
Requires DNSDUMPSTER_KEY environment variable in .env file
```
## ExploitDB Plugin
A security plugin that searches Exploit-DB for vulnerabilities and exploits directly from Matrix.
### Features
- Searches the official Exploit-DB CSV database for security exploits
- Provides direct links to exploit details
- Fallback to web search when CSV lookup fails
- Configurable result limits (1-10)
- Formatted output with exploit metadata
### Commands
- `!exploitdb <search term> [max_results]` - Search Exploit-DB for vulnerabilities
### Examples
```
!exploitdb wordpress
!exploitdb apache 3
!exploitdb windows privilege escalation
!exploitdb android 10
```
### Usage Notes
- Maximum results limited to 10 for performance
- Results include: title, EDB-ID, type, platform, author, date, and direct URL
- Includes responsible disclosure reminder
- Automatically falls back to search links if CSV database is unavailable
### 🔒 HTTP Security Headers Analysis
**🛡️ !headers [url]**
Comprehensive HTTP security header analysis with security scoring and recommendations.
**Features:**
- **Security Scoring**: 0-100 rating based on headers configuration
- **Header Validation**: Checks presence and proper configuration of critical security headers
- **Redirect Analysis**: Follows HTTP to HTTPS redirect chain
- **SSL Certificate**: Basic SSL/TLS certificate information
- **Information Disclosure**: Identifies revealing server headers
- **Actionable Recommendations**: Specific guidance for security improvements
**Security Headers Analyzed:**
- `Strict-Transport-Security` (HSTS) - HTTP to HTTPS enforcement
- `Content-Security-Policy` (CSP) - XSS and content injection protection
- `X-Frame-Options` - Clickjacking protection
- `X-Content-Type-Options` - MIME type sniffing prevention
- `Referrer-Policy` - Referrer information control
- `Feature-Policy` / `Permissions-Policy` - Browser feature restrictions
- Information disclosure headers (`Server`, `X-Powered-By`)
**Security Ratings:**
- **🟢 Excellent (80-100)**: Strong security headers configuration
- **🟡 Good (60-79)**: Moderate security, room for improvement
- **🟠 Fair (40-59)**: Basic security, significant improvements needed
- **🔴 Poor (0-39)**: Weak security headers configuration
**Examples:**
```bash
!headers example.com
!headers https://github.com
!headers localhost:8080
!headers subdomain.target.com
```
### 🔐 Hash Identification
**🔄 !hashid [hash]**
Advanced hash type identification with confidence scoring and tool recommendations.
**Features:**
- **Comprehensive Detection**: 100+ hash types including modern, legacy, and exotic algorithms
- **Confidence Scoring**: Color-coded confidence levels (🟢 Very High to 🔴 Low)
- **Tool Integration**: Hashcat mode numbers and John the Ripper format names
- **Context-Aware**: Handles modular crypt formats, LDAP, database, and network hashes
**Supported Hash Categories:**
- **Modern Algorithms**: yescrypt, scrypt, Argon2 (i/d/id), bcrypt variants
- **Unix/Linux**: SHA-512/256 Crypt, MD5 Crypt, Apache MD5 (apr1)
- **Raw Hashes**: MD5, SHA-1/224/256/384/512, SHA-3, Keccak, BLAKE2
- **Windows**: NTLM, LM, NetNTLMv1/v2
- **Databases**: MySQL (4.1+, old), PostgreSQL, Oracle (11g, 12c), MSSQL
- **Web/CMS**: WordPress, phpBB3, Drupal 7+, Django PBKDF2
- **LDAP**: SSHA, SMD5, various LDAP crypt formats
- **Exotic**: Whirlpool, RIPEMD, GOST, Tiger, Haval
**Tool Integration:**
- **Hashcat**: Mode numbers for direct use with `-m` parameter
- **John the Ripper**: Format names for `--format=` parameter
- **Multi-tool Support**: Works with most popular password cracking tools
**Examples:**
```bash
!hashid 5d41402abc4b2a76b9719d911017c592
!hashid aaf4c61ddcc5e8a2dabede0f3b482cd9aea9434d
!hashid $6$rounds=5000$salt$hashvalue...
!hashid $y$j9T$... (modern Linux yescrypt)
!hashid 8846f7eaee8fb117ad06bdd830b7586c
!hashid *2470C0C06DEE42FD1618BB99005ADCA2EC9D1E19
```
### 🔐 SSL/TLS Security Scanner
**🔐 !sslscan [domain[:port]]**
Comprehensive SSL/TLS security scanning and analysis with vulnerability detection.
**Features:**
- **Protocol Analysis**: TLS 1.0-1.3 support testing with security scoring
- **Certificate Validation**: Chain validation, expiration, signature algorithms
- **Cipher Suite Testing**: 25+ cipher suites with strength classification
- **Vulnerability Detection**: POODLE, weak ciphers, protocol vulnerabilities
- **Security Scoring**: 0-100 rating with color-coded assessment
- **Compliance Checking**: PCI DSS and modern security standards
**Security Checks:**
- **Protocol Security**: TLS 1.2/1.3 enforcement, insecure protocol detection
- **Certificate Health**: Expiration monitoring, signature strength validation
- **Cipher Security**: RC4, DES, 3DES, NULL cipher detection and classification
- **Modern Standards**: Forward Secrecy, strong encryption, best practices
**Output Features:**
- **Security Score**: Overall rating (🟢 Excellent to 🔴 Poor)
- **Detailed Breakdown**: Protocol support, cipher analysis, certificate info
- **Vulnerability List**: CVE references and severity ratings
- **Actionable Recommendations**: Specific fixes and configuration improvements
- **Quick Assessment**: Executive summary for rapid evaluation
**Examples:**
```bash
!sslscan example.com
!sslscan github.com:443
!sslscan localhost:8443
!sslscan 192.168.1.1:443
```
🟢 Excellent (90-100): Modern TLS configuration with strong security
🟡 Good (80-89): Good security with minor improvements needed
🟠 Fair (60-79): Moderate security, significant improvements recommended
🔴 Poor (0-59): Critical security issues requiring immediate attention
*Note: SSLv2/SSLv3 testing limited by Python security features (intentional security measure)*
### AI & Generation Commands
**🤖 AI Commands (!tech, !music, !eth, etc.)**

View File

@@ -17,11 +17,6 @@ import toml # Library for parsing TOML configuration files
# Importing FunguyConfig class from plugins.config module
from plugins.config import FunguyConfig
# Whitelist of allowed plugins to prevent arbitrary code execution
ALLOWED_PLUGINS = {'ai', 'config', 'cron', 'date', 'fortune', 'help', 'isup', 'karma',
'loadplugin', 'plugins', 'proxy', 'sd_text', 'stable-diffusion',
'xkcd', 'youtube-preview', 'youtube-search', 'weather', 'urbandictionary'}
class FunguyBot:
"""
A bot class for managing plugins and handling commands in a Matrix chat environment.
@@ -83,22 +78,17 @@ class FunguyBot:
"""
Method to load plugins from the specified directory.
"""
# Iterating through whitelisted plugins only
for plugin_name in ALLOWED_PLUGINS:
plugin_file = os.path.join(self.PLUGINS_DIR, f"{plugin_name}.py")
# Verify that the plugin file exists
if not os.path.isfile(plugin_file):
logging.warning(f"Plugin file not found: {plugin_file}, skipping")
continue
try:
# Importing plugin module dynamically with validated plugin name
module = importlib.import_module(f"{self.PLUGINS_DIR}.{plugin_name}")
self.PLUGINS[plugin_name] = module # Storing loaded plugin module
logging.info(f"Loaded plugin: {plugin_name}") # Logging successful plugin loading
except Exception as e:
logging.error(f"Error loading plugin {plugin_name}: {e}") # Logging error if plugin loading fails
# Iterating through files in the plugins directory
for plugin_file in os.listdir(self.PLUGINS_DIR):
if plugin_file.endswith(".py"): # Checking if file is a Python file
plugin_name = os.path.splitext(plugin_file)[0] # Extracting plugin name
try:
# Importing plugin module dynamically
module = importlib.import_module(f"{self.PLUGINS_DIR}.{plugin_name}")
self.PLUGINS[plugin_name] = module # Storing loaded plugin module
logging.info(f"Loaded plugin: {plugin_name}") # Logging successful plugin loading
except Exception as e:
logging.error(f"Error loading plugin {plugin_name}: {e}") # Logging error if plugin loading fails
def reload_plugins(self):
"""
@@ -243,10 +233,3 @@ class FunguyBot:
if __name__ == "__main__":
bot = FunguyBot() # Creating instance of FunguyBot
bot.run() # Running the bot
from plugins import cron # Import your cron plugin
# After bot starts running, periodically check for cron jobs
while True:
asyncio.sleep(60) # Check every minute (adjust as needed)
cron.run_cron_jobs(bot) # Check and execute cron jobs

109
plugins/bitcoin.py Normal file
View File

@@ -0,0 +1,109 @@
"""
This plugin provides a command to fetch the current Bitcoin price.
"""
import logging
import requests
import simplematrixbotlib as botlib
BITCOIN_API_URL = "https://api.bitcointicker.co/trades/bitstamp/btcusd/60/"
async def handle_command(room, message, bot, prefix, config):
"""
Function to handle the !btc command.
Args:
room (Room): The Matrix room where the command was invoked.
message (RoomMessage): The message object containing the command.
bot (Bot): The bot object.
prefix (str): The command prefix.
config (dict): Configuration parameters.
Returns:
None
"""
match = botlib.MessageMatch(room, message, bot, prefix)
if match.is_not_from_this_bot() and match.prefix() and match.command("btc"):
logging.info("Received !btc command")
try:
# Fetch Bitcoin price data
headers = {
'Accept-Encoding': 'gzip, deflate',
'User-Agent': 'FunguyBot/1.0'
}
logging.info(f"Fetching Bitcoin price from {BITCOIN_API_URL}")
response = requests.get(BITCOIN_API_URL, headers=headers, timeout=10)
response.raise_for_status()
data = response.json()
if not data or len(data) == 0:
await bot.api.send_text_message(
room.room_id,
"No Bitcoin price data available."
)
logging.warning("No Bitcoin price data returned from API")
return
# Get the most recent trade (last item in the array)
latest_trade = data[-1]
price = latest_trade.get('price')
if price is None:
await bot.api.send_text_message(
room.room_id,
"Could not extract Bitcoin price from API response."
)
logging.error("Price field not found in API response")
return
# Convert to float and format with commas
try:
price_float = float(price)
price_formatted = f"${price_float:,.2f}"
except (ValueError, TypeError):
price_formatted = f"${price}"
# Optional: Get additional info if available
timestamp = latest_trade.get('timestamp', '')
volume = latest_trade.get('volume', '')
# Build the message
message_text = f"<strong>₿ BTC/USD</strong>"
message_text += f"<strong> Current Price:</strong> {price_formatted}"
message_text += ", <em>bitcointicker.co</em>"
await bot.api.send_markdown_message(room.room_id, message_text)
logging.info(f"Sent Bitcoin price: {price_formatted}")
except requests.exceptions.Timeout:
await bot.api.send_text_message(
room.room_id,
"Request timed out. Bitcoin price API may be slow or unavailable."
)
logging.error("Bitcoin API timeout")
except requests.exceptions.RequestException as e:
await bot.api.send_text_message(
room.room_id,
f"Error fetching Bitcoin price: {e}"
)
logging.error(f"Error fetching Bitcoin price: {e}")
except (KeyError, IndexError, ValueError) as e:
await bot.api.send_text_message(
room.room_id,
"Error parsing Bitcoin price data."
)
logging.error(f"Error parsing Bitcoin API response: {e}", exc_info=True)
except Exception as e:
await bot.api.send_text_message(
room.room_id,
"An unexpected error occurred while fetching Bitcoin price."
)
logging.error(f"Unexpected error in Bitcoin plugin: {e}", exc_info=True)

482
plugins/ddg.py Normal file
View File

@@ -0,0 +1,482 @@
"""
This plugin provides DuckDuckGo search functionality using the DuckDuckGo Instant Answer API.
"""
import logging
import requests
import json
import simplematrixbotlib as botlib
from urllib.parse import quote, urlencode
import html
DDG_API_URL = "https://api.duckduckgo.com/"
DDG_SEARCH_URL = "https://html.duckduckgo.com/html/"
async def handle_command(room, message, bot, prefix, config):
"""
Function to handle DuckDuckGo search commands.
Args:
room (Room): The Matrix room where the command was invoked.
message (RoomMessage): The message object containing the command.
bot (Bot): The bot object.
prefix (str): The command prefix.
config (dict): Configuration parameters.
Returns:
None
"""
match = botlib.MessageMatch(room, message, bot, prefix)
if match.is_not_from_this_bot() and match.prefix() and match.command("ddg"):
logging.info("Received !ddg command")
args = match.args()
if len(args) < 1:
await show_usage(room, bot)
return
subcommand = args[0].lower()
if subcommand == "search":
if len(args) < 2:
await bot.api.send_text_message(room.room_id, "Usage: !ddg search <query>")
return
query = ' '.join(args[1:])
await ddg_search(room, bot, query)
elif subcommand == "instant":
if len(args) < 2:
await bot.api.send_text_message(room.room_id, "Usage: !ddg instant <query>")
return
query = ' '.join(args[1:])
await ddg_instant_answer(room, bot, query)
elif subcommand == "image":
if len(args) < 2:
await bot.api.send_text_message(room.room_id, "Usage: !ddg image <query>")
return
query = ' '.join(args[1:])
await ddg_image_search(room, bot, query)
elif subcommand == "news":
if len(args) < 2:
await bot.api.send_text_message(room.room_id, "Usage: !ddg news <query>")
return
query = ' '.join(args[1:])
await ddg_news_search(room, bot, query)
elif subcommand == "video":
if len(args) < 2:
await bot.api.send_text_message(room.room_id, "Usage: !ddg video <query>")
return
query = ' '.join(args[1:])
await ddg_video_search(room, bot, query)
elif subcommand == "bang":
if len(args) < 2:
await show_bang_help(room, bot)
return
bang_query = ' '.join(args[1:])
await ddg_bang_search(room, bot, bang_query)
elif subcommand == "define":
if len(args) < 2:
await bot.api.send_text_message(room.room_id, "Usage: !ddg define <word>")
return
word = ' '.join(args[1:])
await ddg_definition(room, bot, word)
elif subcommand == "calc":
if len(args) < 2:
await bot.api.send_text_message(room.room_id, "Usage: !ddg calc <expression>")
return
expression = ' '.join(args[1:])
await ddg_calculator(room, bot, expression)
elif subcommand == "weather":
location = ' '.join(args[1:]) if len(args) > 1 else ""
await ddg_weather(room, bot, location)
elif subcommand == "help":
await show_usage(room, bot)
else:
# Default to instant answer search
query = ' '.join(args)
await ddg_instant_answer(room, bot, query)
async def show_usage(room, bot):
"""Display DuckDuckGo command usage."""
usage = """
<strong>🦆 DuckDuckGo Search Commands</strong>
<strong>!ddg &lt;query&gt;</strong> - Instant answer search (default)
<strong>!ddg search &lt;query&gt;</strong> - Web search with results
<strong>!ddg instant &lt;query&gt;</strong> - Instant answer with detailed info
<strong>!ddg image &lt;query&gt;</strong> - Image search
<strong>!ddg news &lt;query&gt;</strong> - News search
<strong>!ddg video &lt;query&gt;</strong> - Video search
<strong>!ddg bang &lt;!bang query&gt;</strong> - Use DuckDuckGo bangs
<strong>!ddg define &lt;word&gt;</strong> - Word definitions
<strong>!ddg calc &lt;expression&gt;</strong> - Calculator
<strong>!ddg weather [location]</strong> - Weather information
<strong>!ddg help</strong> - Show this help
<strong>Examples:</strong>
• <code>!ddg python programming</code>
• <code>!ddg search matrix protocol</code>
• <code>!ddg image cute cats</code>
• <code>!ddg bang !w matrix</code>
• <code>!ddg define serendipity</code>
• <code>!ddg calc 2+2*5</code>
• <code>!ddg weather London</code>
<strong>Popular Bangs:</strong>
• <code>!w</code> - Wikipedia
• <code>!g</code> - Google
• <code>!yt</code> - YouTube
• <code>!aw</code> - ArchWiki
• <code>!gh</code> - GitHub
• <code>!so</code> - Stack Overflow
"""
await bot.api.send_markdown_message(room.room_id, usage)
async def show_bang_help(room, bot):
"""Display DuckDuckGo bang help."""
bang_help = """
<strong>🦆 DuckDuckGo Bangs</strong>
<strong>Usage:</strong> <code>!ddg bang &lt;!bang query&gt;</code>
<strong>Popular Bangs:</strong>
• <code>!ddg bang !w matrix</code> - Search Wikipedia
• <code>!ddg bang !g python</code> - Search Google
• <code>!ddg bang !yt music</code> - Search YouTube
• <code>!ddg bang !aw arch</code> - Search ArchWiki
• <code>!ddg bang !gh repository</code> - Search GitHub
• <code>!ddg bang !so error</code> - Search Stack Overflow
• <code>!ddg bang !amazon book</code> - Search Amazon
• <code>!ddg bang !imdb movie</code> - Search IMDb
• <code>!ddg bang !reddit topic</code> - Search Reddit
• <code>!ddg bang !tw tweet</code> - Search Twitter
<strong>More Bangs:</strong>
• <code>!ddg</code> - DuckDuckGo
• <code>!bing</code> - Bing
• <code>!ddgimages</code> - DuckDuckGo Images
• <code>!npm</code> - npm packages
• <code>!cpp</code> - C++ reference
• <code>!python</code> - Python docs
• <code>!rust</code> - Rust docs
• <code>!mdn</code> - MDN Web Docs
<em>Thousands of bangs available! See: https://duckduckgo.com/bangs</em>
"""
await bot.api.send_markdown_message(room.room_id, bang_help)
async def ddg_instant_answer(room, bot, query):
"""Get DuckDuckGo instant answer."""
try:
params = {
'q': query,
'format': 'json',
'no_html': '1',
'skip_disambig': '1',
'no_redirect': '1'
}
logging.info(f"Fetching DuckDuckGo instant answer for: {query}")
response = requests.get(DDG_API_URL, params=params, timeout=10)
if response.status_code != 200:
# If API fails, provide direct search link
search_url = f"https://duckduckgo.com/?q={quote(query)}"
await bot.api.send_markdown_message(
room.room_id,
f"<strong>🦆 DuckDuckGo: {html.escape(query)}</strong><br><br>"
f"API temporarily unavailable. <a href='{search_url}'>Search on DuckDuckGo</a>"
)
return
data = response.json()
output = f"<strong>🦆 DuckDuckGo: {html.escape(query)}</strong><br><br>"
# Handle different answer types
if data.get('AbstractText'):
# Wikipedia-style answer
output += f"<strong>📚 {data.get('Heading', 'Definition')}</strong><br>"
output += f"{html.escape(data['AbstractText'])}<br>"
if data.get('AbstractURL'):
output += f"<a href='{data['AbstractURL']}'>Read more on {data.get('AbstractSource', 'Wikipedia')}</a><br>"
elif data.get('Answer'):
# Direct answer
output += f"<strong>💡 Answer</strong><br>"
output += f"{html.escape(data['Answer'])}<br>"
elif data.get('Definition'):
# Definition
output += f"<strong>📖 Definition</strong><br>"
output += f"{html.escape(data['Definition'])}<br>"
if data.get('DefinitionSource'):
output += f"<em>Source: {data['DefinitionSource']}</em><br>"
elif data.get('Results'):
# List of results
output += f"<strong>🔍 Results</strong><br>"
for result in data['Results'][:3]:
output += f"• <a href='{result.get('FirstURL', '#')}'>{html.escape(result.get('Text', 'Result'))}</a><br>"
elif data.get('RelatedTopics'):
# Related topics
output += f"<strong>🔗 Related Topics</strong><br>"
for topic in data['RelatedTopics'][:3]:
if isinstance(topic, dict) and topic.get('FirstURL'):
output += f"• <a href='{topic['FirstURL']}'>{html.escape(topic.get('Text', 'Topic'))}</a><br>"
elif isinstance(topic, dict) and topic.get('Name'):
output += f"{html.escape(topic['Name'])}<br>"
else:
# No instant answer found, show search results
output += "<strong>🔍 No instant answer found.</strong><br>"
# Add search link
search_url = f"https://duckduckgo.com/?q={quote(query)}"
output += f"<br><a href='{search_url}'>View all results on DuckDuckGo</a>"
await bot.api.send_markdown_message(room.room_id, output)
except Exception as e:
# Fallback to direct search link
search_url = f"https://duckduckgo.com/?q={quote(query)}"
await bot.api.send_markdown_message(
room.room_id,
f"<strong>🦆 DuckDuckGo: {html.escape(query)}</strong><br><br>"
f"Error accessing API. <a href='{search_url}'>Search on DuckDuckGo</a>"
)
logging.error(f"Error in ddg_instant_answer: {e}")
async def ddg_search(room, bot, query):
"""Perform web search with multiple results."""
try:
await ddg_web_search(room, bot, query, limit=5)
except Exception as e:
await bot.api.send_text_message(room.room_id, f"Error performing search: {str(e)}")
async def ddg_web_search(room, bot, query, limit=5):
"""Perform web search and return results."""
try:
params = {
'q': query,
'format': 'json'
}
response = requests.get(DDG_API_URL, params=params, timeout=10)
if response.status_code != 200:
# Fallback to direct search
search_url = f"https://duckduckgo.com/?q={quote(query)}"
await bot.api.send_markdown_message(
room.room_id,
f"<strong>🔍 DuckDuckGo Search: {html.escape(query)}</strong><br><br>"
f"API temporarily unavailable. <a href='{search_url}'>Search on DuckDuckGo</a>"
)
return
data = response.json()
output = f"<strong>🔍 DuckDuckGo Search: {html.escape(query)}</strong><br><br>"
results_shown = 0
# Show instant answer if available
if data.get('AbstractText') and results_shown < limit:
output += f"<strong>💡 {data.get('Heading', 'Instant Answer')}</strong><br>"
abstract = data['AbstractText'][:200] + "..." if len(data['AbstractText']) > 200 else data['AbstractText']
output += f"{html.escape(abstract)}<br>"
if data.get('AbstractURL'):
output += f"<a href='{data['AbstractURL']}'>Read more</a><br>"
output += "<br>"
results_shown += 1
# Show web results
if data.get('Results') and results_shown < limit:
output += "<strong>🌐 Web Results</strong><br>"
for result in data['Results'][:limit - results_shown]:
output += f"• <a href='{result.get('FirstURL', '#')}'>{html.escape(result.get('Text', 'Result'))}</a><br>"
results_shown += 1
# Show related topics
if data.get('RelatedTopics') and results_shown < limit:
output += "<strong>🔗 Related Topics</strong><br>"
for topic in data['RelatedTopics'][:limit - results_shown]:
if isinstance(topic, dict) and topic.get('FirstURL'):
output += f"• <a href='{topic['FirstURL']}'>{html.escape(topic.get('Text', 'Topic'))}</a><br>"
results_shown += 1
# Add search link
search_url = f"https://duckduckgo.com/?q={quote(query)}"
output += f"<br><a href='{search_url}'>View all results on DuckDuckGo</a>"
await bot.api.send_markdown_message(room.room_id, output)
except Exception as e:
# Fallback to direct search
search_url = f"https://duckduckgo.com/?q={quote(query)}"
await bot.api.send_markdown_message(
room.room_id,
f"<strong>🔍 DuckDuckGo Search: {html.escape(query)}</strong><br><br>"
f"Error accessing API. <a href='{search_url}'>Search on DuckDuckGo</a>"
)
logging.error(f"Error in ddg_web_search: {e}")
async def ddg_image_search(room, bot, query):
"""Perform image search."""
try:
params = {
'q': query,
'format': 'json',
'iax': 'images',
'ia': 'images'
}
response = requests.get(DDG_API_URL, params=params, timeout=10)
if response.status_code != 200:
search_url = f"https://duckduckgo.com/?q={quote(query)}&iax=images&ia=images"
await bot.api.send_markdown_message(
room.room_id,
f"<strong>🖼️ DuckDuckGo Images: {html.escape(query)}</strong><br><br>"
f"API temporarily unavailable. <a href='{search_url}'>Search images on DuckDuckGo</a>"
)
return
data = response.json()
output = f"<strong>🖼️ DuckDuckGo Images: {html.escape(query)}</strong><br><br>"
if data.get('Results'):
output += "<strong>📸 Image Results</strong><br>"
for image in data['Results'][:3]:
output += f"• <a href='{image.get('Image', '#')}'>{html.escape(image.get('Title', 'Image'))}</a><br>"
if image.get('Width') and image.get('Height'):
output += f" Size: {image['Width']}×{image['Height']}<br>"
else:
output += "No image results found.<br>"
# Add search link
search_url = f"https://duckduckgo.com/?q={quote(query)}&iax=images&ia=images"
output += f"<br><a href='{search_url}'>View all images on DuckDuckGo</a>"
await bot.api.send_markdown_message(room.room_id, output)
except Exception as e:
search_url = f"https://duckduckgo.com/?q={quote(query)}&iax=images&ia=images"
await bot.api.send_markdown_message(
room.room_id,
f"<strong>🖼️ DuckDuckGo Images: {html.escape(query)}</strong><br><br>"
f"Error accessing API. <a href='{search_url}'>Search images on DuckDuckGo</a>"
)
async def ddg_news_search(room, bot, query):
"""Perform news search."""
try:
search_url = f"https://duckduckgo.com/?q={quote(query)}&iar=news"
await bot.api.send_markdown_message(
room.room_id,
f"<strong>📰 DuckDuckGo News: {html.escape(query)}</strong><br><br>"
f"<a href='{search_url}'>View news on DuckDuckGo</a>"
)
except Exception as e:
await bot.api.send_text_message(room.room_id, f"Error performing news search: {str(e)}")
async def ddg_video_search(room, bot, query):
"""Perform video search."""
try:
search_url = f"https://duckduckgo.com/?q={quote(query)}&iar=videos"
await bot.api.send_markdown_message(
room.room_id,
f"<strong>🎬 DuckDuckGo Videos: {html.escape(query)}</strong><br><br>"
f"<a href='{search_url}'>View videos on DuckDuckGo</a>"
)
except Exception as e:
await bot.api.send_text_message(room.room_id, f"Error performing video search: {str(e)}")
async def ddg_bang_search(room, bot, bang_query):
"""Perform search using DuckDuckGo bangs."""
try:
# Create search URL directly - this is more reliable than API for bangs
search_url = f"https://duckduckgo.com/?q={quote(bang_query)}"
# Common bangs with descriptions
bang_descriptions = {
'!w': 'Wikipedia',
'!g': 'Google',
'!yt': 'YouTube',
'!aw': 'ArchWiki',
'!gh': 'GitHub',
'!so': 'Stack Overflow',
'!amazon': 'Amazon',
'!imdb': 'IMDb',
'!reddit': 'Reddit',
'!tw': 'Twitter'
}
# Extract bang for description
bang = bang_query.split(' ')[0] if ' ' in bang_query else bang_query
description = bang_descriptions.get(bang, 'Site-specific search')
output = f"<strong>🎯 DuckDuckGo Bang: {html.escape(bang)}</strong><br>"
output += f"<strong>Description:</strong> {description}<br>"
if ' ' in bang_query:
output += f"<strong>Query:</strong> {html.escape(bang_query.split(' ', 1)[1])}<br><br>"
output += f"<a href='{search_url}'>Search with {html.escape(bang)} on DuckDuckGo</a>"
await bot.api.send_markdown_message(room.room_id, output)
except Exception as e:
await bot.api.send_text_message(room.room_id, f"Error with bang search: {str(e)}")
async def ddg_definition(room, bot, word):
"""Get word definition."""
try:
search_url = f"https://duckduckgo.com/?q=define+{quote(word)}"
await bot.api.send_markdown_message(
room.room_id,
f"<strong>📖 Definition: {html.escape(word)}</strong><br><br>"
f"<a href='{search_url}'>Get definition on DuckDuckGo</a>"
)
except Exception as e:
await bot.api.send_text_message(room.room_id, f"Error getting definition: {str(e)}")
async def ddg_calculator(room, bot, expression):
"""Use DuckDuckGo as a calculator."""
try:
search_url = f"https://duckduckgo.com/?q={quote(expression)}"
await bot.api.send_markdown_message(
room.room_id,
f"<strong>🧮 Calculator: {html.escape(expression)}</strong><br><br>"
f"<a href='{search_url}'>Calculate on DuckDuckGo</a>"
)
except Exception as e:
await bot.api.send_text_message(room.room_id, f"Error with calculator: {str(e)}")
async def ddg_weather(room, bot, location):
"""Get weather information."""
try:
if not location:
location = "current location"
search_url = f"https://duckduckgo.com/?q=weather+{quote(location)}"
await bot.api.send_markdown_message(
room.room_id,
f"<strong>🌤️ Weather: {html.escape(location)}</strong><br><br>"
f"<a href='{search_url}'>Get weather on DuckDuckGo</a>"
)
except Exception as e:
await bot.api.send_text_message(room.room_id, f"Error getting weather: {str(e)}")

209
plugins/dns.py Normal file
View File

@@ -0,0 +1,209 @@
"""
This plugin provides a command to perform DNS reconnaissance on a domain.
"""
import logging
import dns.resolver
import dns.reversename
import simplematrixbotlib as botlib
import re
# Common DNS record types to query
RECORD_TYPES = ['A', 'AAAA', 'MX', 'NS', 'TXT', 'CNAME', 'SOA', 'PTR', 'SRV']
def is_valid_domain(domain):
"""
Validate if the provided string is a valid domain name.
Args:
domain (str): The domain to validate.
Returns:
bool: True if valid, False otherwise.
"""
# Basic domain validation regex
pattern = r'^(?:[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]{2,}$'
return re.match(pattern, domain) is not None
def format_dns_record(record_type, records):
"""
Format DNS records for display.
Args:
record_type (str): The type of DNS record.
records (list): List of DNS record values.
Returns:
str: Formatted HTML string.
"""
if not records:
return ""
output = f"<strong>{record_type} Records:</strong><br>"
for record in records:
output += f"{record}<br>"
return output
async def query_dns_records(domain):
"""
Query all common DNS record types for a domain.
Args:
domain (str): The domain to query.
Returns:
dict: Dictionary with record types as keys and lists of records as values.
"""
results = {}
resolver = dns.resolver.Resolver()
resolver.timeout = 5
resolver.lifetime = 5
for record_type in RECORD_TYPES:
try:
logging.info(f"Querying {record_type} records for {domain}")
answers = resolver.resolve(domain, record_type)
records = []
for rdata in answers:
if record_type == 'MX':
# MX records have preference and exchange
records.append(f"{rdata.preference} {rdata.exchange}")
elif record_type == 'SOA':
# SOA records have multiple fields
records.append(f"{rdata.mname} {rdata.rname}")
elif record_type == 'SRV':
# SRV records have priority, weight, port, and target
records.append(f"{rdata.priority} {rdata.weight} {rdata.port} {rdata.target}")
elif record_type == 'TXT':
# TXT records can have multiple strings
txt_data = ' '.join([s.decode() if isinstance(s, bytes) else str(s) for s in rdata.strings])
records.append(txt_data)
else:
records.append(str(rdata))
if records:
results[record_type] = records
logging.info(f"Found {len(records)} {record_type} record(s)")
except dns.resolver.NoAnswer:
logging.debug(f"No {record_type} records found for {domain}")
continue
except dns.resolver.NXDOMAIN:
logging.warning(f"Domain {domain} does not exist")
return None
except dns.resolver.Timeout:
logging.warning(f"Timeout querying {record_type} for {domain}")
continue
except Exception as e:
logging.error(f"Error querying {record_type} for {domain}: {e}")
continue
return results
async def handle_command(room, message, bot, prefix, config):
"""
Function to handle the !dns command.
Args:
room (Room): The Matrix room where the command was invoked.
message (RoomMessage): The message object containing the command.
bot (Bot): The bot object.
prefix (str): The command prefix.
config (dict): Configuration parameters.
Returns:
None
"""
match = botlib.MessageMatch(room, message, bot, prefix)
if match.is_not_from_this_bot() and match.prefix() and match.command("dns"):
logging.info("Received !dns command")
args = match.args()
if len(args) != 1:
await bot.api.send_text_message(
room.room_id,
"Usage: !dns <domain>\nExample: !dns example.com"
)
logging.info("Sent usage message for !dns")
return
domain = args[0].lower().strip()
# Remove protocol if present
domain = domain.replace('http://', '').replace('https://', '')
# Remove trailing slash if present
domain = domain.rstrip('/')
# Remove www. prefix if present (optional - you can keep it if you want)
# domain = domain.replace('www.', '')
# Validate domain
if not is_valid_domain(domain):
await bot.api.send_text_message(
room.room_id,
f"Invalid domain name: {domain}"
)
logging.warning(f"Invalid domain provided: {domain}")
return
try:
logging.info(f"Starting DNS reconnaissance for {domain}")
# Send "working on it" message for longer queries
await bot.api.send_text_message(
room.room_id,
f"🔍 Performing DNS reconnaissance on {domain}..."
)
# Query DNS records
results = await query_dns_records(domain)
if results is None:
await bot.api.send_text_message(
room.room_id,
f"Domain {domain} does not exist (NXDOMAIN)"
)
return
if not results:
await bot.api.send_text_message(
room.room_id,
f"No DNS records found for {domain}"
)
return
# Format the output
output = f"<strong>🔍 DNS Records for {domain}</strong><br><br>"
# Order the records in a logical way
preferred_order = ['A', 'AAAA', 'CNAME', 'MX', 'NS', 'TXT', 'SOA', 'SRV', 'PTR']
for record_type in preferred_order:
if record_type in results:
output += format_dns_record(record_type, results[record_type])
output += "<br>"
# Add any remaining record types not in preferred order
for record_type in results:
if record_type not in preferred_order:
output += format_dns_record(record_type, results[record_type])
output += "<br>"
# Wrap in collapsible details if output is large
if output.count('<br>') > 15:
output = f"<details><summary><strong>🔍 DNS Records for {domain}</strong></summary>{output}</details>"
await bot.api.send_markdown_message(room.room_id, output)
logging.info(f"Sent DNS records for {domain}")
except Exception as e:
await bot.api.send_text_message(
room.room_id,
f"An error occurred while performing DNS lookup: {str(e)}"
)
logging.error(f"Error in DNS plugin for {domain}: {e}", exc_info=True)

270
plugins/dnsdumpster.py Normal file
View File

@@ -0,0 +1,270 @@
"""
This plugin provides DNSDumpster.com integration for domain reconnaissance and DNS mapping.
"""
import logging
import os
import requests
import simplematrixbotlib as botlib
from dotenv import load_dotenv
# Load environment variables from .env file
plugin_dir = os.path.dirname(os.path.abspath(__file__))
parent_dir = os.path.dirname(plugin_dir)
dotenv_path = os.path.join(parent_dir, '.env')
load_dotenv(dotenv_path)
DNSDUMPSTER_API_KEY = os.getenv("DNSDUMPSTER_KEY", "")
DNSDUMPSTER_API_BASE = "https://api.dnsdumpster.com"
async def handle_command(room, message, bot, prefix, config):
"""
Function to handle DNSDumpster commands.
Args:
room (Room): The Matrix room where the command was invoked.
message (RoomMessage): The message object containing the command.
bot (Bot): The bot object.
prefix (str): The command prefix.
config (dict): Configuration parameters.
Returns:
None
"""
match = botlib.MessageMatch(room, message, bot, prefix)
if match.is_not_from_this_bot() and match.prefix() and match.command("dnsdumpster"):
logging.info("Received !dnsdumpster command")
# Check if API key is configured
if not DNSDUMPSTER_API_KEY:
await bot.api.send_text_message(
room.room_id,
"DNSDumpster API key not configured. Please set DNSDUMPSTER_KEY environment variable."
)
logging.error("DNSDumpster API key not configured")
return
args = match.args()
if len(args) < 1:
await show_usage(room, bot)
return
# Check if it's a test command or domain lookup
if args[0].lower() == "test":
await test_dnsdumpster_connection(room, bot)
else:
# Treat the first argument as the domain
domain = args[0].lower().strip()
await dnsdumpster_domain_lookup(room, bot, domain)
async def show_usage(room, bot):
"""Display DNSDumpster command usage."""
usage = """
<strong>🔍 DNSDumpster Commands:</strong>
<strong>!dnsdumpster &lt;domain_name&gt;</strong> - Get comprehensive DNS reconnaissance for a domain
<strong>!dnsdumpster test</strong> - Test API connection
<strong>Examples:</strong>
• <code>!dnsdumpster google.com</code>
• <code>!dnsdumpster github.com</code>
• <code>!dnsdumpster example.com</code>
<strong>Rate Limit:</strong> 1 request per 2 seconds
"""
await bot.api.send_markdown_message(room.room_id, usage)
async def test_dnsdumpster_connection(room, bot):
"""Test DNSDumpster API connection."""
try:
test_domain = "google.com" # Changed from example.com to google.com
url = f"{DNSDUMPSTER_API_BASE}/domain/{test_domain}"
headers = {
"X-API-Key": DNSDUMPSTER_API_KEY
}
logging.info(f"Testing DNSDumpster API with domain: {test_domain}")
response = requests.get(url, headers=headers, timeout=15)
debug_info = f"<strong>🔧 DNSDumpster API Test</strong><br>"
debug_info += f"<strong>Status Code:</strong> {response.status_code}<br>"
debug_info += f"<strong>Test Domain:</strong> {test_domain}<br>"
debug_info += f"<strong>Headers Used:</strong> X-API-Key<br>"
if response.status_code == 200:
data = response.json()
debug_info += "<strong>✅ SUCCESS - API is working!</strong><br>"
debug_info += f"<strong>Response Keys:</strong> {list(data.keys())}<br>"
# Show some sample data
if data.get('a'):
debug_info += f"<strong>A Records Found:</strong> {len(data['a'])}<br>"
if data.get('ns'):
debug_info += f"<strong>NS Records Found:</strong> {len(data['ns'])}<br>"
if data.get('total_a_recs'):
debug_info += f"<strong>Total A Records:</strong> {data['total_a_recs']}<br>"
elif response.status_code == 400:
debug_info += "<strong>❌ Bad Request - Check domain format</strong><br>"
debug_info += f"<strong>Response:</strong> {response.text[:200]}<br>"
elif response.status_code == 401:
debug_info += "<strong>❌ Unauthorized - Invalid API key</strong><br>"
elif response.status_code == 429:
debug_info += "<strong>⚠️ Rate Limit Exceeded - Wait 2 seconds</strong><br>"
else:
debug_info += f"<strong>❌ Error:</strong> {response.status_code} - {response.text[:200]}<br>"
await bot.api.send_markdown_message(room.room_id, debug_info)
except Exception as e:
await bot.api.send_text_message(room.room_id, f"Test failed: {str(e)}")
async def dnsdumpster_domain_lookup(room, bot, domain):
"""Get comprehensive DNS reconnaissance for a domain."""
try:
url = f"{DNSDUMPSTER_API_BASE}/domain/{domain}"
headers = {
"X-API-Key": DNSDUMPSTER_API_KEY
}
logging.info(f"Fetching DNSDumpster data for domain: {domain}")
# Send initial processing message
await bot.api.send_text_message(room.room_id, f"🔍 Processing DNS reconnaissance for {domain}...")
response = requests.get(url, headers=headers, timeout=30)
if response.status_code == 400:
await bot.api.send_text_message(room.room_id, f"Bad request - check domain format: {domain}")
return
elif response.status_code == 401:
await bot.api.send_text_message(room.room_id, "Invalid DNSDumpster API key")
return
elif response.status_code == 403:
await bot.api.send_text_message(room.room_id, "Access denied - check API key permissions")
return
elif response.status_code == 429:
await bot.api.send_text_message(room.room_id, "Rate limit exceeded - wait 2 seconds between requests")
return
elif response.status_code != 200:
await bot.api.send_text_message(room.room_id, f"DNSDumpster API error: {response.status_code} - {response.text[:100]}")
return
data = response.json()
logging.info(f"DNSDumpster response keys: {list(data.keys())}")
# Format the comprehensive DNS report
output = await format_dnsdumpster_report(domain, data)
await bot.api.send_markdown_message(room.room_id, output)
logging.info(f"Sent DNSDumpster data for {domain}")
except requests.exceptions.Timeout:
await bot.api.send_text_message(room.room_id, "DNSDumpster API request timed out")
logging.error("DNSDumpster API timeout")
except Exception as e:
await bot.api.send_text_message(room.room_id, f"Error fetching DNSDumpster data: {str(e)}")
logging.error(f"Error in dnsdumpster_domain_lookup: {e}")
async def format_dnsdumpster_report(domain, data):
"""Format DNSDumpster JSON response into a readable report."""
output = f"<strong>🔍 DNSDumpster Report: {domain}</strong><br><br>"
# Summary statistics
if data.get('total_a_recs'):
output += f"<strong>📊 Summary</strong><br>"
output += f" • <strong>Total A Records:</strong> {data['total_a_recs']}<br>"
# A Records - Show ALL records
if data.get('a') and data['a']:
output += f"<br><strong>📍 A Records (IPv4) - {len(data['a'])} found</strong><br>"
for record in data['a']: # Show ALL A records
host = record.get('host', 'N/A')
ips = record.get('ips', [])
output += f" • <strong>{host}</strong><br>"
for ip_info in ips: # Show ALL IPs per host
ip = ip_info.get('ip', 'N/A')
country = ip_info.get('country', 'Unknown')
asn_name = ip_info.get('asn_name', 'Unknown')
output += f" └─ {ip} ({country})<br>"
output += f" └─ {asn_name}<br>"
# Show banner information if available
banners = ip_info.get('banners', {})
if banners.get('http') or banners.get('https'):
output += f" └─ <em>Web Services:</em> "
services = []
if banners.get('http'):
services.append("HTTP")
if banners.get('https'):
services.append("HTTPS")
output += f"{', '.join(services)}<br>"
# NS Records - Show ALL records
if data.get('ns') and data['ns']:
output += f"<br><strong>🔗 NS Records (Name Servers) - {len(data['ns'])} found</strong><br>"
for record in data['ns']: # Show ALL NS records
host = record.get('host', 'N/A')
ips = record.get('ips', [])
output += f" • <strong>{host}</strong><br>"
for ip_info in ips: # Show ALL IPs
ip = ip_info.get('ip', 'N/A')
country = ip_info.get('country', 'Unknown')
output += f" └─ {ip} ({country})<br>"
# MX Records - Show ALL records
if data.get('mx') and data['mx']:
output += f"<br><strong>📧 MX Records (Mail Servers) - {len(data['mx'])} found</strong><br>"
for record in data['mx']: # Show ALL MX records
host = record.get('host', 'N/A')
ips = record.get('ips', [])
output += f" • <strong>{host}</strong><br>"
for ip_info in ips: # Show ALL IPs
ip = ip_info.get('ip', 'N/A')
country = ip_info.get('country', 'Unknown')
output += f" └─ {ip} ({country})<br>"
# CNAME Records - Show ALL records
if data.get('cname') and data['cname']:
output += f"<br><strong>🔀 CNAME Records - {len(data['cname'])} found</strong><br>"
for record in data['cname']: # Show ALL CNAME records
host = record.get('host', 'N/A')
target = record.get('target', 'N/A')
output += f"{host}{target}<br>"
# TXT Records - Show ALL records
if data.get('txt') and data['txt']:
output += f"<br><strong>📄 TXT Records - {len(data['txt'])} found</strong><br>"
for txt in data['txt']: # Show ALL TXT records
# Truncate very long TXT records but show more content
if len(txt) > 200:
txt = txt[:200] + "..."
output += f"{txt}<br>"
# Additional record types that might be present - Show ALL records
other_records = ['aaaa', 'srv', 'soa', 'ptr']
for record_type in other_records:
if data.get(record_type) and data[record_type]:
output += f"<br><strong>🔧 {record_type.upper()} Records - {len(data[record_type])} found</strong><br>"
for record in data[record_type]: # Show ALL records
if isinstance(record, dict):
# Format dictionary records nicely
record_str = ", ".join([f"{k}: {v}" for k, v in record.items()])
if len(record_str) > 150:
record_str = record_str[:150] + "..."
output += f"{record_str}<br>"
else:
output += f"{record}<br>"
# Add rate limit reminder
output += "<br><em>💡 Rate Limit: 1 request per 2 seconds</em>"
# Always wrap in collapsible details since we're showing all results
output = f"<details><summary><strong>🔍 DNSDumpster Report: {domain} (Click to expand)</strong></summary>{output}</details>"
return output

238
plugins/exploitdb.py Normal file
View File

@@ -0,0 +1,238 @@
"""
This plugin provides a command to search Exploit-DB for security exploits and vulnerabilities.
Uses the searchsploit-style approach with the files.csv database.
"""
import logging
import requests
import csv
import io
import simplematrixbotlib as botlib
from datetime import datetime
# Exploit-DB CSV database URL
EXPLOITDB_CSV_URL = "https://gitlab.com/exploit-database/exploitdb/-/raw/main/files_exploits.csv"
def format_exploit(exploit, index, total):
"""
Format an exploit entry for display.
Args:
exploit (dict): The exploit data.
index (int): Current result index.
total (int): Total number of results.
Returns:
str: Formatted HTML string.
"""
edb_id = exploit.get('id', 'N/A')
title = exploit.get('description', 'No title')
date = exploit.get('date', 'Unknown')
author = exploit.get('author', 'Unknown')
exploit_type = exploit.get('type', 'Unknown')
platform = exploit.get('platform', 'Unknown')
# Build the URL
url = f"https://www.exploit-db.com/exploits/{edb_id}"
output = f"""<strong>💣 Exploit {index}/{total}</strong><br>
<strong>Title:</strong> {title}<br>
<strong>EDB-ID:</strong> {edb_id}<br>
<strong>Type:</strong> {exploit_type} | <strong>Platform:</strong> {platform}<br>
<strong>Author:</strong> {author} | <strong>Date:</strong> {date}<br>
<strong>URL:</strong> <a href="{url}">{url}</a>"""
return output
async def search_exploitdb_csv(query, max_results=5):
"""
Search Exploit-DB CSV database for exploits matching the query.
Args:
query (str): Search term.
max_results (int): Maximum number of results to return.
Returns:
list: List of exploit dictionaries, or None on error.
"""
try:
logging.info(f"Downloading Exploit-DB CSV database...")
headers = {
'User-Agent': 'FunguyBot/1.0',
}
# Download the CSV file
response = requests.get(EXPLOITDB_CSV_URL, headers=headers, timeout=30)
response.raise_for_status()
# Parse CSV
csv_data = response.text
csv_file = io.StringIO(csv_data)
reader = csv.DictReader(csv_file)
# Search through CSV
results = []
query_lower = query.lower()
logging.info(f"Searching CSV for: {query}")
for row in reader:
# Search in description (title) and other fields
description = row.get('description', '').lower()
file_path = row.get('file', '').lower()
if query_lower in description or query_lower in file_path:
exploit = {
'id': row.get('id', 'N/A'),
'description': row.get('description', 'No title'),
'date': row.get('date_published', row.get('date', 'Unknown')),
'author': row.get('author', 'Unknown'),
'type': row.get('type', 'Unknown'),
'platform': row.get('platform', 'Unknown')
}
results.append(exploit)
if len(results) >= max_results:
break
return results
except requests.exceptions.Timeout:
logging.error("Timeout downloading Exploit-DB database")
return None
except requests.exceptions.RequestException as e:
logging.error(f"Error downloading Exploit-DB database: {e}")
return None
except Exception as e:
logging.error(f"Unexpected error searching Exploit-DB: {e}", exc_info=True)
return None
async def search_exploitdb_google(query, max_results=5):
"""
Alternative: Search Exploit-DB using site-specific search.
Returns formatted search URLs instead of parsing.
Args:
query (str): Search term.
max_results (int): Maximum number of results to return.
Returns:
str: Formatted search information.
"""
# Create search URLs
exploitdb_search_url = f"https://www.exploit-db.com/search?q={query}"
google_search_url = f"https://www.google.com/search?q=site:exploit-db.com+{query}"
output = f"""<strong>💣 Exploit-DB Search for: {query}</strong><br><br>
<strong>Direct Search:</strong><br>
<a href="{exploitdb_search_url}">{exploitdb_search_url}</a><br><br>
<strong>Google Site Search:</strong><br>
<a href="{google_search_url}">{google_search_url}</a><br><br>
<em>💡 Tip: You can also use <code>searchsploit</code> command-line tool for offline searches.</em><br>
<em>⚠️ Use responsibly and only on systems you have permission to test.</em>"""
return output
async def handle_command(room, message, bot, prefix, config):
"""
Function to handle the !exploitdb command.
Args:
room (Room): The Matrix room where the command was invoked.
message (RoomMessage): The message object containing the command.
bot (Bot): The bot object.
prefix (str): The command prefix.
config (dict): Configuration parameters.
Returns:
None
"""
match = botlib.MessageMatch(room, message, bot, prefix)
if match.is_not_from_this_bot() and match.prefix() and match.command("exploitdb"):
logging.info("Received !exploitdb command")
args = match.args()
if len(args) < 1:
await bot.api.send_text_message(
room.room_id,
"Usage: !exploitdb <search term> [max_results]\n"
"Examples:\n"
" !exploitdb wordpress\n"
" !exploitdb apache 3\n"
" !exploitdb windows privilege escalation\n"
"Searches Exploit-DB for security vulnerabilities and exploits."
)
logging.info("Sent usage message for !exploitdb")
return
# Check if last argument is a number (max results)
max_results = 5
search_terms = args
if args[-1].isdigit():
max_results = int(args[-1])
if max_results < 1:
max_results = 1
elif max_results > 10:
max_results = 10
search_terms = args[:-1]
query = ' '.join(search_terms)
try:
# Send "searching" message
await bot.api.send_text_message(
room.room_id,
f"🔍 Searching Exploit-DB for: {query}... (this may take a moment)"
)
# Try CSV search first
exploits = await search_exploitdb_csv(query, max_results)
if exploits is None:
# Fallback to providing search links
logging.warning("CSV search failed, providing search links instead")
output = await search_exploitdb_google(query, max_results)
await bot.api.send_markdown_message(room.room_id, output)
return
if not exploits:
# Also provide search links when no results
output = f"No exploits found in local search for: <strong>{query}</strong><br><br>"
output += await search_exploitdb_google(query, max_results)
await bot.api.send_markdown_message(room.room_id, output)
logging.info(f"No exploits found for: {query}")
return
total = len(exploits)
logging.info(f"Found {total} exploit(s) for: {query}")
# Format all results
output = f"<strong>💣 Exploit-DB Search Results for: {query}</strong><br><br>"
for idx, exploit in enumerate(exploits, 1):
output += format_exploit(exploit, idx, total)
output += "<br><br>"
output += f"<em>⚠️ Use responsibly and only on systems you have permission to test.</em>"
# Wrap in collapsible details if more than 2 results
if total > 2:
summary = f"<strong>💣 Exploit-DB: {query}</strong> ({total} results)"
output = f"<details><summary>{summary}</summary>{output}</details>"
await bot.api.send_markdown_message(room.room_id, output)
logging.info(f"Sent {total} exploit(s) for: {query}")
except Exception as e:
await bot.api.send_text_message(
room.room_id,
f"An error occurred while searching Exploit-DB: {str(e)}"
)
logging.error(f"Error in exploitdb plugin: {e}", exc_info=True)

386
plugins/hashid.py Normal file
View File

@@ -0,0 +1,386 @@
"""
This plugin provides a command to identify hash types using comprehensive pattern matching.
"""
import logging
import re
import simplematrixbotlib as botlib
def identify_hash(hash_string):
"""
Identify the hash type based on comprehensive pattern matching.
Args:
hash_string (str): The hash string to identify
Returns:
list: List of tuples (hash_type, hashcat_mode, john_format, confidence)
"""
hash_string = hash_string.strip()
hash_lower = hash_string.lower()
length = len(hash_string)
possible_types = []
# Unix crypt and modular crypt formats (most specific first)
if hash_string.startswith('$'):
# yescrypt (modern Linux /etc/shadow)
if re.match(r'^\$y\$', hash_string):
possible_types.append(("yescrypt", None, "yescrypt", 95))
# scrypt
elif re.match(r'^\$7\$', hash_string):
possible_types.append(("scrypt", "8900", "scrypt", 95))
# Argon2
elif re.match(r'^\$argon2(id?|d)\$', hash_string):
if '$argon2i$' in hash_string:
possible_types.append(("Argon2i", "10900", "argon2", 95))
elif '$argon2d$' in hash_string:
possible_types.append(("Argon2d", None, "argon2", 95))
elif '$argon2id$' in hash_string:
possible_types.append(("Argon2id", "10900", "argon2", 95))
# bcrypt variants
elif re.match(r'^\$(2[abxy]?)\$', hash_string):
bcrypt_type = re.match(r'^\$(2[abxy]?)\$', hash_string).group(1)
possible_types.append((f"bcrypt ({bcrypt_type})", "3200", "bcrypt", 95))
# SHA-512 Crypt (common in Linux)
elif re.match(r'^\$6\$', hash_string):
possible_types.append(("SHA-512 Crypt (Unix)", "1800", "sha512crypt", 95))
# SHA-256 Crypt (Unix)
elif re.match(r'^\$5\$', hash_string):
possible_types.append(("SHA-256 Crypt (Unix)", "7400", "sha256crypt", 95))
# MD5 Crypt (Unix)
elif re.match(r'^\$1\$', hash_string):
possible_types.append(("MD5 Crypt (Unix)", "500", "md5crypt", 95))
# Apache MD5
elif re.match(r'^\$apr1\$', hash_string):
possible_types.append(("Apache MD5 (apr1)", "1600", "md5crypt", 95))
# AIX SMD5
elif re.match(r'^\{smd5\}', hash_string, re.IGNORECASE):
possible_types.append(("AIX {smd5}", "6300", None, 90))
# AIX SSHA256
elif re.match(r'^\{ssha256\}', hash_string, re.IGNORECASE):
possible_types.append(("AIX {ssha256}", "6700", None, 90))
# AIX SSHA512
elif re.match(r'^\{ssha512\}', hash_string, re.IGNORECASE):
possible_types.append(("AIX {ssha512}", "6800", None, 90))
# phpBB3
elif re.match(r'^\$H\$', hash_string):
possible_types.append(("phpBB3", "400", "phpass", 90))
# Wordpress
elif re.match(r'^\$P\$', hash_string):
possible_types.append(("Wordpress", "400", "phpass", 90))
# Drupal 7+
elif re.match(r'^\$S\$', hash_string):
possible_types.append(("Drupal 7+", "7900", "drupal7", 90))
# WBB3 (Woltlab Burning Board)
elif re.match(r'^\$wbb3\$', hash_string):
possible_types.append(("WBB3 (Woltlab)", None, None, 85))
# PBKDF2-HMAC-SHA256
elif re.match(r'^\$pbkdf2-sha256\$', hash_string):
possible_types.append(("PBKDF2-HMAC-SHA256", "10900", "pbkdf2-hmac-sha256", 90))
# PBKDF2-HMAC-SHA512
elif re.match(r'^\$pbkdf2-sha512\$', hash_string):
possible_types.append(("PBKDF2-HMAC-SHA512", None, "pbkdf2-hmac-sha512", 90))
# Django PBKDF2
elif re.match(r'^pbkdf2_sha256\$', hash_string):
possible_types.append(("Django PBKDF2-SHA256", "10000", "django", 90))
# Unknown modular crypt format
else:
possible_types.append(("Unknown Modular Crypt Format", None, None, 30))
return possible_types
# LDAP formats
if hash_string.startswith('{'):
if re.match(r'^\{SHA\}', hash_string, re.IGNORECASE):
possible_types.append(("LDAP SHA-1", "101", "nsldap", 90))
elif re.match(r'^\{SSHA\}', hash_string, re.IGNORECASE):
possible_types.append(("LDAP SSHA (Salted SHA-1)", "111", "nsldaps", 90))
elif re.match(r'^\{MD5\}', hash_string, re.IGNORECASE):
possible_types.append(("LDAP MD5", "3210", None, 90))
elif re.match(r'^\{SMD5\}', hash_string, re.IGNORECASE):
possible_types.append(("LDAP SMD5 (Salted MD5)", "3211", None, 90))
elif re.match(r'^\{CRYPT\}', hash_string, re.IGNORECASE):
possible_types.append(("LDAP CRYPT", None, None, 85))
return possible_types
# Check for colon-separated formats (LM:NTLM, username:hash, etc.)
if ':' in hash_string:
parts = hash_string.split(':')
# NetNTLMv1 / NetNTLMv2
if len(parts) >= 5:
possible_types.append(("NetNTLMv2", "5600", "netntlmv2", 85))
possible_types.append(("NetNTLMv1", "5500", "netntlm", 75))
# LM:NTLM format
elif len(parts) == 2 and len(parts[0]) == 32 and len(parts[1]) == 32:
possible_types.append(("LM:NTLM", "1000", "nt", 90))
# Username:Hash or similar
elif len(parts) == 2:
hash_part = parts[1]
if len(hash_part) == 32:
possible_types.append(("NTLM (with username)", "1000", "nt", 80))
elif len(hash_part) == 40:
possible_types.append(("SHA-1 (with salt/username)", "110", None, 70))
# PostgreSQL md5
if hash_string.startswith('md5') and len(hash_string) == 35:
possible_types.append(("PostgreSQL MD5", "3100", "postgres", 90))
return possible_types if possible_types else None
# MySQL formats
if hash_string.startswith('*') and length == 41 and re.match(r'^\*[A-F0-9]{40}$', hash_string.upper()):
possible_types.append(("MySQL 4.1/5.x", "300", "mysql-sha1", 95))
return possible_types
# Oracle formats
if re.match(r'^[A-F0-9]{16}:[A-F0-9]{16}$', hash_string.upper()):
possible_types.append(("Oracle 11g", "112", "oracle11", 90))
return possible_types
if re.match(r'^S:[A-F0-9]{60}$', hash_string.upper()):
possible_types.append(("Oracle 12c/18c", "12300", "oracle12c", 90))
return possible_types
# MSSQL formats
if re.match(r'^0x0100[A-F0-9]{8}[A-F0-9]{40}$', hash_string.upper()):
possible_types.append(("MSSQL 2000", "131", "mssql", 90))
return possible_types
if re.match(r'^0x0200[A-F0-9]{8}[A-F0-9]{128}$', hash_string.upper()):
possible_types.append(("MSSQL 2012/2014", "1731", "mssql12", 90))
return possible_types
# Base64 pattern check
is_base64 = re.match(r'^[A-Za-z0-9+/]+=*$', hash_string) and length % 4 == 0
# Raw hash identification by length
is_hex = re.match(r'^[a-f0-9]+$', hash_lower)
if is_hex:
if length == 16:
possible_types.append(("MySQL < 4.1", "200", "mysql", 85))
possible_types.append(("Half MD5", None, None, 60))
elif length == 32:
possible_types.append(("MD5", "0", "raw-md5", 80))
possible_types.append(("MD4", "900", "raw-md4", 70))
possible_types.append(("NTLM", "1000", "nt", 75))
possible_types.append(("LM", "3000", "lm", 60))
possible_types.append(("RAdmin v2.x", "9900", None, 50))
possible_types.append(("Snefru-128", None, None, 40))
possible_types.append(("HMAC-MD5 (key = $pass)", "50", None, 50))
elif length == 40:
possible_types.append(("SHA-1", "100", "raw-sha1", 85))
possible_types.append(("RIPEMD-160", "6000", "ripemd-160", 65))
possible_types.append(("Tiger-160", None, None, 50))
possible_types.append(("Haval-160", None, None, 45))
possible_types.append(("HMAC-SHA1 (key = $pass)", "150", None, 55))
elif length == 48:
possible_types.append(("Tiger-192", None, None, 70))
possible_types.append(("Haval-192", None, None, 65))
elif length == 56:
possible_types.append(("SHA-224", "1300", "raw-sha224", 85))
possible_types.append(("Haval-224", None, None, 60))
elif length == 64:
possible_types.append(("SHA-256", "1400", "raw-sha256", 85))
possible_types.append(("RIPEMD-256", None, None, 60))
possible_types.append(("SHA3-256", "17400", "raw-sha3", 70))
possible_types.append(("Keccak-256", "17800", "raw-keccak-256", 70))
possible_types.append(("Haval-256", None, None, 50))
possible_types.append(("GOST R 34.11-94", "6900", None, 55))
possible_types.append(("BLAKE2b-256", None, None, 60))
elif length == 80:
possible_types.append(("RIPEMD-320", None, None, 80))
elif length == 96:
possible_types.append(("SHA-384", "10800", "raw-sha384", 85))
possible_types.append(("SHA3-384", "17900", None, 70))
possible_types.append(("Keccak-384", None, None, 65))
elif length == 128:
possible_types.append(("SHA-512", "1700", "raw-sha512", 85))
possible_types.append(("Whirlpool", "6100", "whirlpool", 75))
possible_types.append(("SHA3-512", "17600", None, 70))
possible_types.append(("Keccak-512", None, None, 65))
possible_types.append(("BLAKE2b-512", None, None, 60))
# Base64 encoded hashes
elif is_base64:
if length == 24:
possible_types.append(("MD5 (Base64)", None, None, 75))
elif length == 28:
possible_types.append(("SHA-1 (Base64)", None, None, 75))
elif length == 32:
possible_types.append(("SHA-224 (Base64)", None, None, 75))
elif length == 44:
possible_types.append(("SHA-256 (Base64)", None, None, 75))
elif length == 64:
possible_types.append(("SHA-384 (Base64)", None, None, 75))
elif length == 88:
possible_types.append(("SHA-512 (Base64)", None, None, 75))
return possible_types if possible_types else [("Unknown", None, None, 0)]
async def handle_command(room, message, bot, prefix, config):
"""
Function to handle the !hashid command.
Args:
room (Room): The Matrix room where the command was invoked.
message (RoomMessage): The message object containing the command.
bot (Bot): The bot object.
prefix (str): The command prefix.
config (dict): Configuration parameters.
Returns:
None
"""
match = botlib.MessageMatch(room, message, bot, prefix)
if match.is_not_from_this_bot() and match.prefix() and match.command("hashid"):
logging.info("Received !hashid command")
args = match.args()
if len(args) < 1:
usage_msg = """<strong>🔐 Hash Identifier Usage</strong>
<strong>Usage:</strong> <code>!hashid &lt;hash&gt;</code>
<strong>Examples:</strong>
• <code>!hashid 5f4dcc3b5aa765d61d8327deb882cf99</code>
• <code>!hashid 5baa61e4c9b93f3f0682250b6cf8331b7ee68fd8</code>
• <code>!hashid $6$rounds=5000$salt$hash...</code>
• <code>!hashid $y$j9T$...</code> (yescrypt from /etc/shadow)
<strong>Supported Hash Types:</strong>
• <strong>Modern:</strong> yescrypt, scrypt, Argon2, bcrypt
• <strong>Unix Crypt:</strong> SHA-512 Crypt, SHA-256 Crypt, MD5 Crypt
• <strong>Raw Hashes:</strong> MD5, SHA-1/224/256/384/512, SHA-3, NTLM, LM
• <strong>Database:</strong> MySQL, PostgreSQL, Oracle, MSSQL
• <strong>CMS:</strong> Wordpress, phpBB3, Drupal, Django
• <strong>LDAP:</strong> SSHA, SMD5, and various LDAP formats
• <strong>Network:</strong> NetNTLMv1/v2, Kerberos
• <strong>Exotic:</strong> Whirlpool, RIPEMD, BLAKE2, Keccak, GOST
"""
await bot.api.send_markdown_message(room.room_id, usage_msg)
return
hash_input = ' '.join(args)
try:
# Identify the hash
identified = identify_hash(hash_input)
if not identified:
await bot.api.send_text_message(
room.room_id,
"Could not identify hash type. Please verify the hash format."
)
return
# Sort by confidence (highest first)
identified = sorted(identified, key=lambda x: x[3], reverse=True)
# Format the response
hash_preview = hash_input[:60] + "..." if len(hash_input) > 60 else hash_input
# Determine confidence indicator
top_confidence = identified[0][3]
if top_confidence >= 90:
confidence_emoji = "🟢"
confidence_label = "Very High"
elif top_confidence >= 80:
confidence_emoji = "🟡"
confidence_label = "High"
elif top_confidence >= 60:
confidence_emoji = "🟠"
confidence_label = "Medium"
else:
confidence_emoji = "🔴"
confidence_label = "Low"
# Build response inside collapsible details
response = "<details><summary><strong>🔐 Hash Identification Results</strong></summary>\n"
response += "<br>\n"
response += f"<strong>Input:</strong> <code>{hash_preview}</code><br>\n"
response += f"<strong>Length:</strong> {len(hash_input)} characters<br>\n"
response += f"<strong>Overall Confidence:</strong> {confidence_emoji} {confidence_label} ({top_confidence}%)<br>\n"
response += "<br>\n"
response += f"<strong>Possible Hash Types ({len(identified)}):</strong><br>\n"
for idx, (hash_type, hashcat_mode, john_format, confidence) in enumerate(identified, 1):
# Confidence indicator per hash
if confidence >= 90:
conf_emoji = "🟢"
elif confidence >= 80:
conf_emoji = "🟡"
elif confidence >= 60:
conf_emoji = "🟠"
else:
conf_emoji = "🔴"
response += f" <strong>{idx}. {hash_type}</strong> {conf_emoji} {confidence}%<br>\n"
tools = []
if hashcat_mode:
tools.append(f"Hashcat: <code>-m {hashcat_mode}</code>")
if john_format:
tools.append(f"John: <code>--format={john_format}</code>")
if tools:
response += f" {' | '.join(tools)}<br>\n"
response += "<br>\n"
# Add useful tips
if len(identified) == 1 and identified[0][0] not in ["Unknown", "Unknown Modular Crypt Format"]:
response += "<br><strong>💡 Single match with high confidence</strong><br>\n"
elif len(identified) > 5:
response += "<br><em> Multiple possibilities - context may help narrow it down</em><br>\n"
# Add legend
response += "<br>\n"
response += "<strong>Confidence Legend:</strong><br>\n"
response += "🟢 Very High (90-100%) | 🟡 High (80-89%) | 🟠 Medium (60-79%) | 🔴 Low (0-59%)<br>\n"
response += "</details>"
await bot.api.send_markdown_message(room.room_id, response)
logging.info(f"Identified hash types: {', '.join([f'{h[0]} ({h[3]}%)' for h in identified])}")
except Exception as e:
await bot.api.send_text_message(
room.room_id,
f"Error identifying hash: {str(e)}"
)
logging.error(f"Error in hashid command: {e}", exc_info=True)

387
plugins/headers.py Normal file
View File

@@ -0,0 +1,387 @@
"""
This plugin provides comprehensive HTTP security header analysis.
"""
import logging
import requests
import simplematrixbotlib as botlib
from urllib.parse import urlparse
import ssl
import socket
async def handle_command(room, message, bot, prefix, config):
"""
Function to handle !headers command for HTTP security header analysis.
Args:
room (Room): The Matrix room where the command was invoked.
message (RoomMessage): The message object containing the command.
bot (Bot): The bot object.
prefix (str): The command prefix.
config (dict): Configuration parameters.
Returns:
None
"""
match = botlib.MessageMatch(room, message, bot, prefix)
if match.is_not_from_this_bot() and match.prefix() and match.command("headers"):
logging.info("Received !headers command")
args = match.args()
if len(args) < 1:
await show_usage(room, bot)
return
url = args[0].strip()
# Add protocol if missing
if not url.startswith(('http://', 'https://')):
url = 'https://' + url
await analyze_headers(room, bot, url)
async def show_usage(room, bot):
"""Display headers command usage."""
usage = """
<strong>🔒 HTTP Security Headers Analysis</strong>
<strong>!headers &lt;url&gt;</strong> - Comprehensive HTTP security header analysis
<strong>Examples:</strong>
• <code>!headers example.com</code>
• <code>!headers https://github.com</code>
• <code>!headers http://localhost:8080</code>
<strong>Analyzes:</strong>
• Security headers presence and configuration
• SSL/TLS certificate information
• HTTP to HTTPS redirects
• Security scoring and recommendations
"""
await bot.api.send_markdown_message(room.room_id, usage)
async def analyze_headers(room, bot, url):
"""Perform comprehensive HTTP security header analysis."""
try:
await bot.api.send_text_message(room.room_id, f"🔍 Analyzing security headers for: {url}")
results = {
'url': url,
'http_headers': {},
'https_headers': {},
'redirect_chain': [],
'ssl_info': {},
'security_score': 0,
'recommendations': []
}
# Test HTTP first (if HTTPS was provided, we'll still check redirects)
parsed = urlparse(url)
http_url = f"http://{parsed.netloc or parsed.path}"
https_url = f"https://{parsed.netloc or parsed.path}"
# Analyze HTTP response and redirects
await analyze_http_response(results, http_url if not url.startswith('https://') else https_url)
# Analyze HTTPS response
if url.startswith('https://') or results.get('redirects_to_https'):
await analyze_https_response(results, https_url)
# Analyze SSL certificate if HTTPS
if url.startswith('https://') or results.get('redirects_to_https'):
await analyze_ssl_certificate(results, parsed.netloc or parsed.path)
# Calculate security score
await calculate_security_score(results)
# Generate recommendations
await generate_recommendations(results)
# Format and send results
output = await format_header_analysis(results)
await bot.api.send_markdown_message(room.room_id, output)
logging.info(f"Completed header analysis for {url}")
except Exception as e:
await bot.api.send_text_message(room.room_id, f"Error analyzing headers: {str(e)}")
logging.error(f"Error in analyze_headers: {e}")
async def analyze_http_response(results, url):
"""Analyze HTTP response and redirect chain."""
try:
session = requests.Session()
session.max_redirects = 5
response = session.get(url, timeout=10, allow_redirects=True)
results['final_url'] = response.url
results['status_code'] = response.status_code
results['http_headers'] = dict(response.headers)
# Check if redirects to HTTPS
results['redirects_to_https'] = response.url.startswith('https://')
# Store redirect history
results['redirect_chain'] = [{
'url': resp.url,
'status_code': resp.status_code,
'headers': dict(resp.headers)
} for resp in response.history]
except requests.exceptions.SSLError:
results['ssl_error'] = True
except requests.exceptions.RequestException as e:
results['http_error'] = str(e)
async def analyze_https_response(results, url):
"""Analyze HTTPS response headers."""
try:
response = requests.get(url, timeout=10, allow_redirects=False)
results['https_headers'] = dict(response.headers)
results['https_status'] = response.status_code
except requests.exceptions.RequestException as e:
results['https_error'] = str(e)
async def analyze_ssl_certificate(results, domain):
"""Analyze SSL certificate information."""
try:
context = ssl.create_default_context()
with socket.create_connection((domain, 443), timeout=10) as sock:
with context.wrap_socket(sock, server_hostname=domain) as ssock:
cert = ssock.getpeercert()
results['ssl_info'] = {
'subject': dict(x[0] for x in cert['subject']),
'issuer': dict(x[0] for x in cert['issuer']),
'not_before': cert['notBefore'],
'not_after': cert['notAfter'],
'san': cert.get('subjectAltName', []),
'version': cert.get('version'),
'serial_number': cert.get('serialNumber')
}
except Exception as e:
results['ssl_error'] = str(e)
async def calculate_security_score(results):
"""Calculate overall security score based on headers and configuration."""
score = 100
missing_headers = []
# Critical security headers
critical_headers = [
'Strict-Transport-Security',
'Content-Security-Policy',
'X-Content-Type-Options',
'X-Frame-Options',
'X-XSS-Protection'
]
headers = results.get('https_headers') or results.get('http_headers', {})
for header in critical_headers:
if header not in headers:
score -= 15
missing_headers.append(header)
# Check HSTS configuration
hsts = headers.get('Strict-Transport-Security', '')
if 'max-age=31536000' not in hsts:
score -= 10
if 'includeSubDomains' not in hsts:
score -= 5
if 'preload' not in hsts:
score -= 5
# Check CSP configuration
csp = headers.get('Content-Security-Policy', '')
if not csp:
score -= 10
elif "default-src 'none'" not in csp and "default-src 'self'" not in csp:
score -= 5
# Check for insecure headers
insecure_headers = ['Server', 'X-Powered-By', 'X-AspNet-Version']
for header in insecure_headers:
if header in headers:
score -= 5
# Bonus for good practices
if headers.get('Referrer-Policy'):
score += 5
if headers.get('Feature-Policy') or headers.get('Permissions-Policy'):
score += 5
if headers.get('X-Content-Type-Options') == 'nosniff':
score += 5
if headers.get('X-Frame-Options') in ['DENY', 'SAMEORIGIN']:
score += 5
# HTTPS enforcement bonus
if results.get('redirects_to_https'):
score += 10
results['security_score'] = max(0, score)
results['missing_headers'] = missing_headers
async def generate_recommendations(results):
"""Generate security recommendations based on analysis."""
recommendations = []
headers = results.get('https_headers') or results.get('http_headers', {})
# HSTS recommendations
if 'Strict-Transport-Security' not in headers:
recommendations.append("🔒 Implement HSTS header with max-age=31536000, includeSubDomains, and preload")
else:
hsts = headers['Strict-Transport-Security']
if 'max-age=31536000' not in hsts:
recommendations.append("🔒 Increase HSTS max-age to 31536000 (1 year)")
if 'includeSubDomains' not in hsts:
recommendations.append("🔒 Add includeSubDomains to HSTS header")
if 'preload' not in hsts:
recommendations.append("🔒 Consider adding preload directive to HSTS for browser preloading")
# CSP recommendations
if 'Content-Security-Policy' not in headers:
recommendations.append("🛡️ Implement Content Security Policy to prevent XSS attacks")
else:
csp = headers['Content-Security-Policy']
if "default-src 'self'" not in csp and "default-src 'none'" not in csp:
recommendations.append("🛡️ Restrict CSP default-src to 'self' or specific origins")
# Frame options
if 'X-Frame-Options' not in headers:
recommendations.append("🚫 Add X-Frame-Options header to prevent clickjacking (DENY or SAMEORIGIN)")
# Content type options
if 'X-Content-Type-Options' not in headers:
recommendations.append("📄 Add X-Content-Type-Options: nosniff to prevent MIME type sniffing")
# Referrer policy
if 'Referrer-Policy' not in headers:
recommendations.append("🔗 Implement Referrer-Policy to control referrer information leakage")
# Feature policy
if 'Feature-Policy' not in headers and 'Permissions-Policy' not in headers:
recommendations.append("⚙️ Implement Feature-Policy/Permissions-Policy to restrict browser features")
# Remove server information
if 'Server' in headers or 'X-Powered-By' in headers:
recommendations.append("🕵️ Remove Server and X-Powered-By headers to avoid information disclosure")
# HTTPS enforcement
if not results.get('redirects_to_https') and not results['url'].startswith('https://'):
recommendations.append("🔐 Implement HTTP to HTTPS redirects")
results['recommendations'] = recommendations
async def format_header_analysis(results):
"""Format the header analysis results for display."""
output = f"<strong>🔒 Security Headers Analysis: {results['url']}</strong><br><br>"
# Security Score
score = results['security_score']
score_emoji = "🟢" if score >= 80 else "🟡" if score >= 60 else "🔴"
output += f"<strong>{score_emoji} Security Score: {score}/100</strong><br><br>"
# Basic Information
output += "<strong>📊 Basic Information</strong><br>"
output += f" • <strong>Final URL:</strong> {results.get('final_url', 'N/A')}<br>"
output += f" • <strong>Status Code:</strong> {results.get('status_code', 'N/A')}<br>"
if results.get('redirects_to_https'):
output += f" • <strong>HTTPS Redirect:</strong> ✅ Enforced<br>"
else:
output += f" • <strong>HTTPS Redirect:</strong> ❌ Not enforced<br>"
output += f" • <strong>Redirect Chain:</strong> {len(results.get('redirect_chain', []))} hops<br>"
output += "<br>"
# Security Headers Analysis
headers = results.get('https_headers') or results.get('http_headers', {})
output += "<strong>🛡️ Security Headers Analysis</strong><br>"
security_headers = {
'Strict-Transport-Security': ('🔒', 'HSTS - HTTP Strict Transport Security'),
'Content-Security-Policy': ('🛡️', 'CSP - Content Security Policy'),
'X-Frame-Options': ('🚫', 'Clickjacking Protection'),
'X-Content-Type-Options': ('📄', 'MIME Type Sniffing Protection'),
'X-XSS-Protection': ('', 'XSS Protection (Deprecated)'),
'Referrer-Policy': ('🔗', 'Referrer Policy'),
'Feature-Policy': ('⚙️', 'Feature Policy'),
'Permissions-Policy': ('🔧', 'Permissions Policy'),
}
for header, (emoji, description) in security_headers.items():
if header in headers:
value = headers[header]
if len(value) > 100:
value = value[:100] + "..."
output += f"{emoji} <strong>{header}:</strong> ✅ {value}<br>"
else:
output += f"{emoji} <strong>{header}:</strong> ❌ Missing<br>"
output += "<br>"
# Other Headers (Information Disclosure)
output += "<strong>📋 Other Headers</strong><br>"
info_headers = ['Server', 'X-Powered-By', 'X-AspNet-Version']
for header in info_headers:
if header in headers:
output += f" • 🔍 <strong>{header}:</strong> {headers[header]}<br>"
output += "<br>"
# SSL Certificate Information (if available)
if results.get('ssl_info'):
output += "<strong>🔐 SSL Certificate</strong><br>"
ssl_info = results['ssl_info']
if ssl_info.get('subject'):
output += f" • <strong>Subject:</strong> {ssl_info['subject'].get('commonName', 'N/A')}<br>"
if ssl_info.get('issuer'):
output += f" • <strong>Issuer:</strong> {ssl_info['issuer'].get('organizationName', 'N/A')}<br>"
if ssl_info.get('not_after'):
output += f" • <strong>Expires:</strong> {ssl_info['not_after']}<br>"
if ssl_info.get('san'):
san_count = len([san for san in ssl_info['san'] if san[0] == 'DNS'])
output += f" • <strong>SAN Entries:</strong> {san_count}<br>"
output += "<br>"
# Recommendations
if results.get('recommendations'):
output += "<strong>💡 Security Recommendations</strong><br>"
for rec in results['recommendations'][:8]: # Show first 8 recommendations
output += f"{rec}<br>"
if len(results['recommendations']) > 8:
output += f" • ... and {len(results['recommendations']) - 8} more recommendations<br>"
output += "<br>"
# Missing Headers Summary
if results.get('missing_headers'):
output += "<strong>⚠️ Critical Headers Missing</strong><br>"
for header in results['missing_headers']:
output += f" • ❌ {header}<br>"
output += "<br>"
# Security Rating
score = results['security_score']
if score >= 80:
rating = "🟢 Excellent"
description = "Strong security headers configuration"
elif score >= 60:
rating = "🟡 Good"
description = "Moderate security, room for improvement"
elif score >= 40:
rating = "🟠 Fair"
description = "Basic security, significant improvements needed"
else:
rating = "🔴 Poor"
description = "Weak security headers configuration"
output += f"<strong>📈 Security Rating:</strong> {rating}<br>"
output += f"<strong>📝 Assessment:</strong> {description}<br>"
# Wrap in collapsible if content is large
if len(output) > 1000:
output = f"<details><summary><strong>🔒 Security Headers Analysis: {results['url']}</strong></summary>{output}</details>"
return output

View File

@@ -69,6 +69,222 @@ async def handle_command(room, message, bot, prefix, config):
<p>Fetches definitions from Urban Dictionary. Use without arguments for random definition, or specify term and optional index number. Shows definition, example, author, votes, and permalink.</p>
</details>
<details><summary>🔍 <strong>!dns [domain]</strong></summary>
<p>Performs comprehensive DNS reconnaissance on a domain. Queries multiple DNS record types including A, AAAA, MX, NS, TXT, CNAME, SOA, and SRV records. Validates domain format and provides formatted results.</p>
</details>
<details><summary>💰 <strong>!btc</strong></summary>
<p>Fetches the current Bitcoin price in USD from bitcointicker.co API. Shows real-time BTC/USD price with proper formatting. Includes error handling for API timeouts and data parsing issues.</p>
</details>
<details><summary>🔍 <strong>!shodan [command] [query]</strong></summary>
<p>Shodan.io integration for security reconnaissance and threat intelligence.</p>
<p><strong>Commands:</strong></p>
<ul>
<li><code>!shodan ip &lt;ip_address&gt;</code> - Detailed IP information (services, ports, banners)</li>
<li><code>!shodan search &lt;query&gt;</code> - Search Shodan database with filters</li>
<li><code>!shodan host &lt;domain&gt;</code> - Host information and subdomain enumeration</li>
<li><code>!shodan count &lt;query&gt;</code> - Count results with geographic/organization breakdown</li>
<li><code>!shodan test</code> - Test API connection and debug queries</li>
</ul>
<p><strong>Search Examples:</strong></p>
<ul>
<li><code>!shodan search apache</code></li>
<li><code>!shodan search "port:22 country:US"</code></li>
<li><code>!shodan search "product:nginx"</code></li>
<li><code>!shodan search "net:192.168.1.0/24"</code></li>
<li><code>!shodan search "http.title:'admin'"</code></li>
</ul>
<p><strong>Common Filters:</strong> country, city, port, product, os, org, net, has_ssl, http.title</p>
<p><em>Requires SHODAN_KEY environment variable</em></p>
</details>
<details><summary>🌐 <strong>!dnsdumpster [domain]</strong></summary>
<p>Comprehensive DNS reconnaissance and attack surface mapping using DNSDumpster.com API.</p>
<p><strong>Commands:</strong></p>
<ul>
<li><code>!dnsdumpster &lt;domain&gt;</code> - Complete DNS reconnaissance for any domain</li>
<li><code>!dnsdumpster test</code> - Test API connection and key validity</li>
</ul>
<p><strong>Features:</strong></p>
<ul>
<li>A Records - All IPv4 addresses with geographic and ASN information</li>
<li>NS Records - Complete name server information with IP locations</li>
<li>MX Records - All mail servers with geographic data</li>
<li>CNAME Records - Full alias chain mappings</li>
<li>TXT Records - All text records including SPF, DKIM, verification</li>
<li>Additional Records - AAAA, SRV, SOA, PTR records when available</li>
<li>Web Services - HTTP/HTTPS service detection with banner information</li>
</ul>
<p><strong>Examples:</strong></p>
<ul>
<li><code>!dnsdumpster google.com</code></li>
<li><code>!dnsdumpster github.com</code></li>
<li><code>!dnsdumpster example.com</code></li>
</ul>
<p><em>Requires DNSDUMPSTER_KEY environment variable</em><br>
<em>Rate Limit: 1 request per 2 seconds</em></p>
</details>
<details>
<summary><strong>💣 !exploitdb - Search Exploit Database</strong></summary>
<br>
<strong>Description:</strong><br>
Search Exploit-DB for security vulnerabilities and exploits. Returns detailed information about exploits including EDB-ID, type, platform, author, and direct links to exploit code.<br>
<br>
<strong>Usage:</strong><br>
<code>!exploitdb &lt;search_term&gt; [max_results]</code><br>
<br>
<strong>Parameters:</strong><br>
• <strong>search_term</strong> (required) - Software name, CVE number, or vulnerability type<br>
• <strong>max_results</strong> (optional) - Number of results to return (1-10, default: 5)<br>
<br>
<strong>Examples:</strong><br>
<code>!exploitdb wordpress</code> - Search for WordPress exploits<br>
<code>!exploitdb apache 3</code> - Get 3 Apache exploits<br>
<code>!exploitdb windows privilege escalation</code> - Search for Windows privesc<br>
<code>!exploitdb CVE-2021-44228</code> - Search by CVE number<br>
<code>!exploitdb linux kernel 10</code> - Get 10 Linux kernel exploits<br>
<code>!exploitdb sql injection</code> - Search for SQL injection exploits<br>
<br>
<strong>Output Includes:</strong><br>
• Exploit title and description<br>
• EDB-ID (Exploit Database ID)<br>
• Exploit type (webapps, local, remote, etc.)<br>
• Platform/OS (PHP, Linux, Windows, etc.)<br>
• Author name<br>
• Publication date<br>
• Direct link to full exploit code<br>
<br>
<strong>Notes:</strong><br>
• Searches the official Exploit-DB CSV database<br>
• May take a few seconds on first use (downloads database)<br>
• Falls back to search links if database unavailable<br>
<br>
<em>⚠️ Use responsibly and only on systems you have permission to test.</em>
</details>
<details><summary>🛡️ <strong>!headers &lt;url&gt;</strong></summary>
<p>Comprehensive HTTP security header analysis with security scoring and recommendations.</p>
<p><strong>Features:</strong></p>
<ul>
<li>Security scoring (0-100) with color-coded ratings</li>
<li>Critical security header validation and configuration checking</li>
<li>HTTP to HTTPS redirect chain analysis</li>
<li>SSL certificate information for HTTPS sites</li>
<li>Information disclosure header detection</li>
<li>Actionable security recommendations</li>
</ul>
<p><strong>Security Headers Analyzed:</strong></p>
<ul>
<li><code>Strict-Transport-Security</code> - HSTS enforcement</li>
<li><code>Content-Security-Policy</code> - XSS protection</li>
<li><code>X-Frame-Options</code> - Clickjacking protection</li>
<li><code>X-Content-Type-Options</code> - MIME sniffing prevention</li>
<li><code>Referrer-Policy</code> - Referrer control</li>
<li><code>Feature-Policy</code> - Browser feature restrictions</li>
<li>Server information headers</li>
</ul>
<p><strong>Security Ratings:</strong></p>
<ul>
<li>🟢 <strong>Excellent (80-100)</strong> - Strong configuration</li>
<li>🟡 <strong>Good (60-79)</strong> - Moderate, needs improvement</li>
<li>🟠 <strong>Fair (40-59)</strong> - Basic, significant improvements needed</li>
<li>🔴 <strong>Poor (0-39)</strong> - Weak configuration</li>
</ul>
<p><strong>Examples:</strong></p>
<ul>
<li><code>!headers example.com</code></li>
<li><code>!headers https://github.com</code></li>
<li><code>!headers localhost:3000</code></li>
<li><code>!headers subdomain.target.com</code></li>
</ul>
<p><em>Provides enterprise-grade security analysis for penetration testers and developers</em></p>
</details>
<details><summary>🔄 <strong>!hashid &lt;hash&gt;</strong></summary>
<p>Advanced hash type identification with confidence scoring and tool recommendations.</p>
<p><strong>Features:</strong></p>
<ul>
<li>100+ hash types including modern, legacy, and exotic algorithms</li>
<li>Color-coded confidence scoring (🟢 Very High to 🔴 Low)</li>
<li>Hashcat mode numbers and John the Ripper format names</li>
<li>Context-aware parsing for various hash formats</li>
</ul>
<p><strong>Supported Categories:</strong></p>
<ul>
<li><strong>Modern:</strong> yescrypt, scrypt, Argon2, bcrypt</li>
<li><strong>Unix/Linux:</strong> SHA-512/256 Crypt, MD5 Crypt, apr1</li>
<li><strong>Raw Hashes:</strong> MD5, SHA family, SHA-3, NTLM, LM</li>
<li><strong>Databases:</strong> MySQL, PostgreSQL, Oracle, MSSQL</li>
<li><strong>Web/CMS:</strong> WordPress, Drupal, phpBB3, Django</li>
<li><strong>LDAP:</strong> SSHA, SMD5, LDAP crypt formats</li>
<li><strong>Network:</strong> NetNTLMv1/v2, Kerberos</li>
<li><strong>Exotic:</strong> Whirlpool, RIPEMD, GOST, BLAKE2</li>
</ul>
<p><strong>Tool Integration:</strong></p>
<ul>
<li><strong>Hashcat:</strong> Mode numbers for <code>-m</code> parameter</li>
<li><strong>John:</strong> Format names for <code>--format=</code> parameter</li>
<li>Multi-tool compatibility</li>
</ul>
<p><strong>Examples:</strong></p>
<ul>
<li><code>!hashid 5d41402abc4b2a76b9719d911017c592</code> (MD5)</li>
<li><code>!hashid aaf4c61ddcc5e8a2dabede0f3b482cd9aea9434d</code> (SHA-1)</li>
<li><code>!hashid $6$rounds=5000$salt$hash...</code> (SHA-512 Crypt)</li>
<li><code>!hashid $y$j9T$...</code> (yescrypt - modern Linux)</li>
<li><code>!hashid 8846f7eaee8fb117ad06bdd830b7586c</code> (NTLM)</li>
<li><code>!hashid *2470C0C06DEE42FD1618BB99005ADCA2EC9D1E19</code> (MySQL)</li>
</ul>
<p><strong>Confidence Legend:</strong></p>
<ul>
<li>🟢 Very High (90-100%) - Single definitive match</li>
<li>🟡 High (80-89%) - Strong match with minor alternatives</li>
<li>🟠 Medium (60-79%) - Multiple plausible matches</li>
<li>🔴 Low (0-59%) - Uncertain, needs context</li>
</ul>
<p><em>Essential for penetration testers, forensic analysts, and password cracking</em></p>
</details>
<details><summary>🔐 <strong>!sslscan &lt;domain[:port]&gt;</strong></summary>
<p>Comprehensive SSL/TLS security scanning and analysis with vulnerability detection.</p>
<p><strong>Features:</strong></p>
<ul>
<li>TLS 1.0-1.3 protocol support testing with security scoring</li>
<li>Certificate chain validation, expiration, and signature analysis</li>
<li>25+ cipher suite testing with strength classification</li>
<li>Vulnerability detection (POODLE, weak ciphers, protocol issues)</li>
<li>0-100 security rating with color-coded assessment</li>
<li>PCI DSS and modern security standards compliance checking</li>
</ul>
<p><strong>Security Checks:</strong></p>
<ul>
<li><strong>Protocol Security:</strong> TLS 1.2/1.3 enforcement, insecure protocol detection</li>
<li><strong>Certificate Health:</strong> Expiration monitoring, signature validation</li>
<li><strong>Cipher Security:</strong> RC4, DES, 3DES, NULL cipher detection</li>
<li><strong>Modern Standards:</strong> Forward Secrecy, strong encryption</li>
</ul>
<p><strong>Security Ratings:</strong></p>
<ul>
<li>🟢 <strong>Excellent (90-100)</strong> - Modern TLS with strong security</li>
<li>🟡 <strong>Good (80-89)</strong> - Good security, minor improvements needed</li>
<li>🟠 <strong>Fair (60-79)</strong> - Moderate security, significant improvements</li>
<li>🔴 <strong>Poor (0-59)</strong> - Critical issues requiring immediate attention</li>
</ul>
<p><strong>Examples:</strong></p>
<ul>
<li><code>!sslscan example.com</code></li>
<li><code>!sslscan github.com:443</code></li>
<li><code>!sslscan localhost:8443</code></li>
<li><code>!sslscan 192.168.1.1:443</code></li>
</ul>
<p><em>Essential for security teams, system administrators, and developers ensuring TLS compliance</em><br>
<em>Note: SSLv2/SSLv3 testing limited by Python security features</em></p>
</details>
<details><summary>📸 <strong>!sd [prompt]</strong></summary>
<p>Generates images using self-hosted Stable Diffusion. Supports options: --steps, --cfg, --h, --w, --neg, --sampler. Uses queuing system to handle multiple requests. See available options using just '!sd'.</p>
</details>

View File

@@ -11,11 +11,6 @@ import sys # Import sys module for unloading plugins
# Dictionary to store loaded plugins
PLUGINS = {}
# Whitelist of allowed plugins to prevent arbitrary code execution
ALLOWED_PLUGINS = {'ai', 'config', 'cron', 'date', 'fortune', 'help', 'isup', 'karma',
'loadplugin', 'plugins', 'proxy', 'sd_text', 'stable-diffusion',
'xkcd', 'youtube-preview', 'youtube-search'}
async def load_plugin(plugin_name):
"""
Asynchronously loads a plugin.
@@ -26,48 +21,9 @@ async def load_plugin(plugin_name):
Returns:
bool: True if the plugin is loaded successfully, False otherwise.
"""
# Validate plugin name against whitelist
if plugin_name not in ALLOWED_PLUGINS:
logging.error(f"Plugin '{plugin_name}' is not whitelisted")
return False
# Verify that the plugin file exists in the plugins directory
plugin_path = os.path.join("plugins", f"{plugin_name}.py")
if not os.path.isfile(plugin_path):
logging.error(f"Plugin file not found: {plugin_path}")
return False
try:
# Create a mapping of whitelisted plugins to their module paths
plugin_modules = {
'ai': 'plugins.ai',
'config': 'plugins.config',
'cron': 'plugins.cron',
'date': 'plugins.date',
'fortune': 'plugins.fortune',
'help': 'plugins.help',
'isup': 'plugins.isup',
'karma': 'plugins.karma',
'loadplugin': 'plugins.loadplugin',
'plugins': 'plugins.plugins',
'proxy': 'plugins.proxy',
'sd_text': 'plugins.sd_text',
'stable-diffusion': 'plugins.stable-diffusion',
'xkcd': 'plugins.xkcd',
'youtube-preview': 'plugins.youtube-preview',
'youtube-search': 'plugins.youtube-search',
'weather': 'plugins.weather',
'urbandictionary': 'plugins.urbandictionary'
}
# Get the module path from the mapping
module_path = plugin_modules.get(plugin_name)
if not module_path:
logging.error(f"Plugin '{plugin_name}' not found in plugin mapping")
return False
# Import the plugin module using the validated module path
module = importlib.import_module(module_path)
# Import the plugin module
module = importlib.import_module(f"plugins.{plugin_name}")
# Add the plugin module to the PLUGINS dictionary
PLUGINS[plugin_name] = module
logging.info(f"Loaded plugin: {plugin_name}")
@@ -156,3 +112,4 @@ async def handle_command(room, message, bot, prefix, config):
else:
# Send unauthorized message if the sender is not the admin
await bot.api.send_text_message(room.room_id, "You are not authorized to unload plugins.")

330
plugins/shodan.py Normal file
View File

@@ -0,0 +1,330 @@
"""
This plugin provides Shodan.io integration for security research and reconnaissance.
"""
import logging
import os
import requests
import simplematrixbotlib as botlib
from dotenv import load_dotenv
# Load environment variables from .env file
plugin_dir = os.path.dirname(os.path.abspath(__file__))
parent_dir = os.path.dirname(plugin_dir)
dotenv_path = os.path.join(parent_dir, '.env')
load_dotenv(dotenv_path)
SHODAN_API_KEY = os.getenv("SHODAN_KEY", "")
SHODAN_API_BASE = "https://api.shodan.io"
async def handle_command(room, message, bot, prefix, config):
"""
Function to handle Shodan commands.
Args:
room (Room): The Matrix room where the command was invoked.
message (RoomMessage): The message object containing the command.
bot (Bot): The bot object.
prefix (str): The command prefix.
config (dict): Configuration parameters.
Returns:
None
"""
match = botlib.MessageMatch(room, message, bot, prefix)
if match.is_not_from_this_bot() and match.prefix() and match.command("shodan"):
logging.info("Received !shodan command")
# Check if API key is configured
if not SHODAN_API_KEY:
await bot.api.send_text_message(
room.room_id,
"Shodan API key not configured. Please set SHODAN_KEY environment variable."
)
logging.error("Shodan API key not configured")
return
args = match.args()
if len(args) < 1:
await show_usage(room, bot)
return
subcommand = args[0].lower()
if subcommand == "ip":
if len(args) < 2:
await bot.api.send_text_message(room.room_id, "Usage: !shodan ip <ip_address>")
return
ip = args[1]
await shodan_ip_lookup(room, bot, ip)
elif subcommand == "search":
if len(args) < 2:
await bot.api.send_text_message(room.room_id, "Usage: !shodan search <query>")
return
query = ' '.join(args[1:])
await shodan_search(room, bot, query)
elif subcommand == "host":
if len(args) < 2:
await bot.api.send_text_message(room.room_id, "Usage: !shodan host <domain/ip>")
return
host = args[1]
await shodan_host(room, bot, host)
elif subcommand == "count":
if len(args) < 2:
await bot.api.send_text_message(room.room_id, "Usage: !shodan count <query>")
return
query = ' '.join(args[1:])
await shodan_count(room, bot, query)
else:
await show_usage(room, bot)
async def show_usage(room, bot):
"""Display Shodan command usage."""
usage = """
<strong>🔍 Shodan Commands:</strong>
<strong>!shodan ip &lt;ip_address&gt;</strong> - Get detailed information about an IP
<strong>!shodan search &lt;query&gt;</strong> - Search Shodan database
<strong>!shodan host &lt;domain/ip&gt;</strong> - Get host information
<strong>!shodan count &lt;query&gt;</strong> - Count results for a search query
<strong>Search Examples:</strong>
• <code>!shodan search apache</code>
• <code>!shodan search "port:22"</code>
• <code>!shodan search "country:US product:nginx"</code>
• <code>!shodan search "net:192.168.1.0/24"</code>
"""
await bot.api.send_markdown_message(room.room_id, usage)
async def shodan_ip_lookup(room, bot, ip):
"""Look up information about a specific IP address."""
try:
url = f"{SHODAN_API_BASE}/shodan/host/{ip}"
params = {"key": SHODAN_API_KEY}
logging.info(f"Fetching Shodan IP info for: {ip}")
response = requests.get(url, params=params, timeout=15)
if response.status_code == 404:
await bot.api.send_text_message(room.room_id, f"No information found for IP: {ip}")
return
elif response.status_code == 401:
await bot.api.send_text_message(room.room_id, "Invalid Shodan API key")
return
elif response.status_code != 200:
await bot.api.send_text_message(room.room_id, f"Shodan API error: {response.status_code}")
return
data = response.json()
# Format the response
output = f"<strong>🔍 Shodan IP Lookup: {ip}</strong><br><br>"
if data.get('country_name'):
output += f"<strong>📍 Location:</strong> {data.get('city', 'N/A')}, {data.get('country_name', 'N/A')}<br>"
if data.get('org'):
output += f"<strong>🏢 Organization:</strong> {data['org']}<br>"
if data.get('os'):
output += f"<strong>💻 Operating System:</strong> {data['os']}<br>"
if data.get('ports'):
output += f"<strong>🔌 Open Ports:</strong> {', '.join(map(str, data['ports']))}<br>"
output += f"<strong>🕒 Last Update:</strong> {data.get('last_update', 'N/A')}<br><br>"
# Show services
if data.get('data'):
output += "<strong>📡 Services:</strong><br>"
for service in data['data'][:5]: # Limit to first 5 services
port = service.get('port', 'N/A')
product = service.get('product', 'Unknown')
version = service.get('version', '')
banner = service.get('data', '')[:100] + "..." if len(service.get('data', '')) > 100 else service.get('data', '')
output += f" • <strong>Port {port}:</strong> {product} {version}<br>"
if banner:
output += f" <em>{banner}</em><br>"
if len(data['data']) > 5:
output += f" • ... and {len(data['data']) - 5} more services<br>"
# Wrap in collapsible if output is large
if len(output) > 500:
output = f"<details><summary><strong>🔍 Shodan IP Lookup: {ip}</strong></summary>{output}</details>"
await bot.api.send_markdown_message(room.room_id, output)
logging.info(f"Sent Shodan IP info for {ip}")
except requests.exceptions.Timeout:
await bot.api.send_text_message(room.room_id, "Shodan API request timed out")
logging.error("Shodan API timeout")
except Exception as e:
await bot.api.send_text_message(room.room_id, f"Error fetching Shodan data: {str(e)}")
logging.error(f"Error in shodan_ip_lookup: {e}")
async def shodan_search(room, bot, query):
"""Search the Shodan database."""
try:
url = f"{SHODAN_API_BASE}/shodan/host/search"
params = {
"key": SHODAN_API_KEY,
"query": query,
"minify": True,
"limit": 5 # Limit results to avoid huge responses
}
logging.info(f"Searching Shodan for: {query}")
response = requests.get(url, params=params, timeout=15)
if response.status_code != 200:
await handle_shodan_error(room, bot, response.status_code)
return
data = response.json()
if not data.get('matches'):
await bot.api.send_text_message(room.room_id, f"No results found for: {query}")
return
output = f"<strong>🔍 Shodan Search: '{query}'</strong><br>"
output += f"<strong>Total Results:</strong> {data.get('total', 0):,}<br><br>"
for match in data['matches'][:5]: # Show first 5 results
ip = match.get('ip_str', 'N/A')
port = match.get('port', 'N/A')
org = match.get('org', 'Unknown')
product = match.get('product', 'Unknown')
output += f"<strong>🌐 {ip}:{port}</strong><br>"
output += f" • <strong>Organization:</strong> {org}<br>"
output += f" • <strong>Service:</strong> {product}<br>"
if match.get('location'):
loc = match['location']
if loc.get('city') and loc.get('country_name'):
output += f" • <strong>Location:</strong> {loc['city']}, {loc['country_name']}<br>"
output += "<br>"
if data.get('total', 0) > 5:
output += f"<em>Showing 5 of {data['total']:,} results. Refine your search for more specific results.</em>"
await bot.api.send_markdown_message(room.room_id, output)
logging.info(f"Sent Shodan search results for: {query}")
except requests.exceptions.Timeout:
await bot.api.send_text_message(room.room_id, "Shodan API request timed out")
logging.error("Shodan API timeout")
except Exception as e:
await bot.api.send_text_message(room.room_id, f"Error searching Shodan: {str(e)}")
logging.error(f"Error in shodan_search: {e}")
async def shodan_host(room, bot, host):
"""Get host information (domain or IP)."""
try:
url = f"{SHODAN_API_BASE}/dns/domain/{host}"
params = {"key": SHODAN_API_KEY}
logging.info(f"Fetching Shodan host info for: {host}")
response = requests.get(url, params=params, timeout=15)
if response.status_code == 404:
# Try IP lookup instead
await shodan_ip_lookup(room, bot, host)
return
elif response.status_code != 200:
await handle_shodan_error(room, bot, response.status_code)
return
data = response.json()
output = f"<strong>🔍 Shodan Host: {host}</strong><br><br>"
if data.get('subdomains'):
output += f"<strong>🌐 Subdomains ({len(data['subdomains'])}):</strong><br>"
for subdomain in sorted(data['subdomains'])[:10]: # Show first 10
output += f"{subdomain}.{host}<br>"
if len(data['subdomains']) > 10:
output += f" • ... and {len(data['subdomains']) - 10} more<br>"
if data.get('tags'):
output += f"<br><strong>🏷️ Tags:</strong> {', '.join(data['tags'])}<br>"
if data.get('data'):
output += f"<br><strong>📊 Records Found:</strong> {len(data['data'])}<br>"
await bot.api.send_markdown_message(room.room_id, output)
logging.info(f"Sent Shodan host info for: {host}")
except requests.exceptions.Timeout:
await bot.api.send_text_message(room.room_id, "Shodan API request timed out")
logging.error("Shodan API timeout")
except Exception as e:
await bot.api.send_text_message(room.room_id, f"Error fetching host info: {str(e)}")
logging.error(f"Error in shodan_host: {e}")
async def shodan_count(room, bot, query):
"""Count results for a search query."""
try:
url = f"{SHODAN_API_BASE}/shodan/host/count"
params = {
"key": SHODAN_API_KEY,
"query": query
}
logging.info(f"Counting Shodan results for: {query}")
response = requests.get(url, params=params, timeout=15)
if response.status_code != 200:
await handle_shodan_error(room, bot, response.status_code)
return
data = response.json()
output = f"<strong>🔍 Shodan Count: '{query}'</strong><br><br>"
output += f"<strong>Total Results:</strong> {data.get('total', 0):,}<br>"
# Show top countries if available
if data.get('facets') and 'country' in data['facets']:
output += "<br><strong>🌍 Top Countries:</strong><br>"
for country in data['facets']['country'][:5]:
output += f"{country['value']}: {country['count']:,}<br>"
# Show top organizations if available
if data.get('facets') and 'org' in data['facets']:
output += "<br><strong>🏢 Top Organizations:</strong><br>"
for org in data['facets']['org'][:5]:
output += f"{org['value']}: {org['count']:,}<br>"
await bot.api.send_markdown_message(room.room_id, output)
logging.info(f"Sent Shodan count for: {query}")
except requests.exceptions.Timeout:
await bot.api.send_text_message(room.room_id, "Shodan API request timed out")
logging.error("Shodan API timeout")
except Exception as e:
await bot.api.send_text_message(room.room_id, f"Error counting Shodan results: {str(e)}")
logging.error(f"Error in shodan_count: {e}")
async def handle_shodan_error(room, bot, status_code):
"""Handle Shodan API errors."""
error_messages = {
401: "Invalid Shodan API key",
403: "Access denied - check API key permissions",
404: "No results found",
429: "Rate limit exceeded - try again later",
500: "Shodan API server error",
503: "Shodan API temporarily unavailable"
}
message = error_messages.get(status_code, f"Shodan API error: {status_code}")
await bot.api.send_text_message(room.room_id, message)
logging.error(f"Shodan API error: {status_code}")

594
plugins/sslscan.py Normal file
View File

@@ -0,0 +1,594 @@
"""
This plugin provides comprehensive SSL/TLS security scanning and analysis.
"""
import logging
import socket
import ssl
import OpenSSL
import datetime
import re
import simplematrixbotlib as botlib
from urllib.parse import urlparse
# SSL/TLS configuration - handle missing protocols in modern Python
TLS_VERSIONS = {
'TLSv1.2': ssl.PROTOCOL_TLSv1_2,
'TLSv1.3': ssl.PROTOCOL_TLS
}
# Try to add older protocols if available (they're removed in modern Python)
try:
TLS_VERSIONS['TLSv1.1'] = ssl.PROTOCOL_TLSv1_1
except AttributeError:
pass
try:
TLS_VERSIONS['TLSv1'] = ssl.PROTOCOL_TLSv1
except AttributeError:
pass
# Cipher suites by strength and category
CIPHER_CATEGORIES = {
'STRONG': [
'TLS_AES_256_GCM_SHA384',
'TLS_CHACHA20_POLY1305_SHA256',
'TLS_AES_128_GCM_SHA256',
'ECDHE-RSA-AES256-GCM-SHA384',
'ECDHE-ECDSA-AES256-GCM-SHA384',
'ECDHE-RSA-CHACHA20-POLY1305',
'ECDHE-ECDSA-CHACHA20-POLY1305',
'DHE-RSA-AES256-GCM-SHA384'
],
'WEAK': [
'RC4',
'DES',
'3DES',
'MD5',
'EXPORT',
'NULL',
'ANON',
'ADH',
'CBC'
],
'OBSOLETE': [
'SSLv2',
'SSLv3'
]
}
async def handle_command(room, message, bot, prefix, config):
"""
Function to handle !sslscan command for comprehensive SSL/TLS analysis.
Args:
room (Room): The Matrix room where the command was invoked.
message (RoomMessage): The message object containing the command.
bot (Bot): The bot object.
prefix (str): The command prefix.
config (dict): Configuration parameters.
Returns:
None
"""
match = botlib.MessageMatch(room, message, bot, prefix)
if match.is_not_from_this_bot() and match.prefix() and match.command("sslscan"):
logging.info("Received !sslscan command")
args = match.args()
if len(args) < 1:
await show_usage(room, bot)
return
target = args[0].strip()
port = 443
# Parse port if provided
if ':' in target:
parts = target.split(':')
target = parts[0]
try:
port = int(parts[1])
except ValueError:
await bot.api.send_text_message(room.room_id, "Invalid port number")
return
await perform_ssl_scan(room, bot, target, port)
async def show_usage(room, bot):
"""Display sslscan command usage."""
usage = """
<strong>🔐 SSL/TLS Security Scanner</strong>
<strong>!sslscan &lt;domain[:port]&gt;</strong> - Comprehensive SSL/TLS security analysis
<strong>Examples:</strong>
• <code>!sslscan example.com</code>
• <code>!sslscan github.com:443</code>
• <code>!sslscan localhost:8443</code>
<strong>Tests Performed:</strong>
• SSL/TLS protocol support and versions
• Certificate chain validation and expiration
• Cipher suite strength and configuration
• Security headers and extensions
• Handshake simulation and vulnerabilities
• PCI DSS compliance checking
• SSL/TLS best practices assessment
"""
await bot.api.send_markdown_message(room.room_id, usage)
async def perform_ssl_scan(room, bot, target, port):
"""Perform comprehensive SSL/TLS security scan."""
try:
await bot.api.send_text_message(room.room_id, f"🔍 Starting comprehensive SSL/TLS scan for {target}:{port}...")
scan_results = {
'target': target,
'port': port,
'certificate': {},
'protocols': {},
'ciphers': {},
'vulnerabilities': [],
'recommendations': [],
'security_score': 0
}
# Test basic connectivity
if not await test_connectivity(target, port):
await bot.api.send_text_message(room.room_id, f"❌ Cannot connect to {target}:{port}")
return
# Perform comprehensive tests
await get_certificate_info(scan_results, target, port)
await test_protocol_support(scan_results, target, port)
await test_cipher_suites(scan_results, target, port)
await check_vulnerabilities(scan_results)
await calculate_security_score(scan_results)
await generate_recommendations(scan_results)
# Format and send results
output = await format_ssl_scan_results(scan_results)
await bot.api.send_markdown_message(room.room_id, output)
logging.info(f"Completed SSL scan for {target}:{port}")
except Exception as e:
await bot.api.send_text_message(room.room_id, f"Error performing SSL scan: {str(e)}")
logging.error(f"Error in perform_ssl_scan: {e}")
async def test_connectivity(target, port):
"""Test basic connectivity to the target."""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(10)
result = sock.connect_ex((target, port))
sock.close()
return result == 0
except:
return False
async def get_certificate_info(scan_results, target, port):
"""Get comprehensive certificate information."""
try:
context = ssl.create_default_context()
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
with socket.create_connection((target, port), timeout=10) as sock:
with context.wrap_socket(sock, server_hostname=target) as ssock:
cert_bin = ssock.getpeercert(binary_form=True)
cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_ASN1, cert_bin)
# Basic certificate info
subject = cert.get_subject()
issuer = cert.get_issuer()
scan_results['certificate'] = {
'subject': {
'common_name': subject.CN,
'organization': subject.O,
'organizational_unit': subject.OU,
'country': subject.C,
'state': subject.ST,
'locality': subject.L
},
'issuer': {
'common_name': issuer.CN,
'organization': issuer.O,
'organizational_unit': issuer.OU
},
'serial_number': cert.get_serial_number(),
'version': cert.get_version(),
'not_before': cert.get_notBefore().decode('utf-8'),
'not_after': cert.get_notAfter().decode('utf-8'),
'signature_algorithm': cert.get_signature_algorithm().decode('utf-8'),
'extensions': []
}
# Parse extensions
for i in range(cert.get_extension_count()):
ext = cert.get_extension(i)
scan_results['certificate']['extensions'].append({
'name': ext.get_short_name().decode('utf-8'),
'value': str(ext)
})
# Calculate days until expiration
not_after = datetime.datetime.strptime(scan_results['certificate']['not_after'], '%Y%m%d%H%M%SZ')
days_until_expiry = (not_after - datetime.datetime.utcnow()).days
scan_results['certificate']['days_until_expiry'] = days_until_expiry
except Exception as e:
scan_results['certificate_error'] = str(e)
async def test_protocol_support(scan_results, target, port):
"""Test support for various SSL/TLS protocols."""
protocols = {
'SSLv2': False,
'SSLv3': False,
'TLSv1': False,
'TLSv1.1': False,
'TLSv1.2': False,
'TLSv1.3': False
}
# Test available protocols
for protocol_name in protocols.keys():
try:
if protocol_name in TLS_VERSIONS:
context = ssl.SSLContext(TLS_VERSIONS[protocol_name])
else:
# For protocols not available in this Python version, assume False
protocols[protocol_name] = False
continue
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
with socket.create_connection((target, port), timeout=5) as sock:
with context.wrap_socket(sock, server_hostname=target) as ssock:
protocols[protocol_name] = True
# Get negotiated protocol
if hasattr(ssock, 'version'):
scan_results['negotiated_protocol'] = ssock.version()
except:
protocols[protocol_name] = False
scan_results['protocols'] = protocols
async def test_cipher_suites(scan_results, target, port):
"""Test supported cipher suites."""
try:
context = ssl.create_default_context()
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
# Get default cipher suites
context.set_ciphers('ALL:COMPLEMENTOFALL')
with socket.create_connection((target, port), timeout=10) as sock:
with context.wrap_socket(sock, server_hostname=target) as ssock:
cipher = ssock.cipher()
scan_results['ciphers'] = {
'negotiated_cipher': cipher[0] if cipher else 'Unknown',
'supported_ciphers': await get_supported_ciphers(target, port),
'weak_ciphers': [],
'strong_ciphers': []
}
except Exception as e:
scan_results['cipher_error'] = str(e)
async def get_supported_ciphers(target, port):
"""Get list of supported cipher suites."""
supported_ciphers = []
# Test common cipher suites
test_ciphers = [
'ECDHE-RSA-AES256-GCM-SHA384',
'ECDHE-ECDSA-AES256-GCM-SHA384',
'ECDHE-RSA-AES256-SHA384',
'ECDHE-ECDSA-AES256-SHA384',
'ECDHE-RSA-AES256-SHA',
'ECDHE-ECDSA-AES256-SHA',
'AES256-GCM-SHA384',
'AES256-SHA256',
'AES256-SHA',
'CAMELLIA256-SHA',
'PSK-AES256-CBC-SHA',
'ECDHE-RSA-AES128-GCM-SHA256',
'ECDHE-ECDSA-AES128-GCM-SHA256',
'ECDHE-RSA-AES128-SHA256',
'ECDHE-ECDSA-AES128-SHA256',
'ECDHE-RSA-AES128-SHA',
'ECDHE-ECDSA-AES128-SHA',
'AES128-GCM-SHA256',
'AES128-SHA256',
'AES128-SHA',
'CAMELLIA128-SHA',
'PSK-AES128-CBC-SHA',
'DES-CBC3-SHA',
'RC4-SHA',
'RC4-MD5'
]
for cipher in test_ciphers:
try:
context = ssl.create_default_context()
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
context.set_ciphers(cipher)
with socket.create_connection((target, port), timeout=5) as sock:
with context.wrap_socket(sock, server_hostname=target) as ssock:
if ssock.cipher():
supported_ciphers.append(cipher)
except:
pass
return supported_ciphers
async def check_vulnerabilities(scan_results):
"""Check for common SSL/TLS vulnerabilities."""
vulnerabilities = []
# Check for weak protocols (these will be False in modern Python, which is good)
if scan_results['protocols'].get('SSLv2', False):
vulnerabilities.append({
'name': 'SSLv2 Support',
'severity': 'CRITICAL',
'description': 'SSLv2 is obsolete and contains critical vulnerabilities',
'cve': 'Multiple CVEs'
})
if scan_results['protocols'].get('SSLv3', False):
vulnerabilities.append({
'name': 'SSLv3 Support',
'severity': 'HIGH',
'description': 'SSLv3 is vulnerable to POODLE attack',
'cve': 'CVE-2014-3566'
})
# Check certificate expiration
cert = scan_results.get('certificate', {})
if cert.get('days_until_expiry', 0) < 30:
vulnerabilities.append({
'name': 'Certificate Expiring Soon',
'severity': 'MEDIUM',
'description': f"Certificate expires in {cert['days_until_expiry']} days",
'cve': 'N/A'
})
# Check for weak ciphers
supported_ciphers = scan_results.get('ciphers', {}).get('supported_ciphers', [])
weak_ciphers_found = []
for cipher in supported_ciphers:
if any(weak in cipher.upper() for weak in CIPHER_CATEGORIES['WEAK']):
weak_ciphers_found.append(cipher)
if weak_ciphers_found:
vulnerabilities.append({
'name': 'Weak Cipher Suites',
'severity': 'HIGH',
'description': f'Weak ciphers supported: {", ".join(weak_ciphers_found[:3])}',
'cve': 'Multiple CVEs'
})
# Check for missing modern protocols
if not scan_results['protocols'].get('TLSv1.2', False):
vulnerabilities.append({
'name': 'TLS 1.2 Not Supported',
'severity': 'HIGH',
'description': 'TLS 1.2 is required for modern security',
'cve': 'N/A'
})
if not scan_results['protocols'].get('TLSv1.3', False):
vulnerabilities.append({
'name': 'TLS 1.3 Not Supported',
'severity': 'MEDIUM',
'description': 'TLS 1.3 provides improved security and performance',
'cve': 'N/A'
})
scan_results['vulnerabilities'] = vulnerabilities
async def calculate_security_score(scan_results):
"""Calculate overall security score."""
score = 100
# Protocol penalties (in modern Python, SSLv2/SSLv3 will be False, which is good)
if scan_results['protocols'].get('SSLv2', False):
score -= 30
if scan_results['protocols'].get('SSLv3', False):
score -= 20
if not scan_results['protocols'].get('TLSv1.2', False):
score -= 15
if not scan_results['protocols'].get('TLSv1.3', False):
score -= 10
# Certificate penalties
cert = scan_results.get('certificate', {})
if cert.get('days_until_expiry', 0) < 30:
score -= 10
if cert.get('days_until_expiry', 0) < 7:
score -= 20
# Cipher penalties
supported_ciphers = scan_results.get('ciphers', {}).get('supported_ciphers', [])
weak_cipher_count = sum(1 for cipher in supported_ciphers
if any(weak in cipher.upper() for weak in CIPHER_CATEGORIES['WEAK']))
score -= min(weak_cipher_count * 5, 25)
# Vulnerability penalties
for vuln in scan_results.get('vulnerabilities', []):
if vuln['severity'] == 'CRITICAL':
score -= 20
elif vuln['severity'] == 'HIGH':
score -= 15
elif vuln['severity'] == 'MEDIUM':
score -= 10
elif vuln['severity'] == 'LOW':
score -= 5
scan_results['security_score'] = max(0, score)
async def generate_recommendations(scan_results):
"""Generate security recommendations."""
recommendations = []
# Protocol recommendations
if scan_results['protocols'].get('SSLv2', False):
recommendations.append("🔴 IMMEDIATELY disable SSLv2 - critically vulnerable")
if scan_results['protocols'].get('SSLv3', False):
recommendations.append("🔴 Disable SSLv3 - vulnerable to POODLE attack")
if not scan_results['protocols'].get('TLSv1.3', False):
recommendations.append("🟡 Enable TLSv1.3 for best security and performance")
# Certificate recommendations
cert = scan_results.get('certificate', {})
if cert.get('days_until_expiry', 0) < 30:
recommendations.append("🟡 Renew SSL certificate - expiring soon")
# Cipher recommendations
supported_ciphers = scan_results.get('ciphers', {}).get('supported_ciphers', [])
weak_ciphers = [c for c in supported_ciphers
if any(weak in c.upper() for weak in CIPHER_CATEGORIES['WEAK'])]
if weak_ciphers:
recommendations.append("🔴 Remove weak cipher suites (RC4, DES, 3DES, NULL)")
# General recommendations
if scan_results['security_score'] < 80:
recommendations.append("🛡️ Implement modern TLS configuration following Mozilla guidelines")
if not any('ECDHE' in c for c in supported_ciphers):
recommendations.append("🟡 Enable Forward Secrecy with ECDHE cipher suites")
# Add note about Python version limitations
recommendations.append(" Note: SSLv2/SSLv3 testing limited by Python security features")
scan_results['recommendations'] = recommendations
async def format_ssl_scan_results(scan_results):
"""Format comprehensive SSL scan results."""
output = f"<strong>🔐 SSL/TLS Security Scan: {scan_results['target']}:{scan_results['port']}</strong><br><br>"
# Security Score
score = scan_results['security_score']
if score >= 90:
score_emoji, rating = "🟢", "Excellent"
elif score >= 80:
score_emoji, rating = "🟡", "Good"
elif score >= 60:
score_emoji, rating = "🟠", "Fair"
else:
score_emoji, rating = "🔴", "Poor"
output += f"<strong>{score_emoji} Security Score: {score}/100 ({rating})</strong><br><br>"
# Certificate Information
cert = scan_results.get('certificate', {})
if cert:
output += "<strong>📜 Certificate Information</strong><br>"
output += f" • <strong>Subject:</strong> {cert.get('subject', {}).get('common_name', 'N/A')}<br>"
output += f" • <strong>Issuer:</strong> {cert.get('issuer', {}).get('common_name', 'N/A')}<br>"
output += f" • <strong>Valid From:</strong> {format_cert_date(cert.get('not_before', ''))}<br>"
output += f" • <strong>Valid Until:</strong> {format_cert_date(cert.get('not_after', ''))}<br>"
output += f" • <strong>Expires In:</strong> {cert.get('days_until_expiry', 'N/A')} days<br>"
output += f" • <strong>Signature Algorithm:</strong> {cert.get('signature_algorithm', 'N/A')}<br>"
output += "<br>"
# Protocol Support
output += "<strong>🔌 Protocol Support</strong><br>"
protocols = scan_results.get('protocols', {})
for proto, supported in protocols.items():
# Handle protocols that can't be tested in this Python version
if proto in ['SSLv2', 'SSLv3'] and proto not in TLS_VERSIONS:
emoji = ""
status = "Cannot test (Python security)"
else:
emoji = "" if supported else ""
# Highlight insecure protocols
if proto in ['SSLv2', 'SSLv3'] and supported:
emoji = "🔴"
elif proto in ['TLSv1.3'] and supported:
emoji = ""
output += f"{emoji} <strong>{proto}:</strong> {status if 'status' in locals() else 'Supported' if supported else 'Not Supported'}<br>"
output += "<br>"
# Cipher Information
ciphers = scan_results.get('ciphers', {})
if ciphers.get('supported_ciphers'):
output += "<strong>🔐 Cipher Suites</strong><br>"
output += f" • <strong>Negotiated:</strong> {ciphers.get('negotiated_cipher', 'Unknown')}<br>"
output += f" • <strong>Total Supported:</strong> {len(ciphers['supported_ciphers'])}<br>"
# Show weak ciphers if any
weak_ciphers = [c for c in ciphers['supported_ciphers']
if any(weak in c.upper() for weak in CIPHER_CATEGORIES['WEAK'])]
if weak_ciphers:
output += f" • <strong>Weak Ciphers:</strong> {len(weak_ciphers)} found<br>"
for cipher in weak_ciphers[:3]:
output += f" └─ 🔴 {cipher}<br>"
# Show strong ciphers if any
strong_ciphers = [c for c in ciphers['supported_ciphers']
if any(strong in c.upper() for strong in CIPHER_CATEGORIES['STRONG'])]
if strong_ciphers:
output += f" • <strong>Strong Ciphers:</strong> {len(strong_ciphers)} found<br>"
output += "<br>"
# Vulnerabilities
vulnerabilities = scan_results.get('vulnerabilities', [])
if vulnerabilities:
output += "<strong>⚠️ Security Vulnerabilities</strong><br>"
for vuln in vulnerabilities[:5]: # Show top 5
severity_emoji = "🔴" if vuln['severity'] == 'CRITICAL' else "🟠" if vuln['severity'] == 'HIGH' else "🟡"
output += f"{severity_emoji} <strong>{vuln['name']}</strong> ({vuln['severity']})<br>"
output += f" └─ {vuln['description']}<br>"
output += "<br>"
# Recommendations
recommendations = scan_results.get('recommendations', [])
if recommendations:
output += "<strong>💡 Security Recommendations</strong><br>"
for rec in recommendations[:8]:
output += f"{rec}<br>"
output += "<br>"
# Quick Assessment
output += "<strong>📊 Quick Assessment</strong><br>"
if score >= 90:
output += " • ✅ Excellent TLS configuration<br>"
output += " • ✅ Modern protocols and ciphers<br>"
output += " • ✅ Good certificate management<br>"
elif score >= 70:
output += " • ⚠️ Good configuration with minor issues<br>"
output += " • 🔧 Some improvements recommended<br>"
else:
output += " • 🚨 Significant security issues found<br>"
output += " • 🔴 Immediate action required<br>"
# Add note about testing limitations
output += "<br><em> Note: Some protocol tests limited by Python security features</em>"
# Always wrap in collapsible due to comprehensive output
output = f"<details><summary><strong>🔐 SSL/TLS Scan: {scan_results['target']}:{scan_results['port']} (Score: {score}/100)</strong></summary>{output}</details>"
return output
def format_cert_date(date_str):
"""Format certificate date string for display."""
try:
if date_str:
dt = datetime.datetime.strptime(date_str, '%Y%m%d%H%M%SZ')
return dt.strftime('%Y-%m-%d %H:%M:%S UTC')
except:
pass
return date_str

442
plugins/sysinfo.py Normal file
View File

@@ -0,0 +1,442 @@
"""
This plugin provides comprehensive system information and resource monitoring.
"""
import logging
import platform
import os
import psutil
import socket
import datetime
import simplematrixbotlib as botlib
import subprocess
import sys
async def handle_command(room, message, bot, prefix, config):
"""
Function to handle !sysinfo command for system information.
Args:
room (Room): The Matrix room where the command was invoked.
message (RoomMessage): The message object containing the command.
bot (Bot): The bot object.
prefix (str): The command prefix.
config (dict): Configuration parameters.
Returns:
None
"""
match = botlib.MessageMatch(room, message, bot, prefix)
if match.is_not_from_this_bot() and match.prefix() and match.command("sysinfo"):
logging.info("Received !sysinfo command")
args = match.args()
if len(args) > 0 and args[0].lower() == 'help':
await show_usage(room, bot)
return
await get_system_info(room, bot)
async def show_usage(room, bot):
"""Display sysinfo command usage."""
usage = """
<strong>💻 System Information Plugin</strong>
<strong>!sysinfo</strong> - Display comprehensive system information
<strong>!sysinfo help</strong> - Show this help message
<strong>Information Provided:</strong>
• System hardware (CPU, RAM, storage, GPU)
• Operating system and kernel details
• Network configuration and interfaces
• Running processes and resource usage
• Temperature and hardware sensors
• System load and performance metrics
• Docker container status (if available)
"""
await bot.api.send_markdown_message(room.room_id, usage)
async def get_system_info(room, bot):
"""Collect and display comprehensive system information."""
try:
await bot.api.send_text_message(room.room_id, "🔍 Gathering system information...")
sysinfo = {
'system': await get_system_info_basic(),
'cpu': await get_cpu_info(),
'memory': await get_memory_info(),
'storage': await get_storage_info(),
'network': await get_network_info(),
'processes': await get_process_info(),
'docker': await get_docker_info(),
'sensors': await get_sensor_info(),
'gpu': await get_gpu_info()
}
output = await format_system_info(sysinfo)
await bot.api.send_markdown_message(room.room_id, output)
logging.info("Sent system information")
except Exception as e:
await bot.api.send_text_message(room.room_id, f"Error gathering system info: {str(e)}")
logging.error(f"Error in get_system_info: {e}")
async def get_system_info_basic():
"""Get basic system information."""
try:
return {
'hostname': socket.gethostname(),
'os': platform.system(),
'os_release': platform.release(),
'os_version': platform.version(),
'architecture': platform.architecture()[0],
'machine': platform.machine(),
'processor': platform.processor(),
'boot_time': datetime.datetime.fromtimestamp(psutil.boot_time()).strftime("%Y-%m-%d %H:%M:%S"),
'uptime': str(datetime.timedelta(seconds=psutil.boot_time() - datetime.datetime.now().timestamp())).split('.')[0],
'users': len(psutil.users())
}
except Exception as e:
return {'error': str(e)}
async def get_cpu_info():
"""Get CPU information and usage."""
try:
cpu_times = psutil.cpu_times_percent(interval=1)
cpu_freq = psutil.cpu_freq()
return {
'physical_cores': psutil.cpu_count(logical=False),
'total_cores': psutil.cpu_count(logical=True),
'max_frequency': f"{cpu_freq.max:.1f} MHz" if cpu_freq else "N/A",
'current_frequency': f"{cpu_freq.current:.1f} MHz" if cpu_freq else "N/A",
'usage_percent': psutil.cpu_percent(interval=1),
'user_time': cpu_times.user,
'system_time': cpu_times.system,
'idle_time': cpu_times.idle,
'load_avg': os.getloadavg() if hasattr(os, 'getloadavg') else "N/A"
}
except Exception as e:
return {'error': str(e)}
async def get_memory_info():
"""Get memory and swap information."""
try:
memory = psutil.virtual_memory()
swap = psutil.swap_memory()
return {
'total': f"{memory.total / (1024**3):.2f} GB",
'available': f"{memory.available / (1024**3):.2f} GB",
'used': f"{memory.used / (1024**3):.2f} GB",
'usage_percent': memory.percent,
'swap_total': f"{swap.total / (1024**3):.2f} GB",
'swap_used': f"{swap.used / (1024**3):.2f} GB",
'swap_free': f"{swap.free / (1024**3):.2f} GB",
'swap_percent': swap.percent
}
except Exception as e:
return {'error': str(e)}
async def get_storage_info():
"""Get storage device information."""
try:
partitions = psutil.disk_partitions()
storage_info = []
for partition in partitions:
try:
usage = psutil.disk_usage(partition.mountpoint)
storage_info.append({
'device': partition.device,
'mountpoint': partition.mountpoint,
'fstype': partition.fstype,
'total': f"{usage.total / (1024**3):.2f} GB",
'used': f"{usage.used / (1024**3):.2f} GB",
'free': f"{usage.free / (1024**3):.2f} GB",
'percent': usage.percent
})
except:
continue
# Get disk I/O statistics
disk_io = psutil.disk_io_counters()
io_info = {
'read_count': disk_io.read_count if disk_io else 0,
'write_count': disk_io.write_count if disk_io else 0,
'read_bytes': f"{disk_io.read_bytes / (1024**3):.2f} GB" if disk_io else "0 GB",
'write_bytes': f"{disk_io.write_bytes / (1024**3):.2f} GB" if disk_io else "0 GB"
}
return {
'partitions': storage_info,
'io_stats': io_info
}
except Exception as e:
return {'error': str(e)}
async def get_network_info():
"""Get network interface information."""
try:
interfaces = psutil.net_if_addrs()
io_counters = psutil.net_io_counters(pernic=True)
network_info = []
for interface, addrs in interfaces.items():
if interface not in ['lo']: # Skip loopback
interface_io = io_counters.get(interface, None)
network_info.append({
'interface': interface,
'ipv4': next((addr.address for addr in addrs if addr.family == socket.AF_INET), 'N/A'),
'ipv6': next((addr.address for addr in addrs if addr.family == socket.AF_INET6), 'N/A'),
'mac': next((addr.address for addr in addrs if addr.family == psutil.AF_LINK), 'N/A'),
'bytes_sent': f"{interface_io.bytes_sent / (1024**2):.2f} MB" if interface_io else "N/A",
'bytes_recv': f"{interface_io.bytes_recv / (1024**2):.2f} MB" if interface_io else "N/A"
})
return network_info
except Exception as e:
return {'error': str(e)}
async def get_process_info():
"""Get process and system load information."""
try:
processes = []
for proc in psutil.process_iter(['pid', 'name', 'cpu_percent', 'memory_percent']):
try:
processes.append(proc.info)
except (psutil.NoSuchProcess, psutil.AccessDenied):
continue
# Sort by CPU usage and get top 5
top_processes = sorted(processes, key=lambda x: x['cpu_percent'] or 0, reverse=True)[:5]
return {
'total_processes': len(processes),
'top_cpu': top_processes
}
except Exception as e:
return {'error': str(e)}
async def get_docker_info():
"""Get Docker container information if available."""
try:
# Check if docker is available
result = subprocess.run(['docker', '--version'], capture_output=True, text=True)
if result.returncode != 0:
return {'available': False}
# Get running containers
result = subprocess.run(['docker', 'ps', '--format', '{{.Names}}|{{.Status}}|{{.Ports}}'],
capture_output=True, text=True)
containers = []
for line in result.stdout.strip().split('\n'):
if line:
parts = line.split('|')
if len(parts) >= 2:
containers.append({
'name': parts[0],
'status': parts[1],
'ports': parts[2] if len(parts) > 2 else 'N/A'
})
return {
'available': True,
'containers': containers,
'total_running': len(containers)
}
except Exception as e:
return {'available': False, 'error': str(e)}
async def get_sensor_info():
"""Get hardware sensor information."""
try:
temperatures = psutil.sensors_temperatures()
fans = psutil.sensors_fans()
battery = psutil.sensors_battery()
sensor_info = {
'temperatures': {},
'fans': {},
'battery': {}
}
# Temperature sensors
if temperatures:
for name, entries in temperatures.items():
sensor_info['temperatures'][name] = [
f"{entry.current}°C" for entry in entries[:2] # Show first 2 sensors per type
]
# Fan speeds
if fans:
for name, entries in fans.items():
sensor_info['fans'][name] = [
f"{entry.current} RPM" for entry in entries[:2]
]
# Battery information
if battery:
sensor_info['battery'] = {
'percent': battery.percent,
'power_plugged': battery.power_plugged,
'time_left': f"{battery.secsleft // 3600}h {(battery.secsleft % 3600) // 60}m" if battery.secsleft != psutil.POWER_TIME_UNLIMITED else "Unknown"
}
return sensor_info
except Exception as e:
return {'error': str(e)}
async def get_gpu_info():
"""Get GPU information using various methods."""
try:
gpu_info = {}
# Try nvidia-smi first
try:
result = subprocess.run(['nvidia-smi', '--query-gpu=name,memory.total,memory.used,memory.free,temperature.gpu,utilization.gpu',
'--format=csv,noheader,nounits'], capture_output=True, text=True)
if result.returncode == 0:
nvidia_gpus = []
for line in result.stdout.strip().split('\n'):
if line:
parts = [part.strip() for part in line.split(',')]
if len(parts) >= 6:
nvidia_gpus.append({
'name': parts[0],
'memory_total': f"{parts[1]} MB",
'memory_used': f"{parts[2]} MB",
'memory_free': f"{parts[3]} MB",
'temperature': f"{parts[4]}°C",
'utilization': f"{parts[5]}%"
})
gpu_info['nvidia'] = nvidia_gpus
except:
pass
# Try lspci for generic GPU detection
try:
result = subprocess.run(['lspci'], capture_output=True, text=True)
if result.returncode == 0:
gpu_lines = [line for line in result.stdout.split('\n') if 'VGA' in line or '3D' in line]
gpu_info['detected'] = gpu_lines[:3] # Show first 3 GPUs
except:
pass
return gpu_info
except Exception as e:
return {'error': str(e)}
async def format_system_info(sysinfo):
"""Format system information for display."""
output = "<strong>💻 System Information</strong><br><br>"
# System Overview
system = sysinfo.get('system', {})
output += "<strong>🖥️ System Overview</strong><br>"
output += f" • <strong>Hostname:</strong> {system.get('hostname', 'N/A')}<br>"
output += f" • <strong>OS:</strong> {system.get('os', 'N/A')} {system.get('os_release', '')}<br>"
output += f" • <strong>Architecture:</strong> {system.get('architecture', 'N/A')}<br>"
output += f" • <strong>Uptime:</strong> {system.get('uptime', 'N/A')}<br>"
output += f" • <strong>Boot Time:</strong> {system.get('boot_time', 'N/A')}<br>"
output += f" • <strong>Users:</strong> {system.get('users', 'N/A')}<br>"
output += "<br>"
# CPU Information
cpu = sysinfo.get('cpu', {})
if 'error' not in cpu:
output += "<strong>⚡ CPU Information</strong><br>"
output += f" • <strong>Cores:</strong> {cpu.get('physical_cores', 'N/A')} physical, {cpu.get('total_cores', 'N/A')} logical<br>"
output += f" • <strong>Frequency:</strong> {cpu.get('current_frequency', 'N/A')} (max: {cpu.get('max_frequency', 'N/A')})<br>"
output += f" • <strong>Usage:</strong> {cpu.get('usage_percent', 'N/A')}%<br>"
if cpu.get('load_avg') != "N/A":
output += f" • <strong>Load Average:</strong> {', '.join([f'{load:.2f}' for load in cpu.get('load_avg', [0,0,0])])}<br>"
output += "<br>"
# Memory Information
memory = sysinfo.get('memory', {})
if 'error' not in memory:
output += "<strong>🧠 Memory Information</strong><br>"
output += f" • <strong>Total:</strong> {memory.get('total', 'N/A')}<br>"
output += f" • <strong>Used:</strong> {memory.get('used', 'N/A')} ({memory.get('usage_percent', 'N/A')}%)<br>"
output += f" • <strong>Available:</strong> {memory.get('available', 'N/A')}<br>"
output += f" • <strong>Swap:</strong> {memory.get('swap_used', 'N/A')} / {memory.get('swap_total', 'N/A')} ({memory.get('swap_percent', 'N/A')}%)<br>"
output += "<br>"
# Storage Information
storage = sysinfo.get('storage', {})
if 'error' not in storage:
output += "<strong>💾 Storage Information</strong><br>"
partitions = storage.get('partitions', [])
for partition in partitions[:3]: # Show first 3 partitions
output += f" • <strong>{partition.get('device', 'N/A')}:</strong> {partition.get('used', 'N/A')} / {partition.get('total', 'N/A')} ({partition.get('percent', 'N/A')}%)<br>"
output += "<br>"
# GPU Information
gpu = sysinfo.get('gpu', {})
if gpu.get('nvidia'):
output += "<strong>🎮 GPU Information (NVIDIA)</strong><br>"
for gpu_info in gpu['nvidia']:
output += f" • <strong>{gpu_info.get('name', 'N/A')}:</strong> {gpu_info.get('utilization', 'N/A')} usage, {gpu_info.get('temperature', 'N/A')}<br>"
output += "<br>"
elif gpu.get('detected'):
output += "<strong>🎮 GPU Information</strong><br>"
for gpu_line in gpu['detected'][:2]:
output += f"{gpu_line}<br>"
output += "<br>"
# Network Information
network = sysinfo.get('network', [])
if network and 'error' not in network:
output += "<strong>🌐 Network Information</strong><br>"
for interface in network[:2]: # Show first 2 interfaces
output += f" • <strong>{interface.get('interface', 'N/A')}:</strong> {interface.get('ipv4', 'N/A')}<br>"
output += "<br>"
# Process Information
processes = sysinfo.get('processes', {})
if 'error' not in processes:
output += "<strong>🔄 Top Processes (by CPU)</strong><br>"
for proc in processes.get('top_cpu', [])[:3]:
output += f" • <strong>{proc.get('name', 'N/A')}:</strong> {proc.get('cpu_percent', 0):.1f}% CPU, {proc.get('memory_percent', 0):.1f}% RAM<br>"
output += f" • <strong>Total Processes:</strong> {processes.get('total_processes', 'N/A')}<br>"
output += "<br>"
# Docker Information
docker = sysinfo.get('docker', {})
if docker.get('available'):
output += "<strong>🐳 Docker Containers</strong><br>"
for container in docker.get('containers', [])[:3]:
output += f" • <strong>{container.get('name', 'N/A')}:</strong> {container.get('status', 'N/A')}<br>"
output += f" • <strong>Total Running:</strong> {docker.get('total_running', 'N/A')}<br>"
output += "<br>"
# Sensor Information
sensors = sysinfo.get('sensors', {})
if 'error' not in sensors:
if sensors.get('temperatures'):
output += "<strong>🌡️ Temperature Sensors</strong><br>"
for sensor, temps in list(sensors['temperatures'].items())[:2]:
output += f" • <strong>{sensor}:</strong> {', '.join(temps[:2])}<br>"
output += "<br>"
if sensors.get('battery'):
battery = sensors['battery']
output += "<strong>🔋 Battery Information</strong><br>"
output += f" • <strong>Charge:</strong> {battery.get('percent', 'N/A')}%<br>"
output += f" • <strong>Plugged In:</strong> {'Yes' if battery.get('power_plugged') else 'No'}<br>"
if battery.get('time_left'):
output += f" • <strong>Time Left:</strong> {battery.get('time_left', 'N/A')}<br>"
output += "<br>"
# Add timestamp
output += f"<em>Last updated: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}</em>"
# Wrap in collapsible due to comprehensive output
output = f"<details><summary><strong>💻 System Information - {system.get('hostname', 'Unknown')}</strong></summary>{output}</details>"
return output

View File

@@ -11,3 +11,5 @@ dnspython
croniter
schedule
yt-dlp
pyopenssl
psutil