diff --git a/.gitignore b/.gitignore index 6b3e407..6274c34 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ *.png .env .session -vscode/ \ No newline at end of file +vscode/ +venv/ diff --git a/dpic.oy.py b/dpic.oy.py new file mode 100644 index 0000000..6d033ba --- /dev/null +++ b/dpic.oy.py @@ -0,0 +1,163 @@ +from datetime import datetime +import urllib.request +import base64 +import json +import time +import os +url="pop-os.local" +webui_server_url = f'http://{url}:7860' + +out_dir = 'api_out' +out_dir_t2i = os.path.join(out_dir, 'txt2img') +out_dir_i2i = os.path.join(out_dir, 'img2img') +os.makedirs(out_dir_t2i, exist_ok=True) +os.makedirs(out_dir_i2i, exist_ok=True) + + +def timestamp(): + return datetime.fromtimestamp(time.time()).strftime("%Y%m%d-%H%M%S") + + +def encode_file_to_base64(path): + with open(path, 'rb') as file: + return base64.b64encode(file.read()).decode('utf-8') + + +def decode_and_save_base64(base64_str, save_path): + with open(save_path, "wb") as file: + file.write(base64.b64decode(base64_str)) + + +def call_api(api_endpoint, **payload): + data = json.dumps(payload).encode('utf-8') + request = urllib.request.Request( + f'{webui_server_url}/{api_endpoint}', + headers={'Content-Type': 'application/json'}, + data=data, + ) + response = urllib.request.urlopen(request) + return json.loads(response.read().decode('utf-8')) + + +def call_txt2img_api(**payload): + response = call_api('sdapi/v1/txt2img', **payload) + for index, image in enumerate(response.get('images')): + save_path = os.path.join(out_dir_t2i, f'txt2img-{timestamp()}-{index}.png') + decode_and_save_base64(image, save_path) + + +def call_img2img_api(**payload): + response = call_api('sdapi/v1/img2img', **payload) + for index, image in enumerate(response.get('images')): + save_path = os.path.join(out_dir_i2i, f'img2img-{timestamp()}-{index}.png') + decode_and_save_base64(image, save_path) + + +if __name__ == '__main__': + payload = { + "prompt": "masterpiece, (best quality:1.1), 1girl ", # extra networks also in prompts + "negative_prompt": "", + "seed": 1, + "steps": 20, + "width": 512, + "height": 512, + "cfg_scale": 7, + "sampler_name": "DPM++ 2M", + "n_iter": 1, + "batch_size": 1, + + # example args for x/y/z plot + # "script_name": "x/y/z plot", + # "script_args": [ + # 1, + # "10,20", + # [], + # 0, + # "", + # [], + # 0, + # "", + # [], + # True, + # True, + # False, + # False, + # 0, + # False + # ], + + # example args for Refiner and ControlNet + # "alwayson_scripts": { + # "ControlNet": { + # "args": [ + # { + # "batch_images": "", + # "control_mode": "Balanced", + # "enabled": True, + # "guidance_end": 1, + # "guidance_start": 0, + # "image": { + # "image": encode_file_to_base64(r"B:\path\to\control\img.png"), + # "mask": None # base64, None when not need + # }, + # "input_mode": "simple", + # "is_ui": True, + # "loopback": False, + # "low_vram": False, + # "model": "control_v11p_sd15_canny [d14c016b]", + # "module": "canny", + # "output_dir": "", + # "pixel_perfect": False, + # "processor_res": 512, + # "resize_mode": "Crop and Resize", + # "threshold_a": 100, + # "threshold_b": 200, + # "weight": 1 + # } + # ] + # }, + # "Refiner": { + # "args": [ + # True, + # "sd_xl_refiner_1.0", + # 0.5 + # ] + # } + # }, + # "enable_hr": True, + # "hr_upscaler": "R-ESRGAN 4x+ Anime6B", + # "hr_scale": 2, + # "denoising_strength": 0.5, + # "styles": ['style 1', 'style 2'], + # "override_settings": { + # 'sd_model_checkpoint': "sd_xl_base_1.0", # this can use to switch sd model + # }, + } + call_txt2img_api(**payload) + + init_images = [ + encode_file_to_base64(r"../stable-diffusion-webui/output/img2img-images/2024-05-15/00012-357584826.png"), + # encode_file_to_base64(r"B:\path\to\img_2.png"), + # "https://image.can/also/be/a/http/url.png", + ] + + batch_size = 2 + payload = { + "prompt": "1girl, blue hair", + "seed": 1, + "steps": 20, + "width": 512, + "height": 512, + "denoising_strength": 0.5, + "n_iter": 1, + "init_images": init_images, + "batch_size": batch_size if len(init_images) == 1 else len(init_images), + # "mask": encode_file_to_base64(r"B:\path\to\mask.png") + } + # if len(init_images) > 1 then batch_size should be == len(init_images) + # else if len(init_images) == 1 then batch_size can be any value int >= 1 + call_img2img_api(**payload) + + # there exist a useful extension that allows converting of webui calls to api payload + # particularly useful when you wish setup arguments of extensions and scripts + # https://github.com/huchenlei/sd-webui-api-payload-display diff --git a/main.py b/main.py index 2d8e260..36b8c2e 100644 --- a/main.py +++ b/main.py @@ -5,32 +5,38 @@ import re import os import uuid import base64 +from datetime import datetime from PIL import Image, PngImagePlugin from pyrogram import Client, filters -from pyrogram.types import * +from pyrogram.types import InlineKeyboardButton, InlineKeyboardMarkup from dotenv import load_dotenv -# Done! Congratulations on your new bot. You will find it at -# t.me/gootmornbot -# You can now add a description, about section and profile picture for your bot, see /help for a list of commands. -# By the way, when you've finished creating your cool bot, ping our Bot Support if you want a better username for it. Just make sure the bot is fully operational before you do this. - -# Use this token to access the HTTP API: -# Keep your token secure and store it safely, it can be used by anyone to control your bot. - -# For a description of the Bot API, see this page: https://core.telegram.org/bots/api - +# Load environment variables load_dotenv() API_ID = os.environ.get("API_ID", None) API_HASH = os.environ.get("API_HASH", None) TOKEN = os.environ.get("TOKEN", None) SD_URL = os.environ.get("SD_URL", None) -print(SD_URL) -app = Client("stable", api_id=API_ID, api_hash=API_HASH, bot_token=TOKEN) -# default params +app = Client("stable", api_id=API_ID, api_hash=API_HASH, bot_token=TOKEN) +IMAGE_PATH = 'images' # Do not leave a trailing / + +# Ensure IMAGE_PATH directory exists +os.makedirs(IMAGE_PATH, exist_ok=True) + +# Default params steps_value_default = 40 +def timestamp(): + return datetime.now().strftime("%Y%m%d-%H%M%S") + +def encode_file_to_base64(path): + with open(path, 'rb') as file: + return base64.b64encode(file.read()).decode('utf-8') + +def decode_and_save_base64(base64_str, save_path): + with open(save_path, "wb") as file: + file.write(base64.b64decode(base64_str)) def parse_input(input_string): default_payload = { @@ -49,7 +55,7 @@ def parse_input(input_string): "controlnet_guidance": 1, "controlnet_guessmode": True, "enable_hr": False, - "denoising_strength": 0.5, + "denoising_strength": 0.4, "hr_scale": 1.5, "hr_upscale": "Latent", "seed": -1, @@ -58,7 +64,7 @@ def parse_input(input_string): "sampler_index": "", "batch_size": 1, "n_iter": 1, - "steps": 20, + "steps": 35, "cfg_scale": 7, "width": 512, "height": 512, @@ -66,175 +72,157 @@ def parse_input(input_string): "override_settings": {}, "override_settings_restore_afterwards": True, } - # Initialize an empty payload with the 'prompt' key payload = {"prompt": ""} - prompt = [] - # Find all occurrences of keys (words ending with a colon) matches = re.finditer(r"(\w+):", input_string) last_index = 0 - # Iterate over the found keys for match in matches: - key = match.group(1).lower() # Convert the key to lowercase + key = match.group(1).lower() value_start_index = match.end() - # If there's text between the last key and the current key, add it to the prompt if last_index != match.start(): - prompt.append(input_string[last_index : match.start()].strip()) + prompt.append(input_string[last_index: match.start()].strip()) last_index = value_start_index - # Check if the key is in the default payload if key in default_payload: - # Extract the value for the current key - value_end_index = re.search( - r"(?=\s+\w+:|$)", input_string[value_start_index:] - ).start() - value = input_string[ - value_start_index : value_start_index + value_end_index - ].strip() + value_end_index = re.search(r"(?=\s+\w+:|$)", input_string[value_start_index:]).start() + value = input_string[value_start_index: value_start_index + value_end_index].strip() - # Check if the default value for the key is an integer if isinstance(default_payload[key], int): - # If the value is a valid integer, store it as an integer in the payload if value.isdigit(): - payload[key] = int(value) + payload[key] = idefault_payloadnt(value) else: - # If the default value for the key is not an integer, store the value as is in the payload payload[key] = value last_index += value_end_index else: - # If the key is not in the default payload, add it to the prompt prompt.append(f"{key}:") - # Join the prompt words and store it in the payload payload["prompt"] = " ".join(prompt) - # If the prompt is empty, set the input string as the prompt if not payload["prompt"]: payload["prompt"] = input_string.strip() - # Return the final payload return payload +def call_api(api_endpoint, payload): + response = requests.post(f'{SD_URL}/{api_endpoint}', json=payload) + response.raise_for_status() + return response.json() + +def process_images(images, user_id, user_name): + def generate_unique_name(): + unique_id = str(uuid.uuid4())[:7] + return f"{user_name}-{unique_id}" + + word = generate_unique_name() + + for i in images: + image = Image.open(io.BytesIO(base64.b64decode(i.split(",", 1)[0]))) + + png_payload = {"image": "data:image/png;base64," + i} + response2 = requests.post(f"{SD_URL}/sdapi/v1/png-info", json=png_payload) + response2.raise_for_status() + + pnginfo = PngImagePlugin.PngInfo() + pnginfo.add_text("parameters", response2.json().get("info")) + image.save(f"{IMAGE_PATH}/{word}.png", pnginfo=pnginfo) + + return word, response2.json().get("info") @app.on_message(filters.command(["draw"])) def draw(client, message): msgs = message.text.split(" ", 1) if len(msgs) == 1: - message.reply_text( - "Format :\n/draw < text to image >\nng: < negative (optional) >\nsteps: < steps value (1-70, optional) >" - ) + message.reply_text("Format :\n/draw < text to image >\nng: < negative (optional) >\nsteps: < steps value (1-70, optional) >") return payload = parse_input(msgs[1]) print(payload) - # The rest of the draw function remains unchanged - - K = message.reply_text("Please Wait 10-15 Second") - r = requests.post(url=f"{SD_URL}/sdapi/v1/txt2img", json=payload).json() - - def genr(): - unique_id = str(uuid.uuid4())[:7] - return f"{message.from_user.first_name}-{unique_id}" - - word = genr() + K = message.reply_text("Please Wait 10-15 Seconds") + r = call_api('sdapi/v1/txt2img', payload) for i in r["images"]: - image = Image.open(io.BytesIO(base64.b64decode(i.split(",", 1)[0]))) + word, info = process_images([i], message.from_user.id, message.from_user.first_name) - png_payload = {"image": "data:image/png;base64," + i} - response2 = requests.post(url=f"{SD_URL}/sdapi/v1/png-info", json=png_payload) - - pnginfo = PngImagePlugin.PngInfo() - pnginfo.add_text("parameters", response2.json().get("info")) - image.save(f"{word}.png", pnginfo=pnginfo) - - # Add a flag to check if the user provided a seed value - user_provided_seed = "seed" in payload - - info_dict = response2.json() - seed_value = info_dict['info'].split(", Seed: ")[1].split(",")[0] - # print(seed_value) - - caption = f"**[{message.from_user.first_name}-Kun](tg://user?id={message.from_user.id})**\n\n" + seed_value = info.split(", Seed: ")[1].split(",")[0] + caption = f"**[{message.from_user.first_name}](tg://user?id={message.from_user.id})**\n\n" for key, value in payload.items(): caption += f"{key.capitalize()} - **{value}**\n" caption += f"Seed - **{seed_value}**\n" - message.reply_photo( - photo=f"{word}.png", - caption=caption, - ) - - - # os.remove(f"{word}.png") + message.reply_photo(photo=f"{IMAGE_PATH}/{word}.png", caption=caption) K.delete() +@app.on_message(filters.command(["img"])) +def img2img(client, message): + if not message.reply_to_message or not message.reply_to_message.photo: + message.reply_text(""" + reply to an image with + /img ds:0-1.0 + ds is for Denoising_strength. Set that low (like 0.2) if you just want to slightly change things. defaults to 0.4 + """) + return + + msgs = message.text.split(" ", 1) + if len(msgs) == 1: + message.reply_text(""" + Format :\n/img + force: < 0.1-1.0, default 0.3 > + """) + return + + payload = parse_input(msgs[1]) + print(payload) + photo = message.reply_to_message.photo + photo_file = app.download_media(photo) + init_image = encode_file_to_base64(photo_file) + os.remove(photo_file) # Clean up downloaded image file + + payload["init_images"] = [init_image] + payload["denoising_strength"] = 0.3 # Set default denoising strength or customize as needed + + K = message.reply_text("Please Wait 10-15 Seconds") + r = call_api('sdapi/v1/img2img', payload) + + for i in r["images"]: + word, info = process_images([i], message.from_user.id, message.from_user.first_name) + + caption = f"**[{message.from_user.first_name}](tg://user?id={message.from_user.id})**\n\n" + prompt = payload["prompt"] + caption += f"**{prompt}**\n" + + message.reply_photo(photo=f"{IMAGE_PATH}/{word}.png", caption=caption) + K.delete() @app.on_message(filters.command(["getmodels"])) async def get_models(client, message): - response = requests.get(url=f"{SD_URL}/sdapi/v1/sd-models") - if response.status_code == 200: - models_json = response.json() - # create buttons for each model name - buttons = [] - for model in models_json: - buttons.append( - [ - InlineKeyboardButton( - model["title"], callback_data=model["model_name"] - ) - ] - ) - # send the message - await message.reply_text( - text="Select a model [checkpoint] to use", - reply_markup=InlineKeyboardMarkup(buttons), - ) + response = requests.get(f"{SD_URL}/sdapi/v1/sd-models") + response.raise_for_status() + models_json = response.json() + buttons = [ + [InlineKeyboardButton(model["title"], callback_data=model["model_name"])] + for model in models_json + ] + await message.reply_text("Select a model [checkpoint] to use", reply_markup=InlineKeyboardMarkup(buttons)) @app.on_callback_query() async def process_callback(client, callback_query): - # if a model button is clicked, set sd_model_checkpoint to the selected model's title sd_model_checkpoint = callback_query.data - - # The sd_model_checkpoint needs to be set to the title from /sdapi/v1/sd-models - # post using /sdapi/v1/options - options = {"sd_model_checkpoint": sd_model_checkpoint} - # post the options - response = requests.post(url=f"{SD_URL}/sdapi/v1/options", json=options) - if response.status_code == 200: - # if the post was successful, send a message - await callback_query.message.reply_text( - "checpoint set to " + sd_model_checkpoint - ) - else: - # if the post was unsuccessful, send an error message - await callback_query.message.reply_text("Error setting options") + response = requests.post(f"{SD_URL}/sdapi/v1/options", json=options) + response.raise_for_status() + await callback_query.message.reply_text(f"Checkpoint set to {sd_model_checkpoint}") @app.on_message(filters.command(["start"], prefixes=["/", "!"])) async def start(client, message): - # Photo = "https://i.imgur.com/79hHVX6.png" - - buttons = [ - [ - InlineKeyboardButton( - "Add to your group", url="https://t.me/gootmornbot?startgroup=true" - ) - ] - ] - await message.reply_text( - # photo=Photo, - text=f"Hello!\nask me to imagine anything\n\n/draw text to image", - reply_markup=InlineKeyboardMarkup(buttons), - ) - + buttons = [[InlineKeyboardButton("Add to your group", url="https://t.me/gootmornbot?startgroup=true")]] + await message.reply_text("Hello!\nAsk me to imagine anything\n\n/draw text to image", reply_markup=InlineKeyboardMarkup(buttons)) app.run()