From 8ba56e58c257b1e5aede9e0824fb8c0e429deff8 Mon Sep 17 00:00:00 2001 From: k4yt3x Date: Mon, 21 Mar 2022 03:28:19 +0000 Subject: [PATCH] added pause function --- video2x/decoder.py | 12 +++- video2x/encoder.py | 19 ++++-- video2x/interpolator.py | 15 +++-- video2x/upscaler.py | 15 +++-- video2x/video2x.py | 136 +++++++++++++++++++++++++++------------- 5 files changed, 138 insertions(+), 59 deletions(-) diff --git a/video2x/decoder.py b/video2x/decoder.py index 2b129b8..fc9a991 100755 --- a/video2x/decoder.py +++ b/video2x/decoder.py @@ -19,7 +19,7 @@ along with this program. If not, see . Name: Video Decoder Author: K4YT3X Date Created: June 17, 2021 -Last Modified: March 1, 2022 +Last Modified: March 20, 2022 """ import contextlib @@ -30,6 +30,8 @@ import queue import signal import subprocess import threading +import time +from multiprocessing.sharedctypes import Synchronized import ffmpeg from loguru import logger @@ -58,6 +60,7 @@ class VideoDecoder(threading.Thread): frame_rate: float, processing_queue: multiprocessing.Queue, processing_settings: tuple, + pause: Synchronized, ignore_max_image_pixels=True, ) -> None: threading.Thread.__init__(self) @@ -67,6 +70,7 @@ class VideoDecoder(threading.Thread): self.input_height = input_height self.processing_queue = processing_queue self.processing_settings = processing_settings + self.pause = pause # this disables the "possible DDoS" warning if ignore_max_image_pixels: @@ -109,6 +113,12 @@ class VideoDecoder(threading.Thread): # continue running until an exception occurs # or all frames have been decoded while self.running: + + # pause if pause flag is set + if self.pause.value is True: + time.sleep(0.1) + continue + try: buffer = self.decoder.stdout.read( 3 * self.input_width * self.input_height diff --git a/video2x/encoder.py b/video2x/encoder.py index eabf2f5..4b77c01 100755 --- a/video2x/encoder.py +++ b/video2x/encoder.py @@ -19,18 +19,17 @@ along with this program. If not, see . Name: Video Encoder Author: K4YT3X Date Created: June 17, 2021 -Last Modified: March 1, 2022 +Last Modified: March 20, 2022 """ -import multiprocessing -import multiprocessing.managers -import multiprocessing.sharedctypes import os import pathlib import signal import subprocess import threading import time +from multiprocessing.managers import ListProxy +from multiprocessing.sharedctypes import Synchronized import ffmpeg from loguru import logger @@ -58,8 +57,9 @@ class VideoEncoder(threading.Thread): output_width: int, output_height: int, total_frames: int, - processed_frames: multiprocessing.managers.ListProxy, - processed: multiprocessing.sharedctypes.Synchronized, + processed_frames: ListProxy, + processed: Synchronized, + pause: Synchronized, copy_audio: bool = True, copy_subtitle: bool = True, copy_data: bool = False, @@ -72,6 +72,7 @@ class VideoEncoder(threading.Thread): self.total_frames = total_frames self.processed_frames = processed_frames self.processed = processed + self.pause = pause # stores exceptions if the thread exits with errors self.exception = None @@ -140,6 +141,12 @@ class VideoEncoder(threading.Thread): self.running = True frame_index = 0 while self.running and frame_index < self.total_frames: + + # pause if pause flag is set + if self.pause.value is True: + time.sleep(0.1) + continue + try: image = self.processed_frames[frame_index] if image is None: diff --git a/video2x/interpolator.py b/video2x/interpolator.py index 3aeb153..c104f93 100755 --- a/video2x/interpolator.py +++ b/video2x/interpolator.py @@ -19,15 +19,15 @@ along with this program. If not, see . Name: Interpolator Author: K4YT3X Date Created: May 27, 2021 -Last Modified: February 28, 2022 +Last Modified: March 20, 2022 """ import multiprocessing -import multiprocessing.managers -import multiprocessing.sharedctypes import queue import signal import time +from multiprocessing.managers import ListProxy +from multiprocessing.sharedctypes import Synchronized from loguru import logger from PIL import ImageChops, ImageStat @@ -40,12 +40,14 @@ class Interpolator(multiprocessing.Process): def __init__( self, processing_queue: multiprocessing.Queue, - processed_frames: multiprocessing.managers.ListProxy, + processed_frames: ListProxy, + pause: Synchronized, ) -> None: multiprocessing.Process.__init__(self) self.running = False self.processing_queue = processing_queue self.processed_frames = processed_frames + self.pause = pause signal.signal(signal.SIGTERM, self._stop) @@ -57,6 +59,11 @@ class Interpolator(multiprocessing.Process): processor_objects = {} while self.running: try: + # pause if pause flag is set + if self.pause.value is True: + time.sleep(0.1) + continue + try: # get new job from queue ( diff --git a/video2x/upscaler.py b/video2x/upscaler.py index 2f221c3..edd5567 100755 --- a/video2x/upscaler.py +++ b/video2x/upscaler.py @@ -19,16 +19,16 @@ along with this program. If not, see . Name: Upscaler Author: K4YT3X Date Created: May 27, 2021 -Last Modified: March 19, 2022 +Last Modified: March 20, 2022 """ import math import multiprocessing -import multiprocessing.managers -import multiprocessing.sharedctypes import queue import signal import time +from multiprocessing.managers import ListProxy +from multiprocessing.sharedctypes import Synchronized from loguru import logger from PIL import Image, ImageChops, ImageStat @@ -58,12 +58,14 @@ class Upscaler(multiprocessing.Process): def __init__( self, processing_queue: multiprocessing.Queue, - processed_frames: multiprocessing.managers.ListProxy, + processed_frames: ListProxy, + pause: Synchronized, ) -> None: multiprocessing.Process.__init__(self) self.running = False self.processing_queue = processing_queue self.processed_frames = processed_frames + self.pause = pause signal.signal(signal.SIGTERM, self._stop) @@ -75,6 +77,11 @@ class Upscaler(multiprocessing.Process): processor_objects = {} while self.running: try: + # pause if pause flag is set + if self.pause.value is True: + time.sleep(0.1) + continue + try: # get new job from queue ( diff --git a/video2x/video2x.py b/video2x/video2x.py index 15ed6d5..c077b63 100755 --- a/video2x/video2x.py +++ b/video2x/video2x.py @@ -27,7 +27,7 @@ __ __ _ _ ___ __ __ Name: Video2X Creator: K4YT3X Date Created: February 24, 2018 -Last Modified: March 19, 2022 +Last Modified: March 20, 2022 Editor: BrianPetkovsek Last Modified: June 17, 2019 @@ -40,15 +40,19 @@ Last Modified: March 23, 2020 """ import argparse +import ctypes import math import multiprocessing import os import pathlib +import signal import sys import time +from typing import Union import cv2 import ffmpeg +import pynput from loguru import logger from rich import print from rich.console import Console @@ -150,6 +154,22 @@ class Video2X: return video_info["width"], video_info["height"], total_frames, frame_rate + def _toggle_pause(self, _signal_number: int = -1, _frame=None): + # print console messages and update the progress bar's status + if self.pause.value is False: + self.progress.update(self.task, description=self.description + " (paused)") + self.progress.stop_task(self.task) + logger.warning("Processing paused, press Ctrl+Alt+V again to resume") + + elif self.pause.value is True: + self.progress.update(self.task, description=self.description) + logger.warning("Resuming processing") + self.progress.start_task(self.task) + + # invert the value of the pause flag + with self.pause.get_lock(): + self.pause.value = not self.pause.value + def _run( self, input_path: pathlib.Path, @@ -186,6 +206,7 @@ class Video2X: self.processing_queue = multiprocessing.Queue(maxsize=processes * 10) processed_frames = multiprocessing.Manager().list([None] * total_frames) self.processed = multiprocessing.Value("I", 0) + self.pause = multiprocessing.Value(ctypes.c_bool, False) # set up and start decoder thread logger.info("Starting video decoder") @@ -196,6 +217,7 @@ class Video2X: frame_rate, self.processing_queue, processing_settings, + self.pause, ) self.decoder.start() @@ -210,84 +232,110 @@ class Video2X: total_frames, processed_frames, self.processed, + self.pause, ) self.encoder.start() # create processor processes for process_name in range(processes): - process = Processor(self.processing_queue, processed_frames) + process = Processor(self.processing_queue, processed_frames, self.pause) process.name = str(process_name) process.daemon = True process.start() self.processor_processes.append(process) + # create progress bar + self.progress = Progress( + "[progress.description]{task.description}", + BarColumn(complete_style="blue", finished_style="green"), + "[progress.percentage]{task.percentage:>3.0f}%", + "[color(240)]({task.completed}/{task.total})", + ProcessingSpeedColumn(), + TimeElapsedColumn(), + "<", + TimeRemainingColumn(), + console=console, + disable=True, + ) + + self.description = f"[cyan]{MODE_LABELS.get(self.mode, 'Unknown')}" + self.task = self.progress.add_task(self.description, total=total_frames) + + # allow sending SIGUSR1 to pause/resume processing + signal.signal(signal.SIGUSR1, self._toggle_pause) + + # create global pause hotkey + pause_hotkey = pynput.keyboard.HotKey( + pynput.keyboard.HotKey.parse("++v"), self._toggle_pause + ) + + # create global keyboard input listener + keyboard_listener = pynput.keyboard.Listener( + on_press=(lambda key: pause_hotkey.press(keyboard_listener.canonical(key))), + on_release=( + lambda key: pause_hotkey.release(keyboard_listener.canonical(key)) + ), + ) + + # start monitoring global key presses + keyboard_listener.start() + # a temporary variable that stores the exception exception = [] try: - # create progress bar - with Progress( - "[progress.description]{task.description}", - BarColumn(complete_style="blue", finished_style="green"), - "[progress.percentage]{task.percentage:>3.0f}%", - "[color(240)]({task.completed}/{task.total})", - ProcessingSpeedColumn(), - TimeElapsedColumn(), - "<", - TimeRemainingColumn(), - console=console, - disable=True, - ) as progress: - task = progress.add_task( - f"[cyan]{MODE_LABELS.get(mode, 'Unknown')}", total=total_frames - ) - # wait for jobs in queue to deplete - while self.processed.value < total_frames - 1: - time.sleep(0.5) + # wait for jobs in queue to deplete + while self.processed.value < total_frames - 1: + time.sleep(1) - # check processor health - for process in self.processor_processes: - if not process.is_alive(): - raise Exception("process died unexpectedly") + # check processor health + for process in self.processor_processes: + if not process.is_alive(): + raise Exception("process died unexpectedly") - # check decoder health - if ( - not self.decoder.is_alive() - and self.decoder.exception is not None - ): - raise Exception("decoder died unexpectedly") + # check decoder health + if not self.decoder.is_alive() and self.decoder.exception is not None: + raise Exception("decoder died unexpectedly") - # check encoder health - if ( - not self.encoder.is_alive() - and self.encoder.exception is not None - ): - raise Exception("encoder died unexpectedly") + # check encoder health + if not self.encoder.is_alive() and self.encoder.exception is not None: + raise Exception("encoder died unexpectedly") - # show progress bar when upscale starts - if progress.disable is True and self.processed.value > 0: - progress.disable = False - progress.start() + # show progress bar when upscale starts + if self.progress.disable is True and self.processed.value > 0: + self.progress.disable = False + self.progress.start() - # update progress - progress.update(task, completed=self.processed.value) + # update progress + if self.pause.value is False: + self.progress.update(self.task, completed=self.processed.value) - progress.update(task, completed=total_frames) + self.progress.update(self.task, completed=total_frames) + self.progress.stop() logger.info("Processing has completed") # if SIGTERM is received or ^C is pressed # TODO: pause and continue here except (SystemExit, KeyboardInterrupt) as e: + self.progress.stop() logger.warning("Exit signal received, exiting gracefully") logger.warning("Press ^C again to force terminate") exception.append(e) except Exception as e: + self.progress.stop() logger.exception(e) exception.append(e) finally: + # stop keyboard listener + keyboard_listener.stop() + keyboard_listener.join() + + # stop progress display + self.progress.stop() + # stop processor processes logger.info("Stopping processor processes") for process in self.processor_processes: