mirror of
https://github.com/5shekel/stable-diffusion-telegram-bot.git
synced 2024-05-22 19:33:14 +03:00
191 lines
6.3 KiB
Python
191 lines
6.3 KiB
Python
import json
|
|
import requests
|
|
import io
|
|
import os
|
|
import uuid
|
|
import base64
|
|
from PIL import Image, PngImagePlugin
|
|
from pyrogram import Client, filters
|
|
from pyrogram.types import *
|
|
from dotenv import load_dotenv
|
|
|
|
# Done! Congratulations on your new bot. You will find it at
|
|
# t.me/gootmornbot
|
|
# You can now add a description, about section and profile picture for your bot, see /help for a list of commands. By the way, when you've finished creating your cool bot, ping our Bot Support if you want a better username for it. Just make sure the bot is fully operational before you do this.
|
|
|
|
# Use this token to access the HTTP API:
|
|
# Keep your token secure and store it safely, it can be used by anyone to control your bot.
|
|
|
|
# For a description of the Bot API, see this page: https://core.telegram.org/bots/api
|
|
|
|
load_dotenv()
|
|
API_ID = os.environ.get("API_ID", None)
|
|
API_HASH = os.environ.get("API_HASH", None)
|
|
TOKEN = os.environ.get("TOKEN", None)
|
|
SD_URL = os.environ.get("SD_URL", None)
|
|
print(SD_URL)
|
|
app = Client("stable", api_id=API_ID, api_hash=API_HASH, bot_token=TOKEN)
|
|
|
|
#default params
|
|
steps_value_default = 40
|
|
|
|
def process_input_string(string):
|
|
ng_delimiter = "ng:"
|
|
steps_delimiter = "steps:"
|
|
|
|
ng_index = string.find(ng_delimiter)
|
|
steps_index = string.find(steps_delimiter)
|
|
|
|
if ng_index != -1 and steps_index != -1:
|
|
if ng_index < steps_index:
|
|
positive = string[:ng_index].strip()
|
|
negative = string[ng_index + len(ng_delimiter):steps_index].strip()
|
|
steps_str = string[steps_index + len(steps_delimiter):].strip().split()[0]
|
|
else:
|
|
positive = string[:steps_index].strip()
|
|
negative = string[steps_index + len(steps_delimiter):ng_index].strip()
|
|
steps_str = string[ng_index + len(ng_delimiter):].strip().split()[0]
|
|
elif ng_index != -1:
|
|
positive = string[:ng_index].strip()
|
|
negative = string[ng_index + len(ng_delimiter):].strip()
|
|
steps_str = None
|
|
elif steps_index != -1:
|
|
positive = string[:steps_index].strip()
|
|
negative = None
|
|
steps_str = string[steps_index + len(steps_delimiter):].strip().split()[0]
|
|
else:
|
|
positive = string.strip()
|
|
negative = None
|
|
steps_str = None
|
|
|
|
try:
|
|
steps_value = int(steps_str)
|
|
#limit steps to range
|
|
if not 1 <= steps_value <= 70:
|
|
steps_value = steps_value_default
|
|
except (ValueError, TypeError):
|
|
steps_value = None
|
|
|
|
|
|
return positive, negative, steps_value
|
|
|
|
@app.on_message(filters.command(["draw"]))
|
|
def draw(client, message):
|
|
msgs = message.text.split(" ", 1)
|
|
if len(msgs) == 1:
|
|
message.reply_text(
|
|
"Format :\n/draw < text to image >\nng: < negative (optional) >\nsteps: < steps value (1-70, optional) >"
|
|
)
|
|
return
|
|
|
|
positive, negative, steps_value = process_input_string(msgs[1])
|
|
payload = {
|
|
"prompt": positive,
|
|
}
|
|
if negative is not None:
|
|
payload["negative_prompt"] = negative
|
|
if steps_value is not None:
|
|
payload["steps"] = steps_value
|
|
|
|
print(payload)
|
|
|
|
# The rest of the draw function remains unchanged
|
|
|
|
|
|
K = message.reply_text("Please Wait 10-15 Second")
|
|
r = requests.post(url=f"{SD_URL}/sdapi/v1/txt2img", json=payload).json()
|
|
|
|
def genr():
|
|
unique_id = str(uuid.uuid4())[:7]
|
|
return f"{message.from_user.first_name}-{unique_id}"
|
|
|
|
word = genr()
|
|
|
|
for i in r["images"]:
|
|
image = Image.open(io.BytesIO(base64.b64decode(i.split(",", 1)[0])))
|
|
|
|
png_payload = {"image": "data:image/png;base64," + i}
|
|
response2 = requests.post(url=f"{SD_URL}/sdapi/v1/png-info", json=png_payload)
|
|
|
|
pnginfo = PngImagePlugin.PngInfo()
|
|
pnginfo.add_text("parameters", response2.json().get("info"))
|
|
image.save(f"{word}.png", pnginfo=pnginfo)
|
|
|
|
message.reply_photo(
|
|
photo=f"{word}.png",
|
|
caption=(
|
|
f"Prompt - **{positive}**\n"
|
|
f"Negative Prompt - **{negative if negative is not None else 'None'}**\n"
|
|
f"Steps - **{steps_value if steps_value != steps_value_default else 'Default'}**\n"
|
|
f"**[{message.from_user.first_name}-Kun](tg://user?id={message.from_user.id})**"
|
|
),
|
|
)
|
|
|
|
# os.remove(f"{word}.png")
|
|
K.delete()
|
|
|
|
|
|
@app.on_message(filters.command(["getmodels"]))
|
|
async def get_models(client, message):
|
|
response = requests.get(url=f"{SD_URL}/sdapi/v1/sd-models")
|
|
if response.status_code == 200:
|
|
models_json = response.json()
|
|
# create buttons for each model name
|
|
buttons = []
|
|
for model in models_json:
|
|
buttons.append(
|
|
[
|
|
InlineKeyboardButton(
|
|
model["title"], callback_data=model["model_name"]
|
|
)
|
|
]
|
|
)
|
|
# send the message
|
|
await message.reply_text(
|
|
text="Select a model [checkpoint] to use",
|
|
reply_markup=InlineKeyboardMarkup(buttons),
|
|
)
|
|
|
|
|
|
@app.on_callback_query()
|
|
async def process_callback(client, callback_query):
|
|
# if a model button is clicked, set sd_model_checkpoint to the selected model's title
|
|
sd_model_checkpoint = callback_query.data
|
|
|
|
# The sd_model_checkpoint needs to be set to the title from /sdapi/v1/sd-models
|
|
# post using /sdapi/v1/options
|
|
|
|
options = {"sd_model_checkpoint": sd_model_checkpoint}
|
|
|
|
# post the options
|
|
response = requests.post(url=f"{SD_URL}/sdapi/v1/options", json=options)
|
|
if response.status_code == 200:
|
|
# if the post was successful, send a message
|
|
await callback_query.message.reply_text(
|
|
"checpoint set to " + sd_model_checkpoint
|
|
)
|
|
else:
|
|
# if the post was unsuccessful, send an error message
|
|
await callback_query.message.reply_text("Error setting options")
|
|
|
|
|
|
@app.on_message(filters.command(["start"], prefixes=["/", "!"]))
|
|
async def start(client, message):
|
|
# Photo = "https://i.imgur.com/79hHVX6.png"
|
|
|
|
buttons = [
|
|
[
|
|
InlineKeyboardButton(
|
|
"Add to your group", url="https://t.me/gootmornbot?startgroup=true"
|
|
)
|
|
]
|
|
]
|
|
await message.reply_text(
|
|
# photo=Photo,
|
|
text=f"Hello!\nask me to imagine anything\n\n/draw text to image",
|
|
reply_markup=InlineKeyboardMarkup(buttons),
|
|
)
|
|
|
|
|
|
app.run()
|