from logging import getLogger from pathlib import Path from time import time import ffmpeg from ffmpeg.nodes import FilterableStream from rich.prompt import Prompt from utils.log import configure_logging PATH = Path(__file__).parent INPUT_DIR = PATH / 'input' OUTPUT_DIR = PATH / 'output' OUTPUT_ARGS = { 'vcodec': 'libvpx-vp9', 'pix_fmt': 'yuva420p', 'row-mt': 1, 'format': 'webm', 'loglevel': 'error', 'an': None, 'y': None, } logger = getLogger(__name__) def compress(input_file: Path, output_file: Path, sticker_width: int, bitrate: int | None = None): file_input = ffmpeg.input(input_file) file_output = OUTPUT_DIR / input_file.name.replace(input_file.suffix, '.webm') if not isinstance(file_input, FilterableStream): logger.error(f'Failed to process file: {input_file}') raise Exception('Failed to process file') stream = ffmpeg.filter(file_input, 'fps', fps=25, round='up') stream = ffmpeg.filter( stream, 'scale', sticker_width, -1, force_original_aspect_ratio='decrease' ) stream = ffmpeg.filter( stream, 'pad', sticker_width, sticker_width, '(ow-iw)/2', '(oh-ih)/2', color='0x00000000' ) stream = ffmpeg.filter(stream, 'loop', 0, 1) if input_file.suffix == '.gif': duration = float(ffmpeg.probe(input_file)['streams'][0]['duration']) speed_factor = 1 if duration > 3: speed_factor = duration / 3 stream = ffmpeg.filter(stream, 'setpts', f'PTS/{speed_factor}') args = OUTPUT_ARGS if bitrate: args['video_bitrate'] = f'{bitrate}k' output = ffmpeg.output(stream, filename=file_output, **args) try: ffmpeg.run(output) except Exception: exit(1) file_size = file_output.stat().st_size if file_size / 1024 > 256: if not bitrate: bitrate = int(int(ffmpeg.probe(output_file)['format']['bit_rate']) / 1000) bitrate = int((256 * 1024) / (file_size / bitrate)) logger.info( f'File size is too high [{int(file_size / 1024)} KB], reducing to {bitrate}k...' ) return compress(input_file, output_file, sticker_width, bitrate) def main(): if not INPUT_DIR.exists(): INPUT_DIR.mkdir(parents=True) if not OUTPUT_DIR.exists(): OUTPUT_DIR.mkdir(parents=True) for file in OUTPUT_DIR.iterdir(): file.unlink() sticker_type = Prompt.ask('Do you want to create stickers or emoji?', choices=['s', 'e']) match sticker_type: case 's': STICKER_WIDTH = 512 case 'e': STICKER_WIDTH = 100 case _: STICKER_WIDTH = 512 started_at = time() for file in INPUT_DIR.iterdir(): logger.info(f'Processing {file.name}...') if file.suffix not in ('.png', '.jpg', '.webp', '.gif'): logger.warning(f'Unsupported file type: {file.suffix}') continue file_path = INPUT_DIR / file.name file_output = OUTPUT_DIR / file.name.replace(file.suffix, '.webm') compress(file_path, file_output, STICKER_WIDTH) logger.info(f'Finished in {round(time() - started_at, 2)} seconds') if __name__ == '__main__': configure_logging() main()