Compare commits

..

21 Commits

Author SHA1 Message Date
bb6f6c15f6 DNSDumpster plugin added 2025-10-16 14:30:30 -05:00
428d21d884 New plugins help updated 2025-10-16 12:06:33 -05:00
5ace1083f1 New plugins 2025-10-16 11:55:31 -05:00
8eb21d49da Help update 2025-10-16 01:14:49 -05:00
b63cccca7a Help update 2025-10-16 01:11:02 -05:00
501df0ad3d Help updated 2025-10-16 00:55:00 -05:00
4efb3e745e Help updated 2025-10-16 00:52:08 -05:00
25d84ed392 Help updated 2025-10-16 00:46:09 -05:00
89fc69557a Urban Dictionary plugin added 2025-10-16 00:38:47 -05:00
5a320014b9 README update 2025-10-15 23:31:03 -05:00
dbd93583c1 Youtube preview now use yt-dlp 2025-10-15 22:45:20 -05:00
2d4ff9c1e2 Display weather on single line 2025-10-15 22:05:13 -05:00
712715b174 Delete cron.db 2025-10-15 21:59:41 -05:00
6f923e79bd Remove cron.db 2025-10-15 21:58:40 -05:00
e575598772 Weather plugin added 2025-10-15 21:55:44 -05:00
2feeb339f2 Reduced some vuln after SAST scan 2025-10-14 17:56:06 -05:00
5d746027e2 unload plugin loadplugin.py. youtube-preview now gets lyrics. 2024-03-10 05:58:21 -06:00
d2acef611b Update plugins, refactor proxy plugin, yt preview now gets lyrics 2024-03-04 00:53:58 -07:00
e8186c9fec Plugin clean up 2024-03-03 16:52:04 -07:00
543e139ca0 removed standalone ai plugins 2024-03-03 16:35:07 -07:00
551c5ddc02 Updated plugins. AI plugin updated 2024-03-03 16:34:39 -07:00
26 changed files with 3783 additions and 367 deletions

3
.gitignore vendored
View File

@@ -9,4 +9,5 @@ simplematrixbotlib*/
chromedriver
store
funguybot.service
stats.db
stats.db
cron.db

232
README.md
View File

@@ -25,7 +25,7 @@ Run the installation script
`source venv/bin/activate`
2. Clone the repository:
`git clone https://git.stoned.io/hash/FunguyBot`
`git clone https://gitlab.com/Eggzy/funguybot.git`
3. Apply the patch
`cp api.py.patch simplematrixbotlib`
@@ -42,6 +42,7 @@ Create/Edit `.env` file in the root directory of the bot and add the following v
MATRIX_URL="https://matrix.org" (or another homeserver)
MATRIX_USER=""
MATRIX_PASS=""
OPENWEATHER_API_KEY="" # Optional: For weather plugin
```
4. Create systemd.service
@@ -88,50 +89,217 @@ To use the bot, invite it to a Matrix room and interact with it by sending comma
- `!funguy <prompt>` Talk to the Tech AI LLM
- `!music <prompt>` Talk to the music knowledge LLM
- `!yt <search terms>` Search Youtube
- `!weather New York` Get Weather information
- `!ud <term>` Get Urban Dictionary definition
- `!help` Get Help
For a complete list of available commands and their descriptions, use the `!commands` command.
# 🍄 Funguy Bot Commands 🍄
🃏 **!fortune**
Returns a random fortune message.
Executes the `/usr/games/fortune` utility and sends the output as a message to the chat room.
**!date**
Displays the current date and time.
Fetches the current date and time using Python's `datetime` module and sends it in a formatted message to the chat room.
## Plugin Documentation
💻 **!proxy**
Retrieves a tested/working random SOCKS5 proxy.
Fetches a list of SOCKS5 proxies, tests their availability, and sends the first working proxy to the chat room.
### Core Commands
📶 **!isup <domain/ip>**
Checks if the specified domain or IP address is reachable.
Checks if the specified domain or IP address is reachable by attempting to ping it. If DNS resolution is successful, it checks HTTP and HTTPS service availability by sending requests to the domain.
**🍄 !help**
Displays comprehensive help documentation for all available commands with usage examples.
**!karma <user>**
Retrieves the karma points for the specified user.
Retrieves the karma points for the specified user from a database and sends them as a message to the chat room.
**🔌 !plugins**
Lists all loaded plugins along with their descriptions.
**!karma user up**
Increases the karma points for the specified user by 1.
Increases the karma points for the specified user by 1 in the database and sends the updated points as a message to the chat room.
**⏰ !date**
Displays the current date and time with proper ordinal formatting.
**!karma user down**
Decreases the karma points for the specified user by 1.
Decreases the karma points for the specified user by 1 in the database and sends the updated points as a message to the chat room.
**🃏 !fortune**
Returns a random fortune message using the fortune command.
📄 **!funguy [prompt]**
An AI large language model designed to serve as a chatbot within a vibrant and diverse community chat room hosted on the Matrix platform.
(Currently loaded model: **TheBloke_Mistral-7B-Instruct-v0.2-GPTQ**
### Utility Commands
🎝 **!music [prompt]**
Your music expert! Try it out.
**💻 !proxy**
Retrieves and tests random SOCKS5 proxies from public sources, showing latency and caching working proxies.
# 🌟 Funguy Bot Credits 🌟
**📶 !isup [domain/ip]**
Checks if a website or server is reachable, including DNS resolution and HTTP/HTTPS service checks.
🧙‍♂️ Creator & Developer:
- Hash Borgir is the author of 🍄Funguy Bot🍄. (@hashborgir:mozilla.org)
**☯ !karma [user] [up/down]**
Manages karma points for users. View karma with `!karma user`, increase with `!karma user up`, decrease with `!karma user down`.
# Join our Matrix Room:
**🌧️ !weather [location]**
Fetches current weather information for any location using OpenWeatherMap API.
*Requires OPENWEATHER_API_KEY environment variable*
[Self-hosting | Security | Sysadmin | Homelab | Programming](https://chat.mozilla.org/#/room/#selfhosting:mozilla.org)
**📖 !ud [term] [index]**
Fetches definitions from Urban Dictionary. Use without arguments for random definition, or specify term and optional index.
**🔍 !dns [domain]**
Performs comprehensive DNS reconnaissance on a domain. Shows A, AAAA, MX, NS, TXT, CNAME, SOA, and other DNS records.
**💰 !btc**
Fetches the current Bitcoin price in USD from bitcointicker.co API.
### 🔍 Shodan Security Research
**📡 !shodan [command] [query]**
Shodan.io integration for security reconnaissance and threat intelligence.
**Commands:**
- `!shodan ip <ip_address>` - Detailed IP information (services, ports, banners)
- `!shodan search <query>` - Search Shodan database with filters
- `!shodan host <domain>` - Host information and subdomain enumeration
- `!shodan count <query>` - Count results with geographic/organization breakdown
- `!shodan test` - Test API connection and debug queries
**Search Examples:**
```bash
!shodan search apache
!shodan search "port:22 country:US"
!shodan search "product:nginx city:'New York'"
!shodan search "net:192.168.1.0/24"
!shodan search "vuln:cve-2021-44228"
!shodan search "http.title:'phpMyAdmin'"
!shodan search "ssl.cert.subject.cn:'example.com'"
Common Search Filters:
country:US - Filter by country
city:"New York" - Filter by city
port:80,443,8080 - Filter by ports
product:nginx - Filter by service/product
os:Windows - Filter by operating system
org:"Google" - Filter by organization
net:192.168.0.0/16 - Filter by network range
has_ssl:true - Has SSL certificate
http.title:"admin" - HTTP page title contains
```
### AI & Generation Commands
**🤖 AI Commands (!tech, !music, !eth, etc.)**
Multiple AI model commands that interface with local AI API. Each command uses specialized prompts for different domains:
- `!tech` - Technology assistance
- `!music` - Music knowledge and recommendations
- `!weather` - Weather information
- And 100+ other specialized AI commands
**📸 !sd [prompt] [options]**
Generates images using self-hosted Stable Diffusion with customizable parameters:
- `--steps` - Number of generation steps (default: 4)
- `--cfg` - CFG scale (default: 2)
- `--h` - Image height (default: 512)
- `--w` - Image width (default: 512)
- `--neg` - Negative prompt
- `--sampler` - Sampler name (default: DPM++ SDE)
**📄 !text [prompt] [options]**
Generates text using Ollama's Mistral 7B Instruct model:
- `--max_tokens` - Maximum tokens to generate (default: 512)
- `--temperature` - Sampling temperature (default: 0.7)
### Media & Search Commands
**🎬 YouTube Commands**
- Automatic preview when YouTube links are posted
- `!yt [search terms]` - Search for YouTube videos
- Shows video info, description, and attempts to fetch lyrics
**📰 !xkcd**
Fetches and displays a random XKCD comic.
### Administration Commands
*Admin only - requires admin_user configuration*
**🔧 !set [option] [value]**
Set configuration options (admin_user, prefix)
**🔍 !get [option]**
Get configuration values
**💾 !saveconf**
Save current configuration
**📥 !loadconf**
Load saved configuration
**👁️ !show**
Display current configuration
**🔄 !reset**
Reset configuration to defaults
**📤 !load [plugin]**
Load a plugin
**📥 !unload [plugin]**
Unload a plugin
**🔄 !reload**
Reload all plugins
**🚫 !disable [plugin] [room_id]**
Disable a plugin for specific room
**✅ !enable [plugin] [room_id]**
Enable a plugin for specific room
**⚙️ !rehash**
Reload configuration
### Cron System
**⏱️ !cron [add|remove] [room_id] [cron_entry] [command]**
Schedule automated commands using cron syntax:
- `add` - Add a new cron job
- `remove` - Remove an existing cron job
## Full AI Command List
The bot includes over 100 specialized AI commands covering various domains:
**Creative & Writing**: !write, !script, !author, !poem, !rap, !story, !comic, !motiv, !debate
**Technical**: !tech, !dev, !py, !php, !regex, !math, !web, !it, !security, !ai, !ml, !data, !game
**Professional**: !seo, !recruit, !coach, !devrel, !sales, !ceo, !mgmt, !startup, !invest, !fin
**Educational**: !tutor, !teach, !edu, !acad, !hist, !astro, !chem, !math, !psych
**Lifestyle**: !fit, !health, !diet, !cook, !travel, !art, !music, !film, !gaming
**Specialized**: !legal, !medical, !realest, !auto, !fashion, !design, !interior
And many more! Use `!help` in chat to see the complete list with descriptions.
## Configuration
The bot uses a TOML configuration file (`funguy.conf`) for settings:
- `admin_user` - Matrix user ID with admin privileges
- `prefix` - Command prefix (default: "!")
- Plugin-specific settings in `plugins/ai.json` for AI commands
## Dependencies
- Python 3.7+
- simplematrixbotlib
- Various AI/ML services (Stable Diffusion, Ollama, etc.)
- Database support (SQLite)
- External APIs (OpenWeatherMap, Urban Dictionary, YouTube)
## Troubleshooting
- Ensure all environment variables are set correctly
- Check that required services are running (Stable Diffusion API, Ollama, etc.)
- Verify plugin permissions and whitelist settings
- Check logs for detailed error information
## Support
Join our Matrix room for support and community:
[Self-hosting | Security | Sysadmin | Homelab | Programming](https://matrix.to/#/#selfhosting:mozilla.org)
## Credits
**🧙‍♂️ Creator & Developer**: HB (@hashborgir:mozilla.org)
**🍄 Funguy Bot** - Created during recovery from cervical spinal surgery
---
*Note: This bot was created rapidly and may contain bugs. Please report issues and contribute improvements!*

View File

@@ -12,5 +12,6 @@ prefix = "!"
config_file = "funguy.conf"
[plugins.disabled]
"!uFhErnfpYhhlauJsNK:matrix.org" = [ "youtube-preview", "ai-tech", "ai-music", "proxy",]
"!uFhErnfpYhhlauJsNK:matrix.org" = [ "youtube-preview", "ai", "proxy",]
"!vYcfWXpPvxeQvhlFdV:matrix.org" = []
"!NXdVjDXPxXowPkrJJY:matrix.org" = [ "karma",]

View File

@@ -17,6 +17,14 @@ import toml # Library for parsing TOML configuration files
# Importing FunguyConfig class from plugins.config module
from plugins.config import FunguyConfig
# Whitelist of allowed plugins to prevent arbitrary code execution
ALLOWED_PLUGINS = {
'ai', 'config', 'cron', 'date', 'fortune', 'help', 'isup', 'karma',
'loadplugin', 'plugins', 'proxy', 'sd_text', 'stable-diffusion',
'xkcd', 'youtube-preview', 'youtube-search', 'weather', 'urbandictionary',
'bitcoin', 'dns', 'shodan', 'dnsdumpster'
}
class FunguyBot:
"""
A bot class for managing plugins and handling commands in a Matrix chat environment.
@@ -78,17 +86,22 @@ class FunguyBot:
"""
Method to load plugins from the specified directory.
"""
# Iterating through files in the plugins directory
for plugin_file in os.listdir(self.PLUGINS_DIR):
if plugin_file.endswith(".py"): # Checking if file is a Python file
plugin_name = os.path.splitext(plugin_file)[0] # Extracting plugin name
try:
# Importing plugin module dynamically
module = importlib.import_module(f"{self.PLUGINS_DIR}.{plugin_name}")
self.PLUGINS[plugin_name] = module # Storing loaded plugin module
logging.info(f"Loaded plugin: {plugin_name}") # Logging successful plugin loading
except Exception as e:
logging.error(f"Error loading plugin {plugin_name}: {e}") # Logging error if plugin loading fails
# Iterating through whitelisted plugins only
for plugin_name in ALLOWED_PLUGINS:
plugin_file = os.path.join(self.PLUGINS_DIR, f"{plugin_name}.py")
# Verify that the plugin file exists
if not os.path.isfile(plugin_file):
logging.warning(f"Plugin file not found: {plugin_file}, skipping")
continue
try:
# Importing plugin module dynamically with validated plugin name
module = importlib.import_module(f"{self.PLUGINS_DIR}.{plugin_name}")
self.PLUGINS[plugin_name] = module # Storing loaded plugin module
logging.info(f"Loaded plugin: {plugin_name}") # Logging successful plugin loading
except Exception as e:
logging.error(f"Error loading plugin {plugin_name}: {e}") # Logging error if plugin loading fails
def reload_plugins(self):
"""
@@ -233,3 +246,10 @@ class FunguyBot:
if __name__ == "__main__":
bot = FunguyBot() # Creating instance of FunguyBot
bot.run() # Running the bot
from plugins import cron # Import your cron plugin
# After bot starts running, periodically check for cron jobs
while True:
asyncio.sleep(60) # Check every minute (adjust as needed)
cron.run_cron_jobs(bot) # Check and execute cron jobs

View File

@@ -1,78 +0,0 @@
"""
This plugin provides a command to interact with the music knowledge A.I.
"""
# plugins/ai-music.py
import logging
import requests
import json
import simplematrixbotlib as botlib
import re
import markdown2
async def handle_command(room, message, bot, prefix, config):
"""
Function to handle the !music command.
Args:
room (Room): The Matrix room where the command was invoked.
message (RoomMessage): The message object containing the command.
bot (Bot): The bot object.
PREFIX (str): The command prefix.
Returns:
None
"""
match = botlib.MessageMatch(room, message, bot, prefix)
if match.is_not_from_this_bot() and match.prefix() and match.command("music"):
logging.info("Received !music command")
args = match.args()
if len(args) < 1:
await bot.api.send_text_message(room.room_id, "Usage: !music [prompt]")
logging.info("Sent usage message to the room")
return
prompt = ' '.join(args)
# Prepare data for the API request
url = "http://127.0.0.1:5000/v1/completions"
headers = {
"Content-Type": "application/json"
}
data = {
"prompt": "<s>[INST]You are FunguyFlows, an AI music expert bot with access to a vast repository of knowledge encompassing every facet of music, spanning genres, artists, bands, compositions, albums, lyrics, music theory, composition techniques, composers, music history, music appreciation, and more. Your role is to serve as an invaluable resource and guide within the realm of music, offering comprehensive insights, recommendations, and assistance to music enthusiasts, students, professionals, and curious minds alike.Drawing upon humanity's collective knowledge and expertise in music, your database contains a wealth of information sourced from authoritative texts, scholarly articles, historical archives, musical compositions, biographies, discographies, and cultural repositories. This rich repository enables you to provide accurate, detailed, and insightful responses to a wide range of inquiries, covering an extensive array of topics related to music theory, composition, performance, history, and appreciation.As an AI music expert, your knowledge extends across various genres, including classical, jazz, rock, pop, hip-hop, electronic, folk, world music, and beyond. You possess a deep understanding of musical concepts such as melody, harmony, rhythm, timbre, form, dynamics, and texture, allowing you to analyze and interpret musical compositions with precision and clarity.In addition to your expertise in music theory and composition, you are well-versed in the works of renowned composers throughout history, from the classical masters of Bach, Mozart, and Beethoven to contemporary innovators like John Williams, Philip Glass, and Hans Zimmer. You can provide detailed biographical information, analysis of their compositions, and insights into their lasting impact on the world of music.Your knowledge of music history is extensive, spanning centuries of cultural evolution and musical innovation. From the Gregorian chants of the medieval period to the avant-garde experiments of the 20th century, you can trace the development of musical styles, movements, and traditions across different regions and epochs, shedding light on the social, political, and artistic contexts that shaped musical expression throughout history.Furthermore, your expertise encompasses a diverse range of topics related to music appreciation, including techniques for active listening, critical analysis of musical performances, understanding musical genres and styles, exploring the cultural significance of music, and engaging with music as a form of creative expression, emotional communication, and cultural identity.Whether users seek recommendations for discovering new artists and albums, assistance with analyzing musical compositions, insights into music theory concepts, guidance on composing their own music, or historical context for understanding musical traditions, you are poised to provide informative, engaging, and enriching responses tailored to their interests and inquiries.As an AI music expert bot, your mission is to inspire curiosity, deepen understanding, and foster appreciation for the diverse and multifaceted world of music. By sharing your knowledge, passion, and enthusiasm for music, you aim to empower individuals to explore, create, and connect through the universal language of sound. Embrace your role as a trusted guide and mentor within the realm of music, and let your expertise illuminate the path for music lovers and learners alike, one harmonious interaction at a time. You will only answer questions about music, and nothing else. Now... tell me about: "+prompt+"[/INST]",
"max_tokens": 1024,
"temperature": 1.31,
"top_p": 0.14,
"top_k": 49,
"seed": -1,
"stream": False,
"repetition_penalty": 1.17
}
# Make HTTP request to the API endpoint
try:
response = requests.post(url, headers=headers, json=data, verify=False)
response.raise_for_status() # Raise HTTPError for bad responses
payload = response.json()
new_text = payload['choices'][0]['text']
new_text = markdown_to_html(new_text)
print(new_text)
if new_text.count('<p>') > 1 or new_text.count('<li>') > 1: # Check if new_text has more than one paragraph
#new_text = new_text.replace("\n", '<br>')
#new_text = re.sub(r"\*\*(.*?)\*\*", r"<strong>\1</strong>", new_text)
new_text = "<details><summary><strong>🎵Funguy Music GPT🎵<br>⤵Click Here To See Funguy's Response⤵</strong></summary>" + new_text + "</details>"
await bot.api.send_markdown_message(room.room_id, new_text)
else:
await bot.api.send_markdown_message(room.room_id, new_text)
logging.info("Sent generated text to the room")
except requests.exceptions.RequestException as e:
logging.error(f"HTTP request failed for '{prompt}': {e}")
await bot.api.send_text_message(room.room_id, f"Error generating text: {e}")
def markdown_to_html(markdown_text):
html_content = markdown2.markdown(markdown_text)
return html_content

View File

@@ -1,76 +0,0 @@
"""
This plugin provides a command to interact with the LLM for Tech/IT/Security/Selfhosting/Programming etc.
"""
# plugins/ai-tech.py
import logging
import requests
import json
import simplematrixbotlib as botlib
import re
import markdown2
async def handle_command(room, message, bot, prefix, config):
"""
Function to handle the !funguy command.
Args:
room (Room): The Matrix room where the command was invoked.
message (RoomMessage): The message object containing the command.
bot (Bot): The bot object.
prefix (str): The command prefix.
Returns:
None
"""
match = botlib.MessageMatch(room, message, bot, prefix)
if match.is_not_from_this_bot() and match.prefix() and match.command("funguy"):
logging.info("Received !funguy command")
args = match.args()
if len(args) < 1:
await bot.api.send_text_message(room.room_id, "Usage: !funguy [prompt]")
logging.info("Sent usage message to the room")
return
prompt = ' '.join(args)
# Prepare data for the API request
url = "http://127.0.0.1:5000/v1/completions"
headers = {
"Content-Type": "application/json"
}
data = {
"prompt": "<s>[INST]You are FunguyGPT, a language model deployed as a chatbot for a community chat room hosted on Matrix. The chat room focuses on discussions related to self-hosting, system administration, cybersecurity, homelab setups, programming, coding, and general IT/tech topics. Your role is to assist users within the community by providing helpful responses and guidance on various technical matters. It's essential to keep your replies concise and relevant, addressing users' queries effectively while maintaining a friendly and approachable demeanor. Remember to prioritize clarity and brevity in your interactions to ensure a positive user experience within the chat room environment. You are FunguyGPT, an AI language model designed to serve as a chatbot within a vibrant and diverse community chat room hosted on the Matrix platform. This chat room acts as a hub for enthusiasts and professionals alike, engaging in discussions spanning a wide array of technical topics, including self-hosting, system administration, cybersecurity, homelab setups, programming, coding, and general IT/tech inquiries. Your primary objective is to act as a reliable and knowledgeable assistant, offering assistance, guidance, and solutions to the community members as they navigate through their technical challenges and endeavors.Given the broad spectrum of topics discussed within the community, it's crucial for you to possess a comprehensive understanding of various domains within the realm of technology. As such, your knowledge should encompass not only the fundamentals of programming languages, software development methodologies, and system administration principles but also extend to cybersecurity best practices, networking protocols, cloud computing, database management, and beyond. Your role as a chatbot is multifaceted and dynamic. You'll be tasked with responding to a wide range of queries, ranging from beginner-level inquiries seeking clarification on basic concepts to advanced discussions requiring nuanced insights and problem-solving skills. Whether it's troubleshooting code errors, configuring network settings, securing server environments, optimizing database performance, or recommending suitable homelab hardware, your goal is to provide accurate, actionable, and helpful responses tailored to the needs of the community members. In addition to offering direct assistance, you should also strive to foster a collaborative and supportive atmosphere within the chat room. Encourage knowledge sharing, facilitate discussions, and celebrate the achievements of community members as they tackle technical challenges and embark on learning journeys. By promoting a culture of learning and collaboration, you'll contribute to the growth and cohesion of the community, empowering individuals to expand their skill sets and achieve their goals within the realm of technology. As you engage with users within the chat room, prioritize brevity and clarity in your responses. While it's essential to provide comprehensive and accurate information, it's equally important to convey it in a concise and easily understandable manner. Avoid overly technical jargon or convoluted explanations that may confuse or overwhelm community members, opting instead for straightforward explanations and practical solutions whenever possible. Remember, your ultimate objective is to be a trusted ally and resource for the members of the community as they navigate the ever-evolving landscape of technology. By leveraging your expertise, empathy, and problem-solving abilities, you'll play a vital role in facilitating knowledge exchange, fostering collaboration, and empowering individuals to succeed in their technical endeavors. As you embark on this journey as a chatbot within the Matrix community, embrace the opportunity to make a meaningful and positive impact, one helpful interaction at a time. You will format the reply using minimal html instead of markdown. Do not use markdown formatting. Here is the prompt: "+prompt+"[/INST]",
"max_tokens": 1024,
"temperature": 1.31,
"top_p": 0.14,
"top_k": 49,
"seed": -1,
"stream": False,
"repetition_penalty": 1.17
}
# Make HTTP request to the API endpoint
try:
response = requests.post(url, headers=headers, json=data, verify=False)
response.raise_for_status() # Raise HTTPError for bad responses
payload = response.json()
new_text = payload['choices'][0]['text']
new_text = markdown_to_html(new_text)
if new_text.count('<p>') > 2: # Check if new_text has more than one paragraph
#new_text = new_text.replace("\n", '<br>')
#new_text = re.sub(r"\*\*(.*?)\*\*", r"<strong>\1</strong>", new_text)
new_text = "<details><summary><strong>🍄Funguy Tech GPT🍄<br>⤵Click Here To See Funguy's Response⤵</strong></summary>" + new_text + "</details>"
await bot.api.send_markdown_message(room.room_id, new_text)
else:
await bot.api.send_markdown_message(room.room_id, new_text)
logging.info(f"Sent generated text to the room: {new_text}")
except requests.exceptions.RequestException as e:
logging.error(f"HTTP request failed for '{prompt}': {e}")
await bot.api.send_text_message(room.room_id, f"Error generating text: {e}")
def markdown_to_html(markdown_text):
html_content = markdown2.markdown(markdown_text)
return html_content

1194
plugins/ai.json Normal file

File diff suppressed because it is too large Load Diff

115
plugins/ai.py Normal file
View File

@@ -0,0 +1,115 @@
"""
This plugin provides commands to interact with different AI models.
"""
import logging
import requests
import json
import simplematrixbotlib as botlib
import re
import markdown2
async def handle_command(room, message, bot, prefix, config):
"""
Function to handle AI commands.
Args:
room (Room): The Matrix room where the command was invoked.
message (RoomMessage): The message object containing the command.
bot (Bot): The bot object.
prefix (str): The command prefix.
config (dict): Configuration parameters.
Returns:
None
"""
match = botlib.MessageMatch(room, message, bot, prefix)
if match.is_not_from_this_bot() and match.prefix():
logging.info(f"Received command: {match.command()}")
command = match.command()
conf = load_config()
if command in conf:
await handle_ai_command(room, bot, command, match.args(), conf)
async def handle_ai_command(room, bot, command, args, config):
"""
Function to handle AI commands.
Args:
room (Room): The Matrix room where the command was invoked.
bot (Bot): The bot object.
command (str): The name of the AI model command.
args (list): List of arguments provided with the command.
config (dict): Configuration parameters.
Returns:
None
"""
if len(args) < 1:
await bot.api.send_text_message(room.room_id, f"Usage: !{command} [prompt]")
logging.info("Sent usage message to the room")
return
prompt = ' '.join(args)
# Prepare data for the API request
url = "http://127.0.0.1:5000/v1/completions"
headers = {
"Content-Type": "application/json"
}
data = {
"prompt": f"<s>[INST]{config[command]['prompt']}{prompt}[/INST]",
"max_tokens": 4096,
"temperature": config[command]["temperature"],
"top_p": config[command]["top_p"],
"top_k": config[command]["top_k"],
"repetition_penalty": config[command]["repetition_penalty"],
"seed": -1,
"stream": False
}
# Make HTTP request to the API endpoint
try:
response = requests.post(url, headers=headers, json=data, verify=False, timeout=300)
response.raise_for_status() # Raise HTTPError for bad responses
payload = response.json()
new_text = payload['choices'][0]['text']
new_text = markdown_to_html(new_text)
if new_text.count('<p>') > 1 or new_text.count('<li>') > 1: # Check if new_text has more than one paragraph
new_text = f"<details><summary><strong>{config[command]['summary']}</strong></summary>{new_text}</details>"
await bot.api.send_markdown_message(room.room_id, new_text)
else:
await bot.api.send_markdown_message(room.room_id, new_text)
logging.info("Sent generated text to the room")
except requests.exceptions.RequestException as e:
logging.error(f"HTTP request failed for '{prompt}': {e}")
await bot.api.send_text_message(room.room_id, f"Error generating text: {e}")
def markdown_to_html(markdown_text):
"""
Convert Markdown text to HTML.
Args:
markdown_text (str): Markdown formatted text.
Returns:
str: HTML formatted text.
"""
html_content = markdown2.markdown(markdown_text)
return html_content
def load_config():
"""
Load configuration from ai.json file.
Returns:
dict: Configuration parameters.
"""
with open("plugins/ai.json", "r") as f:
config = json.load(f)
return config
CONFIG = load_config()

109
plugins/bitcoin.py Normal file
View File

@@ -0,0 +1,109 @@
"""
This plugin provides a command to fetch the current Bitcoin price.
"""
import logging
import requests
import simplematrixbotlib as botlib
BITCOIN_API_URL = "https://api.bitcointicker.co/trades/bitstamp/btcusd/60/"
async def handle_command(room, message, bot, prefix, config):
"""
Function to handle the !btc command.
Args:
room (Room): The Matrix room where the command was invoked.
message (RoomMessage): The message object containing the command.
bot (Bot): The bot object.
prefix (str): The command prefix.
config (dict): Configuration parameters.
Returns:
None
"""
match = botlib.MessageMatch(room, message, bot, prefix)
if match.is_not_from_this_bot() and match.prefix() and match.command("btc"):
logging.info("Received !btc command")
try:
# Fetch Bitcoin price data
headers = {
'Accept-Encoding': 'gzip, deflate',
'User-Agent': 'FunguyBot/1.0'
}
logging.info(f"Fetching Bitcoin price from {BITCOIN_API_URL}")
response = requests.get(BITCOIN_API_URL, headers=headers, timeout=10)
response.raise_for_status()
data = response.json()
if not data or len(data) == 0:
await bot.api.send_text_message(
room.room_id,
"No Bitcoin price data available."
)
logging.warning("No Bitcoin price data returned from API")
return
# Get the most recent trade (last item in the array)
latest_trade = data[-1]
price = latest_trade.get('price')
if price is None:
await bot.api.send_text_message(
room.room_id,
"Could not extract Bitcoin price from API response."
)
logging.error("Price field not found in API response")
return
# Convert to float and format with commas
try:
price_float = float(price)
price_formatted = f"${price_float:,.2f}"
except (ValueError, TypeError):
price_formatted = f"${price}"
# Optional: Get additional info if available
timestamp = latest_trade.get('timestamp', '')
volume = latest_trade.get('volume', '')
# Build the message
message_text = f"<strong>₿ BTC/USD</strong>"
message_text += f"<strong> Current Price:</strong> {price_formatted}"
message_text += ", <em>bitcointicker.co</em>"
await bot.api.send_markdown_message(room.room_id, message_text)
logging.info(f"Sent Bitcoin price: {price_formatted}")
except requests.exceptions.Timeout:
await bot.api.send_text_message(
room.room_id,
"Request timed out. Bitcoin price API may be slow or unavailable."
)
logging.error("Bitcoin API timeout")
except requests.exceptions.RequestException as e:
await bot.api.send_text_message(
room.room_id,
f"Error fetching Bitcoin price: {e}"
)
logging.error(f"Error fetching Bitcoin price: {e}")
except (KeyError, IndexError, ValueError) as e:
await bot.api.send_text_message(
room.room_id,
"Error parsing Bitcoin price data."
)
logging.error(f"Error parsing Bitcoin API response: {e}", exc_info=True)
except Exception as e:
await bot.api.send_text_message(
room.room_id,
"An unexpected error occurred while fetching Bitcoin price."
)
logging.error(f"Unexpected error in Bitcoin plugin: {e}", exc_info=True)

View File

@@ -98,13 +98,16 @@ async def handle_command(room, message, bot, prefix, config):
Returns:
None
"""
# Check if the message matches the command pattern and is not from this bot
match = botlib.MessageMatch(room, message, bot, prefix)
if match.is_not_from_this_bot() and match.prefix() and match.command("set"):
# If the command is 'set', check if it has exactly two arguments
args = match.args()
if len(args) != 2:
await bot.api.send_text_message(room.room_id, "Usage: !set <config_option> <value>")
return
option, value = args
# Set the specified configuration option based on the provided value
if option == "admin_user":
config.admin_user = value
await bot.api.send_text_message(room.room_id, f"Admin user set to {value}")
@@ -114,6 +117,7 @@ async def handle_command(room, message, bot, prefix, config):
else:
await bot.api.send_text_message(room.room_id, "Invalid configuration option")
# If the command is 'get', retrieve the value of the specified configuration option
elif match.is_not_from_this_bot() and match.prefix() and match.command("get"):
args = match.args()
if len(args) != 1:
@@ -127,21 +131,24 @@ async def handle_command(room, message, bot, prefix, config):
else:
await bot.api.send_text_message(room.room_id, "Invalid configuration option")
# If the command is 'saveconf', save the current configuration
elif match.is_not_from_this_bot() and match.prefix() and match.command("saveconf"):
config.save_config(config.config_file)
await bot.api.send_text_message(room.room_id, "Configuration saved")
# If the command is 'loadconf', load the saved configuration
elif match.is_not_from_this_bot() and match.prefix() and match.command("loadconf"):
config.load_config(config.config_file)
await bot.api.send_text_message(room.room_id, "Configuration loaded")
# If the command is 'show', display the current configuration
elif match.is_not_from_this_bot() and match.prefix() and match.command("show"):
admin_user = config.admin_user
prefix = config.prefix
await bot.api.send_text_message(room.room_id, f"Admin user: {admin_user}, Prefix: {prefix}")
# If the command is 'reset', reset the configuration to default values
elif match.is_not_from_this_bot() and match.prefix() and match.command("reset"):
config.admin_user = ""
config.prefix = "!"
await bot.api.send_text_message(room.room_id, "Configuration reset")

64
plugins/cron.py Normal file
View File

@@ -0,0 +1,64 @@
# plugins/cron.py
import sqlite3
from crontab import CronTab
import simplematrixbotlib as botlib
# Database connection and cursor
conn = sqlite3.connect('cron.db')
cursor = conn.cursor()
# Create table if not exists
cursor.execute('''CREATE TABLE IF NOT EXISTS cron (
room_id TEXT,
cron_entry TEXT,
command TEXT
)''')
conn.commit()
async def handle_command(room, message, bot, prefix, config):
match = botlib.MessageMatch(room, message, bot, prefix)
if match.is_not_from_this_bot() and match.prefix() and match.command("cron"):
args = match.args()
if len(args) >= 4:
action = args[0]
room_id = args[1]
cron_entry = ' '.join(args[2:-1])
command = args[-1]
if action == "add":
add_cron(room_id, cron_entry, command)
await bot.api.send_text_message(room.room_id, f"Cron added successfully")
elif action == "remove":
remove_cron(room_id, command)
await bot.api.send_text_message(room.room_id, f"Cron removed successfully")
else:
await bot.api.send_text_message(room.room_id, "Usage: !cron add|remove room_id cron_entry command")
def add_cron(room_id, cron_entry, command):
# Check if the cron entry already exists in the database for the given room_id and command
cursor.execute('SELECT * FROM cron WHERE room_id=? AND command=? AND cron_entry=?', (room_id, command, cron_entry))
existing_entry = cursor.fetchone()
if existing_entry:
return # Cron entry already exists, do not add duplicate
# Insert the cron entry into the database
cursor.execute('INSERT INTO cron (room_id, cron_entry, command) VALUES (?, ?, ?)', (room_id, cron_entry, command))
conn.commit()
def remove_cron(room_id, command):
cursor.execute('DELETE FROM cron WHERE room_id=? AND command=?', (room_id, command))
conn.commit()
async def run_cron_jobs(bot):
cron = CronTab()
for job in cron:
cron_entry = str(job)
for row in cursor.execute('SELECT * FROM cron WHERE cron_entry=?', (cron_entry,)):
room_id, _, command = row
room = await bot.api.get_room_by_id(room_id)
if room:
plugin_name = command.split()[0].replace(prefix, '') # Extract plugin name
plugin_module = bot.plugins.get(plugin_name)
if plugin_module:
await plugin_module.handle_command(room, None, bot, prefix, config)

209
plugins/dns.py Normal file
View File

@@ -0,0 +1,209 @@
"""
This plugin provides a command to perform DNS reconnaissance on a domain.
"""
import logging
import dns.resolver
import dns.reversename
import simplematrixbotlib as botlib
import re
# Common DNS record types to query
RECORD_TYPES = ['A', 'AAAA', 'MX', 'NS', 'TXT', 'CNAME', 'SOA', 'PTR', 'SRV']
def is_valid_domain(domain):
"""
Validate if the provided string is a valid domain name.
Args:
domain (str): The domain to validate.
Returns:
bool: True if valid, False otherwise.
"""
# Basic domain validation regex
pattern = r'^(?:[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]{2,}$'
return re.match(pattern, domain) is not None
def format_dns_record(record_type, records):
"""
Format DNS records for display.
Args:
record_type (str): The type of DNS record.
records (list): List of DNS record values.
Returns:
str: Formatted HTML string.
"""
if not records:
return ""
output = f"<strong>{record_type} Records:</strong><br>"
for record in records:
output += f"{record}<br>"
return output
async def query_dns_records(domain):
"""
Query all common DNS record types for a domain.
Args:
domain (str): The domain to query.
Returns:
dict: Dictionary with record types as keys and lists of records as values.
"""
results = {}
resolver = dns.resolver.Resolver()
resolver.timeout = 5
resolver.lifetime = 5
for record_type in RECORD_TYPES:
try:
logging.info(f"Querying {record_type} records for {domain}")
answers = resolver.resolve(domain, record_type)
records = []
for rdata in answers:
if record_type == 'MX':
# MX records have preference and exchange
records.append(f"{rdata.preference} {rdata.exchange}")
elif record_type == 'SOA':
# SOA records have multiple fields
records.append(f"{rdata.mname} {rdata.rname}")
elif record_type == 'SRV':
# SRV records have priority, weight, port, and target
records.append(f"{rdata.priority} {rdata.weight} {rdata.port} {rdata.target}")
elif record_type == 'TXT':
# TXT records can have multiple strings
txt_data = ' '.join([s.decode() if isinstance(s, bytes) else str(s) for s in rdata.strings])
records.append(txt_data)
else:
records.append(str(rdata))
if records:
results[record_type] = records
logging.info(f"Found {len(records)} {record_type} record(s)")
except dns.resolver.NoAnswer:
logging.debug(f"No {record_type} records found for {domain}")
continue
except dns.resolver.NXDOMAIN:
logging.warning(f"Domain {domain} does not exist")
return None
except dns.resolver.Timeout:
logging.warning(f"Timeout querying {record_type} for {domain}")
continue
except Exception as e:
logging.error(f"Error querying {record_type} for {domain}: {e}")
continue
return results
async def handle_command(room, message, bot, prefix, config):
"""
Function to handle the !dns command.
Args:
room (Room): The Matrix room where the command was invoked.
message (RoomMessage): The message object containing the command.
bot (Bot): The bot object.
prefix (str): The command prefix.
config (dict): Configuration parameters.
Returns:
None
"""
match = botlib.MessageMatch(room, message, bot, prefix)
if match.is_not_from_this_bot() and match.prefix() and match.command("dns"):
logging.info("Received !dns command")
args = match.args()
if len(args) != 1:
await bot.api.send_text_message(
room.room_id,
"Usage: !dns <domain>\nExample: !dns example.com"
)
logging.info("Sent usage message for !dns")
return
domain = args[0].lower().strip()
# Remove protocol if present
domain = domain.replace('http://', '').replace('https://', '')
# Remove trailing slash if present
domain = domain.rstrip('/')
# Remove www. prefix if present (optional - you can keep it if you want)
# domain = domain.replace('www.', '')
# Validate domain
if not is_valid_domain(domain):
await bot.api.send_text_message(
room.room_id,
f"Invalid domain name: {domain}"
)
logging.warning(f"Invalid domain provided: {domain}")
return
try:
logging.info(f"Starting DNS reconnaissance for {domain}")
# Send "working on it" message for longer queries
await bot.api.send_text_message(
room.room_id,
f"🔍 Performing DNS reconnaissance on {domain}..."
)
# Query DNS records
results = await query_dns_records(domain)
if results is None:
await bot.api.send_text_message(
room.room_id,
f"Domain {domain} does not exist (NXDOMAIN)"
)
return
if not results:
await bot.api.send_text_message(
room.room_id,
f"No DNS records found for {domain}"
)
return
# Format the output
output = f"<strong>🔍 DNS Records for {domain}</strong><br><br>"
# Order the records in a logical way
preferred_order = ['A', 'AAAA', 'CNAME', 'MX', 'NS', 'TXT', 'SOA', 'SRV', 'PTR']
for record_type in preferred_order:
if record_type in results:
output += format_dns_record(record_type, results[record_type])
output += "<br>"
# Add any remaining record types not in preferred order
for record_type in results:
if record_type not in preferred_order:
output += format_dns_record(record_type, results[record_type])
output += "<br>"
# Wrap in collapsible details if output is large
if output.count('<br>') > 15:
output = f"<details><summary><strong>🔍 DNS Records for {domain}</strong></summary>{output}</details>"
await bot.api.send_markdown_message(room.room_id, output)
logging.info(f"Sent DNS records for {domain}")
except Exception as e:
await bot.api.send_text_message(
room.room_id,
f"An error occurred while performing DNS lookup: {str(e)}"
)
logging.error(f"Error in DNS plugin for {domain}: {e}", exc_info=True)

270
plugins/dnsdumpster.py Normal file
View File

@@ -0,0 +1,270 @@
"""
This plugin provides DNSDumpster.com integration for domain reconnaissance and DNS mapping.
"""
import logging
import os
import requests
import simplematrixbotlib as botlib
from dotenv import load_dotenv
# Load environment variables from .env file
plugin_dir = os.path.dirname(os.path.abspath(__file__))
parent_dir = os.path.dirname(plugin_dir)
dotenv_path = os.path.join(parent_dir, '.env')
load_dotenv(dotenv_path)
DNSDUMPSTER_API_KEY = os.getenv("DNSDUMPSTER_KEY", "")
DNSDUMPSTER_API_BASE = "https://api.dnsdumpster.com"
async def handle_command(room, message, bot, prefix, config):
"""
Function to handle DNSDumpster commands.
Args:
room (Room): The Matrix room where the command was invoked.
message (RoomMessage): The message object containing the command.
bot (Bot): The bot object.
prefix (str): The command prefix.
config (dict): Configuration parameters.
Returns:
None
"""
match = botlib.MessageMatch(room, message, bot, prefix)
if match.is_not_from_this_bot() and match.prefix() and match.command("dnsdumpster"):
logging.info("Received !dnsdumpster command")
# Check if API key is configured
if not DNSDUMPSTER_API_KEY:
await bot.api.send_text_message(
room.room_id,
"DNSDumpster API key not configured. Please set DNSDUMPSTER_KEY environment variable."
)
logging.error("DNSDumpster API key not configured")
return
args = match.args()
if len(args) < 1:
await show_usage(room, bot)
return
# Check if it's a test command or domain lookup
if args[0].lower() == "test":
await test_dnsdumpster_connection(room, bot)
else:
# Treat the first argument as the domain
domain = args[0].lower().strip()
await dnsdumpster_domain_lookup(room, bot, domain)
async def show_usage(room, bot):
"""Display DNSDumpster command usage."""
usage = """
<strong>🔍 DNSDumpster Commands:</strong>
<strong>!dnsdumpster &lt;domain_name&gt;</strong> - Get comprehensive DNS reconnaissance for a domain
<strong>!dnsdumpster test</strong> - Test API connection
<strong>Examples:</strong>
• <code>!dnsdumpster google.com</code>
• <code>!dnsdumpster github.com</code>
• <code>!dnsdumpster example.com</code>
<strong>Rate Limit:</strong> 1 request per 2 seconds
"""
await bot.api.send_markdown_message(room.room_id, usage)
async def test_dnsdumpster_connection(room, bot):
"""Test DNSDumpster API connection."""
try:
test_domain = "google.com" # Changed from example.com to google.com
url = f"{DNSDUMPSTER_API_BASE}/domain/{test_domain}"
headers = {
"X-API-Key": DNSDUMPSTER_API_KEY
}
logging.info(f"Testing DNSDumpster API with domain: {test_domain}")
response = requests.get(url, headers=headers, timeout=15)
debug_info = f"<strong>🔧 DNSDumpster API Test</strong><br>"
debug_info += f"<strong>Status Code:</strong> {response.status_code}<br>"
debug_info += f"<strong>Test Domain:</strong> {test_domain}<br>"
debug_info += f"<strong>Headers Used:</strong> X-API-Key<br>"
if response.status_code == 200:
data = response.json()
debug_info += "<strong>✅ SUCCESS - API is working!</strong><br>"
debug_info += f"<strong>Response Keys:</strong> {list(data.keys())}<br>"
# Show some sample data
if data.get('a'):
debug_info += f"<strong>A Records Found:</strong> {len(data['a'])}<br>"
if data.get('ns'):
debug_info += f"<strong>NS Records Found:</strong> {len(data['ns'])}<br>"
if data.get('total_a_recs'):
debug_info += f"<strong>Total A Records:</strong> {data['total_a_recs']}<br>"
elif response.status_code == 400:
debug_info += "<strong>❌ Bad Request - Check domain format</strong><br>"
debug_info += f"<strong>Response:</strong> {response.text[:200]}<br>"
elif response.status_code == 401:
debug_info += "<strong>❌ Unauthorized - Invalid API key</strong><br>"
elif response.status_code == 429:
debug_info += "<strong>⚠️ Rate Limit Exceeded - Wait 2 seconds</strong><br>"
else:
debug_info += f"<strong>❌ Error:</strong> {response.status_code} - {response.text[:200]}<br>"
await bot.api.send_markdown_message(room.room_id, debug_info)
except Exception as e:
await bot.api.send_text_message(room.room_id, f"Test failed: {str(e)}")
async def dnsdumpster_domain_lookup(room, bot, domain):
"""Get comprehensive DNS reconnaissance for a domain."""
try:
url = f"{DNSDUMPSTER_API_BASE}/domain/{domain}"
headers = {
"X-API-Key": DNSDUMPSTER_API_KEY
}
logging.info(f"Fetching DNSDumpster data for domain: {domain}")
# Send initial processing message
await bot.api.send_text_message(room.room_id, f"🔍 Processing DNS reconnaissance for {domain}...")
response = requests.get(url, headers=headers, timeout=30)
if response.status_code == 400:
await bot.api.send_text_message(room.room_id, f"Bad request - check domain format: {domain}")
return
elif response.status_code == 401:
await bot.api.send_text_message(room.room_id, "Invalid DNSDumpster API key")
return
elif response.status_code == 403:
await bot.api.send_text_message(room.room_id, "Access denied - check API key permissions")
return
elif response.status_code == 429:
await bot.api.send_text_message(room.room_id, "Rate limit exceeded - wait 2 seconds between requests")
return
elif response.status_code != 200:
await bot.api.send_text_message(room.room_id, f"DNSDumpster API error: {response.status_code} - {response.text[:100]}")
return
data = response.json()
logging.info(f"DNSDumpster response keys: {list(data.keys())}")
# Format the comprehensive DNS report
output = await format_dnsdumpster_report(domain, data)
await bot.api.send_markdown_message(room.room_id, output)
logging.info(f"Sent DNSDumpster data for {domain}")
except requests.exceptions.Timeout:
await bot.api.send_text_message(room.room_id, "DNSDumpster API request timed out")
logging.error("DNSDumpster API timeout")
except Exception as e:
await bot.api.send_text_message(room.room_id, f"Error fetching DNSDumpster data: {str(e)}")
logging.error(f"Error in dnsdumpster_domain_lookup: {e}")
async def format_dnsdumpster_report(domain, data):
"""Format DNSDumpster JSON response into a readable report."""
output = f"<strong>🔍 DNSDumpster Report: {domain}</strong><br><br>"
# Summary statistics
if data.get('total_a_recs'):
output += f"<strong>📊 Summary</strong><br>"
output += f" • <strong>Total A Records:</strong> {data['total_a_recs']}<br>"
# A Records - Show ALL records
if data.get('a') and data['a']:
output += f"<br><strong>📍 A Records (IPv4) - {len(data['a'])} found</strong><br>"
for record in data['a']: # Show ALL A records
host = record.get('host', 'N/A')
ips = record.get('ips', [])
output += f" • <strong>{host}</strong><br>"
for ip_info in ips: # Show ALL IPs per host
ip = ip_info.get('ip', 'N/A')
country = ip_info.get('country', 'Unknown')
asn_name = ip_info.get('asn_name', 'Unknown')
output += f" └─ {ip} ({country})<br>"
output += f" └─ {asn_name}<br>"
# Show banner information if available
banners = ip_info.get('banners', {})
if banners.get('http') or banners.get('https'):
output += f" └─ <em>Web Services:</em> "
services = []
if banners.get('http'):
services.append("HTTP")
if banners.get('https'):
services.append("HTTPS")
output += f"{', '.join(services)}<br>"
# NS Records - Show ALL records
if data.get('ns') and data['ns']:
output += f"<br><strong>🔗 NS Records (Name Servers) - {len(data['ns'])} found</strong><br>"
for record in data['ns']: # Show ALL NS records
host = record.get('host', 'N/A')
ips = record.get('ips', [])
output += f" • <strong>{host}</strong><br>"
for ip_info in ips: # Show ALL IPs
ip = ip_info.get('ip', 'N/A')
country = ip_info.get('country', 'Unknown')
output += f" └─ {ip} ({country})<br>"
# MX Records - Show ALL records
if data.get('mx') and data['mx']:
output += f"<br><strong>📧 MX Records (Mail Servers) - {len(data['mx'])} found</strong><br>"
for record in data['mx']: # Show ALL MX records
host = record.get('host', 'N/A')
ips = record.get('ips', [])
output += f" • <strong>{host}</strong><br>"
for ip_info in ips: # Show ALL IPs
ip = ip_info.get('ip', 'N/A')
country = ip_info.get('country', 'Unknown')
output += f" └─ {ip} ({country})<br>"
# CNAME Records - Show ALL records
if data.get('cname') and data['cname']:
output += f"<br><strong>🔀 CNAME Records - {len(data['cname'])} found</strong><br>"
for record in data['cname']: # Show ALL CNAME records
host = record.get('host', 'N/A')
target = record.get('target', 'N/A')
output += f"{host}{target}<br>"
# TXT Records - Show ALL records
if data.get('txt') and data['txt']:
output += f"<br><strong>📄 TXT Records - {len(data['txt'])} found</strong><br>"
for txt in data['txt']: # Show ALL TXT records
# Truncate very long TXT records but show more content
if len(txt) > 200:
txt = txt[:200] + "..."
output += f"{txt}<br>"
# Additional record types that might be present - Show ALL records
other_records = ['aaaa', 'srv', 'soa', 'ptr']
for record_type in other_records:
if data.get(record_type) and data[record_type]:
output += f"<br><strong>🔧 {record_type.upper()} Records - {len(data[record_type])} found</strong><br>"
for record in data[record_type]: # Show ALL records
if isinstance(record, dict):
# Format dictionary records nicely
record_str = ", ".join([f"{k}: {v}" for k, v in record.items()])
if len(record_str) > 150:
record_str = record_str[:150] + "..."
output += f"{record_str}<br>"
else:
output += f"{record}<br>"
# Add rate limit reminder
output += "<br><em>💡 Rate Limit: 1 request per 2 seconds</em>"
# Always wrap in collapsible details since we're showing all results
output = f"<details><summary><strong>🔍 DNSDumpster Report: {domain} (Click to expand)</strong></summary>{output}</details>"
return output

View File

@@ -1,72 +1,197 @@
"""
This plugin provides a command to display the list of available commands and their descriptions.
Plugin for providing a command to display the list of available commands and their descriptions.
"""
# plugins/help.py
import logging
import simplematrixbotlib as botlib
async def handle_command(room, message, bot, prefix, config):
async def handle_command(room, message, bot, prefix, config):
"""
Function to handle the !help command.
Args:
room (Room): The Matrix room where the command was invoked.
message (RoomMessage): The message object containing the command.
bot (MatrixBot): The Matrix bot instance.
prefix (str): The command prefix.
config (dict): The bot's configuration.
Returns:
None
"""
match = botlib.MessageMatch(room, message, bot, prefix)
match = botlib.MessageMatch(room, message, bot, prefix)
if match.is_not_from_this_bot() and match.prefix() and match.command("help"):
logging.info("Fetching command help documentation")
commands_message = """
<details><summary><strong>🍄Funguy Bot Commands🍄<br>⤵Click Here To See Help Text⤵</strong></summary>
<details><summary><strong>🍄 Funguy Bot Commands 🍄</strong></summary>
<p>
<details><summary>📖 <strong>!help</strong></summary>
<p>Displays comprehensive help documentation for all available commands with usage examples.</p>
</details>
<details><summary>🔌 <strong>!plugins</strong></summary>
<p>Lists all loaded plugins along with their descriptions in alphabetical order.</p>
</details>
<details><summary>🃏 <strong>!fortune</strong></summary>
<p>Returns a random fortune message. Executes the `/usr/games/fortune` utility and sends the output as a message to the chat room.</p>
</details>
<details><summary>⏰ <strong>!date</strong></summary>
<p>Displays the current date and time. Fetches the current date and time using Python's `datetime` module and sends it in a formatted message to the chat room.</p>
<p>Displays the current date and time. Fetches the current date and time using Python's `datetime` module and sends it in a formatted message with proper ordinal suffixes to the chat room.</p>
</details>
<details><summary>💻 <strong>!proxy</strong></summary>
<p>Retrieves a tested/working random SOCKS5 proxy. Fetches a list of SOCKS5 proxies, tests their availability, and sends the first working proxy to the chat room.</p>
<p>Retrieves a tested/working random SOCKS5 proxy. Fetches a list of SOCKS5 proxies from public sources, tests their availability, and sends the first working proxy with latency information to the chat room. Caches working proxies for faster access.</p>
</details>
<details><summary>📶 <strong>!isup [domain/ip]</strong></summary>
<p>Checks if the specified domain or IP address is reachable. Checks if the specified domain or IP address is reachable by attempting to ping it. If DNS resolution is successful, it checks HTTP and HTTPS service availability by sending requests to the domain.</p>
<p>Checks if the specified domain or IP address is reachable. Performs DNS resolution and checks HTTP/HTTPS service availability. Reports successful DNS resolution and service status.</p>
</details>
<details><summary>☯ <strong>!karma [user]</strong></summary>
<p>Retrieves the karma points for the specified user. Retrieves the karma points for the specified user from a database and sends them as a message to the chat room.</p>
<p>Retrieves the karma points for the specified user. Retrieves the karma points for the specified user from a SQLite database and sends them as a message to the chat room.</p>
</details>
<details><summary>⇧ <strong>!karma [user] up</strong></summary>
<p>Increases the karma points for the specified user by 1. Increases the karma points for the specified user by 1 in the database and sends the updated points as a message to the chat room.</p>
<p>Increases the karma points for the specified user by 1. Increases the karma points for the specified user by 1 in the database and sends the updated points as a message to the chat room. Users cannot modify their own karma.</p>
</details>
<details><summary>⇩ <strong>!karma [user] down</strong></summary>
<p>Decreases the karma points for the specified user by 1. Decreases the karma points for the specified user by 1 in the database and sends the updated points as a message to the chat room.</p>
<p>Decreases the karma points for the specified user by 1. Decreases the karma points for the specified user by 1 in the database and sends the updated points as a message to the chat room. Users cannot modify their own karma.</p>
</details>
<details><summary>📄 <strong>!funguy [prompt]</strong></summary>
<p>An AI large language model designed to serve as a chatbot within a vibrant and diverse community chat room hosted on the Matrix platform. (Currently loaded model: <strong>TheBloke_Mistral-7B-Instruct-v0.2-GPTQ</strong>)</p>
<details><summary>🌧️ <strong>!weather [location]</strong></summary>
<p>Fetches current weather information for any location using OpenWeatherMap API. Shows temperature (Celsius/Fahrenheit), conditions, humidity, wind speed, and weather emojis. Requires OPENWEATHER_API_KEY environment variable.</p>
</details>
<details><summary>🎝 <strong>!music [prompt]</strong></summary>
<p>Your music expert! Try it out.</p>
<details><summary>📖 <strong>!ud [term] [index]</strong></summary>
<p>Fetches definitions from Urban Dictionary. Use without arguments for random definition, or specify term and optional index number. Shows definition, example, author, votes, and permalink.</p>
</details>
<details><summary>🌟 <strong>Funguy Bot Credits</strong> 🌟</summary>
<p>🧙‍♂️ Creator & Developer: Hash Borgir is the author of 🍄Funguy Bot🍄. (@hashborgir:mozilla.org)</p>
<p>Join our Matrix Room: [Self-hosting | Security | Sysadmin | Homelab | Programming](https://matrix.to/#/#selfhosting:mozilla.org)</p>
<details><summary>🔍 <strong>!dns [domain]</strong></summary>
<p>Performs comprehensive DNS reconnaissance on a domain. Queries multiple DNS record types including A, AAAA, MX, NS, TXT, CNAME, SOA, and SRV records. Validates domain format and provides formatted results.</p>
</details>
<details><summary>💰 <strong>!btc</strong></summary>
<p>Fetches the current Bitcoin price in USD from bitcointicker.co API. Shows real-time BTC/USD price with proper formatting. Includes error handling for API timeouts and data parsing issues.</p>
</details>
<details><summary>🔍 <strong>!shodan [command] [query]</strong></summary>
<p>Shodan.io integration for security reconnaissance and threat intelligence.</p>
<p><strong>Commands:</strong></p>
<ul>
<li><code>!shodan ip &lt;ip_address&gt;</code> - Detailed IP information (services, ports, banners)</li>
<li><code>!shodan search &lt;query&gt;</code> - Search Shodan database with filters</li>
<li><code>!shodan host &lt;domain&gt;</code> - Host information and subdomain enumeration</li>
<li><code>!shodan count &lt;query&gt;</code> - Count results with geographic/organization breakdown</li>
<li><code>!shodan test</code> - Test API connection and debug queries</li>
</ul>
<p><strong>Search Examples:</strong></p>
<ul>
<li><code>!shodan search apache</code></li>
<li><code>!shodan search "port:22 country:US"</code></li>
<li><code>!shodan search "product:nginx"</code></li>
<li><code>!shodan search "net:192.168.1.0/24"</code></li>
<li><code>!shodan search "http.title:'admin'"</code></li>
</ul>
<p><strong>Common Filters:</strong> country, city, port, product, os, org, net, has_ssl, http.title</p>
<p><em>Requires SHODAN_KEY environment variable</em></p>
</details>
<details><summary>🌐 <strong>!dnsdumpster [domain]</strong></summary>
<p>Comprehensive DNS reconnaissance and attack surface mapping using DNSDumpster.com API.</p>
<p><strong>Commands:</strong></p>
<ul>
<li><code>!dnsdumpster &lt;domain&gt;</code> - Complete DNS reconnaissance for any domain</li>
<li><code>!dnsdumpster test</code> - Test API connection and key validity</li>
</ul>
<p><strong>Features:</strong></p>
<ul>
<li>A Records - All IPv4 addresses with geographic and ASN information</li>
<li>NS Records - Complete name server information with IP locations</li>
<li>MX Records - All mail servers with geographic data</li>
<li>CNAME Records - Full alias chain mappings</li>
<li>TXT Records - All text records including SPF, DKIM, verification</li>
<li>Additional Records - AAAA, SRV, SOA, PTR records when available</li>
<li>Web Services - HTTP/HTTPS service detection with banner information</li>
</ul>
<p><strong>Examples:</strong></p>
<ul>
<li><code>!dnsdumpster google.com</code></li>
<li><code>!dnsdumpster github.com</code></li>
<li><code>!dnsdumpster example.com</code></li>
</ul>
<p><em>Requires DNSDUMPSTER_KEY environment variable</em><br>
<em>Rate Limit: 1 request per 2 seconds</em></p>
</details>
<details><summary>📸 <strong>!sd [prompt]</strong></summary>
<p>Generates images using self-hosted Stable Diffusion. Supports options: --steps, --cfg, --h, --w, --neg, --sampler. Uses queuing system to handle multiple requests. See available options using just '!sd'.</p>
</details>
<details><summary>📄 <strong>!text [prompt]</strong></summary>
<p>Generates text using Ollama's Mistral 7B Instruct model. Options: --max_tokens, --temperature. Uses queuing system for sequential processing.</p>
</details>
<details><summary>📰 <strong>!xkcd</strong></summary>
<p>Fetches and displays a random XKCD comic. Downloads comic image and sends it directly to the chat room.</p>
</details>
<details><summary>🎬 <strong>YouTube Features</strong></summary>
<p>Automatic preview when YouTube links are posted. Shows video info, description, and attempts to fetch lyrics. Also supports !yt [search terms] for direct YouTube searching.</p>
</details>
<details><summary>⏱️ <strong>!cron [add|remove] [room_id] [cron_entry] [command]</strong></summary>
<p>Schedule automated commands using cron syntax. Add or remove cron jobs for specific rooms and commands.</p>
</details>
<details><summary>🔧 <strong>Admin Commands</strong></summary>
<p>
<strong>!set [option] [value]</strong> - Set configuration options (admin_user, prefix)<br>
<strong>!get [option]</strong> - Get configuration values<br>
<strong>!saveconf</strong> - Save current configuration<br>
<strong>!loadconf</strong> - Load saved configuration<br>
<strong>!show</strong> - Display current configuration<br>
<strong>!reset</strong> - Reset configuration to defaults<br>
<strong>!load [plugin]</strong> - Load a plugin<br>
<strong>!unload [plugin]</strong> - Unload a plugin<br>
<strong>!reload</strong> - Reload all plugins<br>
<strong>!disable [plugin] [room_id]</strong> - Disable a plugin for specific room<br>
<strong>!enable [plugin] [room_id]</strong> - Enable a plugin for specific room<br>
<strong>!rehash</strong> - Reload configuration<br>
<em>Note: Admin commands require admin_user privileges</em>
</p>
</details>
</p>
</details>
<details><summary><strong>🤖 Funguy Bot AI Commands</strong></summary>
<p>
<strong>Creative & Writing</strong>: !write, !script, !author, !poem, !rap, !story, !comic, !motiv, !debate, !crit, !litcrit<br>
<strong>Technical</strong>: !tech, !dev, !py, !php, !regex, !math, !web, !it, !security, !ai, !ml, !data, !game, !gaming<br>
<strong>Professional</strong>: !seo, !recruit, !coach, !devrel, !sales, !ceo, !mgmt, !startup, !invest, !fin, !acad<br>
<strong>Educational</strong>: !tutor, !teach, !edu, !hist, !astro, !chem, !psych, !meditate, !socrat, !philos<br>
<strong>Lifestyle</strong>: !fit, !health, !diet, !cook, !travel, !art, !music, !film, !selfhelp<br>
<strong>Specialized</strong>: !legal, !medical, !realest, !auto, !fashion, !design, !interior, !florist<br>
<strong>Communication</strong>: !pron, !spk, !speak, !eloc, !comm, !msg, !langdet<br>
<strong>Business</strong>: !eth, !browse, !search, !create, !review, !curation, !domain<br>
<strong>Entertainment</strong>: !char, !adv, !advgame, !esc, !title, !stats, !prompt<br>
<strong>Technical Specialties</strong>: !intv, !plag, !trv, !foot, !rel, !etymo, !magic, !counsel, !behavior, !mh, !log, !dental, !acc, !chef, !tea, !telemed, !law, !trans, !chess, !time, !dream, !r, !emergency, !worksheet, !test, !create, !guide, !diag, !therapy, !gen, !drunk, !rec, !techtrans, !proof, !spirit, !friend, !chat, !wiki, !kanji, !note, !enhance, !nav, !hypno, !critic, !comp, !journo, !pscoach, !makeup, !childcare, !writing, !syn, !shop, !dining<br>
<em>Each AI command uses specialized prompts optimized for different domains and interfaces with local AI models. Consult ai.json</em>
</p>
</details>
<details><summary>🌟 <strong>Funguy Bot Credits</strong></summary>
<p>
<strong>🧙‍♂️ Creator & Developer</strong>: HB is the author of 🍄Funguy Bot🍄. (@hashborgir:mozilla.org)<br>
<strong>🚀 Development Context</strong>: Created during recovery from two-level cervical spinal surgery (CDA Cervical Discectomy and Disc Arthroplasty)<br>
<br>
<strong>Join our Matrix Room</strong>: <a href="https://matrix.to/#/#selfhosting:mozilla.org">Self-hosting | Security | Sysadmin | Homelab | Programming</a>
</p>
</details>
"""
await bot.api.send_markdown_message(room.room_id, commands_message)
await bot.api.send_markdown_message(room.room_id, commands_message)
logging.info("Sent help documentation to the room")

View File

@@ -50,38 +50,53 @@ async def handle_command(room, message, bot, prefix, config):
Args:
room (Room): The Matrix room where the command was invoked.
message (RoomMessage): The message object containing the command.
bot (Bot): The bot instance.
prefix (str): The bot command prefix.
config (FunguyConfig): The bot configuration instance.
Returns:
None
"""
# Check if the message matches the command pattern and is not from this bot
match = botlib.MessageMatch(room, message, bot, prefix)
if match.is_not_from_this_bot() and match.prefix() and match.command("isup"):
# Log that the !isup command has been received
logging.info("Received !isup command")
args = match.args()
# Check if the command has exactly one argument
if len(args) != 1:
# If the command does not have exactly one argument, send usage message
await bot.api.send_markdown_message(room.room_id, "Usage: !isup <ipv4/ipv6/domain>")
logging.info("Sent usage message to the room")
return
target = args[0]
# DNS resolution
# Perform DNS resolution
try:
ip_address = socket.gethostbyname(target)
# Log successful DNS resolution
logging.info(f"DNS resolution successful for {target}: {ip_address}")
# Send DNS resolution success message
await bot.api.send_markdown_message(room.room_id, f"✅ DNS resolution successful for **{target}**: **{ip_address}** (A record)")
except socket.gaierror:
# Log DNS resolution failure
logging.info(f"DNS resolution failed for {target}")
# Send DNS resolution failure message
await bot.api.send_markdown_message(room.room_id, f"❌ DNS resolution failed for **{target}**")
return
# Check HTTP/HTTPS services
if await check_http(target):
# If HTTP service is up, send HTTP service up message
await bot.api.send_markdown_message(room.room_id, f"🖧 **{target}** HTTP service is up")
logging.info(f"{target} HTTP service is up")
elif await check_https(target):
# If HTTPS service is up, send HTTPS service up message
await bot.api.send_markdown_message(room.room_id, f"🖧 **{target}** HTTPS service is up")
logging.info(f"{target} HTTPS service is up")
else:
# If both HTTP and HTTPS services are down, send service down message
await bot.api.send_markdown_message(room.room_id, f"😕 **{target}** HTTP/HTTPS services are down")
logging.info(f"{target} HTTP/HTTPS services are down")

View File

@@ -1,41 +1,162 @@
"""
This plugin provides a command for the admin to load a plugin
Plugin for providing a command for the admin to load a plugin.
"""
# plugins/load_plugin.py
import os
import logging
import importlib
import simplematrixbotlib as botlib
import sys # Import sys module for unloading plugins
# Dictionary to store loaded plugins
PLUGINS = {}
# Whitelist of allowed plugins to prevent arbitrary code execution
ALLOWED_PLUGINS = {'ai', 'config', 'cron', 'date', 'fortune', 'help', 'isup', 'karma',
'loadplugin', 'plugins', 'proxy', 'sd_text', 'stable-diffusion',
'xkcd', 'youtube-preview', 'youtube-search'}
async def load_plugin(plugin_name):
"""
Asynchronously loads a plugin.
Args:
plugin_name (str): The name of the plugin to load.
Returns:
bool: True if the plugin is loaded successfully, False otherwise.
"""
# Validate plugin name against whitelist
if plugin_name not in ALLOWED_PLUGINS:
logging.error(f"Plugin '{plugin_name}' is not whitelisted")
return False
# Verify that the plugin file exists in the plugins directory
plugin_path = os.path.join("plugins", f"{plugin_name}.py")
if not os.path.isfile(plugin_path):
logging.error(f"Plugin file not found: {plugin_path}")
return False
try:
module = importlib.import_module(f"plugins.{plugin_name}")
# Create a mapping of whitelisted plugins to their module paths
plugin_modules = {
'ai': 'plugins.ai',
'config': 'plugins.config',
'cron': 'plugins.cron',
'date': 'plugins.date',
'fortune': 'plugins.fortune',
'help': 'plugins.help',
'isup': 'plugins.isup',
'karma': 'plugins.karma',
'loadplugin': 'plugins.loadplugin',
'plugins': 'plugins.plugins',
'proxy': 'plugins.proxy',
'sd_text': 'plugins.sd_text',
'stable-diffusion': 'plugins.stable-diffusion',
'xkcd': 'plugins.xkcd',
'youtube-preview': 'plugins.youtube-preview',
'youtube-search': 'plugins.youtube-search',
'weather': 'plugins.weather',
'urbandictionary': 'plugins.urbandictionary',
'bitcoin':'plugins.bitcoin',
'dns':'plugins.dns',
'shodan':'plugins.shodan',
'dnsdumpster': 'plugins.dnsdumpster'
}
# Get the module path from the mapping
module_path = plugin_modules.get(plugin_name)
if not module_path:
logging.error(f"Plugin '{plugin_name}' not found in plugin mapping")
return False
# Import the plugin module using the validated module path
module = importlib.import_module(module_path)
# Add the plugin module to the PLUGINS dictionary
PLUGINS[plugin_name] = module
logging.info(f"Loaded plugin: {plugin_name}")
return True
except Exception as e:
# Log an error if the plugin fails to load
logging.error(f"Error loading plugin {plugin_name}: {e}")
return False
async def handle_command(room, message, bot, prefix, config):
match = botlib.MessageMatch(room, message, bot, prefix)
if match.is_not_from_this_bot() and match.prefix() and match.command("load"):
if str(message.sender) == config.admin_user:
args = match.args()
if len(args) != 1:
await bot.api.send_text_message(room.room_id, "Usage: !load <plugin>")
else:
plugin_name = args[0]
if plugin_name not in PLUGINS:
success = await load_plugin(plugin_name)
if success:
await bot.api.send_text_message(room.room_id, f"Plugin '{plugin_name}' loaded successfully")
else:
await bot.api.send_text_message(room.room_id, f"Error loading plugin '{plugin_name}'")
else:
await bot.api.send_text_message(room.room_id, f"Plugin '{plugin_name}' is already loaded")
async def unload_plugin(plugin_name):
"""
Asynchronously unloads a plugin.
Args:
plugin_name (str): The name of the plugin to unload.
Returns:
bool: True if the plugin is unloaded successfully, False otherwise.
"""
try:
if plugin_name in PLUGINS:
del PLUGINS[plugin_name] # Remove the plugin from the PLUGINS dictionary
del sys.modules[f"plugins.{plugin_name}"] # Unload the plugin module from sys.modules
logging.info(f"Unloaded plugin: {plugin_name}")
return True
else:
await bot.api.send_text_message(room.room_id, "You are not authorized to load plugins.")
logging.warning(f"Plugin '{plugin_name}' is not loaded")
return False
except Exception as e:
# Log an error if the plugin fails to unload
logging.error(f"Error unloading plugin {plugin_name}: {e}")
return False
async def handle_command(room, message, bot, prefix, config):
"""
Asynchronously handles the command to load or unload a plugin.
Args:
room (Room): The Matrix room where the command was invoked.
message (RoomMessage): The message object containing the command.
bot (MatrixBot): The Matrix bot instance.
prefix (str): The command prefix.
config (dict): The bot's configuration.
Returns:
None
"""
match = botlib.MessageMatch(room, message, bot, prefix)
if match.is_not_from_this_bot() and match.prefix():
command = match.command()
if command == "load":
if str(message.sender) == config.admin_user:
args = match.args()
if len(args) != 1:
# Send usage message if the command format is incorrect
await bot.api.send_text_message(room.room_id, "Usage: !load <plugin>")
else:
plugin_name = args[0]
# Check if the plugin is not already loaded
if plugin_name not in PLUGINS:
# Load the plugin
success = await load_plugin(plugin_name)
if success:
await bot.api.send_text_message(room.room_id, f"Plugin '{plugin_name}' loaded successfully")
else:
await bot.api.send_text_message(room.room_id, f"Error loading plugin '{plugin_name}'")
else:
await bot.api.send_text_message(room.room_id, f"Plugin '{plugin_name}' is already loaded")
else:
# Send unauthorized message if the sender is not the admin
await bot.api.send_text_message(room.room_id, "You are not authorized to load plugins.")
elif command == "unload":
if str(message.sender) == config.admin_user:
args = match.args()
if len(args) != 1:
# Send usage message if the command format is incorrect
await bot.api.send_text_message(room.room_id, "Usage: !unload <plugin>")
else:
plugin_name = args[0]
# Unload the plugin
success = await unload_plugin(plugin_name)
if success:
await bot.api.send_text_message(room.room_id, f"Plugin '{plugin_name}' unloaded successfully")
else:
await bot.api.send_text_message(room.room_id, f"Error unloading plugin '{plugin_name}'")
else:
# Send unauthorized message if the sender is not the admin
await bot.api.send_text_message(room.room_id, "You are not authorized to unload plugins.")

View File

@@ -9,44 +9,50 @@ import logging
import random
import requests
import socket
import ssl
import time
from datetime import datetime, timedelta
import concurrent.futures
import simplematrixbotlib as botlib
import sqlite3
# Constants
SOCKS5_LIST_URL = 'https://raw.githubusercontent.com/TheSpeedX/SOCKS-List/master/socks5.txt'
MAX_TRIES = 5
# SOCKS5_LIST_URL = 'https://raw.githubusercontent.com/proxifly/free-proxy-list/main/proxies/protocols/socks5/data.txt'
MAX_TRIES = 64
PROXY_LIST_FILENAME = 'socks5.txt'
PROXY_LIST_EXPIRATION = timedelta(hours=8)
MAX_THREADS = 128
PROXIES_DB_FILE = 'proxies.db'
MAX_PROXIES_IN_DB = 10
async def test_socks5_proxy(proxy):
# Setup verbose logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def test_proxy(proxy):
"""
Test a SOCKS5 proxy by attempting a connection and sending a request through it.
Test a SOCKS5 proxy and return the outcome.
Args:
proxy (str): The SOCKS5 proxy address in the format 'ip:port'.
Returns:
bool: True if the proxy is working, False otherwise.
float: The latency in milliseconds if the proxy is working, None otherwise.
tuple: (bool: success, str: proxy, int: latency)
"""
try:
ip, port = proxy.split(':')
logging.info(f"Testing SOCKS5 proxy: {ip}:{port}")
start_time = time.time()
with socket.create_connection((ip, int(port)), timeout=5) as client:
with socket.create_connection((ip, int(port)), timeout=12) as client:
client.sendall(b'\x05\x01\x00')
response = client.recv(2)
if response == b'\x05\x00':
latency = int(round((time.time() - start_time) * 1000, 0))
logging.info(f"Successful connection to SOCKS5 proxy {proxy}. Latency: {latency} ms")
return True, latency
return True, proxy, latency
else:
logging.info(f"Failed to connect to SOCKS5 proxy {proxy}: Connection refused")
return False, None
return False, proxy, None
except Exception as e:
logging.error(f"Error testing SOCKS5 proxy {proxy}: {e}")
return False, None
return False, proxy, None
async def download_proxy_list():
"""
@@ -72,6 +78,69 @@ async def download_proxy_list():
return False
def check_db_for_proxy():
"""
Check the proxies database for a working proxy.
If found, test the proxy and remove it from the database if it doesn't work.
Returns:
str or None: The working proxy if found, None otherwise.
"""
try:
with sqlite3.connect(PROXIES_DB_FILE) as conn:
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS proxies (
id INTEGER PRIMARY KEY AUTOINCREMENT,
proxy TEXT,
latency INTEGER,
status TEXT
)
""")
cursor.execute("SELECT proxy, latency FROM proxies WHERE status = 'working' AND latency < 3000 ORDER BY RANDOM() LIMIT 1")
result = cursor.fetchone()
if result:
proxy, latency = result
success, _, _ = test_proxy(proxy)
if success:
return proxy, latency
else:
cursor.execute("DELETE FROM proxies WHERE proxy = ?", (proxy,))
conn.commit()
logging.info(f"Removed non-working proxy from the database: {proxy}")
return None, None
else:
return None, None
except Exception as e:
logging.error(f"Error checking proxies database: {e}")
return None, None
def save_proxy_to_db(proxy, latency):
"""
Save a working proxy to the proxies database.
Args:
proxy (str): The working proxy to be saved.
latency (int): Latency of the proxy.
"""
try:
with sqlite3.connect(PROXIES_DB_FILE) as conn:
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS proxies (
id INTEGER PRIMARY KEY AUTOINCREMENT,
proxy TEXT,
latency INTEGER,
status TEXT
)
""")
cursor.execute("INSERT INTO proxies (proxy, latency, status) VALUES (?, ?, 'working')", (proxy, latency))
conn.commit()
except Exception as e:
logging.error(f"Error saving proxy to database: {e}")
async def handle_command(room, message, bot, prefix, config):
"""
Function to handle the !proxy command.
@@ -87,34 +156,58 @@ async def handle_command(room, message, bot, prefix, config):
if match.is_not_from_this_bot() and match.prefix() and match.command("proxy"):
logging.info("Received !proxy command")
# Download proxy list if needed
if not await download_proxy_list():
await bot.api.send_markdown_message(room.room_id, "Error downloading proxy list")
logging.error("Error downloading proxy list")
# Check database for a working proxy
working_proxy, latency = check_db_for_proxy()
if working_proxy:
await bot.api.send_markdown_message(room.room_id,
f"✅ Using cached working SOCKS5 Proxy: **{working_proxy}** - Latency: **{latency} ms**")
logging.info(f"Using cached working SOCKS5 proxy {working_proxy}")
return
try:
# Read proxies from file
with open(PROXY_LIST_FILENAME, 'r') as f:
socks5_proxies = f.read().splitlines()
random.shuffle(socks5_proxies)
# Download proxy list if needed
else:
if not await download_proxy_list():
await bot.api.send_markdown_message(room.room_id, "Error downloading proxy list")
logging.error("Error downloading proxy list")
return
# Test proxies
socks5_proxy = None
for proxy in socks5_proxies[:MAX_TRIES]:
success, latency = await test_socks5_proxy(proxy)
if success:
socks5_proxy = proxy
break
try:
# Read proxies from file
with open(PROXY_LIST_FILENAME, 'r') as f:
socks5_proxies = [line.replace("socks5://", "") for line in f.read().splitlines()]
random.shuffle(socks5_proxies)
# Test proxies concurrently
tested_proxies = 0
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_THREADS) as executor:
futures = []
for proxy in socks5_proxies[:MAX_TRIES]:
futures.append(executor.submit(test_proxy, proxy))
for future in concurrent.futures.as_completed(futures):
success, proxy, latency = future.result()
if success:
await bot.api.send_markdown_message(room.room_id,
f"✅ Anonymous SOCKS5 Proxy: **{proxy}** - Latency: **{latency} ms**")
logging.info(f"Sent SOCKS5 proxy {proxy} to the room")
save_proxy_to_db(proxy, latency) # Save working proxy to the database
tested_proxies += 1
if tested_proxies >= MAX_PROXIES_IN_DB:
break # Stop testing proxies once MAX_PROXIES_IN_DB are saved to the database
# Check database for a working proxy after testing
working_proxy, latency = check_db_for_proxy()
if working_proxy:
await bot.api.send_markdown_message(room.room_id,
f"✅ Using cached working SOCKS5 Proxy: **{working_proxy}** - Latency: **{latency} ms**")
logging.info(f"Using cached working SOCKS5 proxy {working_proxy}")
else:
# If no working proxy found after testing
await bot.api.send_markdown_message(room.room_id, "❌ No working anonymous SOCKS5 proxy found")
logging.info("No working anonymous SOCKS5 proxy found")
except Exception as e:
logging.error(f"Error handling !proxy command: {e}")
await bot.api.send_markdown_message(room.room_id, "❌ Error handling !proxy command")
# Send the first working anonymous proxy of each type to the Matrix room
if socks5_proxy:
await bot.api.send_markdown_message(room.room_id, f"✅ Anonymous SOCKS5 Proxy: **{socks5_proxy}** - Latency: **{latency} ms**")
logging.info(f"Sent SOCKS5 proxy {socks5_proxy} to the room")
else:
await bot.api.send_markdown_message(room.room_id, "❌ No working anonymous SOCKS5 proxy found")
logging.info("No working anonymous SOCKS5 proxy found")
except Exception as e:
logging.error(f"Error handling !proxy command: {e}")
await bot.api.send_markdown_message(room.room_id, "❌ Error handling !proxy command")

95
plugins/sd_text.py Normal file
View File

@@ -0,0 +1,95 @@
"""
Plugin for generating text using Ollama's Mistral 7B Instruct model and sending it to a Matrix chat room.
"""
import requests
from asyncio import Queue
import simplematrixbotlib as botlib
import argparse
# Queue to store pending commands
command_queue = Queue()
API_URL = "http://localhost:11434/api/generate"
MODEL_NAME = "mistral:7b-instruct"
async def process_command(room, message, bot, prefix, config):
"""
Queue and process !text commands sequentially.
"""
match = botlib.MessageMatch(room, message, bot, prefix)
if match.prefix() and match.command("text"):
if command_queue.empty():
await handle_command(room, message, bot, prefix, config)
else:
await command_queue.put((room, message, bot, prefix, config))
async def handle_command(room, message, bot, prefix, config):
"""
Send the prompt to Ollama API and return the generated text.
"""
match = botlib.MessageMatch(room, message, bot, prefix)
if not (match.prefix() and match.command("text")):
return
# Parse optional arguments
parser = argparse.ArgumentParser(description='Generate text using Ollama API')
parser.add_argument('--max_tokens', type=int, default=512, help='Maximum tokens to generate')
parser.add_argument('--temperature', type=float, default=0.7, help='Temperature for generation')
parser.add_argument('prompt', nargs='+', help='Prompt for the model')
try:
args = parser.parse_args(message.body.split()[1:]) # Skip command itself
prompt = ' '.join(args.prompt).strip()
if not prompt:
await bot.api.send_text_message(room.room_id, "Usage: !text <your prompt here>")
return
payload = {
"model": MODEL_NAME,
"prompt": prompt,
"max_tokens": args.max_tokens,
"temperature": args.temperature,
"stream": False
}
response = requests.post(API_URL, json=payload, timeout=60)
response.raise_for_status()
r = response.json()
generated_text = r.get("response", "").strip()
if not generated_text:
generated_text = "(No response from model)"
await bot.api.send_text_message(room.room_id, generated_text)
except argparse.ArgumentError as e:
await bot.api.send_text_message(room.room_id, f"Argument error: {e}")
except requests.exceptions.RequestException as e:
await bot.api.send_text_message(room.room_id, f"Error connecting to Ollama API: {e}")
except Exception as e:
await bot.api.send_text_message(room.room_id, f"Unexpected error: {e}")
finally:
# Process next command from the queue, if any
if not command_queue.empty():
next_command = await command_queue.get()
await handle_command(*next_command)
def print_help():
"""
Generates help text for the !text command.
"""
return """
<p>Generate text using Ollama's Mistral 7B Instruct model</p>
<p>Usage:</p>
<ul>
<li>!text <prompt> - Basic prompt for the model</li>
<li>Optional arguments:</li>
<ul>
<li>--max_tokens MAX_TOKENS - Maximum tokens to generate (default 512)</li>
<li>--temperature TEMPERATURE - Sampling temperature (default 0.7)</li>
</ul>
</ul>
"""

330
plugins/shodan.py Normal file
View File

@@ -0,0 +1,330 @@
"""
This plugin provides Shodan.io integration for security research and reconnaissance.
"""
import logging
import os
import requests
import simplematrixbotlib as botlib
from dotenv import load_dotenv
# Load environment variables from .env file
plugin_dir = os.path.dirname(os.path.abspath(__file__))
parent_dir = os.path.dirname(plugin_dir)
dotenv_path = os.path.join(parent_dir, '.env')
load_dotenv(dotenv_path)
SHODAN_API_KEY = os.getenv("SHODAN_KEY", "")
SHODAN_API_BASE = "https://api.shodan.io"
async def handle_command(room, message, bot, prefix, config):
"""
Function to handle Shodan commands.
Args:
room (Room): The Matrix room where the command was invoked.
message (RoomMessage): The message object containing the command.
bot (Bot): The bot object.
prefix (str): The command prefix.
config (dict): Configuration parameters.
Returns:
None
"""
match = botlib.MessageMatch(room, message, bot, prefix)
if match.is_not_from_this_bot() and match.prefix() and match.command("shodan"):
logging.info("Received !shodan command")
# Check if API key is configured
if not SHODAN_API_KEY:
await bot.api.send_text_message(
room.room_id,
"Shodan API key not configured. Please set SHODAN_KEY environment variable."
)
logging.error("Shodan API key not configured")
return
args = match.args()
if len(args) < 1:
await show_usage(room, bot)
return
subcommand = args[0].lower()
if subcommand == "ip":
if len(args) < 2:
await bot.api.send_text_message(room.room_id, "Usage: !shodan ip <ip_address>")
return
ip = args[1]
await shodan_ip_lookup(room, bot, ip)
elif subcommand == "search":
if len(args) < 2:
await bot.api.send_text_message(room.room_id, "Usage: !shodan search <query>")
return
query = ' '.join(args[1:])
await shodan_search(room, bot, query)
elif subcommand == "host":
if len(args) < 2:
await bot.api.send_text_message(room.room_id, "Usage: !shodan host <domain/ip>")
return
host = args[1]
await shodan_host(room, bot, host)
elif subcommand == "count":
if len(args) < 2:
await bot.api.send_text_message(room.room_id, "Usage: !shodan count <query>")
return
query = ' '.join(args[1:])
await shodan_count(room, bot, query)
else:
await show_usage(room, bot)
async def show_usage(room, bot):
"""Display Shodan command usage."""
usage = """
<strong>🔍 Shodan Commands:</strong>
<strong>!shodan ip &lt;ip_address&gt;</strong> - Get detailed information about an IP
<strong>!shodan search &lt;query&gt;</strong> - Search Shodan database
<strong>!shodan host &lt;domain/ip&gt;</strong> - Get host information
<strong>!shodan count &lt;query&gt;</strong> - Count results for a search query
<strong>Search Examples:</strong>
• <code>!shodan search apache</code>
• <code>!shodan search "port:22"</code>
• <code>!shodan search "country:US product:nginx"</code>
• <code>!shodan search "net:192.168.1.0/24"</code>
"""
await bot.api.send_markdown_message(room.room_id, usage)
async def shodan_ip_lookup(room, bot, ip):
"""Look up information about a specific IP address."""
try:
url = f"{SHODAN_API_BASE}/shodan/host/{ip}"
params = {"key": SHODAN_API_KEY}
logging.info(f"Fetching Shodan IP info for: {ip}")
response = requests.get(url, params=params, timeout=15)
if response.status_code == 404:
await bot.api.send_text_message(room.room_id, f"No information found for IP: {ip}")
return
elif response.status_code == 401:
await bot.api.send_text_message(room.room_id, "Invalid Shodan API key")
return
elif response.status_code != 200:
await bot.api.send_text_message(room.room_id, f"Shodan API error: {response.status_code}")
return
data = response.json()
# Format the response
output = f"<strong>🔍 Shodan IP Lookup: {ip}</strong><br><br>"
if data.get('country_name'):
output += f"<strong>📍 Location:</strong> {data.get('city', 'N/A')}, {data.get('country_name', 'N/A')}<br>"
if data.get('org'):
output += f"<strong>🏢 Organization:</strong> {data['org']}<br>"
if data.get('os'):
output += f"<strong>💻 Operating System:</strong> {data['os']}<br>"
if data.get('ports'):
output += f"<strong>🔌 Open Ports:</strong> {', '.join(map(str, data['ports']))}<br>"
output += f"<strong>🕒 Last Update:</strong> {data.get('last_update', 'N/A')}<br><br>"
# Show services
if data.get('data'):
output += "<strong>📡 Services:</strong><br>"
for service in data['data'][:5]: # Limit to first 5 services
port = service.get('port', 'N/A')
product = service.get('product', 'Unknown')
version = service.get('version', '')
banner = service.get('data', '')[:100] + "..." if len(service.get('data', '')) > 100 else service.get('data', '')
output += f" • <strong>Port {port}:</strong> {product} {version}<br>"
if banner:
output += f" <em>{banner}</em><br>"
if len(data['data']) > 5:
output += f" • ... and {len(data['data']) - 5} more services<br>"
# Wrap in collapsible if output is large
if len(output) > 500:
output = f"<details><summary><strong>🔍 Shodan IP Lookup: {ip}</strong></summary>{output}</details>"
await bot.api.send_markdown_message(room.room_id, output)
logging.info(f"Sent Shodan IP info for {ip}")
except requests.exceptions.Timeout:
await bot.api.send_text_message(room.room_id, "Shodan API request timed out")
logging.error("Shodan API timeout")
except Exception as e:
await bot.api.send_text_message(room.room_id, f"Error fetching Shodan data: {str(e)}")
logging.error(f"Error in shodan_ip_lookup: {e}")
async def shodan_search(room, bot, query):
"""Search the Shodan database."""
try:
url = f"{SHODAN_API_BASE}/shodan/host/search"
params = {
"key": SHODAN_API_KEY,
"query": query,
"minify": True,
"limit": 5 # Limit results to avoid huge responses
}
logging.info(f"Searching Shodan for: {query}")
response = requests.get(url, params=params, timeout=15)
if response.status_code != 200:
await handle_shodan_error(room, bot, response.status_code)
return
data = response.json()
if not data.get('matches'):
await bot.api.send_text_message(room.room_id, f"No results found for: {query}")
return
output = f"<strong>🔍 Shodan Search: '{query}'</strong><br>"
output += f"<strong>Total Results:</strong> {data.get('total', 0):,}<br><br>"
for match in data['matches'][:5]: # Show first 5 results
ip = match.get('ip_str', 'N/A')
port = match.get('port', 'N/A')
org = match.get('org', 'Unknown')
product = match.get('product', 'Unknown')
output += f"<strong>🌐 {ip}:{port}</strong><br>"
output += f" • <strong>Organization:</strong> {org}<br>"
output += f" • <strong>Service:</strong> {product}<br>"
if match.get('location'):
loc = match['location']
if loc.get('city') and loc.get('country_name'):
output += f" • <strong>Location:</strong> {loc['city']}, {loc['country_name']}<br>"
output += "<br>"
if data.get('total', 0) > 5:
output += f"<em>Showing 5 of {data['total']:,} results. Refine your search for more specific results.</em>"
await bot.api.send_markdown_message(room.room_id, output)
logging.info(f"Sent Shodan search results for: {query}")
except requests.exceptions.Timeout:
await bot.api.send_text_message(room.room_id, "Shodan API request timed out")
logging.error("Shodan API timeout")
except Exception as e:
await bot.api.send_text_message(room.room_id, f"Error searching Shodan: {str(e)}")
logging.error(f"Error in shodan_search: {e}")
async def shodan_host(room, bot, host):
"""Get host information (domain or IP)."""
try:
url = f"{SHODAN_API_BASE}/dns/domain/{host}"
params = {"key": SHODAN_API_KEY}
logging.info(f"Fetching Shodan host info for: {host}")
response = requests.get(url, params=params, timeout=15)
if response.status_code == 404:
# Try IP lookup instead
await shodan_ip_lookup(room, bot, host)
return
elif response.status_code != 200:
await handle_shodan_error(room, bot, response.status_code)
return
data = response.json()
output = f"<strong>🔍 Shodan Host: {host}</strong><br><br>"
if data.get('subdomains'):
output += f"<strong>🌐 Subdomains ({len(data['subdomains'])}):</strong><br>"
for subdomain in sorted(data['subdomains'])[:10]: # Show first 10
output += f"{subdomain}.{host}<br>"
if len(data['subdomains']) > 10:
output += f" • ... and {len(data['subdomains']) - 10} more<br>"
if data.get('tags'):
output += f"<br><strong>🏷️ Tags:</strong> {', '.join(data['tags'])}<br>"
if data.get('data'):
output += f"<br><strong>📊 Records Found:</strong> {len(data['data'])}<br>"
await bot.api.send_markdown_message(room.room_id, output)
logging.info(f"Sent Shodan host info for: {host}")
except requests.exceptions.Timeout:
await bot.api.send_text_message(room.room_id, "Shodan API request timed out")
logging.error("Shodan API timeout")
except Exception as e:
await bot.api.send_text_message(room.room_id, f"Error fetching host info: {str(e)}")
logging.error(f"Error in shodan_host: {e}")
async def shodan_count(room, bot, query):
"""Count results for a search query."""
try:
url = f"{SHODAN_API_BASE}/shodan/host/count"
params = {
"key": SHODAN_API_KEY,
"query": query
}
logging.info(f"Counting Shodan results for: {query}")
response = requests.get(url, params=params, timeout=15)
if response.status_code != 200:
await handle_shodan_error(room, bot, response.status_code)
return
data = response.json()
output = f"<strong>🔍 Shodan Count: '{query}'</strong><br><br>"
output += f"<strong>Total Results:</strong> {data.get('total', 0):,}<br>"
# Show top countries if available
if data.get('facets') and 'country' in data['facets']:
output += "<br><strong>🌍 Top Countries:</strong><br>"
for country in data['facets']['country'][:5]:
output += f"{country['value']}: {country['count']:,}<br>"
# Show top organizations if available
if data.get('facets') and 'org' in data['facets']:
output += "<br><strong>🏢 Top Organizations:</strong><br>"
for org in data['facets']['org'][:5]:
output += f"{org['value']}: {org['count']:,}<br>"
await bot.api.send_markdown_message(room.room_id, output)
logging.info(f"Sent Shodan count for: {query}")
except requests.exceptions.Timeout:
await bot.api.send_text_message(room.room_id, "Shodan API request timed out")
logging.error("Shodan API timeout")
except Exception as e:
await bot.api.send_text_message(room.room_id, f"Error counting Shodan results: {str(e)}")
logging.error(f"Error in shodan_count: {e}")
async def handle_shodan_error(room, bot, status_code):
"""Handle Shodan API errors."""
error_messages = {
401: "Invalid Shodan API key",
403: "Access denied - check API key permissions",
404: "No results found",
429: "Rate limit exceeded - try again later",
500: "Shodan API server error",
503: "Shodan API temporarily unavailable"
}
message = error_messages.get(status_code, f"Shodan API error: {status_code}")
await bot.api.send_text_message(room.room_id, message)
logging.error(f"Shodan API error: {status_code}")

View File

@@ -1,92 +1,156 @@
"""
This plugin provides a command to generate images using self hosted Stable Diffusion and send to the room
Plugin for generating images using self-hosted Stable Diffusion and sending them to a Matrix chat room.
"""
# plugins/stable-diffusion.py
import requests
import base64
import tempfile
import os
from asyncio import Queue
import argparse
import simplematrixbotlib as botlib
import markdown2
from slugify import slugify
def slugify_prompt(prompt):
# Generate a slug from the prompt
return slugify(prompt)
# Queue to store pending commands
command_queue = Queue()
def markdown_to_html(markdown_text):
html_content = markdown2.markdown(markdown_text)
return html_content
def slugify_prompt(prompt: str) -> str:
"""
Generates a URL-friendly slug from the given prompt.
Args:
prompt (str): The prompt to slugify.
Returns:
str: A URL-friendly slug version of the prompt.
"""
return slugify(prompt)
def markdown_to_html(markdown_text: str) -> str:
"""
Converts Markdown text to HTML.
Args:
markdown_text (str): The Markdown text to convert.
Returns:
str: The HTML version of the input Markdown text.
"""
return markdown2.markdown(markdown_text)
async def process_command(room, message, bot, prefix, config):
"""
Processes !sd commands and queues them if already running.
Args:
room: Matrix room object
message: Matrix message object
bot: Bot instance
prefix: Command prefix
config: Bot config object
"""
match = botlib.MessageMatch(room, message, bot, prefix)
if match.prefix() and match.command("sd"):
if command_queue.empty():
await handle_command(room, message, bot, prefix, config)
else:
await command_queue.put((room, message, bot, prefix, config))
await bot.api.send_text_message(room.room_id, "Command queued. Please wait for the current image to finish.")
async def handle_command(room, message, bot, prefix, config):
"""
Handles !sd command: generates image using Stable Diffusion API.
Args:
room: Matrix room object
message: Matrix message object
bot: Bot instance
prefix: Command prefix
config: Bot config object
"""
match = botlib.MessageMatch(room, message, bot, prefix)
if match.prefix() and match.command("sd"):
try:
parser = argparse.ArgumentParser(description='Generate images using self hosted Stable Diffusion')
parser.add_argument('--steps', type=int, default=4, help='Number of steps, default=16')
parser.add_argument('--cfg', type=int, default=1.25, help='CFG scale, default=7')
parser.add_argument('--h', type=int, default=512, help='Height of the image, default=512')
parser.add_argument('--w', type=int, default=512, help='Width of the image, default=512')
parser.add_argument('--neg', type=str, default='((((ugly)))), (((duplicate))), ((morbid)), ((mutilated)), out of frame, extra fingers, mutated hands, ((poorly drawn hands)), ((poorly drawn face)), (((mutation))), (((deformed))), ((ugly)), blurry, ((bad anatomy)), (((bad proportions))), ((extra limbs)), cloned face, (((disfigured))), out of frame, ugly, extra limbs, (bad anatomy), gross proportions, (malformed limbs), ((missing arms)), ((missing legs)), (((extra arms))), (((extra legs))), mutated hands, (fused fingers), (too many fingers), (((long neck)))', nargs='+', help='Negative prompt, default=none')
parser.add_argument('--sampler', type=str, nargs='*', default=['DPM++', 'SDE', 'Karras'], help='Sampler name, default=Euler a')
parser.add_argument('prompt', type=str, nargs='*', help='Prompt for the image')
if not (match.prefix() and match.command("sd")):
return
args = parser.parse_args(message.body.split()[1:]) # Skip the command itself
# Check if API is available
try:
health_check = requests.get("http://127.0.0.1:7860/docs", timeout=3)
if health_check.status_code != 200:
await bot.api.send_text_message(room.room_id, "Stable Diffusion API is not running!")
return
except Exception:
await bot.api.send_text_message(room.room_id, "Could not reach Stable Diffusion API!")
return
if not args.prompt:
raise argparse.ArgumentError(None, "Prompt is required.")
try:
# Parse command-line arguments
parser = argparse.ArgumentParser(description='Generate images using self-hosted Stable Diffusion')
parser.add_argument('--steps', type=int, default=4, help='Number of steps, default=4')
parser.add_argument('--cfg', type=int, default=2, help='CFG scale, default=2')
parser.add_argument('--h', type=int, default=512, help='Height of the image, default=512')
parser.add_argument('--w', type=int, default=512, help='Width of the image, default=512')
parser.add_argument('--neg', type=str, nargs='+', default=['((((ugly)))), (((duplicate))), ((morbid)), ((mutilated)), out of frame, extra fingers, mutated hands, ((poorly drawn hands)), ((poorly drawn face)), (((mutation))), (((deformed))), ((ugly)), blurry, ((bad anatomy)), (((bad proportions))), ((extra limbs)), cloned face, (((disfigured))), out of frame, ugly, extra limbs, (bad anatomy), gross proportions, (malformed limbs), ((missing arms)), ((missing legs)), (((extra arms))), (((extra legs))), mutated hands, (fused fingers), (too many fingers), (((long neck)))'], help='Negative prompt')
parser.add_argument('--sampler', type=str, nargs='*', default=['DPM++', 'SDE'], help='Sampler name, default=DPM++ SDE')
parser.add_argument('prompt', type=str, nargs='*', help='Prompt for the image')
prompt = ' '.join(args.prompt)
sampler_name = ' '.join(args.sampler)
neg = ' '.join(args.neg)
payload = {
"prompt": prompt,
"steps": args.steps,
"negative_prompt": neg,
"sampler_name": sampler_name,
"cfg_scale": args.cfg,
"width": args.w,
"height": args.h,
}
url = "http://127.0.0.1:7860/sdapi/v1/txt2img"
args = parser.parse_args(message.body.split()[1:]) # skip command prefix
response = requests.post(url=url, json=payload)
r = response.json()
if not args.prompt:
raise argparse.ArgumentError(None, "Prompt is required.")
# Slugify the prompt
prompt_slug = prompt[:120]
prompt = ' '.join(args.prompt)
sampler_name = ' '.join(args.sampler)
neg_prompt = ' '.join(args.neg)
# Construct filename
filename = f"{prompt_slug}_{args.steps}_{args.h}x{args.w}_{sampler_name}_{args.cfg}.jpg"
with open(f"/tmp/{filename}", 'wb') as f:
f.write(base64.b64decode(r['images'][0]))
await bot.api.send_image_message(room_id=room.room_id, image_filepath=f"/tmp/{filename}") # Corrected argument name
neg = neg.replace(" ", "")
info_msg = f"""<details><summary>🍄⤵Image Info:⤵︎</summary><strong>Prompt:</strong> {prompt_slug}<br><strong>Steps:</strong> {args.steps}<br><strong>Dimensions:</strong> {args.h}x{args.w}<br><strong>Sampler:</strong> {sampler_name}<br><strong>CFG Scale:</strong> {args.cfg}<br><strong>Negative Prompt: {neg}</strong></details>"""
await bot.api.send_markdown_message(room.room_id, info_msg)
except argparse.ArgumentError as e:
await bot.api.send_text_message(room.room_id, f"Error: {e}")
await bot.api.send_markdown_message(room.room_id, "<details><summary>Stable Diffusion Help</summary>" + print_help() + "</details>")
except Exception as e:
await bot.api.send_text_message(room.room_id, f"Error processing the command: {str(e)}")
finally:
if not command_queue.empty():
next_command = await command_queue.get()
await handle_command(*next_command)
payload = {
"prompt": prompt,
"steps": args.steps,
"negative_prompt": neg_prompt,
"sampler_name": sampler_name,
"cfg_scale": args.cfg,
"width": args.w,
"height": args.h,
}
url = "http://127.0.0.1:7860/sdapi/v1/txt2img"
response = requests.post(url=url, json=payload, timeout=600)
r = response.json()
# Use secure temporary file
with tempfile.NamedTemporaryFile(suffix='.jpg', delete=False) as temp_file:
filename = temp_file.name
temp_file.write(base64.b64decode(r['images'][0]))
# Send image to Matrix room
await bot.api.send_image_message(room_id=room.room_id, image_filepath=filename)
# Optional: send info about generated image
neg_prompt_clean = neg_prompt.replace(" ", "")
info_msg = f"""<details><summary>🔍 Image Info</summary><strong>Prompt:</strong> {prompt[:100]}<br><strong>Steps:</strong> {args.steps}<br><strong>Dimensions:</strong> {args.h}x{args.w}<br><strong>Sampler:</strong> {sampler_name}<br><strong>CFG Scale:</strong> {args.cfg}<br><strong>Negative Prompt:</strong> {neg_prompt_clean}</details>"""
# await bot.api.send_markdown_message(room.room_id, info_msg)
# Clean up temp file
os.remove(filename)
except argparse.ArgumentError as e:
await bot.api.send_text_message(room.room_id, f"Argument Error: {e}")
await bot.api.send_markdown_message(room.room_id, "<details><summary>Stable Diffusion Help</summary>" + print_help() + "</details>")
except Exception as e:
await bot.api.send_text_message(room.room_id, f"Error processing the command: {str(e)}")
finally:
# Process next queued command
if not command_queue.empty():
next_command = await command_queue.get()
await handle_command(*next_command)
def print_help():
"""
Generates help text for the 'sd' command.
Returns:
str: Help text for the 'sd' command.
"""
return """
<p>Generate images using self-hosted Stable Diffusion</p>
@@ -95,6 +159,8 @@ def print_help():
<li>prompt - Prompt for the image</li>
</ul>
<p>Default Negative Prompts: ((((ugly)))), (((duplicate))), ((morbid)), ((mutilated)), out of frame, extra fingers, mutated hands, ((poorly drawn hands)), ((poorly drawn face)), (((mutation))), (((deformed))), ((ugly)), blurry, ((bad anatomy)), (((bad proportions))), ((extra limbs)), cloned face, (((disfigured))), out of frame, ugly, extra limbs, (bad anatomy), gross proportions, (malformed limbs), ((missing arms)), ((missing legs)), (((extra arms))), (((extra legs))), mutated hands, (fused fingers), (too many fingers), (((long neck)))</p>
<p>Optional arguments:</p>
<ul>
<li>--steps STEPS - Number of steps, default=16</li>

193
plugins/urbandictionary.py Normal file
View File

@@ -0,0 +1,193 @@
"""
This plugin provides a command to fetch definitions from Urban Dictionary.
"""
import logging
import requests
import simplematrixbotlib as botlib
import html
URBAN_API_URL = "https://api.urbandictionary.com/v0/define"
RANDOM_API_URL = "https://api.urbandictionary.com/v0/random"
def format_definition(term, definition, example, author, thumbs_up, thumbs_down, permalink, index=None, total=None):
"""
Format an Urban Dictionary definition for display.
Args:
term (str): The term being defined.
definition (str): The definition text.
example (str): Example usage.
author (str): Author of the definition.
thumbs_up (int): Number of upvotes.
thumbs_down (int): Number of downvotes.
permalink (str): URL to the definition.
index (int, optional): Current definition index.
total (int, optional): Total number of definitions.
Returns:
str: Formatted HTML message.
"""
# Clean up the text - Urban Dictionary uses [brackets] for links
definition = definition.replace('[', '<strong>').replace(']', '</strong>')
example = example.replace('[', '<em>').replace(']', '</em>')
# Escape any HTML that might be in the original text
term = html.escape(term)
author = html.escape(author)
# Build the message
header = f"<strong>📖 Urban Dictionary: {term}</strong>"
if index is not None and total is not None:
header += f" (Definition {index}/{total})"
message = f"""{header}
<strong>Definition:</strong>
{definition}
"""
if example and example.strip():
message += f"""
<strong>Example:</strong>
<em>{example}</em>
"""
message += f"""
<strong>Author:</strong> {author} | 👍 {thumbs_up} 👎 {thumbs_down}
<a href="{permalink}">View on Urban Dictionary</a>
"""
return message
async def handle_command(room, message, bot, prefix, config):
"""
Function to handle the !ud (Urban Dictionary) command.
Args:
room (Room): The Matrix room where the command was invoked.
message (RoomMessage): The message object containing the command.
bot (Bot): The bot object.
prefix (str): The command prefix.
config (dict): Configuration parameters.
Returns:
None
"""
match = botlib.MessageMatch(room, message, bot, prefix)
if match.is_not_from_this_bot() and match.prefix() and match.command("ud"):
logging.info("Received !ud command")
args = match.args()
try:
# Case 1: No arguments - get random definition
if len(args) == 0:
logging.info("Fetching random Urban Dictionary definition")
response = requests.get(RANDOM_API_URL, timeout=10)
response.raise_for_status()
data = response.json()
if not data.get('list'):
await bot.api.send_text_message(room.room_id, "No random definition found.")
return
# Get first random entry
entry = data['list'][0]
formatted = format_definition(
term=entry['word'],
definition=entry['definition'],
example=entry.get('example', ''),
author=entry['author'],
thumbs_up=entry['thumbs_up'],
thumbs_down=entry['thumbs_down'],
permalink=entry['permalink']
)
await bot.api.send_markdown_message(room.room_id, formatted)
logging.info(f"Sent random definition: {entry['word']}")
return
# Case 2: One or more arguments - search for term
# Check if last argument is a number (definition index)
index = None
search_term = ' '.join(args)
if args[-1].isdigit():
index = int(args[-1])
search_term = ' '.join(args[:-1])
if not search_term:
await bot.api.send_text_message(
room.room_id,
"Usage: !ud [term] [index]\nExamples:\n !ud - random definition\n !ud yeet - first definition of 'yeet'\n !ud yeet 2 - second definition of 'yeet'"
)
return
logging.info(f"Searching Urban Dictionary for: {search_term}")
params = {'term': search_term}
response = requests.get(URBAN_API_URL, params=params, timeout=10)
response.raise_for_status()
data = response.json()
definitions = data.get('list', [])
if not definitions:
await bot.api.send_text_message(
room.room_id,
f"No definition found for '{search_term}'"
)
logging.info(f"No definition found for: {search_term}")
return
total = len(definitions)
# If no index specified, use first definition
if index is None:
index = 1
# Validate index
if index < 1 or index > total:
await bot.api.send_text_message(
room.room_id,
f"Invalid index. '{search_term}' has {total} definition(s). Use !ud {search_term} [1-{total}]"
)
return
# Get the requested definition (convert to 0-based index)
entry = definitions[index - 1]
formatted = format_definition(
term=entry['word'],
definition=entry['definition'],
example=entry.get('example', ''),
author=entry['author'],
thumbs_up=entry['thumbs_up'],
thumbs_down=entry['thumbs_down'],
permalink=entry['permalink'],
index=index,
total=total
)
await bot.api.send_markdown_message(room.room_id, formatted)
logging.info(f"Sent definition {index}/{total} for: {search_term}")
except requests.exceptions.Timeout:
await bot.api.send_text_message(
room.room_id,
"Request timed out. Urban Dictionary may be slow or unavailable."
)
logging.error("Urban Dictionary API timeout")
except requests.exceptions.RequestException as e:
await bot.api.send_text_message(
room.room_id,
f"Error fetching from Urban Dictionary: {e}"
)
logging.error(f"Error fetching from Urban Dictionary: {e}")
except Exception as e:
await bot.api.send_text_message(
room.room_id,
"An error occurred while processing the Urban Dictionary request."
)
logging.error(f"Unexpected error in Urban Dictionary plugin: {e}", exc_info=True)

162
plugins/weather.py Normal file
View File

@@ -0,0 +1,162 @@
"""
This plugin provides a command to get weather information for a location.
"""
import logging
import requests
import os
import simplematrixbotlib as botlib
from dotenv import load_dotenv
# Load environment variables from .env file in the parent directory
# Get the directory where this plugin file is located
plugin_dir = os.path.dirname(os.path.abspath(__file__))
# Get the parent directory (main bot directory)
parent_dir = os.path.dirname(plugin_dir)
# Load .env from parent directory
dotenv_path = os.path.join(parent_dir, '.env')
load_dotenv(dotenv_path)
# OpenWeatherMap API configuration
# Get your free API key from: https://openweathermap.org/api
WEATHER_API_KEY = os.getenv("OPENWEATHER_API_KEY", "")
WEATHER_API_URL = "https://api.openweathermap.org/data/2.5/weather"
async def handle_command(room, message, bot, prefix, config):
"""
Function to handle the !weather command.
Args:
room (Room): The Matrix room where the command was invoked.
message (RoomMessage): The message object containing the command.
bot (Bot): The bot object.
prefix (str): The command prefix.
config (dict): Configuration parameters.
Returns:
None
"""
match = botlib.MessageMatch(room, message, bot, prefix)
if match.is_not_from_this_bot() and match.prefix() and match.command("weather"):
logging.info("Received !weather command")
# Check if API key is configured
if not WEATHER_API_KEY:
await bot.api.send_text_message(
room.room_id,
"Weather API key not configured. Please set OPENWEATHER_API_KEY environment variable."
)
return
args = match.args()
# Check if location was provided
if len(args) < 1:
await bot.api.send_text_message(
room.room_id,
"Usage: !weather <location>\nExample: !weather London or !weather New York,US"
)
logging.info("Sent usage message for !weather")
return
location = ' '.join(args)
try:
# Make API request to OpenWeatherMap
params = {
'q': location,
'appid': WEATHER_API_KEY,
'units': 'metric' # Use metric units (Celsius)
}
response = requests.get(WEATHER_API_URL, params=params, timeout=10)
response.raise_for_status()
weather_data = response.json()
# Extract relevant weather information
city_name = weather_data['name']
country = weather_data['sys']['country']
temp = weather_data['main']['temp']
feels_like = weather_data['main']['feels_like']
humidity = weather_data['main']['humidity']
description = weather_data['weather'][0]['description'].capitalize()
wind_speed = weather_data['wind'].get('speed', 0)
# Convert temperature to Fahrenheit for display
temp_f = (temp * 9/5) + 32
feels_like_f = (feels_like * 9/5) + 32
# Get weather emoji based on condition
weather_emoji = get_weather_emoji(weather_data['weather'][0]['main'])
# Format the weather message
weather_message = f"""
<strong>[{weather_emoji} Weather for {city_name}, {country}]</strong>: <strong>Condition:</strong> {description} | <strong>Temperature:</strong> {temp:.1f}°C ({temp_f:.1f}°F) | <strong>Feels like:</strong> {feels_like:.1f}°C ({feels_like_f:.1f}°F) | <strong>Humidity:</strong> {humidity}% | <strong>Wind Speed:</strong> {wind_speed} m/s
""".strip()
await bot.api.send_markdown_message(room.room_id, weather_message)
logging.info(f"Sent weather information for {city_name}")
except requests.exceptions.HTTPError as e:
if e.response.status_code == 404:
await bot.api.send_text_message(
room.room_id,
f"Location '{location}' not found. Please check the spelling and try again."
)
elif e.response.status_code == 401:
await bot.api.send_text_message(
room.room_id,
"Weather API authentication failed. Please check the API key configuration."
)
else:
await bot.api.send_text_message(
room.room_id,
f"Error fetching weather data: HTTP {e.response.status_code}"
)
logging.error(f"HTTP error fetching weather for '{location}': {e}")
except requests.exceptions.RequestException as e:
await bot.api.send_text_message(
room.room_id,
f"Error connecting to weather service: {e}"
)
logging.error(f"Request error fetching weather for '{location}': {e}")
except (KeyError, ValueError) as e:
await bot.api.send_text_message(
room.room_id,
"Error parsing weather data. Please try again later."
)
logging.error(f"Error parsing weather data for '{location}': {e}")
def get_weather_emoji(condition):
"""
Get an emoji based on weather condition.
Args:
condition (str): Weather condition from API.
Returns:
str: Weather emoji.
"""
weather_emojis = {
'Clear': '☀️',
'Clouds': '☁️',
'Rain': '🌧️',
'Drizzle': '🌦️',
'Thunderstorm': '⛈️',
'Snow': '❄️',
'Mist': '🌫️',
'Fog': '🌫️',
'Haze': '🌫️',
'Smoke': '🌫️',
'Dust': '🌫️',
'Sand': '🌫️',
'Ash': '🌫️',
'Squall': '💨',
'Tornado': '🌪️'
}
return weather_emojis.get(condition, '🌡️')

45
plugins/xkcd.py Normal file
View File

@@ -0,0 +1,45 @@
"""
Provides a command to fetch random xkcd comic
"""
import requests
import tempfile
import random
import simplematrixbotlib as botlib
# Define the XKCD API URL
XKCD_API_URL = "https://xkcd.com/info.0.json"
async def handle_command(room, message, bot, prefix, config):
match = botlib.MessageMatch(room, message, bot, prefix)
if match.prefix() and match.command("xkcd"):
# Fetch the latest comic number from XKCD API
try:
response = requests.get(XKCD_API_URL, timeout=10)
response.raise_for_status() # Raise an exception for non-200 status codes
latest_comic_num = response.json()["num"]
# Choose a random comic number
random_comic_num = random.randint(1, latest_comic_num)
# Fetch the random comic data
random_comic_url = f"https://xkcd.com/{random_comic_num}/info.0.json"
comic_response = requests.get(random_comic_url, timeout=10)
comic_response.raise_for_status()
comic_data = comic_response.json()
image_url = comic_data["img"]
# Download the image
image_response = requests.get(image_url, timeout=10)
image_response.raise_for_status()
# Use secure temporary file
with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as temp_file:
image_path = temp_file.name
temp_file.write(image_response.content)
# Send the image to the room
await bot.api.send_image_message(room_id=room.room_id, image_filepath=image_path)
# Clean up temp file
import os
os.remove(image_path)
except Exception as e:
await bot.api.send_text_message(room.room_id, f"Error fetching XKCD comic: {str(e)}")

View File

@@ -1,45 +1,174 @@
"""
This plugin provides a command to fetch YouTube video information from links.
Plugin for providing a command to fetch YouTube video information from links.
"""
# plugins/youtube.py
# Importing necessary libraries
import re
import logging
from pytubefix import YouTube
import simplematrixbotlib as botlib
import asyncio
import aiohttp
import yt_dlp
import simplematrixbotlib as botlib
from youtube_title_parse import get_artist_title
LYRICIST_API_URL = "https://lyrist.vercel.app/api/{}/{}"
def seconds_to_minutes_seconds(seconds):
"""
Converts seconds to a string representation of minutes and seconds.
Args:
seconds (int): The number of seconds.
Returns:
str: A string representation of minutes and seconds in the format MM:SS.
"""
minutes = seconds // 60
seconds %= 60
return f"{minutes:02d}:{seconds:02d}"
async def fetch_youtube_info(youtube_url):
async def fetch_lyrics(song, artist):
"""
Asynchronously fetches lyrics for a song from the Lyricist API.
Args:
song (str): The name of the song.
artist (str): The name of the artist.
Returns:
str: Lyrics of the song.
None if an error occurs during fetching.
"""
try:
video = YouTube(youtube_url)
title = video.title
description = video.description
length = seconds_to_minutes_seconds(video.length)
views = video.views
author = video.author
description_with_breaks = description.replace('\n', '<br>')
info_message = f"""<strong>🎬🎝 Title:</strong> {title} | <strong>Length</strong>: {length} minutes | <strong>Views</strong>: {views}\n<details><summary><strong>⤵Click Here For Description⤵</strong></summary>{description_with_breaks}</details>"""
return info_message
async with aiohttp.ClientSession() as session:
url = LYRICIST_API_URL.format(artist, song)
logging.info(f"Fetching lyrics from: {url}")
async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as response:
if response.status == 200:
data = await response.json()
return data.get("lyrics")
else:
logging.warning(f"Lyrics API returned status {response.status}")
return None
except asyncio.TimeoutError:
logging.error("Timeout fetching lyrics")
return None
except Exception as e:
logging.error(f"Error fetching YouTube video information: {str(e)}")
logging.error(f"Error fetching lyrics: {str(e)}")
return None
async def fetch_youtube_info(youtube_url):
"""
Asynchronously fetches information about a YouTube video using yt-dlp.
Args:
youtube_url (str): The URL of the YouTube video.
Returns:
str: A message containing information about the YouTube video.
None if an error occurs during fetching.
"""
try:
logging.info(f"Fetching YouTube info for: {youtube_url}")
# Configure yt-dlp options
ydl_opts = {
'quiet': True,
'no_warnings': True,
'extract_flat': False,
'skip_download': True,
}
# Run yt-dlp in thread pool to avoid blocking
loop = asyncio.get_event_loop()
def extract_info():
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
return ydl.extract_info(youtube_url, download=False)
info = await loop.run_in_executor(None, extract_info)
if not info:
logging.error("No info returned from yt-dlp")
return None
# Extract video information
title = info.get('title', 'Unknown Title')
description = info.get('description', 'No description available')
duration = info.get('duration', 0)
view_count = info.get('view_count', 0)
uploader = info.get('uploader', 'Unknown')
logging.info(f"Video title: {title}")
length = seconds_to_minutes_seconds(duration)
# Parse artist and song from title
artist, song = get_artist_title(title)
logging.info(f"Parsed artist: {artist}, song: {song}")
# Limit description length to avoid huge messages
if len(description) > 500:
description = description[:500] + "..."
description_with_breaks = description.replace('\n', '<br>')
# Build basic info message
info_message = f"""<strong>🎬🎝 Title:</strong> {title}<br><strong>Length:</strong> {length} | <strong>Views:</strong> {view_count:,} | <strong>Uploader:</strong> {uploader}<br><details><summary><strong>⤵Description⤵</strong></summary>{description_with_breaks}</details>"""
# Try to fetch lyrics if artist and song were parsed
if artist and song:
logging.info("Attempting to fetch lyrics...")
lyrics = await fetch_lyrics(song, artist)
if lyrics:
lyrics = lyrics.replace('\n', "<br>")
# Limit lyrics length
if len(lyrics) > 3000:
lyrics = lyrics[:3000] + "<br>...(truncated)"
info_message += f"<br><details><summary><strong>🎵 Lyrics:</strong></summary><br>{lyrics}</details>"
else:
logging.info("No lyrics found")
else:
logging.info("Could not parse artist/song from title, skipping lyrics")
return info_message
except Exception as e:
logging.error(f"Error fetching YouTube video information: {str(e)}", exc_info=True)
return None
async def handle_command(room, message, bot, prefix, config):
"""
Asynchronously handles the command to fetch YouTube video information.
Args:
room (Room): The Matrix room where the command was invoked.
message (RoomMessage): The message object containing the command.
bot (MatrixBot): The Matrix bot instance.
prefix (str): The command prefix.
config (dict): The bot's configuration.
Returns:
None
"""
match = botlib.MessageMatch(room, message, bot, prefix)
if match.is_not_from_this_bot() and re.search(r'youtube\.com/watch\?v=', message.body): # and room.room_id != '!uFhErnfpYhhlauJsNK:matrix.org':
logging.info("YouTube link detected")
video_id_match = re.search(r'youtube\.com/watch\?v=([^\s]+)', message.body)
# Check if message contains a YouTube link
if match.is_not_from_this_bot() and re.search(r'(youtube\.com/watch\?v=|youtu\.be/)', message.body):
logging.info(f"YouTube link detected in message: {message.body}")
# Match both youtube.com and youtu.be formats
video_id_match = re.search(r'(?:youtube\.com/watch\?v=|youtu\.be/)([a-zA-Z0-9_-]{11})', message.body)
if video_id_match:
video_id = video_id_match.group(1)
youtube_url = f"https://www.youtube.com/watch?v={video_id}"
logging.info(f"Fetching information for YouTube video: {youtube_url}")
retry_count = 3
logging.info(f"Fetching information for YouTube video ID: {video_id}")
retry_count = 2 # Reduced retries since yt-dlp is more reliable
while retry_count > 0:
info_message = await fetch_youtube_info(youtube_url)
if info_message:
@@ -47,10 +176,12 @@ async def handle_command(room, message, bot, prefix, config):
logging.info("Sent YouTube video information to the room")
break
else:
logging.info("Retrying...")
logging.warning(f"Failed to fetch info, retrying... ({retry_count-1} attempts left)")
retry_count -= 1
await asyncio.sleep(1) # wait for 1 second before retrying
if retry_count > 0:
await asyncio.sleep(2) # wait for 2 seconds before retrying
else:
logging.error("Failed to fetch YouTube video information after retries")
logging.error("Failed to fetch YouTube video information after all retries")
await bot.api.send_text_message(room.room_id, "Failed to fetch YouTube video information. The video may be unavailable or age-restricted.")
else:
logging.warning("Could not extract video ID from YouTube URL")

View File

@@ -1,13 +1,25 @@
"""
This plugin provides a command to search for YouTube videos in the room
Plugin for providing a command to search for YouTube videos in the room.
"""
# plugins/youtube_search.py
import logging
import simplematrixbotlib as botlib
from youtube_search import YoutubeSearch
async def handle_command(room, message, bot, PREFIX, config):
"""
Asynchronously handles the command to search for YouTube videos in the room.
Args:
room (Room): The Matrix room where the command was invoked.
message (RoomMessage): The message object containing the command.
bot (MatrixBot): The Matrix bot instance.
PREFIX (str): The command prefix.
config (dict): The bot's configuration.
Returns:
None
"""
match = botlib.MessageMatch(room, message, bot, PREFIX)
if match.is_not_from_this_bot() and match.prefix() and match.command("yt"):
args = match.args()
@@ -16,7 +28,7 @@ async def handle_command(room, message, bot, PREFIX, config):
else:
search_terms = " ".join(args)
logging.info(f"Performing YouTube search for: {search_terms}")
results = YoutubeSearch(search_terms, max_results=3).to_dict()
results = YoutubeSearch(search_terms, max_results=1).to_dict()
if results:
output = generate_output(results)
await send_collapsible_message(room, bot, output)
@@ -24,6 +36,15 @@ async def handle_command(room, message, bot, PREFIX, config):
await bot.api.send_text_message(room.room_id, "No results found.")
def generate_output(results):
"""
Generates HTML output for displaying YouTube search results.
Args:
results (list): A list of dictionaries containing information about YouTube videos.
Returns:
str: HTML formatted output containing YouTube search results.
"""
output = ""
for video in results:
output += f'<a href="https://www.youtube.com/watch?v={video["id"]}">'
@@ -37,5 +58,16 @@ def generate_output(results):
async def send_collapsible_message(room, bot, content):
"""
Sends a collapsible message containing YouTube search results to the room.
Args:
room (Room): The Matrix room where the message will be sent.
bot (MatrixBot): The Matrix bot instance.
content (str): HTML content to be included in the collapsible message.
Returns:
None
"""
message = f'<details><summary><strong>🍄Funguy ▶YouTube Search🍄<br>⤵Click Here To See Results⤵</strong></summary>{content}</details>'
await bot.api.send_markdown_message(room.room_id, message)

View File

@@ -1,9 +1,13 @@
python-dotenv
requests
pytubefix
duckduckgo_search
nio
markdown2
watchdog
emoji
python-slugify
youtube_title_parse
dnspython
croniter
schedule
yt-dlp