Compare commits

..

7 Commits

Author SHA1 Message Date
tami-p40
8fd4b07559 steps default 2024-06-09 11:33:02 +03:00
tami-p40
9f8bff3540 cfg for XL 2024-06-09 10:39:34 +03:00
tami-p40
ec900759a1 get model 2024-06-03 14:14:25 +03:00
tami-p40
49c9f6337a size limit 2024-06-03 13:35:14 +03:00
tami-p40
f62d6a6dbc jaguarnut 2024-05-29 09:28:23 +03:00
tami-p40
5baffd360b info 2024-05-26 09:05:27 +03:00
tami-p40
814a779b47 info 2024-05-26 09:00:32 +03:00
3 changed files with 186 additions and 125 deletions

1
.gitignore vendored
View File

@@ -6,3 +6,4 @@ venv/
*.session-journal *.session-journal
logs/stable_diff_telegram_bot.log logs/stable_diff_telegram_bot.log
*.session *.session
images/

View File

@@ -1,41 +0,0 @@
{
"cells": [
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"this came from upstream, but it is not yet fixed"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"!pip install -q https://github.com/camenduru/stable-diffusion-webui-colab/releases/download/0.0.15/xformers-0.0.15+e163309.d20230103-cp38-cp38-linux_x86_64.whl\n",
"\n",
"!git clone https://github.com/camenduru/stable-diffusion-webui\n",
"!git clone https://github.com/deforum-art/deforum-for-automatic1111-webui /content/stable-diffusion-webui/extensions/deforum-for-automatic1111-webui\n",
"!git clone https://github.com/yfszzx/stable-diffusion-webui-images-browser /content/stable-diffusion-webui/extensions/stable-diffusion-webui-images-browser\n",
"!git clone https://github.com/camenduru/stable-diffusion-webui-huggingface /content/stable-diffusion-webui/extensions/stable-diffusion-webui-huggingface\n",
"!git clone https://github.com/Vetchems/sd-civitai-browser /content/stable-diffusion-webui/extensions/sd-civitai-browser\n",
"%cd /content/stable-diffusion-webui\n",
"\n",
"!wget https://huggingface.co/Linaqruf/anything-v3.0/resolve/main/Anything-V3.0-pruned.ckpt -O /content/stable-diffusion-webui/models/Stable-diffusion/Anything-V3.0-pruned.ckpt\n",
"!wget https://huggingface.co/Linaqruf/anything-v3.0/resolve/main/Anything-V3.0.vae.pt -O /content/stable-diffusion-webui/models/Stable-diffusion/Anything-V3.0-pruned.vae.pt\n",
"\n",
"!python launch.py --share --xformers --api\n"
]
}
],
"metadata": {
"language_info": {
"name": "python"
},
"orig_nbformat": 4
},
"nbformat": 4,
"nbformat_minor": 2
}

237
main.py
View File

@@ -3,6 +3,7 @@ import re
import io import io
import uuid import uuid
import base64 import base64
import json
import requests import requests
from datetime import datetime from datetime import datetime
from PIL import Image, PngImagePlugin from PIL import Image, PngImagePlugin
@@ -17,37 +18,44 @@ API_HASH = os.environ.get("API_HASH")
TOKEN = os.environ.get("TOKEN_givemtxt2img") TOKEN = os.environ.get("TOKEN_givemtxt2img")
SD_URL = os.environ.get("SD_URL") SD_URL = os.environ.get("SD_URL")
# Ensure all required environment variables are loaded
if not all([API_ID, API_HASH, TOKEN, SD_URL]):
raise EnvironmentError("Missing one or more required environment variables: API_ID, API_HASH, TOKEN, SD_URL")
app = Client("stable", api_id=API_ID, api_hash=API_HASH, bot_token=TOKEN) app = Client("stable", api_id=API_ID, api_hash=API_HASH, bot_token=TOKEN)
IMAGE_PATH = 'images' IMAGE_PATH = 'images'
# Ensure IMAGE_PATH directory exists # Ensure IMAGE_PATH directory exists
os.makedirs(IMAGE_PATH, exist_ok=True) os.makedirs(IMAGE_PATH, exist_ok=True)
# Model-specific emmbedings for negative prompts
# see civit.ai model page for specific emmbedings recommnded for each model
model_negative_prompts = {
"Anything-Diffusion": "",
"Deliberate": "",
"Dreamshaper": "",
"DreamShaperXL_Lightning": "",
"icbinp": "",
"realisticVisionV60B1_v51VAE": "realisticvision-negative-embedding",
"v1-5-pruned-emaonly": ""
}
def get_current_model_name():
try:
response = requests.get(f"{SD_URL}/sdapi/v1/options")
response.raise_for_status()
options = response.json()
current_model_name = options.get("sd_model_checkpoint", "Unknown")
return current_model_name
except requests.RequestException as e:
print(f"API call failed: {e}")
return None
# Fetch the current model name at the start
current_model_name = get_current_model_name()
if current_model_name:
print(f"Current model name: {current_model_name}")
else:
print("Failed to fetch the current model name.")
def encode_file_to_base64(path): def encode_file_to_base64(path):
with open(path, 'rb') as file: with open(path, 'rb') as file:
return base64.b64encode(file.read()).decode('utf-8') return base64.b64encode(file.read()).decode('utf-8')
def decode_and_save_base64(base64_str, save_path): def decode_and_save_base64(base64_str, save_path):
with open(save_path, "wb") as file: with open(save_path, "wb") as file:
file.write(base64.b64decode(base64_str)) file.write(base64.b64decode(base64_str))
# Set default payload values
# Set default payload values
default_payload = { default_payload = {
"prompt": "", "prompt": "",
"seed": -1, # Random seed "seed": -1, # Random seed
@@ -64,24 +72,72 @@ default_payload = {
"restore_faces": False, "restore_faces": False,
"override_settings": {}, "override_settings": {},
"override_settings_restore_afterwards": True, "override_settings_restore_afterwards": True,
} }
# Model-specific embeddings for negative prompts
model_negative_prompts = {
"coloringPage_v10": "fake",
"Anything-Diffusion": "",
"Deliberate": "",
"Dreamshaper": "",
"DreamShaperXL_Lightning": "",
"realisticVisionV60B1_v51VAE": "realisticvision-negative-embedding",
"v1-5-pruned-emaonly": "",
"Juggernaut-XL_v9_RunDiffusionPhoto_v2": "bad eyes, cgi, airbrushed, plastic, watermark"
}
def update_negative_prompt(model_name): def update_negative_prompt(model_name):
"""Update the negative prompt for a given model."""
if model_name in model_negative_prompts: if model_name in model_negative_prompts:
suffix = model_negative_prompts[model_name] suffix = model_negative_prompts[model_name]
default_payload["negative_prompt"] += f", {suffix}" default_payload["negative_prompt"] += f", {suffix}"
print(f"Updated negative prompt to: {default_payload['negative_prompt']}")
def update_resolution(model_name):
"""Update resolution based on the selected model."""
if model_name == "Juggernaut-XL_v9_RunDiffusionPhoto_v2":
default_payload["width"] = 832
default_payload["height"] = 1216
else:
default_payload["width"] = 512
default_payload["height"] = 512
print(f"Updated resolution to {default_payload['width']}x{default_payload['height']}")
def update_steps(model_name):
"""Update CFG scale based on the selected model."""
if model_name == "Juggernaut-XL_v9_RunDiffusionPhoto_v2":
default_payload["steps"] = 15
else:
default_payload["steps"] = 35
print(f"Updated steps to {default_payload['cfg_scale']}")
def update_cfg_scale(model_name):
"""Update CFG scale based on the selected model."""
if model_name == "Juggernaut-XL_v9_RunDiffusionPhoto_v2":
default_payload["cfg_scale"] = 2.5
else:
default_payload["cfg_scale"] = 7
print(f"Updated CFG scale to {default_payload['cfg_scale']}")
# Update configurations based on the current model name
if current_model_name:
update_negative_prompt(current_model_name)
update_resolution(current_model_name)
update_cfg_scale(current_model_name)
update_steps(current_model_name)
else:
print("Failed to update configurations as the current model name is not available.")
def parse_input(input_string): def parse_input(input_string):
"""Parse the input string and create a payload."""
payload = default_payload.copy() payload = default_payload.copy()
prompt = [] prompt = []
include_info = "info:" in input_string
input_string = input_string.replace("info:", "").strip()
matches = re.finditer(r"(\w+):", input_string) matches = re.finditer(r"(\w+):", input_string)
last_index = 0 last_index = 0
script_args = [0, "", [], 0, "", [], 0, "", [], True, False, False, False, False, False, False, 0, False]
script_name = None
script_args = [0, "", [], 0, "", [], 0, "", [], True, False, False, False, False, False, False, 0, False] script_args = [0, "", [], 0, "", [], 0, "", [], True, False, False, False, False, False, False, 0, False]
script_name = None script_name = None
@@ -144,53 +200,48 @@ def parse_input(input_string):
if script_name: if script_name:
payload["script_name"] = script_name payload["script_name"] = script_name
payload["script_args"] = script_args payload["script_args"] = script_args
print(f"Generated payload: {payload}")
return payload, include_info
return payload def create_caption(payload, user_name, user_id, info, include_info):
"""Create a caption for the generated image."""
def create_caption(payload, user_name, user_id, info):
caption = f"**[{user_name}](tg://user?id={user_id})**\n\n" caption = f"**[{user_name}](tg://user?id={user_id})**\n\n"
prompt = payload["prompt"] prompt = payload["prompt"]
print(payload["prompt"])
print(info)
# Steps: 3, Sampler: Euler, CFG scale: 7.0, Seed: 4094161400, Size: 512x512, Model hash: 15012c538f, Model: realisticVisionV60B1_v51VAE, Denoising strength: 0.35, Version: v1.8.0-1-g20cdc7c
# Define a regular expression pattern to match the seed value
seed_pattern = r"Seed: (\d+)" seed_pattern = r"Seed: (\d+)"
# Search for the pattern in the info string
match = re.search(seed_pattern, info) match = re.search(seed_pattern, info)
# Check if a match was found and extract the seed value
if match: if match:
seed_value = match.group(1) seed_value = match.group(1)
print(f"Seed value: {seed_value}")
caption += f"**{seed_value}**\n" caption += f"**{seed_value}**\n"
else: else:
print("Seed value not found in the info string.") print("Seed value not found in the info string.")
caption += f"**{prompt}**\n" caption += f"**{prompt}**\n"
if include_info:
caption += f"\nFull Payload:\n`{payload}`\n"
if len(caption) > 1024: if len(caption) > 1024:
caption = caption[:1021] + "..." caption = caption[:1021] + "..."
return caption return caption
def call_api(api_endpoint, payload): def call_api(api_endpoint, payload):
"""Call the API with the provided payload."""
try: try:
response = requests.post(f'{SD_URL}/{api_endpoint}', json=payload) response = requests.post(f'{SD_URL}/{api_endpoint}', json=payload)
response.raise_for_status() response.raise_for_status()
return response.json() return response.json()
except requests.RequestException as e: except requests.RequestException as e:
print(f"API call failed: {e}") print(f"API call failed: {e}")
return None return {"error": str(e)}
def process_images(images, user_id, user_name): def process_images(images, user_id, user_name):
"""Process and save generated images."""
def generate_unique_name(): def generate_unique_name():
unique_id = str(uuid.uuid4())[:7] unique_id = str(uuid.uuid4())[:7]
return f"{user_name}-{unique_id}" date = datetime.now().strftime("%Y-%m-%d-%H-%M")
return f"{date}-{user_name}-{unique_id}"
word = generate_unique_name() word = generate_unique_name()
@@ -200,23 +251,30 @@ def process_images(images, user_id, user_name):
response2 = requests.post(f"{SD_URL}/sdapi/v1/png-info", json=png_payload) response2 = requests.post(f"{SD_URL}/sdapi/v1/png-info", json=png_payload)
response2.raise_for_status() response2.raise_for_status()
# Write response2 json next to the image
with open(f"{IMAGE_PATH}/{word}.json", "w") as json_file:
json.dump(response2.json(), json_file)
pnginfo = PngImagePlugin.PngInfo() pnginfo = PngImagePlugin.PngInfo()
pnginfo.add_text("parameters", response2.json().get("info")) pnginfo.add_text("parameters", response2.json().get("info"))
image.save(f"{IMAGE_PATH}/{word}.png", pnginfo=pnginfo) image.save(f"{IMAGE_PATH}/{word}.png", pnginfo=pnginfo)
# Save as JPG
jpg_path = f"{IMAGE_PATH}/{word}.jpg"
image.convert("RGB").save(jpg_path, "JPEG")
return word, response2.json().get("info") return word, response2.json().get("info")
@app.on_message(filters.command(["draw"])) @app.on_message(filters.command(["draw"]))
def draw(client, message): def draw(client, message):
"""Handle /draw command to generate images from text prompts."""
msgs = message.text.split(" ", 1) msgs = message.text.split(" ", 1)
if len(msgs) == 1: if len(msgs) == 1:
message.reply_text("Format :\n/draw < text to image >\nng: < negative (optional) >\nsteps: < steps value (1-70, optional) >") message.reply_text("Format :\n/draw < text to image >\nng: < negative (optional) >\nsteps: < steps value (1-70, optional) >")
return return
payload = parse_input(msgs[1]) payload, include_info = parse_input(msgs[1])
print(payload)
# Check if xds is used in the payload
if "xds" in msgs[1].lower(): if "xds" in msgs[1].lower():
message.reply_text("`xds` key cannot be used in the `/draw` command. Use `/img` instead.") message.reply_text("`xds` key cannot be used in the `/draw` command. Use `/img` instead.")
return return
@@ -224,34 +282,31 @@ def draw(client, message):
K = message.reply_text("Please Wait 10-15 Seconds") K = message.reply_text("Please Wait 10-15 Seconds")
r = call_api('sdapi/v1/txt2img', payload) r = call_api('sdapi/v1/txt2img', payload)
if r: if r and "images" in r:
for i in r["images"]: for i in r["images"]:
word, info = process_images([i], message.from_user.id, message.from_user.first_name) word, info = process_images([i], message.from_user.id, message.from_user.first_name)
caption = create_caption(payload, message.from_user.first_name, message.from_user.id, info) caption = create_caption(payload, message.from_user.first_name, message.from_user.id, info, include_info)
message.reply_photo(photo=f"{IMAGE_PATH}/{word}.png", caption=caption) message.reply_photo(photo=f"{IMAGE_PATH}/{word}.jpg", caption=caption)
K.delete() K.delete()
else: else:
message.reply_text("Failed to generate image. Please try again later.") error_message = r.get("error", "Failed to generate image. Please try again later.")
message.reply_text(error_message)
K.delete() K.delete()
@app.on_message(filters.command(["img"])) @app.on_message(filters.command(["img"]))
def img2img(client, message): def img2img(client, message):
"""Handle /img command to generate images from existing images."""
if not message.reply_to_message or not message.reply_to_message.photo: if not message.reply_to_message or not message.reply_to_message.photo:
message.reply_text("Reply to an image with\n`/img < prompt > ds:0-1.0`\n\nds stands for `Denoising_strength` parameter. Set that low (like 0.2) if you just want to slightly change things. defaults to 0.35\n\nExample: `/img murder on the dance floor ds:0.2`") message.reply_text("Reply to an image with\n`/img < prompt > ds:0-1.0`\n\nds stands for `Denoising_strength` parameter. Set that low (like 0.2) if you just want to slightly change things. defaults to 0.35\n\nExample: `/img murder on the dance floor ds:0.2`")
return return
msgs = message.text.split(" ", 1) msgs = message.text.split(" ", 1)
if len(msgs) == 1: if len(msgs) == 1:
message.reply_text("dont FAIL in life") message.reply_text("Don't FAIL in life")
return return
payload = parse_input(msgs[1]) payload, include_info = parse_input(msgs[1])
print(f"input:\n{payload}")
photo = message.reply_to_message.photo photo = message.reply_to_message.photo
# prompt_from_reply = message.reply_to_message.
# orginal_prompt = app.reply_to_message.message
# print(orginal_prompt)
photo_file = app.download_media(photo) photo_file = app.download_media(photo)
init_image = encode_file_to_base64(photo_file) init_image = encode_file_to_base64(photo_file)
os.remove(photo_file) # Clean up downloaded image file os.remove(photo_file) # Clean up downloaded image file
@@ -261,24 +316,24 @@ def img2img(client, message):
K = message.reply_text("Please Wait 10-15 Seconds") K = message.reply_text("Please Wait 10-15 Seconds")
r = call_api('sdapi/v1/img2img', payload) r = call_api('sdapi/v1/img2img', payload)
if r: if r and "images" in r:
for i in r["images"]: for i in r["images"]:
word, info = process_images([i], message.from_user.id, message.from_user.first_name) word, info = process_images([i], message.from_user.id, message.from_user.first_name)
caption = create_caption(payload, message.from_user.first_name, message.from_user.id, info) caption = create_caption(payload, message.from_user.first_name, message.from_user.id, info, include_info)
message.reply_photo(photo=f"{IMAGE_PATH}/{word}.png", caption=caption) message.reply_photo(photo=f"{IMAGE_PATH}/{word}.jpg", caption=caption)
K.delete() K.delete()
else: else:
message.reply_text("Failed to process image. Please try again later.") error_message = r.get("error", "Failed to process image. Please try again later.")
message.reply_text(error_message)
K.delete() K.delete()
@app.on_message(filters.command(["getmodels"])) @app.on_message(filters.command(["getmodels"]))
async def get_models(client, message): async def get_models(client, message):
"""Handle /getmodels command to list available models."""
try: try:
response = requests.get(f"{SD_URL}/sdapi/v1/sd-models") response = requests.get(f"{SD_URL}/sdapi/v1/sd-models")
response.raise_for_status() response.raise_for_status()
models_json = response.json() models_json = response.json()
print(models_json)
buttons = [ buttons = [
[InlineKeyboardButton(model["title"], callback_data=model["model_name"])] [InlineKeyboardButton(model["title"], callback_data=model["model_name"])]
for model in models_json for model in models_json
@@ -287,9 +342,9 @@ async def get_models(client, message):
except requests.RequestException as e: except requests.RequestException as e:
await message.reply_text(f"Failed to get models: {e}") await message.reply_text(f"Failed to get models: {e}")
@app.on_callback_query() @app.on_callback_query()
async def process_callback(client, callback_query): async def process_callback(client, callback_query):
"""Process model selection from callback queries."""
sd_model_checkpoint = callback_query.data sd_model_checkpoint = callback_query.data
options = {"sd_model_checkpoint": sd_model_checkpoint} options = {"sd_model_checkpoint": sd_model_checkpoint}
@@ -297,27 +352,73 @@ async def process_callback(client, callback_query):
response = requests.post(f"{SD_URL}/sdapi/v1/options", json=options) response = requests.post(f"{SD_URL}/sdapi/v1/options", json=options)
response.raise_for_status() response.raise_for_status()
# Update the negative prompt based on the selected model
update_negative_prompt(sd_model_checkpoint) update_negative_prompt(sd_model_checkpoint)
update_resolution(sd_model_checkpoint)
update_cfg_scale(sd_model_checkpoint)
await callback_query.message.reply_text(f"Checkpoint set to {sd_model_checkpoint}") await callback_query.message.reply_text(f"Checkpoint set to {sd_model_checkpoint}")
except requests.RequestException as e: except requests.RequestException as e:
await callback_query.message.reply_text(f"Failed to set checkpoint: {e}") await callback_query.message.reply_text(f"Failed to set checkpoint: {e}")
print(f"Error setting checkpoint: {e}") print(f"Error setting checkpoint: {e}")
@app.on_message(filters.command(["info_sd_bot"])) @app.on_message(filters.command(["info_sd_bot"]))
async def info(client, message): async def info(client, message):
"""Provide information about the bot's commands and options."""
await message.reply_text(""" await message.reply_text("""
now support for xyz scripts, see [sd wiki](https://github.com/AUTOMATIC1111/stable-diffusion-webui/wiki/Features#xyz-plot) ! **Stable Diffusion Bot Commands and Options:**
currently supported
`xsr` - search replace text/emoji in the prompt, more info [here](https://github.com/AUTOMATIC1111/stable-diffusion-webui/wiki/Features#prompt-sr)
`xds` - denoise strength, only valid for img2img
`xsteps` - steps
**note** limit the overall `steps:` to lower value (10-20) for big xyz plots
aside from that you can use the usual `ng`, `ds`, `cfg`, `steps` for single image generation. 1. **/draw <prompt> [options]**
- Generates an image based on the provided text prompt.
- **Options:**
- `ng:<negative_prompt>` - Add a negative prompt to avoid specific features.
- `steps:<value>` - Number of steps for generation (1-70).
- `ds:<value>` - Denoising strength (0-1.0).
- `cfg:<value>` - CFG scale (1-30).
- `width:<value>` - Width of the generated image.
- `height:<value>` - Height of the generated image.
- `info:` - Include full payload information in the caption.
**Example:** `/draw beautiful sunset ng:ugly steps:30 ds:0.5 info:`
2. **/img <prompt> [options]**
- Generates an image based on an existing image and the provided text prompt.
- **Options:**
- `ds:<value>` - Denoising strength (0-1.0).
- `steps:<value>` - Number of steps for generation (1-70).
- `cfg:<value>` - CFG scale (1-30).
- `width:<value>` - Width of the generated image.
- `height:<value>` - Height of the generated image.
- `info:` - Include full payload information in the caption.
**Example:** Reply to an image with `/img modern art ds:0.2 info:`
3. **/getmodels**
- Retrieves and lists all available models for the user to select.
- User can then choose a model to set as the current model for image generation.
4. **/info_sd_bot**
- Provides detailed information about the bot's commands and options.
**Additional Options for Advanced Users:**
- **x/y/z plot options** for advanced generation:
- `xsr:<value>` - Search and replace text/emoji in the prompt.
- `xsteps:<value>` - Steps value for x/y/z plot.
- `xds:<value>` - Denoising strength for x/y/z plot.
- `xcfg:<value>` - CFG scale for x/y/z plot.
- `nl:` - No legend in x/y/z plot.
- `ks:` - Keep sub-images in x/y/z plot.
- `rs:` - Set random seed for sub-images in x/y/z plot.
**Notes:**
- Use lower step values (10-20) for large x/y/z plots to avoid long processing times.
- Use `info:` option to include full payload details in the caption of generated images for better troubleshooting and analysis.
**Example for Advanced Users:** `/draw beautiful landscape xsteps:10 xds:0.5 xcfg:7 nl: ks: rs: info:`
For the bot code visit: [Stable Diffusion Bot](https://git.telavivmakers.space/ro/stable-diffusion-telegram-bot)
For more details, visit the [Stable Diffusion Wiki](https://github.com/AUTOMATIC1111/stable-diffusion-webui/wiki/Features#xyz-plot).
Enjoy creating with Stable Diffusion Bot!
""", disable_web_page_preview=True) """, disable_web_page_preview=True)
app.run() app.run()