redirected subprocess outputs into Rich console

This commit is contained in:
k4yt3x 2022-02-28 01:54:52 +00:00
parent c0fe81bd2e
commit 4459f4d3be
4 changed files with 131 additions and 14 deletions

View File

@ -19,9 +19,12 @@ along with this program. If not, see <https://www.gnu.org/licenses/>.
Name: Video Decoder Name: Video Decoder
Author: K4YT3X Author: K4YT3X
Date Created: June 17, 2021 Date Created: June 17, 2021
Last Modified: February 16, 2022 Last Modified: February 27, 2022
""" """
# local imports
from .pipe_printer import PipePrinter
# built-in imports # built-in imports
import contextlib import contextlib
import os import os
@ -87,10 +90,15 @@ class VideoDecoder(threading.Thread):
), ),
overwrite_output=True, overwrite_output=True,
), ),
env={"AV_LOG_FORCE_COLOR": "TRUE"},
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
# stderr=subprocess.DEVNULL, stderr=subprocess.PIPE,
) )
# start the PIPE printer to start printing FFmpeg logs
self.pipe_printer = PipePrinter(self.decoder.stderr)
self.pipe_printer.start()
def run(self) -> None: def run(self) -> None:
self.running = True self.running = True
@ -113,8 +121,8 @@ class VideoDecoder(threading.Thread):
# after the last frame has been decoded # after the last frame has been decoded
# read will return nothing # read will return nothing
if len(buffer) == 0: if len(buffer) == 0:
logger.debug("Decoding queue depleted") self.stop()
break continue
# convert raw bytes into image object # convert raw bytes into image object
image = Image.frombytes( image = Image.frombytes(
@ -140,6 +148,7 @@ class VideoDecoder(threading.Thread):
# most likely "not enough image data" # most likely "not enough image data"
except ValueError as e: except ValueError as e:
self.exception = e
# ignore queue closed # ignore queue closed
if not "is closed" in str(e): if not "is closed" in str(e):
@ -151,13 +160,26 @@ class VideoDecoder(threading.Thread):
self.exception = e self.exception = e
logger.exception(e) logger.exception(e)
break break
else:
logger.debug("Decoding queue depleted")
# flush the remaining data in STDOUT and close PIPE
self.decoder.stdout.flush()
self.decoder.stdout.close()
# flush the remaining data in STDERR and wait for it to be read
self.decoder.stderr.flush()
# send SIGINT (2) to FFmpeg # send SIGINT (2) to FFmpeg
# this instructs it to finalize and exit # this instructs it to finalize and exit
if self.decoder.poll() is None: self.decoder.send_signal(signal.SIGINT)
self.decoder.send_signal(signal.SIGTERM)
# ensure the decoder has exited # wait for process to terminate
self.pipe_printer.stop()
self.decoder.stderr.close()
# wait for processes and threads to stop
self.pipe_printer.join()
self.decoder.wait() self.decoder.wait()
logger.info("Decoder thread exiting") logger.info("Decoder thread exiting")

View File

@ -19,9 +19,12 @@ along with this program. If not, see <https://www.gnu.org/licenses/>.
Name: Video Encoder Name: Video Encoder
Author: K4YT3X Author: K4YT3X
Date Created: June 17, 2021 Date Created: June 17, 2021
Last Modified: February 16, 2022 Last Modified: February 27, 2022
""" """
# local imports
from .pipe_printer import PipePrinter
# built-in imports # built-in imports
import multiprocessing import multiprocessing
import multiprocessing.managers import multiprocessing.managers
@ -74,6 +77,10 @@ class VideoEncoder(threading.Thread):
self.processed_frames = processed_frames self.processed_frames = processed_frames
self.processed = processed self.processed = processed
# stores exceptions if the thread exits with errors
self.exception = None
# create FFmpeg input for the original input video
self.original = ffmpeg.input(input_path) self.original = ffmpeg.input(input_path)
# define frames as input # define frames as input
@ -122,11 +129,15 @@ class VideoEncoder(threading.Thread):
), ),
overwrite_output=True, overwrite_output=True,
), ),
env={"AV_LOG_FORCE_COLOR": "TRUE"},
stdin=subprocess.PIPE, stdin=subprocess.PIPE,
# stdout=subprocess.DEVNULL, stderr=subprocess.PIPE,
# stderr=subprocess.DEVNULL,
) )
# start the PIPE printer to start printing FFmpeg logs
self.pipe_printer = PipePrinter(self.encoder.stderr)
self.pipe_printer.start()
def run(self) -> None: def run(self) -> None:
self.running = True self.running = True
frame_index = 0 frame_index = 0
@ -150,19 +161,29 @@ class VideoEncoder(threading.Thread):
# send exceptions into the client connection pipe # send exceptions into the client connection pipe
except Exception as e: except Exception as e:
self.exception = e
logger.exception(e) logger.exception(e)
break break
else:
logger.debug("Encoding queue depleted")
# flush the remaining data in STDIN and close PIPE # flush the remaining data in STDIN and close PIPE
logger.debug("Encoding queue depleted")
self.encoder.stdin.flush() self.encoder.stdin.flush()
self.encoder.stdin.close() self.encoder.stdin.close()
# flush the remaining data in STDERR and wait for it to be read
self.encoder.stderr.flush()
# send SIGINT (2) to FFmpeg # send SIGINT (2) to FFmpeg
# this instructs it to finalize and exit # this instructs it to finalize and exit
self.encoder.send_signal(signal.SIGINT) self.encoder.send_signal(signal.SIGINT)
# wait for process to terminate # wait for process to terminate
self.pipe_printer.stop()
self.encoder.stderr.close()
# wait for processes and threads to stop
self.pipe_printer.join()
self.encoder.wait() self.encoder.wait()
logger.info("Encoder thread exiting") logger.info("Encoder thread exiting")

