diff --git a/video2x/decoder.py b/video2x/decoder.py index 5c90c6f..03709a5 100755 --- a/video2x/decoder.py +++ b/video2x/decoder.py @@ -22,10 +22,14 @@ Date Created: June 17, 2021 Last Modified: April 9, 2022 """ +import contextlib import os import pathlib import signal import subprocess +from multiprocessing import Queue +from queue import Full +from threading import Thread import ffmpeg from PIL import Image @@ -117,15 +121,13 @@ class VideoDecoder: # return this frame yield frame - def __del__(self): + # automatically self-join and clean up after iterations are done + self.join() - # flush the remaining data in STDOUT and STDERR - self.decoder.stdout.flush() - self.decoder.stderr.flush() + def kill(self): + self.decoder.send_signal(signal.SIGKILL) - # send SIGINT (2) to FFmpeg - # this instructs it to finalize and exit - self.decoder.send_signal(signal.SIGINT) + def join(self): # close PIPEs to prevent process from getting stuck self.decoder.stdout.close() @@ -137,3 +139,39 @@ class VideoDecoder: # wait for PIPE printer to exit self.pipe_printer.stop() self.pipe_printer.join() + + +class VideoDecoderThread(Thread): + def __init__( + self, tasks_queue: Queue, decoder: VideoDecoder, processing_settings: tuple + ): + super().__init__() + + self.tasks_queue = tasks_queue + self.decoder = decoder + self.processing_settings = processing_settings + self.running = False + + def run(self): + self.running = True + previous_frame = None + for frame_index, frame in enumerate(self.decoder): + + while True: + + # check for the stop signal + if self.running is False: + self.decoder.join() + return + + with contextlib.suppress(Full): + self.tasks_queue.put( + (frame_index, previous_frame, frame, self.processing_settings), + timeout=0.1, + ) + break + + previous_frame = frame + + def stop(self): + self.running = False