diff --git a/video2x/decoder.py b/video2x/decoder.py index e2a074d..8dce4f7 100755 --- a/video2x/decoder.py +++ b/video2x/decoder.py @@ -19,9 +19,12 @@ along with this program. If not, see . Name: Video Decoder Author: K4YT3X Date Created: June 17, 2021 -Last Modified: February 16, 2022 +Last Modified: February 27, 2022 """ +# local imports +from .pipe_printer import PipePrinter + # built-in imports import contextlib import os @@ -87,10 +90,15 @@ class VideoDecoder(threading.Thread): ), overwrite_output=True, ), + env={"AV_LOG_FORCE_COLOR": "TRUE"}, stdout=subprocess.PIPE, - # stderr=subprocess.DEVNULL, + stderr=subprocess.PIPE, ) + # start the PIPE printer to start printing FFmpeg logs + self.pipe_printer = PipePrinter(self.decoder.stderr) + self.pipe_printer.start() + def run(self) -> None: self.running = True @@ -113,8 +121,8 @@ class VideoDecoder(threading.Thread): # after the last frame has been decoded # read will return nothing if len(buffer) == 0: - logger.debug("Decoding queue depleted") - break + self.stop() + continue # convert raw bytes into image object image = Image.frombytes( @@ -140,6 +148,7 @@ class VideoDecoder(threading.Thread): # most likely "not enough image data" except ValueError as e: + self.exception = e # ignore queue closed if not "is closed" in str(e): @@ -151,13 +160,26 @@ class VideoDecoder(threading.Thread): self.exception = e logger.exception(e) break + else: + logger.debug("Decoding queue depleted") + + # flush the remaining data in STDOUT and close PIPE + self.decoder.stdout.flush() + self.decoder.stdout.close() + + # flush the remaining data in STDERR and wait for it to be read + self.decoder.stderr.flush() # send SIGINT (2) to FFmpeg # this instructs it to finalize and exit - if self.decoder.poll() is None: - self.decoder.send_signal(signal.SIGTERM) + self.decoder.send_signal(signal.SIGINT) - # ensure the decoder has exited + # wait for process to terminate + self.pipe_printer.stop() + self.decoder.stderr.close() + + # wait for processes and threads to stop + self.pipe_printer.join() self.decoder.wait() logger.info("Decoder thread exiting") diff --git a/video2x/encoder.py b/video2x/encoder.py index b006c90..9176f56 100755 --- a/video2x/encoder.py +++ b/video2x/encoder.py @@ -19,9 +19,12 @@ along with this program. If not, see . Name: Video Encoder Author: K4YT3X Date Created: June 17, 2021 -Last Modified: February 16, 2022 +Last Modified: February 27, 2022 """ +# local imports +from .pipe_printer import PipePrinter + # built-in imports import multiprocessing import multiprocessing.managers @@ -74,6 +77,10 @@ class VideoEncoder(threading.Thread): self.processed_frames = processed_frames self.processed = processed + # stores exceptions if the thread exits with errors + self.exception = None + + # create FFmpeg input for the original input video self.original = ffmpeg.input(input_path) # define frames as input @@ -122,11 +129,15 @@ class VideoEncoder(threading.Thread): ), overwrite_output=True, ), + env={"AV_LOG_FORCE_COLOR": "TRUE"}, stdin=subprocess.PIPE, - # stdout=subprocess.DEVNULL, - # stderr=subprocess.DEVNULL, + stderr=subprocess.PIPE, ) + # start the PIPE printer to start printing FFmpeg logs + self.pipe_printer = PipePrinter(self.encoder.stderr) + self.pipe_printer.start() + def run(self) -> None: self.running = True frame_index = 0 @@ -150,19 +161,29 @@ class VideoEncoder(threading.Thread): # send exceptions into the client connection pipe except Exception as e: + self.exception = e logger.exception(e) break + else: + logger.debug("Encoding queue depleted") # flush the remaining data in STDIN and close PIPE - logger.debug("Encoding queue depleted") self.encoder.stdin.flush() self.encoder.stdin.close() + # flush the remaining data in STDERR and wait for it to be read + self.encoder.stderr.flush() + # send SIGINT (2) to FFmpeg # this instructs it to finalize and exit self.encoder.send_signal(signal.SIGINT) # wait for process to terminate + self.pipe_printer.stop() + self.encoder.stderr.close() + + # wait for processes and threads to stop + self.pipe_printer.join() self.encoder.wait() logger.info("Encoder thread exiting") diff --git a/video2x/pipe_printer.py b/video2x/pipe_printer.py new file mode 100755 index 0000000..c650ed0 --- /dev/null +++ b/video2x/pipe_printer.py @@ -0,0 +1,56 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +Copyright (C) 2018-2022 K4YT3X and contributors. + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License as +published by the Free Software Foundation, either version 3 of the +License, or (at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see . + +Name: PIPE Printer +Author: K4YT3X +Date Created: February 27, 2022 +Last Modified: February 27, 2022 +""" + +# built-in imports +from typing import IO +import os +import sys +import threading +import time + + +class PipePrinter(threading.Thread): + def __init__(self, stderr: IO[bytes]) -> None: + threading.Thread.__init__(self) + self.stderr = stderr + self.running = False + + # set read mode to non-blocking + os.set_blocking(self.stderr.fileno(), False) + + def run(self) -> None: + self.running = True + + # keep printing contents in the PIPE + while self.running: + time.sleep(0.5) + + output = self.stderr.read() + if output is not None: + print(output.decode(), file=sys.stderr) + + return super().run() + + def stop(self) -> None: + self.running = False diff --git a/video2x/video2x.py b/video2x/video2x.py index 349688b..0dabaa9 100755 --- a/video2x/video2x.py +++ b/video2x/video2x.py @@ -248,10 +248,26 @@ class Video2X: # wait for jobs in queue to deplete while self.processed.value < total_frames - 1: time.sleep(0.5) + + # check processor health for process in self.processor_processes: if not process.is_alive(): raise Exception("process died unexpectedly") + # check decoder health + if ( + not self.decoder.is_alive() + and self.decoder.exception is not None + ): + raise Exception("decoder died unexpectedly") + + # check encoder health + if ( + not self.encoder.is_alive() + and self.encoder.exception is not None + ): + raise Exception("encoder died unexpectedly") + # show progress bar when upscale starts if progress.disable is True and self.processed.value > 0: progress.disable = False @@ -266,7 +282,8 @@ class Video2X: # if SIGTERM is received or ^C is pressed # TODO: pause and continue here except (SystemExit, KeyboardInterrupt) as e: - logger.warning("Exit signal received, terminating") + logger.warning("Exit signal received, exiting gracefully") + logger.warning("Press ^C again to force terminate") exception.append(e) except Exception as e: @@ -281,8 +298,8 @@ class Video2X: # mark processing queue as closed self.processing_queue.close() - # stop upscaler processes - logger.info("Stopping upscaler processes") + # stop processor processes + logger.info("Stopping processor processes") for process in self.processor_processes: process.terminate() @@ -291,6 +308,7 @@ class Video2X: process.join() # ensure both the decoder and the encoder have exited + logger.info("Stopping decoder and encoder threads") self.decoder.stop() self.encoder.stop() self.decoder.join()