diff --git a/video2x/__init__.py b/video2x/__init__.py index 86a0b32..72abf0a 100755 --- a/video2x/__init__.py +++ b/video2x/__init__.py @@ -31,4 +31,4 @@ __version__ = "5.0.0-beta5" # generated by the following lines from .interpolator import Interpolator from .upscaler import Upscaler -from .video2x import Video2X, main +from .video2x import Video2X diff --git a/video2x/__main__.py b/video2x/__main__.py index 51bfcb3..0c6e510 100755 --- a/video2x/__main__.py +++ b/video2x/__main__.py @@ -22,9 +22,218 @@ Date Created: July 3, 2021 Last Modified: February 26, 2022 """ +import argparse +import os +import pathlib import sys -from .video2x import main +from loguru import logger +from rich import print as rich_print + +from . import __version__ +from .video2x import LOGURU_FORMAT, Video2X + +LEGAL_INFO = f"""Video2X\t\t{__version__} +Author:\t\tK4YT3X +License:\tGNU AGPL v3 +Github Page:\thttps://github.com/k4yt3x/video2x +Contact:\ti@k4yt3x.com""" + +# algorithms available for upscaling tasks +UPSCALING_ALGORITHMS = [ + "waifu2x", + "srmd", + "realsr", + "realcugan", +] + +# algorithms available for frame interpolation tasks +INTERPOLATION_ALGORITHMS = ["rife"] + + +def parse_arguments() -> argparse.Namespace: + """ + parse command line arguments + + :rtype argparse.Namespace: command parsing results + """ + parser = argparse.ArgumentParser( + prog="video2x", + formatter_class=argparse.ArgumentDefaultsHelpFormatter, + ) + parser.add_argument( + "--version", help="show version information and exit", action="store_true" + ) + parser.add_argument( + "-i", + "--input", + type=pathlib.Path, + help="input file/directory path", + required=True, + ) + parser.add_argument( + "-o", + "--output", + type=pathlib.Path, + help="output file/directory path", + required=True, + ) + parser.add_argument( + "-p", "--processes", type=int, help="number of processes to launch", default=1 + ) + parser.add_argument( + "-l", + "--loglevel", + choices=["trace", "debug", "info", "success", "warning", "error", "critical"], + default="info", + ) + + # upscaler arguments + action = parser.add_subparsers( + help="action to perform", dest="action", required=True + ) + + upscale = action.add_parser( + "upscale", + help="upscale a file", + formatter_class=argparse.ArgumentDefaultsHelpFormatter, + add_help=False, + ) + upscale.add_argument( + "--help", action="help", help="show this help message and exit" + ) + upscale.add_argument("-w", "--width", type=int, help="output width") + upscale.add_argument("-h", "--height", type=int, help="output height") + upscale.add_argument("-n", "--noise", type=int, help="denoise level", default=3) + upscale.add_argument( + "-a", + "--algorithm", + choices=UPSCALING_ALGORITHMS, + help="algorithm to use for upscaling", + default=UPSCALING_ALGORITHMS[0], + ) + upscale.add_argument( + "-t", + "--threshold", + type=float, + help=( + "skip if the percent difference between two adjacent frames is below this" + " value; set to 0 to process all frames" + ), + default=0, + ) + + # interpolator arguments + interpolate = action.add_parser( + "interpolate", + help="interpolate frames for file", + formatter_class=argparse.ArgumentDefaultsHelpFormatter, + add_help=False, + ) + interpolate.add_argument( + "--help", action="help", help="show this help message and exit" + ) + interpolate.add_argument( + "-a", + "--algorithm", + choices=UPSCALING_ALGORITHMS, + help="algorithm to use for upscaling", + default=INTERPOLATION_ALGORITHMS[0], + ) + interpolate.add_argument( + "-t", + "--threshold", + type=float, + help=( + "skip if the percent difference between two adjacent frames exceeds this" + " value; set to 100 to interpolate all frames" + ), + default=10, + ) + + return parser.parse_args() + + +def main() -> int: + """ + command line entrypoint for direct CLI invocation + + :rtype int: 0 if completed successfully, else other int + """ + + try: + # display version and lawful informaition + if "--version" in sys.argv: + rich_print(LEGAL_INFO) + return 0 + + # parse command line arguments + args = parse_arguments() + + # check input/output file paths + if not args.input.exists(): + logger.critical(f"Cannot find input file: {args.input}") + return 1 + if not args.input.is_file(): + logger.critical("Input path is not a file") + return 1 + if not args.output.parent.exists(): + logger.critical(f"Output directory does not exist: {args.output.parent}") + return 1 + + # set logger level + if os.environ.get("LOGURU_LEVEL") is None: + os.environ["LOGURU_LEVEL"] = args.loglevel.upper() + + # remove default handler + logger.remove() + + # add new sink with custom handler + logger.add(sys.stderr, colorize=True, format=LOGURU_FORMAT) + + # print package version and copyright notice + logger.opt(colors=True).info(f"Video2X {__version__}") + logger.opt(colors=True).info( + "Copyright (C) 2018-2022 K4YT3X and contributors." + ) + + # initialize video2x object + video2x = Video2X() + + if args.action == "upscale": + video2x.upscale( + args.input, + args.output, + args.width, + args.height, + args.noise, + args.processes, + args.threshold, + args.algorithm, + ) + + elif args.action == "interpolate": + video2x.interpolate( + args.input, + args.output, + args.processes, + args.threshold, + args.algorithm, + ) + + # don't print the traceback for manual terminations + except KeyboardInterrupt: + return 2 + + except Exception as error: + logger.exception(error) + return 1 + + # if no exceptions were produced + else: + logger.success("Processing completed successfully") + return 0 + if __name__ == "__main__": sys.exit(main()) diff --git a/video2x/decoder.py b/video2x/decoder.py index c0eb47a..cd333a7 100755 --- a/video2x/decoder.py +++ b/video2x/decoder.py @@ -19,22 +19,19 @@ along with this program. If not, see . Name: Video Decoder Author: K4YT3X Date Created: June 17, 2021 -Last Modified: March 21, 2022 +Last Modified: April 9, 2022 """ import contextlib -import multiprocessing import os import pathlib -import queue import signal import subprocess -import threading -import time -from multiprocessing.sharedctypes import Synchronized +from multiprocessing import Queue +from queue import Full +from threading import Thread import ffmpeg -from loguru import logger from PIL import Image from .pipe_printer import PipePrinter @@ -51,36 +48,38 @@ LOGURU_FFMPEG_LOGLEVELS = { } -class VideoDecoder(threading.Thread): +class VideoDecoder: + """ + A video decoder that generates frames read from FFmpeg. + + :param input_path pathlib.Path: the input file's path + :param input_width int: the input file's width + :param input_height int: the input file's height + :param frame_rate float: the input file's frame rate + :param pil_ignore_max_image_pixels bool: setting this to True + disables PIL's "possible DDoS" warning + """ + def __init__( self, input_path: pathlib.Path, input_width: int, input_height: int, frame_rate: float, - processing_queue: multiprocessing.Queue, - processing_settings: tuple, - pause: Synchronized, - ignore_max_image_pixels=True, + pil_ignore_max_image_pixels: bool = True, ) -> None: - threading.Thread.__init__(self) - self.running = False self.input_path = input_path self.input_width = input_width self.input_height = input_height - self.processing_queue = processing_queue - self.processing_settings = processing_settings - self.pause = pause # this disables the "possible DDoS" warning - if ignore_max_image_pixels: + if pil_ignore_max_image_pixels is True: Image.MAX_IMAGE_PIXELS = None - self.exception = None self.decoder = subprocess.Popen( ffmpeg.compile( ffmpeg.input(input_path, r=frame_rate)["v"] - .output("pipe:1", format="rawvideo", pix_fmt="rgb24", vsync="cfr") + .output("pipe:1", format="rawvideo", pix_fmt="rgb24", fps_mode="cfr") .global_args("-hide_banner") .global_args("-nostats") .global_args("-nostdin") @@ -102,83 +101,33 @@ class VideoDecoder(threading.Thread): self.pipe_printer = PipePrinter(self.decoder.stderr) self.pipe_printer.start() - def run(self) -> None: - self.running = True + def __iter__(self): - # the index of the frame - frame_index = 0 - - # create placeholder for previous frame - # used in interpolate mode - previous_image = None - - # continue running until an exception occurs - # or all frames have been decoded - while self.running is True: - - # pause if pause flag is set - if self.pause.value is True: - time.sleep(0.1) - continue - - try: - buffer = self.decoder.stdout.read( + # continue yielding while FFmpeg continues to produce output + while ( + len( + buffer := self.decoder.stdout.read( 3 * self.input_width * self.input_height ) + ) + > 0 + ): - # source depleted (decoding finished) - # after the last frame has been decoded - # read will return nothing - if len(buffer) == 0: - self.stop() - continue + # convert raw bytes into image object + frame = Image.frombytes( + "RGB", (self.input_width, self.input_height), buffer + ) - # convert raw bytes into image object - image = Image.frombytes( - "RGB", (self.input_width, self.input_height), buffer - ) + # return this frame + yield frame - # keep checking if the running flag is set to False - # while waiting to put the next image into the queue - while self.running is True: - with contextlib.suppress(queue.Full): - self.processing_queue.put( - ( - frame_index, - (previous_image, image), - self.processing_settings, - ), - timeout=0.1, - ) - break + # automatically self-join and clean up after iterations are done + self.join() - previous_image = image - frame_index += 1 + def kill(self): + self.decoder.send_signal(signal.SIGKILL) - # most likely "not enough image data" - except ValueError as error: - self.exception = error - - # ignore queue closed - if "is closed" not in str(error): - logger.exception(error) - break - - # send exceptions into the client connection pipe - except Exception as error: - self.exception = error - logger.exception(error) - break - else: - logger.debug("Decoding queue depleted") - - # flush the remaining data in STDOUT and STDERR - self.decoder.stdout.flush() - self.decoder.stderr.flush() - - # send SIGINT (2) to FFmpeg - # this instructs it to finalize and exit - self.decoder.send_signal(signal.SIGINT) + def join(self): # close PIPEs to prevent process from getting stuck self.decoder.stdout.close() @@ -191,8 +140,38 @@ class VideoDecoder(threading.Thread): self.pipe_printer.stop() self.pipe_printer.join() - logger.info("Decoder thread exiting") - return super().run() - def stop(self) -> None: +class VideoDecoderThread(Thread): + def __init__( + self, tasks_queue: Queue, decoder: VideoDecoder, processing_settings: tuple + ): + super().__init__() + + self.tasks_queue = tasks_queue + self.decoder = decoder + self.processing_settings = processing_settings + self.running = False + + def run(self): + self.running = True + previous_frame = None + for frame_index, frame in enumerate(self.decoder): + + while True: + + # check for the stop signal + if self.running is False: + self.decoder.join() + return + + with contextlib.suppress(Full): + self.tasks_queue.put( + (frame_index, previous_frame, frame, self.processing_settings), + timeout=0.1, + ) + break + + previous_frame = frame + + def stop(self): self.running = False diff --git a/video2x/encoder.py b/video2x/encoder.py index bba4319..20c364d 100755 --- a/video2x/encoder.py +++ b/video2x/encoder.py @@ -19,20 +19,16 @@ along with this program. If not, see . Name: Video Encoder Author: K4YT3X Date Created: June 17, 2021 -Last Modified: March 20, 2022 +Last Modified: August 28, 2022 """ import os import pathlib import signal import subprocess -import threading -import time -from multiprocessing.managers import ListProxy -from multiprocessing.sharedctypes import Synchronized import ffmpeg -from loguru import logger +from PIL import Image from .pipe_printer import PipePrinter @@ -48,7 +44,7 @@ LOGURU_FFMPEG_LOGLEVELS = { } -class VideoEncoder(threading.Thread): +class VideoEncoder: def __init__( self, input_path: pathlib.Path, @@ -56,36 +52,20 @@ class VideoEncoder(threading.Thread): output_path: pathlib.Path, output_width: int, output_height: int, - total_frames: int, - processed_frames: ListProxy, - processed: Synchronized, - pause: Synchronized, copy_audio: bool = True, copy_subtitle: bool = True, copy_data: bool = False, copy_attachments: bool = False, ) -> None: - threading.Thread.__init__(self) - self.running = False - self.input_path = input_path - self.output_path = output_path - self.total_frames = total_frames - self.processed_frames = processed_frames - self.processed = processed - self.pause = pause - - # stores exceptions if the thread exits with errors - self.exception = None # create FFmpeg input for the original input video - self.original = ffmpeg.input(input_path) + original = ffmpeg.input(input_path) # define frames as input frames = ffmpeg.input( "pipe:0", format="rawvideo", pix_fmt="rgb24", - vsync="cfr", s=f"{output_width}x{output_height}", r=frame_rate, ) @@ -93,11 +73,11 @@ class VideoEncoder(threading.Thread): # copy additional streams from original file # https://ffmpeg.org/ffmpeg.html#Stream-specifiers-1 additional_streams = [ - # self.original["1:v?"], - self.original["a?"] if copy_audio is True else None, - self.original["s?"] if copy_subtitle is True else None, - self.original["d?"] if copy_data is True else None, - self.original["t?"] if copy_attachments is True else None, + # original["1:v?"], + original["a?"] if copy_audio is True else None, + original["s?"] if copy_subtitle is True else None, + original["d?"] if copy_data is True else None, + original["t?"] if copy_attachments is True else None, ] # run FFmpeg and produce final output @@ -106,10 +86,10 @@ class VideoEncoder(threading.Thread): ffmpeg.output( frames, *[s for s in additional_streams if s is not None], - str(self.output_path), + str(output_path), vcodec="libx264", scodec="copy", - vsync="cfr", + fps_mode="cfr", pix_fmt="yuv420p", crf=17, preset="veryslow", @@ -138,49 +118,26 @@ class VideoEncoder(threading.Thread): self.pipe_printer = PipePrinter(self.encoder.stderr) self.pipe_printer.start() - def run(self) -> None: - self.running = True - frame_index = 0 - while self.running and frame_index < self.total_frames: + def kill(self): + self.encoder.send_signal(signal.SIGKILL) - # pause if pause flag is set - if self.pause.value is True: - time.sleep(0.1) - continue + def write(self, frame: Image.Image) -> None: + """ + write a frame into FFmpeg encoder's STDIN - try: - image = self.processed_frames[frame_index] - if image is None: - time.sleep(0.1) - continue - - # send the image to FFmpeg for encoding - self.encoder.stdin.write(image.tobytes()) - - # remove the image from memory - self.processed_frames[frame_index] = None - - with self.processed.get_lock(): - self.processed.value += 1 - - frame_index += 1 - - # send exceptions into the client connection pipe - except Exception as error: - self.exception = error - logger.exception(error) - break - else: - logger.debug("Encoding queue depleted") + :param frame Image.Image: the Image object to use for writing + """ + self.encoder.stdin.write(frame.tobytes()) + def join(self) -> None: + """ + signal the encoder that all frames have been sent and the FFmpeg + should be instructed to wrap-up the processing + """ # flush the remaining data in STDIN and STDERR self.encoder.stdin.flush() self.encoder.stderr.flush() - # send SIGINT (2) to FFmpeg - # this instructs it to finalize and exit - self.encoder.send_signal(signal.SIGINT) - # close PIPEs to prevent process from getting stuck self.encoder.stdin.close() self.encoder.stderr.close() @@ -191,9 +148,3 @@ class VideoEncoder(threading.Thread): # wait for PIPE printer to exit self.pipe_printer.stop() self.pipe_printer.join() - - logger.info("Encoder thread exiting") - return super().run() - - def stop(self) -> None: - self.running = False diff --git a/video2x/interpolator.py b/video2x/interpolator.py index 92514ce..41e55d0 100755 --- a/video2x/interpolator.py +++ b/video2x/interpolator.py @@ -44,11 +44,13 @@ class Interpolator(multiprocessing.Process): pause: Synchronized, ) -> None: multiprocessing.Process.__init__(self) - self.running = False self.processing_queue = processing_queue self.processed_frames = processed_frames self.pause = pause + self.running = False + self.processor_objects = {} + signal.signal(signal.SIGTERM, self._stop) def run(self) -> None: @@ -56,7 +58,6 @@ class Interpolator(multiprocessing.Process): logger.opt(colors=True).info( f"Interpolator process {self.name} initiating" ) - processor_objects = {} while self.running is True: try: # pause if pause flag is set @@ -80,6 +81,7 @@ class Interpolator(multiprocessing.Process): if image0 is None: continue + # calculate the %diff between the current frame and the previous frame difference = ImageChops.difference(image0, image1) difference_stat = ImageStat.Stat(difference) difference_ratio = ( @@ -92,10 +94,10 @@ class Interpolator(multiprocessing.Process): # select a processor object with the required settings # create a new object if none are available - processor_object = processor_objects.get(algorithm) + processor_object = self.processor_objects.get(algorithm) if processor_object is None: processor_object = ALGORITHM_CLASSES[algorithm](0) - processor_objects[algorithm] = processor_object + self.processor_objects[algorithm] = processor_object interpolated_image = processor_object.process(image0, image1) # if the difference is greater than threshold diff --git a/video2x/processor.py b/video2x/processor.py new file mode 100755 index 0000000..5e8569a --- /dev/null +++ b/video2x/processor.py @@ -0,0 +1,69 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +Copyright (C) 2018-2022 K4YT3X and contributors. + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License as +published by the Free Software Foundation, either version 3 of the +License, or (at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see . + +Name: Processor Abstract Class +Author: K4YT3X +Date Created: April 9, 2022 +Last Modified: April 9, 2022 +""" + +from abc import ABC, abstractmethod +from multiprocessing import Queue +from multiprocessing.managers import DictProxy +from multiprocessing.sharedctypes import Synchronized + +from PIL import Image, ImageChops, ImageStat + + +class Processor(ABC): + def __init__( + self, tasks_queue: Queue, processed_frames: DictProxy, pause_flag: Synchronized + ) -> None: + self.tasks_queue = tasks_queue + self.processed_frames = processed_frames + self.pause_flag = pause_flag + + @abstractmethod + def process(self): + raise NotImplementedError + + @staticmethod + def get_image_diff(image0: Image.Image, image1: Image.Image) -> float: + """ + get the percentage difference between two images + + :param image0 Image.Image: the image to compare + :param image1 Image.Image: the image to compare against + :rtype float: precentage difference between two frames + """ + difference_stat = ImageStat.Stat(ImageChops.difference(image0, image1)) + return sum(difference_stat.mean) / (len(difference_stat.mean) * 255) * 100 + + """ + def run( + self, + ) -> None: + self.running = True + while self.running is True: + self.process() + self.running = False + return super().run() + + def stop(self, _signal_number, _frame) -> None: + self.running = False + """ diff --git a/video2x/upscaler.py b/video2x/upscaler.py index 8e84eac..c108db9 100755 --- a/video2x/upscaler.py +++ b/video2x/upscaler.py @@ -19,189 +19,184 @@ along with this program. If not, see . Name: Upscaler Author: K4YT3X Date Created: May 27, 2021 -Last Modified: March 20, 2022 +Last Modified: April 10, 2022 """ import math -import multiprocessing -import queue -import signal import time -from multiprocessing.managers import ListProxy -from multiprocessing.sharedctypes import Synchronized -from loguru import logger -from PIL import Image, ImageChops, ImageStat +from PIL import Image from realcugan_ncnn_vulkan_python import Realcugan from realsr_ncnn_vulkan_python import Realsr from srmd_ncnn_vulkan_python import Srmd from waifu2x_ncnn_vulkan_python import Waifu2x -# fixed scaling ratios supported by the algorithms -# that only support certain fixed scale ratios -ALGORITHM_FIXED_SCALING_RATIOS = { - "waifu2x": [1, 2], - "srmd": [2, 3, 4], - "realsr": [4], - "realcugan": [1, 2, 3, 4], -} - -ALGORITHM_CLASSES = { - "waifu2x": Waifu2x, - "srmd": Srmd, - "realsr": Realsr, - "realcugan": Realcugan, -} +from .processor import Processor -class Upscaler(multiprocessing.Process): - def __init__( - self, - processing_queue: multiprocessing.Queue, - processed_frames: ListProxy, - pause: Synchronized, - ) -> None: - multiprocessing.Process.__init__(self) - self.running = False - self.processing_queue = processing_queue - self.processed_frames = processed_frames - self.pause = pause +class Upscaler: + # fixed scaling ratios supported by the algorithms + # that only support certain fixed scale ratios + ALGORITHM_FIXED_SCALING_RATIOS = { + "waifu2x": [1, 2], + "srmd": [2, 3, 4], + "realsr": [4], + "realcugan": [1, 2, 3, 4], + } - signal.signal(signal.SIGTERM, self._stop) + ALGORITHM_CLASSES = { + "waifu2x": Waifu2x, + "srmd": Srmd, + "realsr": Realsr, + "realcugan": Realcugan, + } - def run(self) -> None: - self.running = True - logger.opt(colors=True).info( - f"Upscaler process {self.name} initiating" + processor_objects = {} + + @staticmethod + def _get_scaling_tasks( + input_width: int, + input_height: int, + output_width: int, + output_height: int, + algorithm: str, + ) -> list: + """ + Get the required tasks for upscaling the image until it is larger than + or equal to the desired output dimensions. For example, SRMD only supports + 2x, 3x, and 4x, so upsclaing an image from 320x240 to 3840x2160 will + require the SRMD to run 3x then 4x. In this case, this function will + return [3, 4]. + + :param input_width int: input image width + :param input_height int: input image height + :param output_width int: desired output image width + :param output_height int: desired output image size + :param algorithm str: upsclaing algorithm + :rtype list: the list of upsclaing tasks required + """ + # calculate required minimum scale ratio + output_scale = max(output_width / input_width, output_height / input_height) + + # select the optimal algorithm scaling ratio to use + supported_scaling_ratios = sorted( + Upscaler.ALGORITHM_FIXED_SCALING_RATIOS[algorithm] ) - processor_objects = {} - while self.running is True: + + remaining_scaling_ratio = math.ceil(output_scale) + + # if the scaling ratio is 1.0 + # apply the smallest scaling ratio available + if remaining_scaling_ratio == 1: + return [supported_scaling_ratios[0]] + + scaling_jobs = [] + while remaining_scaling_ratio > 1: + for ratio in supported_scaling_ratios: + if ratio >= remaining_scaling_ratio: + scaling_jobs.append(ratio) + remaining_scaling_ratio /= ratio + break + + else: + found = False + for i in supported_scaling_ratios: + for j in supported_scaling_ratios: + if i * j >= remaining_scaling_ratio: + scaling_jobs.extend([i, j]) + remaining_scaling_ratio /= i * j + found = True + break + if found is True: + break + + if found is False: + scaling_jobs.append(supported_scaling_ratios[-1]) + remaining_scaling_ratio /= supported_scaling_ratios[-1] + return scaling_jobs + + def upscale_image( + self, + image: Image.Image, + output_width: int, + output_height: int, + algorithm: str, + noise: int, + ) -> Image.Image: + """ + upscale an image + + :param image Image.Image: the image to upscale + :param output_width int: the desired output width + :param output_height int: the desired output height + :param algorithm str: the algorithm to use + :param noise int: the noise level (available only for some algorithms) + :rtype Image.Image: the upscaled image + """ + width, height = image.size + + for task in self._get_scaling_tasks( + width, height, output_width, output_height, algorithm + ): + + # select a processor object with the required settings + # create a new object if none are available + processor_object = self.processor_objects.get((algorithm, task)) + if processor_object is None: + processor_object = self.ALGORITHM_CLASSES[algorithm]( + noise=noise, scale=task + ) + self.processor_objects[(algorithm, task)] = processor_object + + # process the image with the selected algorithm + image = processor_object.process(image) + + # downscale the image to the desired output size and + # save the image to disk + return image.resize((output_width, output_height), Image.Resampling.LANCZOS) + + +class UpscalerProcessor(Processor, Upscaler): + def process(self) -> None: + + task = self.tasks_queue.get() + while task is not None: + try: - # pause if pause flag is set - if self.pause.value is True: + + if self.pause_flag.value is True: time.sleep(0.1) continue - try: - # get new job from queue - ( - frame_index, - (image0, image1), - ( - output_width, - output_height, - noise, - difference_threshold, - algorithm, - ), - ) = self.processing_queue.get(False) - - # destructure settings - except queue.Empty: - time.sleep(0.1) - continue + # unpack the task's values + ( + frame_index, + previous_frame, + current_frame, + (output_width, output_height, algorithm, noise, threshold), + ) = task + # calculate the %diff between the current frame and the previous frame difference_ratio = 0 - if image0 is not None: - difference = ImageChops.difference(image0, image1) - difference_stat = ImageStat.Stat(difference) - difference_ratio = ( - sum(difference_stat.mean) - / (len(difference_stat.mean) * 255) - * 100 + if previous_frame is not None: + difference_ratio = self.get_image_diff( + previous_frame, current_frame ) - # if the difference is lower than threshold - # skip this frame - if difference_ratio < difference_threshold: - - # make sure the previous frame has been processed - if frame_index > 0: - while self.processed_frames[frame_index - 1] is None: - time.sleep(0.1) + # if the difference is lower than threshold, skip this frame + if difference_ratio < threshold: # make the current image the same as the previous result - self.processed_frames[frame_index] = self.processed_frames[ - frame_index - 1 - ] + self.processed_frames[frame_index] = True # if the difference is greater than threshold # process this frame else: - width, height = image1.size - - # calculate required minimum scale ratio - output_scale = max(output_width / width, output_height / height) - - # select the optimal algorithm scaling ratio to use - supported_scaling_ratios = sorted( - ALGORITHM_FIXED_SCALING_RATIOS[algorithm] + self.processed_frames[frame_index] = self.upscale_image( + current_frame, output_width, output_height, algorithm, noise ) - remaining_scaling_ratio = math.ceil(output_scale) - scaling_jobs = [] + task = self.tasks_queue.get() - # if the scaling ratio is 1.0 - # apply the smallest scaling ratio available - if remaining_scaling_ratio == 1: - scaling_jobs.append(supported_scaling_ratios[0]) - else: - while remaining_scaling_ratio > 1: - for ratio in supported_scaling_ratios: - if ratio >= remaining_scaling_ratio: - scaling_jobs.append(ratio) - remaining_scaling_ratio /= ratio - break - - else: - found = False - for i in supported_scaling_ratios: - for j in supported_scaling_ratios: - if i * j >= remaining_scaling_ratio: - scaling_jobs.extend([i, j]) - remaining_scaling_ratio /= i * j - found = True - break - if found is True: - break - - if found is False: - scaling_jobs.append(supported_scaling_ratios[-1]) - remaining_scaling_ratio /= supported_scaling_ratios[ - -1 - ] - - for job in scaling_jobs: - - # select a processor object with the required settings - # create a new object if none are available - processor_object = processor_objects.get((algorithm, job)) - if processor_object is None: - processor_object = ALGORITHM_CLASSES[algorithm]( - noise=noise, scale=job - ) - processor_objects[(algorithm, job)] = processor_object - - # process the image with the selected algorithm - image1 = processor_object.process(image1) - - # downscale the image to the desired output size and - # save the image to disk - image1 = image1.resize((output_width, output_height), Image.LANCZOS) - self.processed_frames[frame_index] = image1 - - # send exceptions into the client connection pipe - except (SystemExit, KeyboardInterrupt): + except KeyboardInterrupt: break - - except Exception as error: - logger.exception(error) - break - - logger.opt(colors=True).info( - f"Upscaler process {self.name} terminating" - ) - return super().run() - - def _stop(self, _signal_number, _frame) -> None: - self.running = False diff --git a/video2x/video2x.py b/video2x/video2x.py index 27a0c7a..e67abb4 100755 --- a/video2x/video2x.py +++ b/video2x/video2x.py @@ -27,7 +27,7 @@ __ __ _ _ ___ __ __ Name: Video2X Creator: K4YT3X Date Created: February 24, 2018 -Last Modified: April 5, 2022 +Last Modified: August 28, 2022 Editor: BrianPetkovsek Last Modified: June 17, 2019 @@ -39,20 +39,18 @@ Editor: 28598519a Last Modified: March 23, 2020 """ -import argparse import ctypes import math -import multiprocessing -import os -import pathlib import signal import sys import time +from enum import Enum +from multiprocessing import Manager, Pool, Queue, Value +from pathlib import Path import ffmpeg from cv2 import cv2 from loguru import logger -from rich import print as rich_print from rich.console import Console from rich.file_proxy import FileProxy from rich.progress import ( @@ -65,41 +63,23 @@ from rich.progress import ( ) from rich.text import Text +from video2x.processor import Processor + from . import __version__ -from .decoder import VideoDecoder +from .decoder import VideoDecoder, VideoDecoderThread from .encoder import VideoEncoder from .interpolator import Interpolator -from .upscaler import Upscaler +from .upscaler import UpscalerProcessor # for desktop environments only # if pynput can be loaded, enable global pause hotkey support try: - import pynput + from pynput.keyboard import HotKey, Listener except ImportError: ENABLE_HOTKEY = False else: ENABLE_HOTKEY = True -LEGAL_INFO = f"""Video2X\t\t{__version__} -Author:\t\tK4YT3X -License:\tGNU AGPL v3 -Github Page:\thttps://github.com/k4yt3x/video2x -Contact:\ti@k4yt3x.com""" - -# algorithms available for upscaling tasks -UPSCALING_ALGORITHMS = [ - "waifu2x", - "srmd", - "realsr", - "realcugan", -] - -# algorithms available for frame interpolation tasks -INTERPOLATION_ALGORITHMS = ["rife"] - -# progress bar labels for different modes -MODE_LABELS = {"upscale": "Upscaling", "interpolate": "Interpolating"} - # format string for Loguru loggers LOGURU_FORMAT = ( "{time:HH:mm:ss.SSSSSS!UTC} | " @@ -119,6 +99,11 @@ class ProcessingSpeedColumn(ProgressColumn): ) +class ProcessingMode(Enum): + UPSCALE = {"label": "Upscaling", "processor": UpscalerProcessor} + INTERPOLATE = {"label": "Interpolating", "processor": Interpolator} + + class Video2X: """ Video2X class @@ -132,11 +117,11 @@ class Video2X: self.version = __version__ @staticmethod - def _get_video_info(path: pathlib.Path) -> tuple: + def _get_video_info(path: Path) -> tuple: """ get video file information with FFmpeg - :param path pathlib.Path: video file path + :param path Path: video file path :raises RuntimeError: raised when video stream isn't found """ # probe video file info @@ -160,34 +145,17 @@ class Video2X: return video_info["width"], video_info["height"], total_frames, frame_rate - def _toggle_pause(self, _signal_number: int = -1, _frame=None): - # print console messages and update the progress bar's status - if self.pause.value is False: - self.progress.update(self.task, description=self.description + " (paused)") - self.progress.stop_task(self.task) - logger.warning("Processing paused, press Ctrl+Alt+V again to resume") - - elif self.pause.value is True: - self.progress.update(self.task, description=self.description) - logger.warning("Resuming processing") - self.progress.start_task(self.task) - - # invert the value of the pause flag - with self.pause.get_lock(): - self.pause.value = not self.pause.value - def _run( self, - input_path: pathlib.Path, + input_path: Path, width: int, height: int, total_frames: int, frame_rate: float, - output_path: pathlib.Path, + output_path: Path, output_width: int, output_height: int, - Processor: object, - mode: str, + mode: ProcessingMode, processes: int, processing_settings: tuple, ) -> None: @@ -207,51 +175,40 @@ class Video2X: logger.remove() logger.add(sys.stderr, colorize=True, format=LOGURU_FORMAT) - # initialize values - self.processor_processes = [] - self.processing_queue = multiprocessing.Queue(maxsize=processes * 10) - processed_frames = multiprocessing.Manager().list([None] * total_frames) - self.processed = multiprocessing.Value("I", 0) - self.pause = multiprocessing.Value(ctypes.c_bool, False) + # TODO: add docs + tasks_queue = Queue(maxsize=processes * 10) + processed_frames = Manager().dict() + pause_flag = Value(ctypes.c_bool, False) # set up and start decoder thread logger.info("Starting video decoder") - self.decoder = VideoDecoder( + decoder = VideoDecoder( input_path, width, height, frame_rate, - self.processing_queue, - processing_settings, - self.pause, ) - self.decoder.start() + decoder_thread = VideoDecoderThread(tasks_queue, decoder, processing_settings) + decoder_thread.start() # set up and start encoder thread logger.info("Starting video encoder") - self.encoder = VideoEncoder( + encoder = VideoEncoder( input_path, frame_rate * 2 if mode == "interpolate" else frame_rate, output_path, output_width, output_height, - total_frames, - processed_frames, - self.processed, - self.pause, ) - self.encoder.start() - # create processor processes - for process_name in range(processes): - process = Processor(self.processing_queue, processed_frames, self.pause) - process.name = str(process_name) - process.daemon = True - process.start() - self.processor_processes.append(process) + # create a pool of processor processes to process the queue + processor: Processor = mode.value["processor"]( + tasks_queue, processed_frames, pause_flag + ) + processor_pool = Pool(processes, processor.process) # create progress bar - self.progress = Progress( + progress = Progress( "[progress.description]{task.description}", BarColumn(complete_style="blue", finished_style="green"), "[progress.percentage]{task.percentage:>3.0f}%", @@ -264,23 +221,42 @@ class Video2X: speed_estimate_period=300.0, disable=True, ) + task = progress.add_task(f"[cyan]{mode.value['label']}", total=total_frames) - self.description = f"[cyan]{MODE_LABELS.get(mode, 'Unknown')}" - self.task = self.progress.add_task(self.description, total=total_frames) + def _toggle_pause(_signal_number: int = -1, _frame=None): + + # allow the closure to modify external immutable flag + nonlocal pause_flag + + # print console messages and update the progress bar's status + if pause_flag.value is False: + progress.update( + task, description=f"[cyan]{mode.value['label']} (paused)" + ) + progress.stop_task(task) + logger.warning("Processing paused, press Ctrl+Alt+V again to resume") + + # the lock is already acquired + elif pause_flag.value is True: + progress.update(task, description=f"[cyan]{mode.value['label']}") + logger.warning("Resuming processing") + progress.start_task(task) + + # invert the flag + with pause_flag.get_lock(): + pause_flag.value = not pause_flag.value # allow sending SIGUSR1 to pause/resume processing - signal.signal(signal.SIGUSR1, self._toggle_pause) + signal.signal(signal.SIGUSR1, _toggle_pause) # enable global pause hotkey if it's supported if ENABLE_HOTKEY is True: # create global pause hotkey - pause_hotkey = pynput.keyboard.HotKey( - pynput.keyboard.HotKey.parse("++v"), self._toggle_pause - ) + pause_hotkey = HotKey(HotKey.parse("++v"), _toggle_pause) # create global keyboard input listener - keyboard_listener = pynput.keyboard.Listener( + keyboard_listener = Listener( on_press=( lambda key: pause_hotkey.press(keyboard_listener.canonical(key)) ), @@ -293,51 +269,52 @@ class Video2X: keyboard_listener.start() # a temporary variable that stores the exception - exception = [] + exceptions = [] try: - # wait for jobs in queue to deplete - while self.processed.value < total_frames - 1: - time.sleep(1) + # let the context manager automatically stop the progress bar + with progress: - # check processor health - for process in self.processor_processes: - if not process.is_alive(): - raise Exception("process died unexpectedly") + frame_index = 0 + while frame_index < total_frames: - # check decoder health - if not self.decoder.is_alive() and self.decoder.exception is not None: - raise Exception("decoder died unexpectedly") + current_frame = processed_frames.get(frame_index) - # check encoder health - if not self.encoder.is_alive() and self.encoder.exception is not None: - raise Exception("encoder died unexpectedly") + if pause_flag.value is True or current_frame is None: + time.sleep(0.1) + continue - # show progress bar when upscale starts - if self.progress.disable is True and self.processed.value > 0: - self.progress.disable = False - self.progress.start() + # show the progress bar after the processing starts + # reduces speed estimation inaccuracies and print overlaps + if frame_index == 0: + progress.disable = False + progress.start() - # update progress - if self.pause.value is False: - self.progress.update(self.task, completed=self.processed.value) + if current_frame is True: + encoder.write(processed_frames.get(frame_index - 1)) - self.progress.update(self.task, completed=total_frames) - self.progress.stop() - logger.info("Processing has completed") + else: + encoder.write(current_frame) + + if frame_index > 0: + del processed_frames[frame_index - 1] + + progress.update(task, completed=frame_index + 1) + frame_index += 1 # if SIGTERM is received or ^C is pressed except (SystemExit, KeyboardInterrupt) as error: - self.progress.stop() logger.warning("Exit signal received, exiting gracefully") logger.warning("Press ^C again to force terminate") - exception.append(error) + exceptions.append(error) except Exception as error: - self.progress.stop() logger.exception(error) - exception.append(error) + exceptions.append(error) + + else: + logger.info("Processing has completed") finally: @@ -346,31 +323,28 @@ class Video2X: keyboard_listener.stop() keyboard_listener.join() - # stop progress display - self.progress.stop() + # if errors have occurred, kill the FFmpeg processes + if len(exceptions) > 0: + decoder.kill() + encoder.kill() - # stop processor processes - logger.info("Stopping processor processes") - for process in self.processor_processes: - process.terminate() + # stop the decoder + decoder_thread.stop() + decoder_thread.join() - # wait for processes to finish - for process in self.processor_processes: - process.join() + # clear queue and signal processors to exit + # multiprocessing.Queue has no Queue.queue.clear + while tasks_queue.empty() is not True: + tasks_queue.get() + for _ in range(processes): + tasks_queue.put(None) - # stop encoder and decoder - logger.info("Stopping decoder and encoder threads") - self.decoder.stop() - self.encoder.stop() - self.decoder.join() - self.encoder.join() + # close and join the process pool + processor_pool.close() + processor_pool.join() - # mark processing queue as closed - self.processing_queue.close() - - # raise the error if there is any - if len(exception) > 0: - raise exception[0] + # stop the encoder + encoder.join() # restore original STDOUT and STDERR sys.stdout = original_stdout @@ -380,10 +354,14 @@ class Video2X: logger.remove() logger.add(sys.stderr, colorize=True, format=LOGURU_FORMAT) + # raise the first collected error + if len(exceptions) > 0: + raise exceptions[0] + def upscale( self, - input_path: pathlib.Path, - output_path: pathlib.Path, + input_path: Path, + output_path: Path, output_width: int, output_height: int, noise: int, @@ -416,22 +394,21 @@ class Video2X: output_path, output_width, output_height, - Upscaler, - "upscale", + ProcessingMode.UPSCALE, processes, ( output_width, output_height, + algorithm, noise, threshold, - algorithm, ), ) def interpolate( self, - input_path: pathlib.Path, - output_path: pathlib.Path, + input_path: Path, + output_path: Path, processes: int, threshold: float, algorithm: str, @@ -453,192 +430,7 @@ class Video2X: output_path, width, height, - Interpolator, - "interpolate", + ProcessingMode.INTERPOLATE, processes, (threshold, algorithm), - ) - - -def parse_arguments() -> argparse.Namespace: - """ - parse command line arguments - - :rtype argparse.Namespace: command parsing results - """ - parser = argparse.ArgumentParser( - prog="video2x", - formatter_class=argparse.ArgumentDefaultsHelpFormatter, - ) - parser.add_argument( - "--version", help="show version information and exit", action="store_true" - ) - parser.add_argument( - "-i", - "--input", - type=pathlib.Path, - help="input file/directory path", - required=True, - ) - parser.add_argument( - "-o", - "--output", - type=pathlib.Path, - help="output file/directory path", - required=True, - ) - parser.add_argument( - "-p", "--processes", type=int, help="number of processes to launch", default=1 - ) - parser.add_argument( - "-l", - "--loglevel", - choices=["trace", "debug", "info", "success", "warning", "error", "critical"], - default="info", - ) - - # upscaler arguments - action = parser.add_subparsers( - help="action to perform", dest="action", required=True - ) - - upscale = action.add_parser( - "upscale", - help="upscale a file", - formatter_class=argparse.ArgumentDefaultsHelpFormatter, - add_help=False, - ) - upscale.add_argument( - "--help", action="help", help="show this help message and exit" - ) - upscale.add_argument("-w", "--width", type=int, help="output width") - upscale.add_argument("-h", "--height", type=int, help="output height") - upscale.add_argument("-n", "--noise", type=int, help="denoise level", default=3) - upscale.add_argument( - "-a", - "--algorithm", - choices=UPSCALING_ALGORITHMS, - help="algorithm to use for upscaling", - default=UPSCALING_ALGORITHMS[0], - ) - upscale.add_argument( - "-t", - "--threshold", - type=float, - help=( - "skip if the percent difference between two adjacent frames is below this" - " value; set to 0 to process all frames" - ), - default=0, - ) - - # interpolator arguments - interpolate = action.add_parser( - "interpolate", - help="interpolate frames for file", - formatter_class=argparse.ArgumentDefaultsHelpFormatter, - add_help=False, - ) - interpolate.add_argument( - "--help", action="help", help="show this help message and exit" - ) - interpolate.add_argument( - "-a", - "--algorithm", - choices=INTERPOLATION_ALGORITHMS, - help="algorithm to use for upscaling", - default=INTERPOLATION_ALGORITHMS[0], - ) - interpolate.add_argument( - "-t", - "--threshold", - type=float, - help=( - "skip if the percent difference between two adjacent frames exceeds this" - " value; set to 100 to interpolate all frames" - ), - default=10, - ) - - return parser.parse_args() - - -def main() -> int: - """ - command line entrypoint for direct CLI invocation - - :rtype int: 0 if completed successfully, else other int - """ - - try: - # display version and lawful informaition - if "--version" in sys.argv: - rich_print(LEGAL_INFO) - return 0 - - # parse command line arguments - args = parse_arguments() - - # check input/output file paths - if not args.input.exists(): - logger.critical(f"Cannot find input file: {args.input}") - return 1 - if not args.input.is_file(): - logger.critical("Input path is not a file") - return 1 - if not args.output.parent.exists(): - logger.critical(f"Output directory does not exist: {args.output.parent}") - return 1 - - # set logger level - if os.environ.get("LOGURU_LEVEL") is None: - os.environ["LOGURU_LEVEL"] = args.loglevel.upper() - - # remove default handler - logger.remove() - - # add new sink with custom handler - logger.add(sys.stderr, colorize=True, format=LOGURU_FORMAT) - - # print package version and copyright notice - logger.opt(colors=True).info(f"Video2X {__version__}") - logger.opt(colors=True).info( - "Copyright (C) 2018-2022 K4YT3X and contributors." - ) - - # initialize video2x object - video2x = Video2X() - - if args.action == "upscale": - video2x.upscale( - args.input, - args.output, - args.width, - args.height, - args.noise, - args.processes, - args.threshold, - args.algorithm, - ) - - elif args.action == "interpolate": - video2x.interpolate( - args.input, - args.output, - args.processes, - args.threshold, - args.algorithm, - ) - - # don't print the traceback for manual terminations - except KeyboardInterrupt: - return 2 - - except Exception as error: - logger.exception(error) - return 1 - - # if no exceptions were produced - else: - logger.success("Processing completed successfully") - return 0 + ) \ No newline at end of file