56
video2x/pipe_printer.py Executable file
View File

@ -0,0 +1,56 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Copyright (C) 2018-2022 K4YT3X and contributors.
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
Name: PIPE Printer
Author: K4YT3X
Date Created: February 27, 2022
Last Modified: February 27, 2022
"""
# built-in imports
from typing import IO
import os
import sys
import threading
import time
class PipePrinter(threading.Thread):
def __init__(self, stderr: IO[bytes]) -> None:
threading.Thread.__init__(self)
self.stderr = stderr
self.running = False
# set read mode to non-blocking
os.set_blocking(self.stderr.fileno(), False)
def run(self) -> None:
self.running = True
# keep printing contents in the PIPE
while self.running:
time.sleep(0.5)
output = self.stderr.read()
if output is not None:
print(output.decode(), file=sys.stderr)
return super().run()
def stop(self) -> None:
self.running = False

View File

@ -248,10 +248,26 @@ class Video2X:
# wait for jobs in queue to deplete # wait for jobs in queue to deplete
while self.processed.value < total_frames - 1: while self.processed.value < total_frames - 1:
time.sleep(0.5) time.sleep(0.5)
# check processor health
for process in self.processor_processes: for process in self.processor_processes:
if not process.is_alive(): if not process.is_alive():
raise Exception("process died unexpectedly") raise Exception("process died unexpectedly")
# check decoder health
if (
not self.decoder.is_alive()
and self.decoder.exception is not None
):
raise Exception("decoder died unexpectedly")
# check encoder health
if (
not self.encoder.is_alive()
and self.encoder.exception is not None
):
raise Exception("encoder died unexpectedly")
# show progress bar when upscale starts # show progress bar when upscale starts
if progress.disable is True and self.processed.value > 0: if progress.disable is True and self.processed.value > 0:
progress.disable = False progress.disable = False
@ -266,7 +282,8 @@ class Video2X:
# if SIGTERM is received or ^C is pressed # if SIGTERM is received or ^C is pressed
# TODO: pause and continue here # TODO: pause and continue here
except (SystemExit, KeyboardInterrupt) as e: except (SystemExit, KeyboardInterrupt) as e:
logger.warning("Exit signal received, terminating") logger.warning("Exit signal received, exiting gracefully")
logger.warning("Press ^C again to force terminate")
exception.append(e) exception.append(e)
except Exception as e: except Exception as e:
@ -281,8 +298,8 @@ class Video2X:
# mark processing queue as closed # mark processing queue as closed
self.processing_queue.close() self.processing_queue.close()
# stop upscaler processes # stop processor processes
logger.info("Stopping upscaler processes") logger.info("Stopping processor processes")
for process in self.processor_processes: for process in self.processor_processes:
process.terminate() process.terminate()
@ -291,6 +308,7 @@ class Video2X:
process.join() process.join()
# ensure both the decoder and the encoder have exited # ensure both the decoder and the encoder have exited
logger.info("Stopping decoder and encoder threads")
self.decoder.stop() self.decoder.stop()
self.encoder.stop() self.encoder.stop()
self.decoder.join() self.decoder.join()