added pause function

This commit is contained in:
k4yt3x 2022-03-21 03:28:19 +00:00
parent fa485b6cdd
commit 8ba56e58c2
5 changed files with 138 additions and 59 deletions

View File

@ -19,7 +19,7 @@ along with this program. If not, see <https://www.gnu.org/licenses/>.
Name: Video Decoder Name: Video Decoder
Author: K4YT3X Author: K4YT3X
Date Created: June 17, 2021 Date Created: June 17, 2021
Last Modified: March 1, 2022 Last Modified: March 20, 2022
""" """
import contextlib import contextlib
@ -30,6 +30,8 @@ import queue
import signal import signal
import subprocess import subprocess
import threading import threading
import time
from multiprocessing.sharedctypes import Synchronized
import ffmpeg import ffmpeg
from loguru import logger from loguru import logger
@ -58,6 +60,7 @@ class VideoDecoder(threading.Thread):
frame_rate: float, frame_rate: float,
processing_queue: multiprocessing.Queue, processing_queue: multiprocessing.Queue,
processing_settings: tuple, processing_settings: tuple,
pause: Synchronized,
ignore_max_image_pixels=True, ignore_max_image_pixels=True,
) -> None: ) -> None:
threading.Thread.__init__(self) threading.Thread.__init__(self)
@ -67,6 +70,7 @@ class VideoDecoder(threading.Thread):
self.input_height = input_height self.input_height = input_height
self.processing_queue = processing_queue self.processing_queue = processing_queue
self.processing_settings = processing_settings self.processing_settings = processing_settings
self.pause = pause
# this disables the "possible DDoS" warning # this disables the "possible DDoS" warning
if ignore_max_image_pixels: if ignore_max_image_pixels:
@ -109,6 +113,12 @@ class VideoDecoder(threading.Thread):
# continue running until an exception occurs # continue running until an exception occurs
# or all frames have been decoded # or all frames have been decoded
while self.running: while self.running:
# pause if pause flag is set
if self.pause.value is True:
time.sleep(0.1)
continue
try: try:
buffer = self.decoder.stdout.read( buffer = self.decoder.stdout.read(
3 * self.input_width * self.input_height 3 * self.input_width * self.input_height

View File

@ -19,18 +19,17 @@ along with this program. If not, see <https://www.gnu.org/licenses/>.
Name: Video Encoder Name: Video Encoder
Author: K4YT3X Author: K4YT3X
Date Created: June 17, 2021 Date Created: June 17, 2021
Last Modified: March 1, 2022 Last Modified: March 20, 2022
""" """
import multiprocessing
import multiprocessing.managers
import multiprocessing.sharedctypes
import os import os
import pathlib import pathlib
import signal import signal
import subprocess import subprocess
import threading import threading
import time import time
from multiprocessing.managers import ListProxy
from multiprocessing.sharedctypes import Synchronized
import ffmpeg import ffmpeg
from loguru import logger from loguru import logger
@ -58,8 +57,9 @@ class VideoEncoder(threading.Thread):
output_width: int, output_width: int,
output_height: int, output_height: int,
total_frames: int, total_frames: int,
processed_frames: multiprocessing.managers.ListProxy, processed_frames: ListProxy,
processed: multiprocessing.sharedctypes.Synchronized, processed: Synchronized,
pause: Synchronized,
copy_audio: bool = True, copy_audio: bool = True,
copy_subtitle: bool = True, copy_subtitle: bool = True,
copy_data: bool = False, copy_data: bool = False,
@ -72,6 +72,7 @@ class VideoEncoder(threading.Thread):
self.total_frames = total_frames self.total_frames = total_frames
self.processed_frames = processed_frames self.processed_frames = processed_frames
self.processed = processed self.processed = processed
self.pause = pause
# stores exceptions if the thread exits with errors # stores exceptions if the thread exits with errors
self.exception = None self.exception = None
@ -140,6 +141,12 @@ class VideoEncoder(threading.Thread):
self.running = True self.running = True
frame_index = 0 frame_index = 0
while self.running and frame_index < self.total_frames: while self.running and frame_index < self.total_frames:
# pause if pause flag is set
if self.pause.value is True:
time.sleep(0.1)
continue
try: try:
image = self.processed_frames[frame_index] image = self.processed_frames[frame_index]
if image is None: if image is None:

View File

@ -19,15 +19,15 @@ along with this program. If not, see <https://www.gnu.org/licenses/>.
Name: Interpolator Name: Interpolator
Author: K4YT3X Author: K4YT3X
Date Created: May 27, 2021 Date Created: May 27, 2021
Last Modified: February 28, 2022 Last Modified: March 20, 2022
""" """
import multiprocessing import multiprocessing
import multiprocessing.managers
import multiprocessing.sharedctypes
import queue import queue
import signal import signal
import time import time
from multiprocessing.managers import ListProxy
from multiprocessing.sharedctypes import Synchronized
from loguru import logger from loguru import logger
from PIL import ImageChops, ImageStat from PIL import ImageChops, ImageStat
@ -40,12 +40,14 @@ class Interpolator(multiprocessing.Process):
def __init__( def __init__(
self, self,
processing_queue: multiprocessing.Queue, processing_queue: multiprocessing.Queue,
processed_frames: multiprocessing.managers.ListProxy, processed_frames: ListProxy,
pause: Synchronized,
) -> None: ) -> None:
multiprocessing.Process.__init__(self) multiprocessing.Process.__init__(self)
self.running = False self.running = False
self.processing_queue = processing_queue self.processing_queue = processing_queue
self.processed_frames = processed_frames self.processed_frames = processed_frames
self.pause = pause
signal.signal(signal.SIGTERM, self._stop) signal.signal(signal.SIGTERM, self._stop)
@ -57,6 +59,11 @@ class Interpolator(multiprocessing.Process):
processor_objects = {} processor_objects = {}
while self.running: while self.running:
try: try:
# pause if pause flag is set
if self.pause.value is True:
time.sleep(0.1)
continue
try: try:
# get new job from queue # get new job from queue
( (

View File

@ -19,16 +19,16 @@ along with this program. If not, see <https://www.gnu.org/licenses/>.
Name: Upscaler Name: Upscaler
Author: K4YT3X Author: K4YT3X
Date Created: May 27, 2021 Date Created: May 27, 2021
Last Modified: March 19, 2022 Last Modified: March 20, 2022
""" """
import math import math
import multiprocessing import multiprocessing
import multiprocessing.managers
import multiprocessing.sharedctypes
import queue import queue
import signal import signal
import time import time
from multiprocessing.managers import ListProxy
from multiprocessing.sharedctypes import Synchronized
from loguru import logger from loguru import logger
from PIL import Image, ImageChops, ImageStat from PIL import Image, ImageChops, ImageStat
@ -58,12 +58,14 @@ class Upscaler(multiprocessing.Process):
def __init__( def __init__(
self, self,
processing_queue: multiprocessing.Queue, processing_queue: multiprocessing.Queue,
processed_frames: multiprocessing.managers.ListProxy, processed_frames: ListProxy,
pause: Synchronized,
) -> None: ) -> None:
multiprocessing.Process.__init__(self) multiprocessing.Process.__init__(self)
self.running = False self.running = False
self.processing_queue = processing_queue self.processing_queue = processing_queue
self.processed_frames = processed_frames self.processed_frames = processed_frames
self.pause = pause
signal.signal(signal.SIGTERM, self._stop) signal.signal(signal.SIGTERM, self._stop)
@ -75,6 +77,11 @@ class Upscaler(multiprocessing.Process):
processor_objects = {} processor_objects = {}
while self.running: while self.running:
try: try:
# pause if pause flag is set
if self.pause.value is True:
time.sleep(0.1)
continue
try: try:
# get new job from queue # get new job from queue
( (

View File

@ -27,7 +27,7 @@ __ __ _ _ ___ __ __
Name: Video2X Name: Video2X
Creator: K4YT3X Creator: K4YT3X
Date Created: February 24, 2018 Date Created: February 24, 2018
Last Modified: March 19, 2022 Last Modified: March 20, 2022
Editor: BrianPetkovsek Editor: BrianPetkovsek
Last Modified: June 17, 2019 Last Modified: June 17, 2019
@ -40,15 +40,19 @@ Last Modified: March 23, 2020
""" """
import argparse import argparse
import ctypes
import math import math
import multiprocessing import multiprocessing
import os import os
import pathlib import pathlib
import signal
import sys import sys
import time import time
from typing import Union
import cv2 import cv2
import ffmpeg import ffmpeg
import pynput
from loguru import logger from loguru import logger
from rich import print from rich import print
from rich.console import Console from rich.console import Console
@ -150,6 +154,22 @@ class Video2X:
return video_info["width"], video_info["height"], total_frames, frame_rate return video_info["width"], video_info["height"], total_frames, frame_rate
def _toggle_pause(self, _signal_number: int = -1, _frame=None):
# print console messages and update the progress bar's status
if self.pause.value is False:
self.progress.update(self.task, description=self.description + " (paused)")
self.progress.stop_task(self.task)
logger.warning("Processing paused, press Ctrl+Alt+V again to resume")
elif self.pause.value is True:
self.progress.update(self.task, description=self.description)
logger.warning("Resuming processing")
self.progress.start_task(self.task)
# invert the value of the pause flag
with self.pause.get_lock():
self.pause.value = not self.pause.value
def _run( def _run(
self, self,
input_path: pathlib.Path, input_path: pathlib.Path,
@ -186,6 +206,7 @@ class Video2X:
self.processing_queue = multiprocessing.Queue(maxsize=processes * 10) self.processing_queue = multiprocessing.Queue(maxsize=processes * 10)
processed_frames = multiprocessing.Manager().list([None] * total_frames) processed_frames = multiprocessing.Manager().list([None] * total_frames)
self.processed = multiprocessing.Value("I", 0) self.processed = multiprocessing.Value("I", 0)
self.pause = multiprocessing.Value(ctypes.c_bool, False)
# set up and start decoder thread # set up and start decoder thread
logger.info("Starting video decoder") logger.info("Starting video decoder")
@ -196,6 +217,7 @@ class Video2X:
frame_rate, frame_rate,
self.processing_queue, self.processing_queue,
processing_settings, processing_settings,
self.pause,
) )
self.decoder.start() self.decoder.start()
@ -210,84 +232,110 @@ class Video2X:
total_frames, total_frames,
processed_frames, processed_frames,
self.processed, self.processed,
self.pause,
) )
self.encoder.start() self.encoder.start()
# create processor processes # create processor processes
for process_name in range(processes): for process_name in range(processes):
process = Processor(self.processing_queue, processed_frames) process = Processor(self.processing_queue, processed_frames, self.pause)
process.name = str(process_name) process.name = str(process_name)
process.daemon = True process.daemon = True
process.start() process.start()
self.processor_processes.append(process) self.processor_processes.append(process)
# create progress bar
self.progress = Progress(
"[progress.description]{task.description}",
BarColumn(complete_style="blue", finished_style="green"),
"[progress.percentage]{task.percentage:>3.0f}%",
"[color(240)]({task.completed}/{task.total})",
ProcessingSpeedColumn(),
TimeElapsedColumn(),
"<",
TimeRemainingColumn(),
console=console,
disable=True,
)
self.description = f"[cyan]{MODE_LABELS.get(self.mode, 'Unknown')}"
self.task = self.progress.add_task(self.description, total=total_frames)
# allow sending SIGUSR1 to pause/resume processing
signal.signal(signal.SIGUSR1, self._toggle_pause)
# create global pause hotkey
pause_hotkey = pynput.keyboard.HotKey(
pynput.keyboard.HotKey.parse("<ctrl>+<alt>+v"), self._toggle_pause
)
# create global keyboard input listener
keyboard_listener = pynput.keyboard.Listener(
on_press=(lambda key: pause_hotkey.press(keyboard_listener.canonical(key))),
on_release=(
lambda key: pause_hotkey.release(keyboard_listener.canonical(key))
),
)
# start monitoring global key presses
keyboard_listener.start()
# a temporary variable that stores the exception # a temporary variable that stores the exception
exception = [] exception = []
try: try:
# create progress bar
with Progress(
"[progress.description]{task.description}",
BarColumn(complete_style="blue", finished_style="green"),
"[progress.percentage]{task.percentage:>3.0f}%",
"[color(240)]({task.completed}/{task.total})",
ProcessingSpeedColumn(),
TimeElapsedColumn(),
"<",
TimeRemainingColumn(),
console=console,
disable=True,
) as progress:
task = progress.add_task(
f"[cyan]{MODE_LABELS.get(mode, 'Unknown')}", total=total_frames
)
# wait for jobs in queue to deplete # wait for jobs in queue to deplete
while self.processed.value < total_frames - 1: while self.processed.value < total_frames - 1:
time.sleep(0.5) time.sleep(1)
# check processor health # check processor health
for process in self.processor_processes: for process in self.processor_processes:
if not process.is_alive(): if not process.is_alive():
raise Exception("process died unexpectedly") raise Exception("process died unexpectedly")
# check decoder health # check decoder health
if ( if not self.decoder.is_alive() and self.decoder.exception is not None:
not self.decoder.is_alive() raise Exception("decoder died unexpectedly")
and self.decoder.exception is not None
):
raise Exception("decoder died unexpectedly")
# check encoder health # check encoder health
if ( if not self.encoder.is_alive() and self.encoder.exception is not None:
not self.encoder.is_alive() raise Exception("encoder died unexpectedly")
and self.encoder.exception is not None
):
raise Exception("encoder died unexpectedly")
# show progress bar when upscale starts # show progress bar when upscale starts
if progress.disable is True and self.processed.value > 0: if self.progress.disable is True and self.processed.value > 0:
progress.disable = False self.progress.disable = False
progress.start() self.progress.start()
# update progress # update progress
progress.update(task, completed=self.processed.value) if self.pause.value is False:
self.progress.update(self.task, completed=self.processed.value)
progress.update(task, completed=total_frames) self.progress.update(self.task, completed=total_frames)
self.progress.stop()
logger.info("Processing has completed") logger.info("Processing has completed")
# if SIGTERM is received or ^C is pressed # if SIGTERM is received or ^C is pressed
# TODO: pause and continue here # TODO: pause and continue here
except (SystemExit, KeyboardInterrupt) as e: except (SystemExit, KeyboardInterrupt) as e:
self.progress.stop()
logger.warning("Exit signal received, exiting gracefully") logger.warning("Exit signal received, exiting gracefully")
logger.warning("Press ^C again to force terminate") logger.warning("Press ^C again to force terminate")
exception.append(e) exception.append(e)
except Exception as e: except Exception as e:
self.progress.stop()
logger.exception(e) logger.exception(e)
exception.append(e) exception.append(e)
finally: finally:
# stop keyboard listener
keyboard_listener.stop()
keyboard_listener.join()
# stop progress display
self.progress.stop()
# stop processor processes # stop processor processes
logger.info("Stopping processor processes") logger.info("Stopping processor processes")
for process in self.processor_processes: for process in self.processor_processes: