Source code for giga_images

import logging
import os
from io import BytesIO

from PIL import Image
from aiogram import types, Bot, Router, F
from aiogram.types.input_file import FSInputFile
from aiogram.fsm.context import FSMContext
from aiogram.utils.keyboard import InlineKeyboardBuilder

from manager_cw_bot_api.buttons import Buttons
from manager_cw_bot_api.giga_request import VersionAIImagePro
from manager_cw_bot_api.fsm_handler import GigaImage
from manager_cw_bot_api.handler_db_sub_operations import HandlerDB

router_ai_img: Router = Router()


[docs]class GigaCreator: """Class of create image for premium-user.""" __query: str = "" def __init__(self, bot: Bot) -> None: self.__bot: Bot = bot router_ai_img.message.register( self.__check_generate_or_cancel, GigaImage.request )
[docs] async def get_query(self, call: types.CallbackQuery, state: FSMContext) -> None: """ Func of get query from premium-user for create image. :param call: Callback Query. :param state: FSM. :return: None. """ await state.set_state(GigaImage.request) await self.__bot.edit_message_text( chat_id=call.message.chat.id, text="📸 Enter your query...", message_id=call.message.message_id )
async def __check_generate_or_cancel(self, message: types.Message, state: FSMContext) -> None: """ Check generation. :param message: Query from user. :param state: FSM. :return: None """ await state.clear() var: InlineKeyboardBuilder = await Buttons.generate_image() self.__class__.__query = message.text await self.__bot.send_message( chat_id=message.chat.id, text=f"{message.from_user.first_name}, are you want to *generate image*? " f"Are you sure?", reply_markup=var.as_markup(), parse_mode="Markdown" ) await self.__bot.set_message_reaction( chat_id=message.from_user.id, message_id=message.message_id, reaction=[types.ReactionTypeEmoji( types='emoji', emoji='🫡' )] ) router_ai_img.callback_query.register( self.__handler_query, F.data == "generate" ) async def __handler_query(self, call: types.CallbackQuery) -> None: """ Manage of query from premium-user for create image. :param call: Call-Query. :return: None. """ query: str = self.__class__.__query try: await self.__bot.edit_message_text( chat_id=call.from_user.id, text="💫 Please, wait! I'm generating... ⏳", message_id=call.message.message_id ) except Exception as ex: logging.warning(f"The exception has arisen: {ex}.") await self.__bot.send_message( chat_id=call.from_user.id, text="💫 Please, wait! I'm generating... ⏳", ) image_data: str | bytes = await VersionAIImagePro.request(query) try: if image_data == "Sorry! I updated the data. Please, repeat your request :)": var: InlineKeyboardBuilder = await Buttons.back_on_main() await self.__bot.edit_message_text( chat_id=call.from_user.id, text=image_data, message_id=call.message.message_id, reply_markup=var.as_markup() ) else: image: Image = Image.open(BytesIO(image_data)) temp_image: str = 'AI_Photo_By_CW_PREMIUM_Version.jpg' image.save(temp_image, 'JPEG') await self.__bot.send_document( chat_id=call.from_user.id, document=FSInputFile(temp_image), caption=f"<b>{call.from_user.first_name}</b>, the new photo has been generated" f" according to your request: <blockquote>{self.__class__.__query}" f"</blockquote>\n\n✨ There are still generations left: ♾.\n\nThe " f"photo is attached to the message as a file. Download it by clicking " f"on the button above.\n\n" f"#AI_Photo\nDeveloper: @aleksandr_twitt.", parse_mode="HTML", message_effect_id='5104841245755180586' ) os.remove(temp_image) var: InlineKeyboardBuilder = await Buttons.back_on_main() await self.__bot.send_message( chat_id=call.from_user.id, text="Return to main-menu:", reply_markup=var.as_markup() ) await HandlerDB.update_analytic_datas_count_ai_queries() await self.__bot.delete_message( chat_id=call.from_user.id, message_id=call.message.message_id ) except Exception as ex: logging.warning(f"The exception has arisen: {ex}.") var: InlineKeyboardBuilder = await Buttons.back_on_main() if ("cannot identify image file" in str(ex) or "bytes-like object is required, not 'str'" in str(ex)): await self.__bot.edit_message_text( chat_id=call.from_user.id, text=image_data, reply_markup=var.as_markup(), message_id=call.message.message_id ) else: await self.__bot.edit_message_text( chat_id=call.from_user.id, text="🤔 Oh, something wrong! 👌🏻 Please, don't worry! " "Write ticket or EMail: help@cwr.su.", reply_markup=var.as_markup(), message_id=call.message.message_id )