Reduced some vuln after SAST scan
This commit is contained in:
@@ -14,3 +14,4 @@ config_file = "funguy.conf"
|
||||
[plugins.disabled]
|
||||
"!uFhErnfpYhhlauJsNK:matrix.org" = [ "youtube-preview", "ai", "proxy",]
|
||||
"!vYcfWXpPvxeQvhlFdV:matrix.org" = []
|
||||
"!NXdVjDXPxXowPkrJJY:matrix.org" = [ "karma",]
|
||||
|
39
funguy.py
39
funguy.py
@@ -17,6 +17,11 @@ import toml # Library for parsing TOML configuration files
|
||||
# Importing FunguyConfig class from plugins.config module
|
||||
from plugins.config import FunguyConfig
|
||||
|
||||
# Whitelist of allowed plugins to prevent arbitrary code execution
|
||||
ALLOWED_PLUGINS = {'ai', 'config', 'cron', 'date', 'fortune', 'help', 'isup', 'karma',
|
||||
'loadplugin', 'plugins', 'proxy', 'sd_text', 'stable-diffusion',
|
||||
'xkcd', 'youtube-preview', 'youtube-search'}
|
||||
|
||||
class FunguyBot:
|
||||
"""
|
||||
A bot class for managing plugins and handling commands in a Matrix chat environment.
|
||||
@@ -78,17 +83,22 @@ class FunguyBot:
|
||||
"""
|
||||
Method to load plugins from the specified directory.
|
||||
"""
|
||||
# Iterating through files in the plugins directory
|
||||
for plugin_file in os.listdir(self.PLUGINS_DIR):
|
||||
if plugin_file.endswith(".py"): # Checking if file is a Python file
|
||||
plugin_name = os.path.splitext(plugin_file)[0] # Extracting plugin name
|
||||
try:
|
||||
# Importing plugin module dynamically
|
||||
module = importlib.import_module(f"{self.PLUGINS_DIR}.{plugin_name}")
|
||||
self.PLUGINS[plugin_name] = module # Storing loaded plugin module
|
||||
logging.info(f"Loaded plugin: {plugin_name}") # Logging successful plugin loading
|
||||
except Exception as e:
|
||||
logging.error(f"Error loading plugin {plugin_name}: {e}") # Logging error if plugin loading fails
|
||||
# Iterating through whitelisted plugins only
|
||||
for plugin_name in ALLOWED_PLUGINS:
|
||||
plugin_file = os.path.join(self.PLUGINS_DIR, f"{plugin_name}.py")
|
||||
|
||||
# Verify that the plugin file exists
|
||||
if not os.path.isfile(plugin_file):
|
||||
logging.warning(f"Plugin file not found: {plugin_file}, skipping")
|
||||
continue
|
||||
|
||||
try:
|
||||
# Importing plugin module dynamically with validated plugin name
|
||||
module = importlib.import_module(f"{self.PLUGINS_DIR}.{plugin_name}")
|
||||
self.PLUGINS[plugin_name] = module # Storing loaded plugin module
|
||||
logging.info(f"Loaded plugin: {plugin_name}") # Logging successful plugin loading
|
||||
except Exception as e:
|
||||
logging.error(f"Error loading plugin {plugin_name}: {e}") # Logging error if plugin loading fails
|
||||
|
||||
def reload_plugins(self):
|
||||
"""
|
||||
@@ -233,3 +243,10 @@ class FunguyBot:
|
||||
if __name__ == "__main__":
|
||||
bot = FunguyBot() # Creating instance of FunguyBot
|
||||
bot.run() # Running the bot
|
||||
|
||||
from plugins import cron # Import your cron plugin
|
||||
|
||||
# After bot starts running, periodically check for cron jobs
|
||||
while True:
|
||||
asyncio.sleep(60) # Check every minute (adjust as needed)
|
||||
cron.run_cron_jobs(bot) # Check and execute cron jobs
|
||||
|
@@ -61,7 +61,7 @@ async def handle_ai_command(room, bot, command, args, config):
|
||||
|
||||
data = {
|
||||
"prompt": f"<s>[INST]{config[command]['prompt']}{prompt}[/INST]",
|
||||
"max_tokens": 1024,
|
||||
"max_tokens": 4096,
|
||||
"temperature": config[command]["temperature"],
|
||||
"top_p": config[command]["top_p"],
|
||||
"top_k": config[command]["top_k"],
|
||||
@@ -72,7 +72,7 @@ async def handle_ai_command(room, bot, command, args, config):
|
||||
|
||||
# Make HTTP request to the API endpoint
|
||||
try:
|
||||
response = requests.post(url, headers=headers, json=data, verify=False)
|
||||
response = requests.post(url, headers=headers, json=data, verify=False, timeout=300)
|
||||
response.raise_for_status() # Raise HTTPError for bad responses
|
||||
payload = response.json()
|
||||
new_text = payload['choices'][0]['text']
|
||||
|
64
plugins/cron.py
Normal file
64
plugins/cron.py
Normal file
@@ -0,0 +1,64 @@
|
||||
# plugins/cron.py
|
||||
|
||||
import sqlite3
|
||||
from crontab import CronTab
|
||||
import simplematrixbotlib as botlib
|
||||
|
||||
# Database connection and cursor
|
||||
conn = sqlite3.connect('cron.db')
|
||||
cursor = conn.cursor()
|
||||
|
||||
# Create table if not exists
|
||||
cursor.execute('''CREATE TABLE IF NOT EXISTS cron (
|
||||
room_id TEXT,
|
||||
cron_entry TEXT,
|
||||
command TEXT
|
||||
)''')
|
||||
conn.commit()
|
||||
|
||||
async def handle_command(room, message, bot, prefix, config):
|
||||
match = botlib.MessageMatch(room, message, bot, prefix)
|
||||
if match.is_not_from_this_bot() and match.prefix() and match.command("cron"):
|
||||
args = match.args()
|
||||
if len(args) >= 4:
|
||||
action = args[0]
|
||||
room_id = args[1]
|
||||
cron_entry = ' '.join(args[2:-1])
|
||||
command = args[-1]
|
||||
if action == "add":
|
||||
add_cron(room_id, cron_entry, command)
|
||||
await bot.api.send_text_message(room.room_id, f"Cron added successfully")
|
||||
elif action == "remove":
|
||||
remove_cron(room_id, command)
|
||||
await bot.api.send_text_message(room.room_id, f"Cron removed successfully")
|
||||
else:
|
||||
await bot.api.send_text_message(room.room_id, "Usage: !cron add|remove room_id cron_entry command")
|
||||
|
||||
def add_cron(room_id, cron_entry, command):
|
||||
# Check if the cron entry already exists in the database for the given room_id and command
|
||||
cursor.execute('SELECT * FROM cron WHERE room_id=? AND command=? AND cron_entry=?', (room_id, command, cron_entry))
|
||||
existing_entry = cursor.fetchone()
|
||||
if existing_entry:
|
||||
return # Cron entry already exists, do not add duplicate
|
||||
|
||||
# Insert the cron entry into the database
|
||||
cursor.execute('INSERT INTO cron (room_id, cron_entry, command) VALUES (?, ?, ?)', (room_id, cron_entry, command))
|
||||
conn.commit()
|
||||
|
||||
def remove_cron(room_id, command):
|
||||
cursor.execute('DELETE FROM cron WHERE room_id=? AND command=?', (room_id, command))
|
||||
conn.commit()
|
||||
|
||||
async def run_cron_jobs(bot):
|
||||
cron = CronTab()
|
||||
for job in cron:
|
||||
cron_entry = str(job)
|
||||
for row in cursor.execute('SELECT * FROM cron WHERE cron_entry=?', (cron_entry,)):
|
||||
room_id, _, command = row
|
||||
room = await bot.api.get_room_by_id(room_id)
|
||||
if room:
|
||||
plugin_name = command.split()[0].replace(prefix, '') # Extract plugin name
|
||||
plugin_module = bot.plugins.get(plugin_name)
|
||||
if plugin_module:
|
||||
await plugin_module.handle_command(room, None, bot, prefix, config)
|
||||
|
@@ -65,6 +65,10 @@ async def handle_command(room, message, bot, prefix, config):
|
||||
<p>Disables a command. Use '!disable plugin room' to disable a specific command.</p>
|
||||
</details>
|
||||
|
||||
<details><summary>📸 <strong>!xkcd</strong></summary>
|
||||
<p>Get random xkcd image.</p>
|
||||
</details>
|
||||
|
||||
<details><summary>🌟 <strong>Funguy Bot Credits</strong> 🌟</summary>
|
||||
<p>🧙♂️ Creator & Developer: Hash Borgir is the author of 🍄Funguy Bot🍄. (@hashborgir:mozilla.org)</p>
|
||||
<p>Join our Matrix Room: [Self-hosting | Security | Sysadmin | Homelab | Programming](https://matrix.to/#/#selfhosting:mozilla.org)</p>
|
||||
|
@@ -11,6 +11,11 @@ import sys # Import sys module for unloading plugins
|
||||
# Dictionary to store loaded plugins
|
||||
PLUGINS = {}
|
||||
|
||||
# Whitelist of allowed plugins to prevent arbitrary code execution
|
||||
ALLOWED_PLUGINS = {'ai', 'config', 'cron', 'date', 'fortune', 'help', 'isup', 'karma',
|
||||
'loadplugin', 'plugins', 'proxy', 'sd_text', 'stable-diffusion',
|
||||
'xkcd', 'youtube-preview', 'youtube-search'}
|
||||
|
||||
async def load_plugin(plugin_name):
|
||||
"""
|
||||
Asynchronously loads a plugin.
|
||||
@@ -21,9 +26,46 @@ async def load_plugin(plugin_name):
|
||||
Returns:
|
||||
bool: True if the plugin is loaded successfully, False otherwise.
|
||||
"""
|
||||
# Validate plugin name against whitelist
|
||||
if plugin_name not in ALLOWED_PLUGINS:
|
||||
logging.error(f"Plugin '{plugin_name}' is not whitelisted")
|
||||
return False
|
||||
|
||||
# Verify that the plugin file exists in the plugins directory
|
||||
plugin_path = os.path.join("plugins", f"{plugin_name}.py")
|
||||
if not os.path.isfile(plugin_path):
|
||||
logging.error(f"Plugin file not found: {plugin_path}")
|
||||
return False
|
||||
|
||||
try:
|
||||
# Import the plugin module
|
||||
module = importlib.import_module(f"plugins.{plugin_name}")
|
||||
# Create a mapping of whitelisted plugins to their module paths
|
||||
plugin_modules = {
|
||||
'ai': 'plugins.ai',
|
||||
'config': 'plugins.config',
|
||||
'cron': 'plugins.cron',
|
||||
'date': 'plugins.date',
|
||||
'fortune': 'plugins.fortune',
|
||||
'help': 'plugins.help',
|
||||
'isup': 'plugins.isup',
|
||||
'karma': 'plugins.karma',
|
||||
'loadplugin': 'plugins.loadplugin',
|
||||
'plugins': 'plugins.plugins',
|
||||
'proxy': 'plugins.proxy',
|
||||
'sd_text': 'plugins.sd_text',
|
||||
'stable-diffusion': 'plugins.stable-diffusion',
|
||||
'xkcd': 'plugins.xkcd',
|
||||
'youtube-preview': 'plugins.youtube-preview',
|
||||
'youtube-search': 'plugins.youtube-search',
|
||||
}
|
||||
|
||||
# Get the module path from the mapping
|
||||
module_path = plugin_modules.get(plugin_name)
|
||||
if not module_path:
|
||||
logging.error(f"Plugin '{plugin_name}' not found in plugin mapping")
|
||||
return False
|
||||
|
||||
# Import the plugin module using the validated module path
|
||||
module = importlib.import_module(module_path)
|
||||
# Add the plugin module to the PLUGINS dictionary
|
||||
PLUGINS[plugin_name] = module
|
||||
logging.info(f"Loaded plugin: {plugin_name}")
|
||||
@@ -112,4 +154,3 @@ async def handle_command(room, message, bot, prefix, config):
|
||||
else:
|
||||
# Send unauthorized message if the sender is not the admin
|
||||
await bot.api.send_text_message(room.room_id, "You are not authorized to unload plugins.")
|
||||
|
||||
|
@@ -23,7 +23,7 @@ PROXY_LIST_FILENAME = 'socks5.txt'
|
||||
PROXY_LIST_EXPIRATION = timedelta(hours=8)
|
||||
MAX_THREADS = 128
|
||||
PROXIES_DB_FILE = 'proxies.db'
|
||||
MAX_PROXIES_IN_DB = 500
|
||||
MAX_PROXIES_IN_DB = 10
|
||||
|
||||
# Setup verbose logging
|
||||
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
||||
|
95
plugins/sd_text.py
Normal file
95
plugins/sd_text.py
Normal file
@@ -0,0 +1,95 @@
|
||||
"""
|
||||
Plugin for generating text using Ollama's Mistral 7B Instruct model and sending it to a Matrix chat room.
|
||||
"""
|
||||
|
||||
import requests
|
||||
from asyncio import Queue
|
||||
import simplematrixbotlib as botlib
|
||||
import argparse
|
||||
|
||||
# Queue to store pending commands
|
||||
command_queue = Queue()
|
||||
|
||||
API_URL = "http://localhost:11434/api/generate"
|
||||
MODEL_NAME = "mistral:7b-instruct"
|
||||
|
||||
async def process_command(room, message, bot, prefix, config):
|
||||
"""
|
||||
Queue and process !text commands sequentially.
|
||||
"""
|
||||
match = botlib.MessageMatch(room, message, bot, prefix)
|
||||
if match.prefix() and match.command("text"):
|
||||
if command_queue.empty():
|
||||
await handle_command(room, message, bot, prefix, config)
|
||||
else:
|
||||
await command_queue.put((room, message, bot, prefix, config))
|
||||
|
||||
async def handle_command(room, message, bot, prefix, config):
|
||||
"""
|
||||
Send the prompt to Ollama API and return the generated text.
|
||||
"""
|
||||
match = botlib.MessageMatch(room, message, bot, prefix)
|
||||
if not (match.prefix() and match.command("text")):
|
||||
return
|
||||
|
||||
# Parse optional arguments
|
||||
parser = argparse.ArgumentParser(description='Generate text using Ollama API')
|
||||
parser.add_argument('--max_tokens', type=int, default=512, help='Maximum tokens to generate')
|
||||
parser.add_argument('--temperature', type=float, default=0.7, help='Temperature for generation')
|
||||
parser.add_argument('prompt', nargs='+', help='Prompt for the model')
|
||||
|
||||
try:
|
||||
args = parser.parse_args(message.body.split()[1:]) # Skip command itself
|
||||
prompt = ' '.join(args.prompt).strip()
|
||||
|
||||
if not prompt:
|
||||
await bot.api.send_text_message(room.room_id, "Usage: !text <your prompt here>")
|
||||
return
|
||||
|
||||
payload = {
|
||||
"model": MODEL_NAME,
|
||||
"prompt": prompt,
|
||||
"max_tokens": args.max_tokens,
|
||||
"temperature": args.temperature,
|
||||
"stream": False
|
||||
}
|
||||
|
||||
response = requests.post(API_URL, json=payload, timeout=60)
|
||||
response.raise_for_status()
|
||||
r = response.json()
|
||||
|
||||
generated_text = r.get("response", "").strip()
|
||||
if not generated_text:
|
||||
generated_text = "(No response from model)"
|
||||
|
||||
await bot.api.send_text_message(room.room_id, generated_text)
|
||||
|
||||
except argparse.ArgumentError as e:
|
||||
await bot.api.send_text_message(room.room_id, f"Argument error: {e}")
|
||||
except requests.exceptions.RequestException as e:
|
||||
await bot.api.send_text_message(room.room_id, f"Error connecting to Ollama API: {e}")
|
||||
except Exception as e:
|
||||
await bot.api.send_text_message(room.room_id, f"Unexpected error: {e}")
|
||||
finally:
|
||||
# Process next command from the queue, if any
|
||||
if not command_queue.empty():
|
||||
next_command = await command_queue.get()
|
||||
await handle_command(*next_command)
|
||||
|
||||
def print_help():
|
||||
"""
|
||||
Generates help text for the !text command.
|
||||
"""
|
||||
return """
|
||||
<p>Generate text using Ollama's Mistral 7B Instruct model</p>
|
||||
|
||||
<p>Usage:</p>
|
||||
<ul>
|
||||
<li>!text <prompt> - Basic prompt for the model</li>
|
||||
<li>Optional arguments:</li>
|
||||
<ul>
|
||||
<li>--max_tokens MAX_TOKENS - Maximum tokens to generate (default 512)</li>
|
||||
<li>--temperature TEMPERATURE - Sampling temperature (default 0.7)</li>
|
||||
</ul>
|
||||
</ul>
|
||||
"""
|
@@ -4,13 +4,18 @@ Plugin for generating images using self-hosted Stable Diffusion and sending them
|
||||
|
||||
import requests
|
||||
import base64
|
||||
import tempfile
|
||||
import os
|
||||
from asyncio import Queue
|
||||
import argparse
|
||||
import simplematrixbotlib as botlib
|
||||
import markdown2
|
||||
from slugify import slugify
|
||||
|
||||
def slugify_prompt(prompt):
|
||||
# Queue to store pending commands
|
||||
command_queue = Queue()
|
||||
|
||||
def slugify_prompt(prompt: str) -> str:
|
||||
"""
|
||||
Generates a URL-friendly slug from the given prompt.
|
||||
|
||||
@@ -22,10 +27,7 @@ def slugify_prompt(prompt):
|
||||
"""
|
||||
return slugify(prompt)
|
||||
|
||||
# Queue to store pending commands
|
||||
command_queue = Queue()
|
||||
|
||||
def markdown_to_html(markdown_text):
|
||||
def markdown_to_html(markdown_text: str) -> str:
|
||||
"""
|
||||
Converts Markdown text to HTML.
|
||||
|
||||
@@ -35,20 +37,18 @@ def markdown_to_html(markdown_text):
|
||||
Returns:
|
||||
str: The HTML version of the input Markdown text.
|
||||
"""
|
||||
html_content = markdown2.markdown(markdown_text)
|
||||
return html_content
|
||||
return markdown2.markdown(markdown_text)
|
||||
|
||||
async def process_command(room, message, bot, prefix, config):
|
||||
"""
|
||||
Asynchronously processes the commands received in the Matrix room.
|
||||
Processes !sd commands and queues them if already running.
|
||||
|
||||
Args:
|
||||
room (Room): The Matrix room where the command was received.
|
||||
message (Message): The message object containing the command.
|
||||
bot (MatrixBot): The Matrix bot instance.
|
||||
prefix (str): The command prefix.
|
||||
config (dict): The bot's configuration.
|
||||
|
||||
room: Matrix room object
|
||||
message: Matrix message object
|
||||
bot: Bot instance
|
||||
prefix: Command prefix
|
||||
config: Bot config object
|
||||
"""
|
||||
match = botlib.MessageMatch(room, message, bot, prefix)
|
||||
if match.prefix() and match.command("sd"):
|
||||
@@ -56,82 +56,93 @@ async def process_command(room, message, bot, prefix, config):
|
||||
await handle_command(room, message, bot, prefix, config)
|
||||
else:
|
||||
await command_queue.put((room, message, bot, prefix, config))
|
||||
await bot.api.send_text_message(room.room_id, "Command queued. Please wait for the current image to finish.")
|
||||
|
||||
async def handle_command(room, message, bot, prefix, config):
|
||||
"""
|
||||
Asynchronously handles the 'sd' command, which generates images using Stable Diffusion.
|
||||
Handles !sd command: generates image using Stable Diffusion API.
|
||||
|
||||
Args:
|
||||
room (Room): The Matrix room where the command was received.
|
||||
message (Message): The message object containing the command.
|
||||
bot (MatrixBot): The Matrix bot instance.
|
||||
prefix (str): The command prefix.
|
||||
config (dict): The bot's configuration.
|
||||
room: Matrix room object
|
||||
message: Matrix message object
|
||||
bot: Bot instance
|
||||
prefix: Command prefix
|
||||
config: Bot config object
|
||||
"""
|
||||
match = botlib.MessageMatch(room, message, bot, prefix)
|
||||
if match.prefix() and match.command("sd"):
|
||||
try:
|
||||
# Parse command-line arguments
|
||||
parser = argparse.ArgumentParser(description='Generate images using self-hosted Stable Diffusion')
|
||||
parser.add_argument('--steps', type=int, default=4, help='Number of steps, default=16')
|
||||
parser.add_argument('--cfg', type=int, default=1.25, help='CFG scale, default=7')
|
||||
parser.add_argument('--h', type=int, default=512, help='Height of the image, default=512')
|
||||
parser.add_argument('--w', type=int, default=512, help='Width of the image, default=512')
|
||||
parser.add_argument('--neg', type=str, default='((((ugly)))), (((duplicate))), ((morbid)), ((mutilated)), out of frame, extra fingers, mutated hands, ((poorly drawn hands)), ((poorly drawn face)), (((mutation))), (((deformed))), ((ugly)), blurry, ((bad anatomy)), (((bad proportions))), ((extra limbs)), cloned face, (((disfigured))), out of frame, ugly, extra limbs, (bad anatomy), gross proportions, (malformed limbs), ((missing arms)), ((missing legs)), (((extra arms))), (((extra legs))), mutated hands, (fused fingers), (too many fingers), (((long neck)))', nargs='+', help='Negative prompt, default=none')
|
||||
parser.add_argument('--sampler', type=str, nargs='*', default=['DPM++', 'SDE', 'Karras'], help='Sampler name, default=Euler a')
|
||||
parser.add_argument('prompt', type=str, nargs='*', help='Prompt for the image')
|
||||
if not (match.prefix() and match.command("sd")):
|
||||
return
|
||||
|
||||
args = parser.parse_args(message.body.split()[1:]) # Skip the command itself
|
||||
# Check if API is available
|
||||
try:
|
||||
health_check = requests.get("http://127.0.0.1:7860/docs", timeout=3)
|
||||
if health_check.status_code != 200:
|
||||
await bot.api.send_text_message(room.room_id, "Stable Diffusion API is not running!")
|
||||
return
|
||||
except Exception:
|
||||
await bot.api.send_text_message(room.room_id, "Could not reach Stable Diffusion API!")
|
||||
return
|
||||
|
||||
if not args.prompt:
|
||||
raise argparse.ArgumentError(None, "Prompt is required.")
|
||||
try:
|
||||
# Parse command-line arguments
|
||||
parser = argparse.ArgumentParser(description='Generate images using self-hosted Stable Diffusion')
|
||||
parser.add_argument('--steps', type=int, default=4, help='Number of steps, default=4')
|
||||
parser.add_argument('--cfg', type=int, default=2, help='CFG scale, default=2')
|
||||
parser.add_argument('--h', type=int, default=512, help='Height of the image, default=512')
|
||||
parser.add_argument('--w', type=int, default=512, help='Width of the image, default=512')
|
||||
parser.add_argument('--neg', type=str, nargs='+', default=['((((ugly)))), (((duplicate))), ((morbid)), ((mutilated)), out of frame, extra fingers, mutated hands, ((poorly drawn hands)), ((poorly drawn face)), (((mutation))), (((deformed))), ((ugly)), blurry, ((bad anatomy)), (((bad proportions))), ((extra limbs)), cloned face, (((disfigured))), out of frame, ugly, extra limbs, (bad anatomy), gross proportions, (malformed limbs), ((missing arms)), ((missing legs)), (((extra arms))), (((extra legs))), mutated hands, (fused fingers), (too many fingers), (((long neck)))'], help='Negative prompt')
|
||||
parser.add_argument('--sampler', type=str, nargs='*', default=['DPM++', 'SDE'], help='Sampler name, default=DPM++ SDE')
|
||||
parser.add_argument('prompt', type=str, nargs='*', help='Prompt for the image')
|
||||
|
||||
prompt = ' '.join(args.prompt)
|
||||
sampler_name = ' '.join(args.sampler)
|
||||
neg = ' '.join(args.neg)
|
||||
payload = {
|
||||
"prompt": prompt,
|
||||
"steps": args.steps,
|
||||
"negative_prompt": neg,
|
||||
"sampler_name": sampler_name,
|
||||
"cfg_scale": args.cfg,
|
||||
"width": args.w,
|
||||
"height": args.h,
|
||||
}
|
||||
url = "http://127.0.0.1:7860/sdapi/v1/txt2img"
|
||||
args = parser.parse_args(message.body.split()[1:]) # skip command prefix
|
||||
|
||||
# Send request to the Stable Diffusion API
|
||||
response = requests.post(url=url, json=payload)
|
||||
r = response.json()
|
||||
if not args.prompt:
|
||||
raise argparse.ArgumentError(None, "Prompt is required.")
|
||||
|
||||
# Slugify the prompt
|
||||
prompt_slug = slugify(prompt[:120])
|
||||
prompt = ' '.join(args.prompt)
|
||||
sampler_name = ' '.join(args.sampler)
|
||||
neg_prompt = ' '.join(args.neg)
|
||||
|
||||
# Construct filename
|
||||
filename = f"{prompt_slug}_{args.steps}_{args.h}x{args.w}_{sampler_name}_{args.cfg}.jpg"
|
||||
with open(f"/tmp/{filename}", 'wb') as f:
|
||||
f.write(base64.b64decode(r['images'][0]))
|
||||
payload = {
|
||||
"prompt": prompt,
|
||||
"steps": args.steps,
|
||||
"negative_prompt": neg_prompt,
|
||||
"sampler_name": sampler_name,
|
||||
"cfg_scale": args.cfg,
|
||||
"width": args.w,
|
||||
"height": args.h,
|
||||
}
|
||||
|
||||
# Send the generated image to the Matrix room
|
||||
await bot.api.send_image_message(room_id=room.room_id, image_filepath=f"/tmp/{filename}")
|
||||
url = "http://127.0.0.1:7860/sdapi/v1/txt2img"
|
||||
response = requests.post(url=url, json=payload, timeout=600)
|
||||
r = response.json()
|
||||
|
||||
# Format and send information about the generated image
|
||||
neg = neg.replace(" ", "")
|
||||
info_msg = f"""<details><summary>🍄⤵︎Image Info:⤵︎</summary><strong>Prompt:</strong> {prompt_slug}<br><strong>Steps:</strong> {args.steps}<br><strong>Dimensions:</strong> {args.h}x{args.w}<br><strong>Sampler:</strong> {sampler_name}<br><strong>CFG Scale:</strong> {args.cfg}<br><strong>Negative Prompt: {neg}</strong></details>"""
|
||||
await bot.api.send_markdown_message(room.room_id, info_msg)
|
||||
except argparse.ArgumentError as e:
|
||||
# Handle argument errors
|
||||
await bot.api.send_text_message(room.room_id, f"Error: {e}")
|
||||
await bot.api.send_markdown_message(room.room_id, "<details><summary>Stable Diffusion Help</summary>" + print_help() + "</details>")
|
||||
except Exception as e:
|
||||
# Handle general errors
|
||||
await bot.api.send_text_message(room.room_id, f"Error processing the command: {str(e)}")
|
||||
finally:
|
||||
# Process next command from the queue, if any
|
||||
if not command_queue.empty():
|
||||
next_command = await command_queue.get()
|
||||
await handle_command(*next_command)
|
||||
# Use secure temporary file
|
||||
with tempfile.NamedTemporaryFile(suffix='.jpg', delete=False) as temp_file:
|
||||
filename = temp_file.name
|
||||
temp_file.write(base64.b64decode(r['images'][0]))
|
||||
|
||||
# Send image to Matrix room
|
||||
await bot.api.send_image_message(room_id=room.room_id, image_filepath=filename)
|
||||
|
||||
# Optional: send info about generated image
|
||||
neg_prompt_clean = neg_prompt.replace(" ", "")
|
||||
info_msg = f"""<details><summary>🔍 Image Info</summary><strong>Prompt:</strong> {prompt[:100]}<br><strong>Steps:</strong> {args.steps}<br><strong>Dimensions:</strong> {args.h}x{args.w}<br><strong>Sampler:</strong> {sampler_name}<br><strong>CFG Scale:</strong> {args.cfg}<br><strong>Negative Prompt:</strong> {neg_prompt_clean}</details>"""
|
||||
# await bot.api.send_markdown_message(room.room_id, info_msg)
|
||||
|
||||
# Clean up temp file
|
||||
os.remove(filename)
|
||||
|
||||
except argparse.ArgumentError as e:
|
||||
await bot.api.send_text_message(room.room_id, f"Argument Error: {e}")
|
||||
await bot.api.send_markdown_message(room.room_id, "<details><summary>Stable Diffusion Help</summary>" + print_help() + "</details>")
|
||||
except Exception as e:
|
||||
await bot.api.send_text_message(room.room_id, f"Error processing the command: {str(e)}")
|
||||
finally:
|
||||
# Process next queued command
|
||||
if not command_queue.empty():
|
||||
next_command = await command_queue.get()
|
||||
await handle_command(*next_command)
|
||||
|
||||
def print_help():
|
||||
"""
|
||||
|
45
plugins/xkcd.py
Normal file
45
plugins/xkcd.py
Normal file
@@ -0,0 +1,45 @@
|
||||
"""
|
||||
Provides a command to fetch random xkcd comic
|
||||
"""
|
||||
|
||||
import requests
|
||||
import tempfile
|
||||
import random
|
||||
import simplematrixbotlib as botlib
|
||||
|
||||
# Define the XKCD API URL
|
||||
XKCD_API_URL = "https://xkcd.com/info.0.json"
|
||||
|
||||
async def handle_command(room, message, bot, prefix, config):
|
||||
match = botlib.MessageMatch(room, message, bot, prefix)
|
||||
if match.prefix() and match.command("xkcd"):
|
||||
# Fetch the latest comic number from XKCD API
|
||||
try:
|
||||
response = requests.get(XKCD_API_URL, timeout=10)
|
||||
response.raise_for_status() # Raise an exception for non-200 status codes
|
||||
latest_comic_num = response.json()["num"]
|
||||
# Choose a random comic number
|
||||
random_comic_num = random.randint(1, latest_comic_num)
|
||||
# Fetch the random comic data
|
||||
random_comic_url = f"https://xkcd.com/{random_comic_num}/info.0.json"
|
||||
comic_response = requests.get(random_comic_url, timeout=10)
|
||||
comic_response.raise_for_status()
|
||||
comic_data = comic_response.json()
|
||||
image_url = comic_data["img"]
|
||||
# Download the image
|
||||
image_response = requests.get(image_url, timeout=10)
|
||||
image_response.raise_for_status()
|
||||
|
||||
# Use secure temporary file
|
||||
with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as temp_file:
|
||||
image_path = temp_file.name
|
||||
temp_file.write(image_response.content)
|
||||
|
||||
# Send the image to the room
|
||||
await bot.api.send_image_message(room_id=room.room_id, image_filepath=image_path)
|
||||
|
||||
# Clean up temp file
|
||||
import os
|
||||
os.remove(image_path)
|
||||
except Exception as e:
|
||||
await bot.api.send_text_message(room.room_id, f"Error fetching XKCD comic: {str(e)}")
|
@@ -28,7 +28,7 @@ async def handle_command(room, message, bot, PREFIX, config):
|
||||
else:
|
||||
search_terms = " ".join(args)
|
||||
logging.info(f"Performing YouTube search for: {search_terms}")
|
||||
results = YoutubeSearch(search_terms, max_results=10).to_dict()
|
||||
results = YoutubeSearch(search_terms, max_results=1).to_dict()
|
||||
if results:
|
||||
output = generate_output(results)
|
||||
await send_collapsible_message(room, bot, output)
|
||||
|
@@ -9,4 +9,6 @@ emoji
|
||||
python-slugify
|
||||
youtube_title_parse
|
||||
dnspython
|
||||
croniter
|
||||
schedule
|
||||
|
||||
|
Reference in New Issue
Block a user