256 lines
9.6 KiB
Python
256 lines
9.6 KiB
Python
"""
|
||
Plugin for generating text using Infermatic AI API and sending it to a Matrix chat room.
|
||
"""
|
||
|
||
import os
|
||
import requests
|
||
import argparse
|
||
import json
|
||
import simplematrixbotlib as botlib
|
||
from asyncio import Queue
|
||
from dotenv import load_dotenv
|
||
import re
|
||
|
||
# Load environment variables from .env file in the parent directory
|
||
plugin_dir = os.path.dirname(os.path.abspath(__file__))
|
||
parent_dir = os.path.dirname(plugin_dir)
|
||
dotenv_path = os.path.join(parent_dir, '.env')
|
||
load_dotenv(dotenv_path)
|
||
|
||
# Infermatic AI API configuration
|
||
INFERMATIC_API_KEY = os.getenv("INFERMATIC_API", "")
|
||
DEFAULT_MODEL = os.getenv("INFERMATIC_MODEL", "Sao10K-L3.1-70B-Hanami-x1")
|
||
INFERMATIC_API_BASE = "https://api.totalgpt.ai/v1"
|
||
|
||
# Queue to store pending commands
|
||
command_queue = Queue()
|
||
|
||
async def process_command(room, message, bot, prefix, config):
|
||
"""Queue and process !text commands sequentially."""
|
||
match = botlib.MessageMatch(room, message, bot, prefix)
|
||
if match.prefix() and match.command("text"):
|
||
if command_queue.empty():
|
||
await handle_command(room, message, bot, prefix, config)
|
||
else:
|
||
await command_queue.put((room, message, bot, prefix, config))
|
||
await bot.api.send_text_message(room.room_id, "Command queued. Please wait for the current request to finish.")
|
||
|
||
async def handle_command(room, message, bot, prefix, config):
|
||
"""Handle !text command: generate text using Infermatic AI API."""
|
||
match = botlib.MessageMatch(room, message, bot, prefix)
|
||
|
||
if not (match.prefix() and match.command("text")):
|
||
return
|
||
|
||
# Check if API key is configured
|
||
if not INFERMATIC_API_KEY:
|
||
await bot.api.send_text_message(
|
||
room.room_id,
|
||
"Infermatic API key not configured. Please set INFERMATIC_API environment variable."
|
||
)
|
||
return
|
||
|
||
# Parse command arguments
|
||
args = match.args()
|
||
|
||
if len(args) < 1:
|
||
await show_usage(room, bot)
|
||
return
|
||
|
||
# Check if it's a --list-models command
|
||
if args[0] == "--list-models":
|
||
await list_models(room, bot)
|
||
return
|
||
|
||
# Parse other arguments
|
||
try:
|
||
# Extract options manually since argparse doesn't handle mixed positional/optional well
|
||
temperature = 0.9
|
||
max_tokens = 512
|
||
custom_model = None
|
||
prompt_parts = []
|
||
|
||
i = 0
|
||
while i < len(args):
|
||
if args[i] == "--temperature" and i + 1 < len(args):
|
||
temperature = float(args[i + 1])
|
||
i += 2
|
||
elif args[i] == "--max-tokens" and i + 1 < len(args):
|
||
max_tokens = int(args[i + 1])
|
||
i += 2
|
||
elif args[i] == "--use-model" and i + 1 < len(args):
|
||
custom_model = args[i + 1]
|
||
i += 2
|
||
else:
|
||
prompt_parts.append(args[i])
|
||
i += 1
|
||
|
||
prompt = ' '.join(prompt_parts).strip()
|
||
|
||
if not prompt:
|
||
await show_usage(room, bot)
|
||
return
|
||
|
||
model = custom_model or DEFAULT_MODEL
|
||
|
||
await generate_text(room, bot, prompt, model, temperature, max_tokens)
|
||
|
||
except ValueError as e:
|
||
await bot.api.send_text_message(room.room_id, f"Invalid parameter value: {e}")
|
||
except Exception as e:
|
||
await bot.api.send_text_message(room.room_id, f"Error processing command: {str(e)}")
|
||
|
||
async def show_usage(room, bot):
|
||
"""Display command usage information."""
|
||
usage = """
|
||
<strong>📄 Infermatic Text Generation Usage:</strong>
|
||
|
||
<strong>Basic:</strong>
|
||
• <code>!text <prompt></code> - Generate text using default model
|
||
|
||
<strong>Commands:</strong>
|
||
• <code>!text --list-models</code> - List all available models
|
||
• <code>!text --use-model <model> <prompt></code> - Use specific model
|
||
|
||
<strong>Parameters:</strong>
|
||
• <code>--temperature <0.0-1.0></code> - Set temperature (default: 0.9)
|
||
• <code>--max-tokens <number></code> - Set max tokens (default: 2048)
|
||
|
||
<strong>Examples:</strong>
|
||
• <code>!text write a python function to calculate fibonacci</code>
|
||
• <code>!text --list-models</code>
|
||
• <code>!text --use-model llama-v3-8b-instruct explain quantum computing</code>
|
||
• <code>!text --temperature 0.7 write a haiku about AI</code>
|
||
"""
|
||
await bot.api.send_markdown_message(room.room_id, usage)
|
||
|
||
async def list_models(room, bot):
|
||
"""List all available models from Infermatic AI."""
|
||
try:
|
||
await bot.api.send_text_message(room.room_id, "🔍 Fetching available models...")
|
||
|
||
url = f"{INFERMATIC_API_BASE}/models"
|
||
headers = {
|
||
"Authorization": f"Bearer {INFERMATIC_API_KEY}",
|
||
"Content-Type": "application/json"
|
||
}
|
||
|
||
response = requests.get(url, headers=headers, timeout=30)
|
||
response.raise_for_status()
|
||
|
||
data = response.json()
|
||
models = data.get('data', [])
|
||
|
||
if not models:
|
||
await bot.api.send_text_message(room.room_id, "No models found or error in response.")
|
||
return
|
||
|
||
# Format the model list
|
||
output = "<strong>🔧 Available Models:</strong><br><br>"
|
||
|
||
for model in models:
|
||
model_id = model.get('id', 'Unknown')
|
||
model_name = model.get('name', model_id)
|
||
context_length = model.get('context_length', 'Unknown')
|
||
pricing = model.get('pricing', {})
|
||
|
||
output += f"<strong>• {model_name}</strong><br>"
|
||
output += f" └─ ID: <code>{model_id}</code><br>"
|
||
output += f" └─ Context: {context_length}<br>"
|
||
|
||
if pricing:
|
||
prompt_price = pricing.get('prompt', '0')
|
||
completion_price = pricing.get('completion', '0')
|
||
output += f" └─ Price: ${prompt_price}/${completion_price} per 1M tokens<br>"
|
||
|
||
output += f" └─ <strong>Usage:</strong> <code>!text --use-model {model_id} <prompt></code><br><br>"
|
||
|
||
# Wrap in collapsible details since list can be long
|
||
output = f"<details><summary><strong>🔧 Available Models (Click to expand)</strong></summary>{output}</details>"
|
||
|
||
await bot.api.send_markdown_message(room.room_id, output)
|
||
|
||
except requests.exceptions.RequestException as e:
|
||
await bot.api.send_text_message(room.room_id, f"❌ Error fetching models: {str(e)}")
|
||
except Exception as e:
|
||
await bot.api.send_text_message(room.room_id, f"❌ Unexpected error: {str(e)}")
|
||
|
||
import re # add at the top of the file
|
||
|
||
async def generate_text(room, bot, prompt, model, temperature, max_tokens):
|
||
"""Generate text using the Infermatic AI API."""
|
||
try:
|
||
await bot.api.send_text_message(room.room_id, f"📝 Generating text...")
|
||
|
||
url = f"{INFERMATIC_API_BASE}/chat/completions"
|
||
headers = {
|
||
"Authorization": f"Bearer {INFERMATIC_API_KEY}",
|
||
"Content-Type": "application/json"
|
||
}
|
||
|
||
payload = {
|
||
"model": model,
|
||
"messages": [
|
||
{"role": "user", "content": prompt}
|
||
],
|
||
"temperature": temperature,
|
||
"max_tokens": max_tokens
|
||
}
|
||
|
||
response = requests.post(url, headers=headers, json=payload, timeout=120)
|
||
response.raise_for_status()
|
||
|
||
data = response.json()
|
||
generated_text = data.get('choices', [{}])[0].get('message', {}).get('content', '').strip()
|
||
|
||
if not generated_text:
|
||
await bot.api.send_text_message(room.room_id, "No response generated.")
|
||
return
|
||
|
||
# ---- Clean up blank lines that break list rendering ----
|
||
# Remove blank lines directly before a list item (number‐dot or hyphen).
|
||
generated_text = re.sub(r'\n\n(\d+\.)', r'\n\1', generated_text)
|
||
generated_text = re.sub(r'\n\n(- )', r'\n\1', generated_text)
|
||
|
||
# Build a pure Markdown message (no HTML)
|
||
output = f"**Model:** `{model}`\n\n**Prompt:** {prompt}\n\n**Response:**\n\n{generated_text}"
|
||
await bot.api.send_markdown_message(room.room_id, output)
|
||
|
||
except requests.exceptions.Timeout:
|
||
await bot.api.send_text_message(room.room_id, "❌ Request timed out. The model is taking too long to respond.")
|
||
except requests.exceptions.HTTPError as e:
|
||
if e.response.status_code == 401:
|
||
await bot.api.send_text_message(room.room_id, "❌ Authentication failed. Please check your INFERMATIC_API key.")
|
||
elif e.response.status_code == 429:
|
||
await bot.api.send_text_message(room.room_id, "❌ Rate limit exceeded. Please try again later.")
|
||
else:
|
||
await bot.api.send_text_message(room.room_id, f"❌ API error: HTTP {e.response.status_code}")
|
||
except Exception as e:
|
||
await bot.api.send_text_message(room.room_id, f"❌ Error generating text: {str(e)}")
|
||
finally:
|
||
if not command_queue.empty():
|
||
next_command = await command_queue.get()
|
||
await handle_command(*next_command)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Plugin Metadata
|
||
# ---------------------------------------------------------------------------
|
||
|
||
__version__ = "1.0.2"
|
||
__author__ = "Funguy Bot"
|
||
__description__ = "AI text generation via Infermatic API (pure Markdown output)"
|
||
__help__ = """
|
||
<details>
|
||
<summary><strong>!text</strong> – AI text generation (Infermatic)</summary>
|
||
<ul>
|
||
<li><code>!text <prompt></code> – Generate text using default model</li>
|
||
<li><code>!text --list-models</code> – List available models</li>
|
||
<li><code>!text --use-model <model> <prompt></code> – Specific model</li>
|
||
<li><code>--temperature <0.0-1.0></code> – Set creativity (default 0.9)</li>
|
||
<li><code>--max-tokens <number></code> – Max output length (default 512)</li>
|
||
</ul>
|
||
<p>Requires <strong>INFERMATIC_API</strong> env var.</p>
|
||
</details>
|
||
"""
|