diff --git a/video2x/decoder.py b/video2x/decoder.py index c0eb47a..5c90c6f 100755 --- a/video2x/decoder.py +++ b/video2x/decoder.py @@ -19,22 +19,15 @@ along with this program. If not, see . Name: Video Decoder Author: K4YT3X Date Created: June 17, 2021 -Last Modified: March 21, 2022 +Last Modified: April 9, 2022 """ -import contextlib -import multiprocessing import os import pathlib -import queue import signal import subprocess -import threading -import time -from multiprocessing.sharedctypes import Synchronized import ffmpeg -from loguru import logger from PIL import Image from .pipe_printer import PipePrinter @@ -51,32 +44,34 @@ LOGURU_FFMPEG_LOGLEVELS = { } -class VideoDecoder(threading.Thread): +class VideoDecoder: + """ + A video decoder that generates frames read from FFmpeg. + + :param input_path pathlib.Path: the input file's path + :param input_width int: the input file's width + :param input_height int: the input file's height + :param frame_rate float: the input file's frame rate + :param pil_ignore_max_image_pixels bool: setting this to True + disables PIL's "possible DDoS" warning + """ + def __init__( self, input_path: pathlib.Path, input_width: int, input_height: int, frame_rate: float, - processing_queue: multiprocessing.Queue, - processing_settings: tuple, - pause: Synchronized, - ignore_max_image_pixels=True, + pil_ignore_max_image_pixels: bool = True, ) -> None: - threading.Thread.__init__(self) - self.running = False self.input_path = input_path self.input_width = input_width self.input_height = input_height - self.processing_queue = processing_queue - self.processing_settings = processing_settings - self.pause = pause # this disables the "possible DDoS" warning - if ignore_max_image_pixels: + if pil_ignore_max_image_pixels is True: Image.MAX_IMAGE_PIXELS = None - self.exception = None self.decoder = subprocess.Popen( ffmpeg.compile( ffmpeg.input(input_path, r=frame_rate)["v"] @@ -102,75 +97,27 @@ class VideoDecoder(threading.Thread): self.pipe_printer = PipePrinter(self.decoder.stderr) self.pipe_printer.start() - def run(self) -> None: - self.running = True + def __iter__(self): - # the index of the frame - frame_index = 0 - - # create placeholder for previous frame - # used in interpolate mode - previous_image = None - - # continue running until an exception occurs - # or all frames have been decoded - while self.running is True: - - # pause if pause flag is set - if self.pause.value is True: - time.sleep(0.1) - continue - - try: - buffer = self.decoder.stdout.read( + # continue yielding while FFmpeg continues to produce output + while ( + len( + buffer := self.decoder.stdout.read( 3 * self.input_width * self.input_height ) + ) + > 0 + ): - # source depleted (decoding finished) - # after the last frame has been decoded - # read will return nothing - if len(buffer) == 0: - self.stop() - continue + # convert raw bytes into image object + frame = Image.frombytes( + "RGB", (self.input_width, self.input_height), buffer + ) - # convert raw bytes into image object - image = Image.frombytes( - "RGB", (self.input_width, self.input_height), buffer - ) + # return this frame + yield frame - # keep checking if the running flag is set to False - # while waiting to put the next image into the queue - while self.running is True: - with contextlib.suppress(queue.Full): - self.processing_queue.put( - ( - frame_index, - (previous_image, image), - self.processing_settings, - ), - timeout=0.1, - ) - break - - previous_image = image - frame_index += 1 - - # most likely "not enough image data" - except ValueError as error: - self.exception = error - - # ignore queue closed - if "is closed" not in str(error): - logger.exception(error) - break - - # send exceptions into the client connection pipe - except Exception as error: - self.exception = error - logger.exception(error) - break - else: - logger.debug("Decoding queue depleted") + def __del__(self): # flush the remaining data in STDOUT and STDERR self.decoder.stdout.flush() @@ -190,9 +137,3 @@ class VideoDecoder(threading.Thread): # wait for PIPE printer to exit self.pipe_printer.stop() self.pipe_printer.join() - - logger.info("Decoder thread exiting") - return super().run() - - def stop(self) -> None: - self.running = False