Merge pull request #434 from k4yt3x/black

Formatted code with Black and updated README
This commit is contained in:
K4YT3X 2020-12-21 01:58:19 +00:00 committed by GitHub
commit 095d40b688
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 2118 additions and 1141 deletions

View File

@ -8,7 +8,7 @@
![GitHub All Releases](https://img.shields.io/github/downloads/k4yt3x/video2x/total?style=flat-square) ![GitHub All Releases](https://img.shields.io/github/downloads/k4yt3x/video2x/total?style=flat-square)
![GitHub](https://img.shields.io/github/license/k4yt3x/video2x?style=flat-square) ![GitHub](https://img.shields.io/github/license/k4yt3x/video2x?style=flat-square)
![Platforms](https://img.shields.io/badge/Platforms-Windows%20%7C%20Linux%20%7C%20macOS-blue?style=flat-square) ![Platforms](https://img.shields.io/badge/Platforms-Windows%20%7C%20Linux%20%7C%20macOS-blue?style=flat-square)
![Become A Patron!](https://img.shields.io/endpoint.svg?url=https%3A%2F%2Fshieldsio-patreon.herokuapp.com%2Fk4yt3x&style=flat-square) ![Become A Patron!](https://img.shields.io/badge/dynamic/json?color=%23e85b46&label=Patreon&query=data.attributes.patron_count&suffix=%20patrons&url=https%3A%2F%2Fwww.patreon.com%2Fapi%2Fcampaigns%2F4507807&style=flat-square)
<!--# Video2X Lossless Video Enlarger--> <!--# Video2X Lossless Video Enlarger-->
@ -232,56 +232,6 @@ Are you interested in how the idea of Video2X was born? Do you want to know the
--- ---
# Full Usage
## Video2X Options
### -h, --help
show this help message and exit
### -i INPUT, --input INPUT
source video file/directory
### -o OUTPUT, --output OUTPUT
output video file/directory
### -c CONFIG, --config CONFIG
video2x config file path
### --log LOG
log file path (default: program_directory\video2x_%Y-%m-%d_%H-%M-%S.log)
### --disable_logging
disable logging (default: False)
### -v, --version
display version, lawful information and exit
## Upscaling Options
### -d DRIVER, --driver DRIVER
upscaling driver (default: waifu2x_caffe)
Available options are:
- waifu2x_caffe
- waifu2x_converter_cpp
- waifu2x_ncnn_vulkan
- srmd_ncnn_vulkan
- realsr_ncnn_vulkan
- anime4kcpp
### -r RATIO, --ratio RATIO
scaling ratio
### -p PROCESSES, --processes PROCESSES
number of processes to use for upscaling (default: 1)
### --preserve_frames
preserve extracted and upscaled frames (default: False)
---
## License ## License
Licensed under the GNU General Public License Version 3 (GNU GPL v3) Licensed under the GNU General Public License Version 3 (GNU GPL v3)

View File

@ -41,6 +41,5 @@ class BiLogger(io.TextIOWrapper):
self.log_file.flush() self.log_file.flush()
def flush(self): def flush(self):
""" flush logger (for compability only) """flush logger (for compability only)"""
"""
pass pass

View File

@ -40,8 +40,7 @@ class ImageCleaner(threading.Thread):
self.running = False self.running = False
def run(self): def run(self):
""" Run image cleaner """Run image cleaner"""
"""
self.running = True self.running = True
while self.running: while self.running:
@ -49,8 +48,7 @@ class ImageCleaner(threading.Thread):
time.sleep(1) time.sleep(1)
def stop(self): def stop(self):
""" Stop the image cleaner """Stop the image cleaner"""
"""
self.running = False self.running = False
self.join() self.join()

View File

@ -34,7 +34,15 @@ class ProgressMonitor(threading.Thread):
def run(self): def run(self):
self.running = True self.running = True
with tqdm(total=self.upscaler.total_frames, ascii=True, desc=_('Processing: {} (pass {}/{})').format(self.upscaler.current_input_file.name, self.upscaler.current_pass, len(self.upscaler.scaling_jobs))) as progress_bar: with tqdm(
total=self.upscaler.total_frames,
ascii=True,
desc=_("Processing: {} (pass {}/{})").format(
self.upscaler.current_input_file.name,
self.upscaler.current_pass,
len(self.upscaler.scaling_jobs),
),
) as progress_bar:
# tqdm update method adds the value to the progress # tqdm update method adds the value to the progress
# bar instead of setting the value. Therefore, a delta # bar instead of setting the value. Therefore, a delta
# needs to be calculated. # needs to be calculated.
@ -42,7 +50,13 @@ class ProgressMonitor(threading.Thread):
while self.running: while self.running:
with contextlib.suppress(FileNotFoundError): with contextlib.suppress(FileNotFoundError):
upscaled_frames = [f for f in self.upscaler.upscaled_frames.iterdir() if str(f).lower().endswith(self.upscaler.extracted_frame_format.lower())] upscaled_frames = [
f
for f in self.upscaler.upscaled_frames.iterdir()
if str(f)
.lower()
.endswith(self.upscaler.extracted_frame_format.lower())
]
if len(upscaled_frames) >= 1: if len(upscaled_frames) >= 1:
self.upscaler.last_frame_upscaled = sorted(upscaled_frames)[-1] self.upscaler.last_frame_upscaled = sorted(upscaled_frames)[-1]
self.upscaler.total_frames_upscaled = len(upscaled_frames) self.upscaler.total_frames_upscaled = len(upscaled_frames)

View File

@ -43,35 +43,39 @@ from tqdm import tqdm
import magic import magic
# internationalization constants # internationalization constants
DOMAIN = 'video2x' DOMAIN = "video2x"
LOCALE_DIRECTORY = pathlib.Path(__file__).parent.absolute() / 'locale' LOCALE_DIRECTORY = pathlib.Path(__file__).parent.absolute() / "locale"
# getting default locale settings # getting default locale settings
default_locale, encoding = locale.getdefaultlocale() default_locale, encoding = locale.getdefaultlocale()
language = gettext.translation(DOMAIN, LOCALE_DIRECTORY, [default_locale], fallback=True) language = gettext.translation(
DOMAIN, LOCALE_DIRECTORY, [default_locale], fallback=True
)
language.install() language.install()
_ = language.gettext _ = language.gettext
# version information # version information
UPSCALER_VERSION = '4.4.1' UPSCALER_VERSION = "4.4.1"
# these names are consistent for # these names are consistent for
# - driver selection in command line # - driver selection in command line
# - driver wrapper file names # - driver wrapper file names
# - config file keys # - config file keys
AVAILABLE_DRIVERS = ['waifu2x_caffe', AVAILABLE_DRIVERS = [
'waifu2x_converter_cpp', "waifu2x_caffe",
'waifu2x_ncnn_vulkan', "waifu2x_converter_cpp",
'srmd_ncnn_vulkan', "waifu2x_ncnn_vulkan",
'realsr_ncnn_vulkan', "srmd_ncnn_vulkan",
'anime4kcpp'] "realsr_ncnn_vulkan",
"anime4kcpp",
]
# fixed scaling ratios supported by the drivers # fixed scaling ratios supported by the drivers
# that only support certain fixed scale ratios # that only support certain fixed scale ratios
DRIVER_FIXED_SCALING_RATIOS = { DRIVER_FIXED_SCALING_RATIOS = {
'waifu2x_ncnn_vulkan': [1, 2], "waifu2x_ncnn_vulkan": [1, 2],
'srmd_ncnn_vulkan': [2, 3, 4], "srmd_ncnn_vulkan": [2, 3, 4],
'realsr_ncnn_vulkan': [4], "realsr_ncnn_vulkan": [4],
} }
@ -91,17 +95,18 @@ class Upscaler:
driver_settings: dict, driver_settings: dict,
ffmpeg_settings: dict, ffmpeg_settings: dict,
gifski_settings: dict, gifski_settings: dict,
driver: str = 'waifu2x_caffe', driver: str = "waifu2x_caffe",
scale_ratio: float = None, scale_ratio: float = None,
scale_width: int = None, scale_width: int = None,
scale_height: int = None, scale_height: int = None,
processes: int = 1, processes: int = 1,
video2x_cache_directory: pathlib.Path = pathlib.Path(tempfile.gettempdir()) / 'video2x', video2x_cache_directory: pathlib.Path = pathlib.Path(tempfile.gettempdir())
extracted_frame_format: str = 'png', / "video2x",
output_file_name_format_string: str = '{original_file_name}_output{extension}', extracted_frame_format: str = "png",
image_output_extension: str = '.png', output_file_name_format_string: str = "{original_file_name}_output{extension}",
video_output_extension: str = '.mp4', image_output_extension: str = ".png",
preserve_frames: bool = False video_output_extension: str = ".mp4",
preserve_frames: bool = False,
): ):
# required parameters # required parameters
@ -137,109 +142,155 @@ class Upscaler:
self.last_frame_upscaled = pathlib.Path() self.last_frame_upscaled = pathlib.Path()
def create_temp_directories(self): def create_temp_directories(self):
"""create temporary directories """create temporary directories"""
"""
# if cache directory unspecified, use %TEMP%\video2x # if cache directory unspecified, use %TEMP%\video2x
if self.video2x_cache_directory is None: if self.video2x_cache_directory is None:
self.video2x_cache_directory = pathlib.Path(tempfile.gettempdir()) / 'video2x' self.video2x_cache_directory = (
pathlib.Path(tempfile.gettempdir()) / "video2x"
)
# if specified cache path exists and isn't a directory # if specified cache path exists and isn't a directory
if self.video2x_cache_directory.exists() and not self.video2x_cache_directory.is_dir(): if (
Avalon.error(_('Specified or default cache directory is a file/link')) self.video2x_cache_directory.exists()
raise FileExistsError('Specified or default cache directory is a file/link') and not self.video2x_cache_directory.is_dir()
):
Avalon.error(_("Specified or default cache directory is a file/link"))
raise FileExistsError("Specified or default cache directory is a file/link")
# if cache directory doesn't exist, try creating it # if cache directory doesn't exist, try creating it
if not self.video2x_cache_directory.exists(): if not self.video2x_cache_directory.exists():
try: try:
Avalon.debug_info(_('Creating cache directory {}').format(self.video2x_cache_directory)) Avalon.debug_info(
_("Creating cache directory {}").format(
self.video2x_cache_directory
)
)
self.video2x_cache_directory.mkdir(parents=True, exist_ok=True) self.video2x_cache_directory.mkdir(parents=True, exist_ok=True)
except Exception as exception: except Exception as exception:
Avalon.error(_('Unable to create {}').format(self.video2x_cache_directory)) Avalon.error(
_("Unable to create {}").format(self.video2x_cache_directory)
)
raise exception raise exception
# create temp directories for extracted frames and upscaled frames # create temp directories for extracted frames and upscaled frames
self.extracted_frames = pathlib.Path(tempfile.mkdtemp(dir=self.video2x_cache_directory)) self.extracted_frames = pathlib.Path(
Avalon.debug_info(_('Extracted frames are being saved to: {}').format(self.extracted_frames)) tempfile.mkdtemp(dir=self.video2x_cache_directory)
self.upscaled_frames = pathlib.Path(tempfile.mkdtemp(dir=self.video2x_cache_directory)) )
Avalon.debug_info(_('Upscaled frames are being saved to: {}').format(self.upscaled_frames)) Avalon.debug_info(
_("Extracted frames are being saved to: {}").format(self.extracted_frames)
)
self.upscaled_frames = pathlib.Path(
tempfile.mkdtemp(dir=self.video2x_cache_directory)
)
Avalon.debug_info(
_("Upscaled frames are being saved to: {}").format(self.upscaled_frames)
)
def cleanup_temp_directories(self): def cleanup_temp_directories(self):
"""delete temp directories when done """delete temp directories when done"""
"""
if not self.preserve_frames: if not self.preserve_frames:
for directory in [self.extracted_frames, self.upscaled_frames, self.video2x_cache_directory]: for directory in [
self.extracted_frames,
self.upscaled_frames,
self.video2x_cache_directory,
]:
try: try:
# avalon framework cannot be used if python is shutting down # avalon framework cannot be used if python is shutting down
# therefore, plain print is used # therefore, plain print is used
print(_('Cleaning up cache directory: {}').format(directory)) print(_("Cleaning up cache directory: {}").format(directory))
shutil.rmtree(directory) shutil.rmtree(directory)
except FileNotFoundError: except FileNotFoundError:
pass pass
except OSError: except OSError:
print(_('Unable to delete: {}').format(directory)) print(_("Unable to delete: {}").format(directory))
traceback.print_exc() traceback.print_exc()
def _check_arguments(self): def _check_arguments(self):
if isinstance(self.input, list): if isinstance(self.input, list):
if self.output.exists() and not self.output.is_dir(): if self.output.exists() and not self.output.is_dir():
Avalon.error(_('Input and output path type mismatch')) Avalon.error(_("Input and output path type mismatch"))
Avalon.error(_('Input is multiple files but output is not directory')) Avalon.error(_("Input is multiple files but output is not directory"))
raise ArgumentError('input output path type mismatch') raise ArgumentError("input output path type mismatch")
for input_path in self.input: for input_path in self.input:
if not input_path.is_file() and not input_path.is_dir(): if not input_path.is_file() and not input_path.is_dir():
Avalon.error(_('Input path {} is neither a file nor a directory').format(input_path)) Avalon.error(
raise FileNotFoundError(f'{input_path} is neither file nor directory') _("Input path {} is neither a file nor a directory").format(
input_path
)
)
raise FileNotFoundError(
f"{input_path} is neither file nor directory"
)
with contextlib.suppress(FileNotFoundError): with contextlib.suppress(FileNotFoundError):
if input_path.samefile(self.output): if input_path.samefile(self.output):
Avalon.error(_('Input directory and output directory cannot be the same')) Avalon.error(
raise FileExistsError('input directory and output directory are the same') _("Input directory and output directory cannot be the same")
)
raise FileExistsError(
"input directory and output directory are the same"
)
# if input is a file # if input is a file
elif self.input.is_file(): elif self.input.is_file():
if self.output.is_dir(): if self.output.is_dir():
Avalon.error(_('Input and output path type mismatch')) Avalon.error(_("Input and output path type mismatch"))
Avalon.error(_('Input is single file but output is directory')) Avalon.error(_("Input is single file but output is directory"))
raise ArgumentError('input output path type mismatch') raise ArgumentError("input output path type mismatch")
if self.output.suffix == '': if self.output.suffix == "":
Avalon.error(_('No suffix found in output file path')) Avalon.error(_("No suffix found in output file path"))
Avalon.error(_('Suffix must be specified')) Avalon.error(_("Suffix must be specified"))
raise ArgumentError('no output file suffix specified') raise ArgumentError("no output file suffix specified")
# if input is a directory # if input is a directory
elif self.input.is_dir(): elif self.input.is_dir():
if self.output.is_file(): if self.output.is_file():
Avalon.error(_('Input and output path type mismatch')) Avalon.error(_("Input and output path type mismatch"))
Avalon.error(_('Input is directory but output is existing single file')) Avalon.error(_("Input is directory but output is existing single file"))
raise ArgumentError('input output path type mismatch') raise ArgumentError("input output path type mismatch")
with contextlib.suppress(FileNotFoundError): with contextlib.suppress(FileNotFoundError):
if self.input.samefile(self.output): if self.input.samefile(self.output):
Avalon.error(_('Input directory and output directory cannot be the same')) Avalon.error(
raise FileExistsError('input directory and output directory are the same') _("Input directory and output directory cannot be the same")
)
raise FileExistsError(
"input directory and output directory are the same"
)
# if input is neither # if input is neither
else: else:
Avalon.error(_('Input path is neither a file nor a directory')) Avalon.error(_("Input path is neither a file nor a directory"))
raise FileNotFoundError(f'{self.input} is neither file nor directory') raise FileNotFoundError(f"{self.input} is neither file nor directory")
# check FFmpeg settings # check FFmpeg settings
ffmpeg_path = pathlib.Path(self.ffmpeg_settings['ffmpeg_path']) ffmpeg_path = pathlib.Path(self.ffmpeg_settings["ffmpeg_path"])
if not ((pathlib.Path(ffmpeg_path / 'ffmpeg.exe').is_file() and if not (
pathlib.Path(ffmpeg_path / 'ffprobe.exe').is_file()) or (
(pathlib.Path(ffmpeg_path / 'ffmpeg').is_file() and pathlib.Path(ffmpeg_path / "ffmpeg.exe").is_file()
pathlib.Path(ffmpeg_path / 'ffprobe').is_file())): and pathlib.Path(ffmpeg_path / "ffprobe.exe").is_file()
Avalon.error(_('FFmpeg or FFprobe cannot be found under the specified path')) )
Avalon.error(_('Please check the configuration file settings')) or (
raise FileNotFoundError(self.ffmpeg_settings['ffmpeg_path']) pathlib.Path(ffmpeg_path / "ffmpeg").is_file()
and pathlib.Path(ffmpeg_path / "ffprobe").is_file()
)
):
Avalon.error(
_("FFmpeg or FFprobe cannot be found under the specified path")
)
Avalon.error(_("Please check the configuration file settings"))
raise FileNotFoundError(self.ffmpeg_settings["ffmpeg_path"])
# check if driver settings # check if driver settings
driver_settings = copy.deepcopy(self.driver_settings) driver_settings = copy.deepcopy(self.driver_settings)
driver_path = driver_settings.pop('path') driver_path = driver_settings.pop("path")
# check if driver path exists # check if driver path exists
if not (pathlib.Path(driver_path).is_file() or pathlib.Path(f'{driver_path}.exe').is_file()): if not (
Avalon.error(_('Specified driver executable directory doesn\'t exist')) pathlib.Path(driver_path).is_file()
Avalon.error(_('Please check the configuration file settings')) or pathlib.Path(f"{driver_path}.exe").is_file()
):
Avalon.error(_("Specified driver executable directory doesn't exist"))
Avalon.error(_("Please check the configuration file settings"))
raise FileNotFoundError(driver_path) raise FileNotFoundError(driver_path)
# parse driver arguments using driver's parser # parse driver arguments using driver's parser
@ -255,20 +306,24 @@ class Upscaler:
else: else:
if len(key) == 1: if len(key) == 1:
driver_arguments.append(f'-{key}') driver_arguments.append(f"-{key}")
else: else:
driver_arguments.append(f'--{key}') driver_arguments.append(f"--{key}")
# true means key is an option # true means key is an option
if value is not True: if value is not True:
driver_arguments.append(str(value)) driver_arguments.append(str(value))
DriverWrapperMain = getattr(importlib.import_module(f'wrappers.{self.driver}'), 'WrapperMain') DriverWrapperMain = getattr(
importlib.import_module(f"wrappers.{self.driver}"), "WrapperMain"
)
DriverWrapperMain.parse_arguments(driver_arguments) DriverWrapperMain.parse_arguments(driver_arguments)
except AttributeError as e: except AttributeError as e:
Avalon.error(_('Failed to parse driver argument: {}').format(e.args[0])) Avalon.error(_("Failed to parse driver argument: {}").format(e.args[0]))
raise e raise e
def _upscale_frames(self, input_directory: pathlib.Path, output_directory: pathlib.Path): def _upscale_frames(
self, input_directory: pathlib.Path, output_directory: pathlib.Path
):
"""Upscale video frames with waifu2x-caffe """Upscale video frames with waifu2x-caffe
This function upscales all the frames extracted This function upscales all the frames extracted
@ -285,7 +340,9 @@ class Upscaler:
# initialize waifu2x driver # initialize waifu2x driver
if self.driver not in AVAILABLE_DRIVERS: if self.driver not in AVAILABLE_DRIVERS:
raise UnrecognizedDriverError(_('Unrecognized driver: {}').format(self.driver)) raise UnrecognizedDriverError(
_("Unrecognized driver: {}").format(self.driver)
)
# list all images in the extracted frames # list all images in the extracted frames
frames = [(input_directory / f) for f in input_directory.iterdir() if f.is_file] frames = [(input_directory / f) for f in input_directory.iterdir() if f.is_file]
@ -308,7 +365,13 @@ class Upscaler:
process_directory.mkdir(parents=True, exist_ok=True) process_directory.mkdir(parents=True, exist_ok=True)
# waifu2x-converter-cpp will perform multi-threading within its own process # waifu2x-converter-cpp will perform multi-threading within its own process
if self.driver in ['waifu2x_converter_cpp', 'waifu2x_ncnn_vulkan', 'srmd_ncnn_vulkan', 'realsr_ncnn_vulkan', 'anime4kcpp']: if self.driver in [
"waifu2x_converter_cpp",
"waifu2x_ncnn_vulkan",
"srmd_ncnn_vulkan",
"realsr_ncnn_vulkan",
"anime4kcpp",
]:
process_directories = [input_directory] process_directories = [input_directory]
else: else:
@ -318,20 +381,26 @@ class Upscaler:
# move image # move image
image.rename(process_directories[0] / image.name) image.rename(process_directories[0] / image.name)
# rotate list # rotate list
process_directories = process_directories[-1:] + process_directories[:-1] process_directories = (
process_directories[-1:] + process_directories[:-1]
)
# create driver processes and start them # create driver processes and start them
for process_directory in process_directories: for process_directory in process_directories:
self.process_pool.append(self.driver_object.upscale(process_directory, output_directory)) self.process_pool.append(
self.driver_object.upscale(process_directory, output_directory)
)
# start progress bar in a different thread # start progress bar in a different thread
Avalon.debug_info(_('Starting progress monitor')) Avalon.debug_info(_("Starting progress monitor"))
self.progress_monitor = ProgressMonitor(self, process_directories) self.progress_monitor = ProgressMonitor(self, process_directories)
self.progress_monitor.start() self.progress_monitor.start()
# create the clearer and start it # create the clearer and start it
Avalon.debug_info(_('Starting upscaled image cleaner')) Avalon.debug_info(_("Starting upscaled image cleaner"))
self.image_cleaner = ImageCleaner(input_directory, output_directory, len(self.process_pool)) self.image_cleaner = ImageCleaner(
input_directory, output_directory, len(self.process_pool)
)
self.image_cleaner.start() self.image_cleaner.start()
# wait for all process to exit # wait for all process to exit
@ -339,38 +408,39 @@ class Upscaler:
self._wait() self._wait()
except (Exception, KeyboardInterrupt, SystemExit) as e: except (Exception, KeyboardInterrupt, SystemExit) as e:
# cleanup # cleanup
Avalon.debug_info(_('Killing progress monitor')) Avalon.debug_info(_("Killing progress monitor"))
self.progress_monitor.stop() self.progress_monitor.stop()
Avalon.debug_info(_('Killing upscaled image cleaner')) Avalon.debug_info(_("Killing upscaled image cleaner"))
self.image_cleaner.stop() self.image_cleaner.stop()
raise e raise e
# if the driver is waifu2x-converter-cpp # if the driver is waifu2x-converter-cpp
# images need to be renamed to be recognizable for FFmpeg # images need to be renamed to be recognizable for FFmpeg
if self.driver == 'waifu2x_converter_cpp': if self.driver == "waifu2x_converter_cpp":
for image in [f for f in output_directory.iterdir() if f.is_file()]: for image in [f for f in output_directory.iterdir() if f.is_file()]:
renamed = re.sub(f'_\\[.*\\]\\[x(\\d+(\\.\\d+)?)\\]\\.{self.extracted_frame_format}', renamed = re.sub(
f'.{self.extracted_frame_format}', f"_\\[.*\\]\\[x(\\d+(\\.\\d+)?)\\]\\.{self.extracted_frame_format}",
str(image.name)) f".{self.extracted_frame_format}",
str(image.name),
)
(output_directory / image).rename(output_directory / renamed) (output_directory / image).rename(output_directory / renamed)
# upscaling done, kill helper threads # upscaling done, kill helper threads
Avalon.debug_info(_('Killing progress monitor')) Avalon.debug_info(_("Killing progress monitor"))
self.progress_monitor.stop() self.progress_monitor.stop()
Avalon.debug_info(_('Killing upscaled image cleaner')) Avalon.debug_info(_("Killing upscaled image cleaner"))
self.image_cleaner.stop() self.image_cleaner.stop()
def _terminate_subprocesses(self): def _terminate_subprocesses(self):
Avalon.warning(_('Terminating all processes')) Avalon.warning(_("Terminating all processes"))
for process in self.process_pool: for process in self.process_pool:
process.terminate() process.terminate()
def _wait(self): def _wait(self):
""" wait for subprocesses in process pool to complete """wait for subprocesses in process pool to complete"""
""" Avalon.debug_info(_("Main process waiting for subprocesses to exit"))
Avalon.debug_info(_('Main process waiting for subprocesses to exit'))
try: try:
# while process pool not empty # while process pool not empty
@ -389,22 +459,32 @@ class Upscaler:
# if return code is not 0 # if return code is not 0
elif process_status != 0: elif process_status != 0:
Avalon.error(_('Subprocess {} exited with code {}').format(process.pid, process_status)) Avalon.error(
raise subprocess.CalledProcessError(process_status, process.args) _("Subprocess {} exited with code {}").format(
process.pid, process_status
)
)
raise subprocess.CalledProcessError(
process_status, process.args
)
else: else:
Avalon.debug_info(_('Subprocess {} exited with code {}').format(process.pid, process_status)) Avalon.debug_info(
_("Subprocess {} exited with code {}").format(
process.pid, process_status
)
)
self.process_pool.remove(process) self.process_pool.remove(process)
time.sleep(0.1) time.sleep(0.1)
except (KeyboardInterrupt, SystemExit) as e: except (KeyboardInterrupt, SystemExit) as e:
Avalon.warning(_('Stop signal received')) Avalon.warning(_("Stop signal received"))
self._terminate_subprocesses() self._terminate_subprocesses()
raise e raise e
except (Exception, subprocess.CalledProcessError) as e: except (Exception, subprocess.CalledProcessError) as e:
Avalon.error(_('Subprocess execution ran into an error')) Avalon.error(_("Subprocess execution ran into an error"))
self._terminate_subprocesses() self._terminate_subprocesses()
raise e raise e
@ -422,20 +502,24 @@ class Upscaler:
self.process_pool = [] self.process_pool = []
# load driver modules # load driver modules
DriverWrapperMain = getattr(importlib.import_module(f'wrappers.{self.driver}'), 'WrapperMain') DriverWrapperMain = getattr(
importlib.import_module(f"wrappers.{self.driver}"), "WrapperMain"
)
self.driver_object = DriverWrapperMain(self.driver_settings) self.driver_object = DriverWrapperMain(self.driver_settings)
# load options from upscaler class into driver settings # load options from upscaler class into driver settings
self.driver_object.load_configurations(self) self.driver_object.load_configurations(self)
# initialize FFmpeg object # initialize FFmpeg object
self.ffmpeg_object = Ffmpeg(self.ffmpeg_settings, extracted_frame_format=self.extracted_frame_format) self.ffmpeg_object = Ffmpeg(
self.ffmpeg_settings, extracted_frame_format=self.extracted_frame_format
)
# define processing queue # define processing queue
self.processing_queue = queue.Queue() self.processing_queue = queue.Queue()
Avalon.info(_('Loading files into processing queue')) Avalon.info(_("Loading files into processing queue"))
Avalon.debug_info(_('Input path(s): {}').format(self.input)) Avalon.debug_info(_("Input path(s): {}").format(self.input))
# make output directory if the input is a list or a directory # make output directory if the input is a list or a directory
if isinstance(self.input, list) or self.input.is_dir(): if isinstance(self.input, list) or self.input.is_dir():
@ -470,39 +554,53 @@ class Upscaler:
# get file type # get file type
# try python-magic if it's available # try python-magic if it's available
try: try:
input_file_mime_type = magic.from_file(str(input_path.absolute()), mime=True) input_file_mime_type = magic.from_file(
input_file_type = input_file_mime_type.split('/')[0] str(input_path.absolute()), mime=True
input_file_subtype = input_file_mime_type.split('/')[1] )
input_file_type = input_file_mime_type.split("/")[0]
input_file_subtype = input_file_mime_type.split("/")[1]
except Exception: except Exception:
input_file_mime_type = input_file_type = input_file_subtype = '' input_file_mime_type = input_file_type = input_file_subtype = ""
# if python-magic doesn't determine the file to be an image/video file # if python-magic doesn't determine the file to be an image/video file
# fall back to mimetypes to guess the file type based on the extension # fall back to mimetypes to guess the file type based on the extension
if input_file_type not in ['image', 'video']: if input_file_type not in ["image", "video"]:
# in case python-magic fails to detect file type # in case python-magic fails to detect file type
# try guessing file mime type with mimetypes # try guessing file mime type with mimetypes
input_file_mime_type = mimetypes.guess_type(input_path.name)[0] input_file_mime_type = mimetypes.guess_type(input_path.name)[0]
input_file_type = input_file_mime_type.split('/')[0] input_file_type = input_file_mime_type.split("/")[0]
input_file_subtype = input_file_mime_type.split('/')[1] input_file_subtype = input_file_mime_type.split("/")[1]
Avalon.debug_info(_('File MIME type: {}').format(input_file_mime_type)) Avalon.debug_info(_("File MIME type: {}").format(input_file_mime_type))
# set default output file suffixes # set default output file suffixes
# if image type is GIF, default output suffix is also .gif # if image type is GIF, default output suffix is also .gif
if input_file_mime_type == 'image/gif': if input_file_mime_type == "image/gif":
output_path = self.output / self.output_file_name_format_string.format(original_file_name=input_path.stem, extension='.gif') output_path = self.output / self.output_file_name_format_string.format(
original_file_name=input_path.stem, extension=".gif"
)
elif input_file_type == 'image': elif input_file_type == "image":
output_path = self.output / self.output_file_name_format_string.format(original_file_name=input_path.stem, extension=self.image_output_extension) output_path = self.output / self.output_file_name_format_string.format(
original_file_name=input_path.stem,
extension=self.image_output_extension,
)
elif input_file_type == 'video': elif input_file_type == "video":
output_path = self.output / self.output_file_name_format_string.format(original_file_name=input_path.stem, extension=self.video_output_extension) output_path = self.output / self.output_file_name_format_string.format(
original_file_name=input_path.stem,
extension=self.video_output_extension,
)
# if file is none of: image, image/gif, video # if file is none of: image, image/gif, video
# skip to the next task # skip to the next task
else: else:
Avalon.error(_('File {} ({}) neither an image nor a video').format(input_path, input_file_mime_type)) Avalon.error(
Avalon.warning(_('Skipping this file')) _("File {} ({}) neither an image nor a video").format(
input_path, input_file_mime_type
)
)
Avalon.warning(_("Skipping this file"))
continue continue
# if there is only one input file # if there is only one input file
@ -512,14 +610,24 @@ class Upscaler:
output_path_id = 0 output_path_id = 0
while str(output_path) in output_paths: while str(output_path) in output_paths:
output_path = output_path.parent / pathlib.Path(f'{output_path.stem}_{output_path_id}{output_path.suffix}') output_path = output_path.parent / pathlib.Path(
f"{output_path.stem}_{output_path_id}{output_path.suffix}"
)
output_path_id += 1 output_path_id += 1
# record output path # record output path
output_paths.append(str(output_path)) output_paths.append(str(output_path))
# push file information into processing queue # push file information into processing queue
self.processing_queue.put((input_path.absolute(), output_path.absolute(), input_file_mime_type, input_file_type, input_file_subtype)) self.processing_queue.put(
(
input_path.absolute(),
output_path.absolute(),
input_file_mime_type,
input_file_type,
input_file_subtype,
)
)
# check argument sanity before running # check argument sanity before running
self._check_arguments() self._check_arguments()
@ -527,22 +635,28 @@ class Upscaler:
# record file count for external calls # record file count for external calls
self.total_files = self.processing_queue.qsize() self.total_files = self.processing_queue.qsize()
Avalon.info(_('Loaded files into processing queue')) Avalon.info(_("Loaded files into processing queue"))
# print all files in queue for debugging # print all files in queue for debugging
for job in self.processing_queue.queue: for job in self.processing_queue.queue:
Avalon.debug_info(_('Input file: {}').format(job[0].absolute())) Avalon.debug_info(_("Input file: {}").format(job[0].absolute()))
try: try:
while not self.processing_queue.empty(): while not self.processing_queue.empty():
# get new job from queue # get new job from queue
self.current_input_file, output_path, input_file_mime_type, input_file_type, input_file_subtype = self.processing_queue.get() (
self.current_input_file,
output_path,
input_file_mime_type,
input_file_type,
input_file_subtype,
) = self.processing_queue.get()
# get current job starting time for GUI calculations # get current job starting time for GUI calculations
self.current_processing_starting_time = time.time() self.current_processing_starting_time = time.time()
# get video information JSON using FFprobe # get video information JSON using FFprobe
Avalon.info(_('Reading file information')) Avalon.info(_("Reading file information"))
file_info = self.ffmpeg_object.probe_file_info(self.current_input_file) file_info = self.ffmpeg_object.probe_file_info(self.current_input_file)
# create temporary directories for storing frames # create temporary directories for storing frames
@ -550,50 +664,61 @@ class Upscaler:
# start handling input # start handling input
# if input file is a static image # if input file is a static image
if input_file_type == 'image' and input_file_subtype != 'gif': if input_file_type == "image" and input_file_subtype != "gif":
Avalon.info(_('Starting upscaling image')) Avalon.info(_("Starting upscaling image"))
# copy original file into the pre-processing directory # copy original file into the pre-processing directory
shutil.copy(self.current_input_file, self.extracted_frames / self.current_input_file.name) shutil.copy(
self.current_input_file,
self.extracted_frames / self.current_input_file.name,
)
width = int(file_info['streams'][0]['width']) width = int(file_info["streams"][0]["width"])
height = int(file_info['streams'][0]['height']) height = int(file_info["streams"][0]["height"])
framerate = self.total_frames = 1 framerate = self.total_frames = 1
# elif input_file_mime_type == 'image/gif' or input_file_type == 'video': # elif input_file_mime_type == 'image/gif' or input_file_type == 'video':
else: else:
Avalon.info(_('Starting upscaling video/GIF')) Avalon.info(_("Starting upscaling video/GIF"))
# find index of video stream # find index of video stream
video_stream_index = None video_stream_index = None
for stream in file_info['streams']: for stream in file_info["streams"]:
if stream['codec_type'] == 'video': if stream["codec_type"] == "video":
video_stream_index = stream['index'] video_stream_index = stream["index"]
break break
# exit if no video stream found # exit if no video stream found
if video_stream_index is None: if video_stream_index is None:
Avalon.error(_('Aborting: No video stream found')) Avalon.error(_("Aborting: No video stream found"))
raise StreamNotFoundError('no video stream found') raise StreamNotFoundError("no video stream found")
# get average frame rate of video stream # get average frame rate of video stream
framerate = float(Fraction(file_info['streams'][video_stream_index]['r_frame_rate'])) framerate = float(
width = int(file_info['streams'][video_stream_index]['width']) Fraction(
height = int(file_info['streams'][video_stream_index]['height']) file_info["streams"][video_stream_index]["r_frame_rate"]
)
)
width = int(file_info["streams"][video_stream_index]["width"])
height = int(file_info["streams"][video_stream_index]["height"])
# get total number of frames # get total number of frames
Avalon.info(_('Getting total number of frames in the file')) Avalon.info(_("Getting total number of frames in the file"))
# if container stores total number of frames in nb_frames, fetch it directly # if container stores total number of frames in nb_frames, fetch it directly
if 'nb_frames' in file_info['streams'][video_stream_index]: if "nb_frames" in file_info["streams"][video_stream_index]:
self.total_frames = int(file_info['streams'][video_stream_index]['nb_frames']) self.total_frames = int(
file_info["streams"][video_stream_index]["nb_frames"]
)
# otherwise call FFprobe to count the total number of frames # otherwise call FFprobe to count the total number of frames
else: else:
self.total_frames = self.ffmpeg_object.get_number_of_frames(self.current_input_file, video_stream_index) self.total_frames = self.ffmpeg_object.get_number_of_frames(
self.current_input_file, video_stream_index
)
# calculate scale width/height/ratio and scaling jobs if required # calculate scale width/height/ratio and scaling jobs if required
Avalon.info(_('Calculating scaling parameters')) Avalon.info(_("Calculating scaling parameters"))
# create a local copy of the global output settings # create a local copy of the global output settings
output_scale = self.scale_ratio output_scale = self.scale_ratio
@ -624,7 +749,9 @@ class Upscaler:
if self.driver in DRIVER_FIXED_SCALING_RATIOS: if self.driver in DRIVER_FIXED_SCALING_RATIOS:
# select the optimal driver scaling ratio to use # select the optimal driver scaling ratio to use
supported_scaling_ratios = sorted(DRIVER_FIXED_SCALING_RATIOS[self.driver]) supported_scaling_ratios = sorted(
DRIVER_FIXED_SCALING_RATIOS[self.driver]
)
remaining_scaling_ratio = math.ceil(output_scale) remaining_scaling_ratio = math.ceil(output_scale)
self.scaling_jobs = [] self.scaling_jobs = []
@ -654,46 +781,68 @@ class Upscaler:
break break
if found is False: if found is False:
self.scaling_jobs.append(supported_scaling_ratios[-1]) self.scaling_jobs.append(
remaining_scaling_ratio /= supported_scaling_ratios[-1] supported_scaling_ratios[-1]
)
remaining_scaling_ratio /= supported_scaling_ratios[
-1
]
else: else:
self.scaling_jobs = [output_scale] self.scaling_jobs = [output_scale]
# print file information # print file information
Avalon.debug_info(_('Framerate: {}').format(framerate)) Avalon.debug_info(_("Framerate: {}").format(framerate))
Avalon.debug_info(_('Width: {}').format(width)) Avalon.debug_info(_("Width: {}").format(width))
Avalon.debug_info(_('Height: {}').format(height)) Avalon.debug_info(_("Height: {}").format(height))
Avalon.debug_info(_('Total number of frames: {}').format(self.total_frames)) Avalon.debug_info(
Avalon.debug_info(_('Output width: {}').format(output_width)) _("Total number of frames: {}").format(self.total_frames)
Avalon.debug_info(_('Output height: {}').format(output_height)) )
Avalon.debug_info(_('Required scale ratio: {}').format(output_scale)) Avalon.debug_info(_("Output width: {}").format(output_width))
Avalon.debug_info(_('Upscaling jobs queue: {}').format(self.scaling_jobs)) Avalon.debug_info(_("Output height: {}").format(output_height))
Avalon.debug_info(_("Required scale ratio: {}").format(output_scale))
Avalon.debug_info(
_("Upscaling jobs queue: {}").format(self.scaling_jobs)
)
# extract frames from video # extract frames from video
if input_file_mime_type == 'image/gif' or input_file_type == 'video': if input_file_mime_type == "image/gif" or input_file_type == "video":
self.process_pool.append((self.ffmpeg_object.extract_frames(self.current_input_file, self.extracted_frames))) self.process_pool.append(
(
self.ffmpeg_object.extract_frames(
self.current_input_file, self.extracted_frames
)
)
)
self._wait() self._wait()
# if driver is waifu2x-caffe # if driver is waifu2x-caffe
# pass pixel format output depth information # pass pixel format output depth information
if self.driver == 'waifu2x_caffe': if self.driver == "waifu2x_caffe":
# get a dict of all pixel formats and corresponding bit depth # get a dict of all pixel formats and corresponding bit depth
pixel_formats = self.ffmpeg_object.get_pixel_formats() pixel_formats = self.ffmpeg_object.get_pixel_formats()
# try getting pixel format's corresponding bti depth # try getting pixel format's corresponding bti depth
try: try:
self.driver_settings['output_depth'] = pixel_formats[self.ffmpeg_object.pixel_format] self.driver_settings["output_depth"] = pixel_formats[
self.ffmpeg_object.pixel_format
]
except KeyError: except KeyError:
Avalon.error(_('Unsupported pixel format: {}').format(self.ffmpeg_object.pixel_format)) Avalon.error(
raise UnsupportedPixelError(f'unsupported pixel format {self.ffmpeg_object.pixel_format}') _("Unsupported pixel format: {}").format(
self.ffmpeg_object.pixel_format
)
)
raise UnsupportedPixelError(
f"unsupported pixel format {self.ffmpeg_object.pixel_format}"
)
# upscale images one by one using waifu2x # upscale images one by one using waifu2x
Avalon.info(_('Starting to upscale extracted frames')) Avalon.info(_("Starting to upscale extracted frames"))
upscale_begin_time = time.time() upscale_begin_time = time.time()
self.current_pass = 1 self.current_pass = 1
if self.driver == 'waifu2x_caffe': if self.driver == "waifu2x_caffe":
self.driver_object.set_scale_resolution(output_width, output_height) self.driver_object.set_scale_resolution(output_width, output_height)
else: else:
self.driver_object.set_scale_ratio(self.scaling_jobs[0]) self.driver_object.set_scale_ratio(self.scaling_jobs[0])
@ -706,22 +855,39 @@ class Upscaler:
self.upscaled_frames.mkdir(parents=True, exist_ok=True) self.upscaled_frames.mkdir(parents=True, exist_ok=True)
self._upscale_frames(self.extracted_frames, self.upscaled_frames) self._upscale_frames(self.extracted_frames, self.upscaled_frames)
Avalon.info(_('Upscaling completed')) Avalon.info(_("Upscaling completed"))
Avalon.info(_('Average processing speed: {} seconds per frame').format(self.total_frames / (time.time() - upscale_begin_time))) Avalon.info(
_("Average processing speed: {} seconds per frame").format(
self.total_frames / (time.time() - upscale_begin_time)
)
)
# downscale frames with Lanczos # downscale frames with Lanczos
Avalon.info(_('Lanczos downscaling frames')) Avalon.info(_("Lanczos downscaling frames"))
shutil.rmtree(self.extracted_frames) shutil.rmtree(self.extracted_frames)
shutil.move(self.upscaled_frames, self.extracted_frames) shutil.move(self.upscaled_frames, self.extracted_frames)
self.upscaled_frames.mkdir(parents=True, exist_ok=True) self.upscaled_frames.mkdir(parents=True, exist_ok=True)
for image in tqdm([i for i in self.extracted_frames.iterdir() if i.is_file() and i.name.endswith(self.extracted_frame_format)], ascii=True, desc=_('Downscaling')): for image in tqdm(
[
i
for i in self.extracted_frames.iterdir()
if i.is_file() and i.name.endswith(self.extracted_frame_format)
],
ascii=True,
desc=_("Downscaling"),
):
image_object = Image.open(image) image_object = Image.open(image)
# if the image dimensions are not equal to the output size # if the image dimensions are not equal to the output size
# resize the image using Lanczos # resize the image using Lanczos
if (image_object.width, image_object.height) != (output_width, output_height): if (image_object.width, image_object.height) != (
image_object.resize((output_width, output_height), Image.LANCZOS).save(self.upscaled_frames / image.name) output_width,
output_height,
):
image_object.resize(
(output_width, output_height), Image.LANCZOS
).save(self.upscaled_frames / image.name)
image_object.close() image_object.close()
# if the image's dimensions are already equal to the output size # if the image's dimensions are already equal to the output size
@ -732,71 +898,117 @@ class Upscaler:
# start handling output # start handling output
# output can be either GIF or video # output can be either GIF or video
if input_file_type == 'image' and input_file_subtype != 'gif': if input_file_type == "image" and input_file_subtype != "gif":
Avalon.info(_('Exporting image')) Avalon.info(_("Exporting image"))
# there should be only one image in the directory # there should be only one image in the directory
shutil.move([f for f in self.upscaled_frames.iterdir() if f.is_file()][0], output_path) shutil.move(
[f for f in self.upscaled_frames.iterdir() if f.is_file()][0],
output_path,
)
# elif input_file_mime_type == 'image/gif' or input_file_type == 'video': # elif input_file_mime_type == 'image/gif' or input_file_type == 'video':
else: else:
# if the desired output is gif file # if the desired output is gif file
if output_path.suffix.lower() == '.gif': if output_path.suffix.lower() == ".gif":
Avalon.info(_('Converting extracted frames into GIF image')) Avalon.info(_("Converting extracted frames into GIF image"))
gifski_object = Gifski(self.gifski_settings) gifski_object = Gifski(self.gifski_settings)
self.process_pool.append(gifski_object.make_gif(self.upscaled_frames, output_path, framerate, self.extracted_frame_format, output_width, output_height)) self.process_pool.append(
gifski_object.make_gif(
self.upscaled_frames,
output_path,
framerate,
self.extracted_frame_format,
output_width,
output_height,
)
)
self._wait() self._wait()
Avalon.info(_('Conversion completed')) Avalon.info(_("Conversion completed"))
# if the desired output is video # if the desired output is video
else: else:
# frames to video # frames to video
Avalon.info(_('Converting extracted frames into video')) Avalon.info(_("Converting extracted frames into video"))
self.process_pool.append(self.ffmpeg_object.assemble_video(framerate, self.upscaled_frames)) self.process_pool.append(
self.ffmpeg_object.assemble_video(
framerate, self.upscaled_frames
)
)
# f'{scale_width}x{scale_height}' # f'{scale_width}x{scale_height}'
self._wait() self._wait()
Avalon.info(_('Conversion completed')) Avalon.info(_("Conversion completed"))
try: try:
# migrate audio tracks and subtitles # migrate audio tracks and subtitles
Avalon.info(_('Migrating audio, subtitles and other streams to upscaled video')) Avalon.info(
self.process_pool.append(self.ffmpeg_object.migrate_streams(self.current_input_file, _(
"Migrating audio, subtitles and other streams to upscaled video"
)
)
self.process_pool.append(
self.ffmpeg_object.migrate_streams(
self.current_input_file,
output_path, output_path,
self.upscaled_frames)) self.upscaled_frames,
)
)
self._wait() self._wait()
# if failed to copy streams # if failed to copy streams
# use file with only video stream # use file with only video stream
except subprocess.CalledProcessError: except subprocess.CalledProcessError:
traceback.print_exc() traceback.print_exc()
Avalon.error(_('Failed to migrate streams')) Avalon.error(_("Failed to migrate streams"))
Avalon.warning(_('Trying to output video without additional streams')) Avalon.warning(
_("Trying to output video without additional streams")
)
if input_file_mime_type == 'image/gif': if input_file_mime_type == "image/gif":
# copy will overwrite destination content if exists # copy will overwrite destination content if exists
shutil.copy(self.upscaled_frames / self.ffmpeg_object.intermediate_file_name, output_path) shutil.copy(
self.upscaled_frames
/ self.ffmpeg_object.intermediate_file_name,
output_path,
)
else: else:
# construct output file path # construct output file path
output_file_name = f'{output_path.stem}{self.ffmpeg_object.intermediate_file_name.suffix}' output_file_name = f"{output_path.stem}{self.ffmpeg_object.intermediate_file_name.suffix}"
output_video_path = output_path.parent / output_file_name output_video_path = (
output_path.parent / output_file_name
)
# if output file already exists # if output file already exists
# create temporary directory in output folder # create temporary directory in output folder
# temporary directories generated by tempfile are guaranteed to be unique # temporary directories generated by tempfile are guaranteed to be unique
# and won't conflict with other files # and won't conflict with other files
if output_video_path.exists(): if output_video_path.exists():
Avalon.error(_('Output video file exists')) Avalon.error(_("Output video file exists"))
temporary_directory = pathlib.Path(tempfile.mkdtemp(dir=output_path.parent)) temporary_directory = pathlib.Path(
output_video_path = temporary_directory / output_file_name tempfile.mkdtemp(dir=output_path.parent)
Avalon.info(_('Created temporary directory to contain file')) )
output_video_path = (
temporary_directory / output_file_name
)
Avalon.info(
_("Created temporary directory to contain file")
)
# move file to new destination # move file to new destination
Avalon.info(_('Writing intermediate file to: {}').format(output_video_path.absolute())) Avalon.info(
shutil.move(self.upscaled_frames / self.ffmpeg_object.intermediate_file_name, output_video_path) _("Writing intermediate file to: {}").format(
output_video_path.absolute()
)
)
shutil.move(
self.upscaled_frames
/ self.ffmpeg_object.intermediate_file_name,
output_video_path,
)
# increment total number of files processed # increment total number of files processed
self.cleanup_temp_directories() self.cleanup_temp_directories()

View File

@ -71,75 +71,143 @@ import yaml
from avalon_framework import Avalon from avalon_framework import Avalon
# internationalization constants # internationalization constants
DOMAIN = 'video2x' DOMAIN = "video2x"
LOCALE_DIRECTORY = pathlib.Path(__file__).parent.absolute() / 'locale' LOCALE_DIRECTORY = pathlib.Path(__file__).parent.absolute() / "locale"
# getting default locale settings # getting default locale settings
default_locale, encoding = locale.getdefaultlocale() default_locale, encoding = locale.getdefaultlocale()
language = gettext.translation(DOMAIN, LOCALE_DIRECTORY, [default_locale], fallback=True) language = gettext.translation(
DOMAIN, LOCALE_DIRECTORY, [default_locale], fallback=True
)
language.install() language.install()
_ = language.gettext _ = language.gettext
CLI_VERSION = '4.3.1' CLI_VERSION = "4.3.1"
LEGAL_INFO = _('''Video2X CLI Version: {} LEGAL_INFO = _(
"""Video2X CLI Version: {}
Upscaler Version: {} Upscaler Version: {}
Author: K4YT3X Author: K4YT3X
License: GNU GPL v3 License: GNU GPL v3
Github Page: https://github.com/k4yt3x/video2x Github Page: https://github.com/k4yt3x/video2x
Contact: k4yt3x@k4yt3x.com''').format(CLI_VERSION, UPSCALER_VERSION) Contact: k4yt3x@k4yt3x.com"""
).format(CLI_VERSION, UPSCALER_VERSION)
LOGO = r''' LOGO = r"""
__ __ _ _ ___ __ __ __ __ _ _ ___ __ __
\ \ / / (_) | | |__ \ \ \ / / \ \ / / (_) | | |__ \ \ \ / /
\ \ / / _ __| | ___ ___ ) | \ V / \ \ / / _ __| | ___ ___ ) | \ V /
\ \/ / | | / _` | / _ \ / _ \ / / > < \ \/ / | | / _` | / _ \ / _ \ / / > <
\ / | | | (_| | | __/ | (_) | / /_ / . \ \ / | | | (_| | | __/ | (_) | / /_ / . \
\/ |_| \__,_| \___| \___/ |____| /_/ \_\ \/ |_| \__,_| \___| \___/ |____| /_/ \_\
''' """
def parse_arguments(): def parse_arguments():
""" parse CLI arguments """parse CLI arguments"""
""" parser = argparse.ArgumentParser(
parser = argparse.ArgumentParser(prog='video2x', formatter_class=argparse.ArgumentDefaultsHelpFormatter, add_help=False) prog="video2x",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
add_help=False,
)
# video options # video options
video2x_options = parser.add_argument_group(_('Video2X Options')) video2x_options = parser.add_argument_group(_("Video2X Options"))
video2x_options.add_argument('--help', action='help', help=_('show this help message and exit'))
video2x_options.add_argument(
"--help", action="help", help=_("show this help message and exit")
)
# if help is in arguments list # if help is in arguments list
# do not require input and output path to be specified # do not require input and output path to be specified
require_input_output = True require_input_output = True
if '-h' in sys.argv or '--help' in sys.argv: if "-h" in sys.argv or "--help" in sys.argv:
require_input_output = False require_input_output = False
video2x_options.add_argument('-i', '--input', type=pathlib.Path, help=_('source video file/directory'), required=require_input_output)
video2x_options.add_argument('-o', '--output', type=pathlib.Path, help=_('output video file/directory'), required=require_input_output)
video2x_options.add_argument('-c', '--config', type=pathlib.Path, help=_('Video2X config file path'), action='store', video2x_options.add_argument(
default=pathlib.Path(__file__).parent.absolute() / 'video2x.yaml') "-i",
video2x_options.add_argument('--log', type=pathlib.Path, help=_('log file path')) "--input",
video2x_options.add_argument('-v', '--version', help=_('display version, lawful information and exit'), action='store_true') type=pathlib.Path,
help=_("source video file/directory"),
required=require_input_output,
)
video2x_options.add_argument(
"-o",
"--output",
type=pathlib.Path,
help=_("output video file/directory"),
required=require_input_output,
)
video2x_options.add_argument(
"-c",
"--config",
type=pathlib.Path,
help=_("Video2X config file path"),
action="store",
default=pathlib.Path(__file__).parent.absolute() / "video2x.yaml",
)
video2x_options.add_argument("--log", type=pathlib.Path, help=_("log file path"))
video2x_options.add_argument(
"-v",
"--version",
help=_("display version, lawful information and exit"),
action="store_true",
)
# scaling options # scaling options
upscaling_options = parser.add_argument_group(_('Upscaling Options')) upscaling_options = parser.add_argument_group(_("Upscaling Options"))
upscaling_options.add_argument('-r', '--ratio', help=_('scaling ratio'), action='store', type=float)
upscaling_options.add_argument('-w', '--width', help=_('output width'), action='store', type=float) upscaling_options.add_argument(
upscaling_options.add_argument('-h', '--height', help=_('output height'), action='store', type=float) "-r", "--ratio", help=_("scaling ratio"), action="store", type=float
upscaling_options.add_argument('-d', '--driver', help=_('upscaling driver'), choices=AVAILABLE_DRIVERS, default='waifu2x_ncnn_vulkan') )
upscaling_options.add_argument('-p', '--processes', help=_('number of processes to use for upscaling'), action='store', type=int, default=1)
upscaling_options.add_argument('--preserve_frames', help=_('preserve extracted and upscaled frames'), action='store_true') upscaling_options.add_argument(
"-w", "--width", help=_("output width"), action="store", type=float
)
upscaling_options.add_argument(
"-h", "--height", help=_("output height"), action="store", type=float
)
upscaling_options.add_argument(
"-d",
"--driver",
help=_("upscaling driver"),
choices=AVAILABLE_DRIVERS,
default="waifu2x_ncnn_vulkan",
)
upscaling_options.add_argument(
"-p",
"--processes",
help=_("number of processes to use for upscaling"),
action="store",
type=int,
default=1,
)
upscaling_options.add_argument(
"--preserve_frames",
help=_("preserve extracted and upscaled frames"),
action="store_true",
)
# if no driver arguments are specified # if no driver arguments are specified
if '--' not in sys.argv: if "--" not in sys.argv:
video2x_args = parser.parse_args() video2x_args = parser.parse_args()
return video2x_args, None return video2x_args, None
# if driver arguments are specified # if driver arguments are specified
else: else:
video2x_args = parser.parse_args(sys.argv[1:sys.argv.index('--')]) video2x_args = parser.parse_args(sys.argv[1 : sys.argv.index("--")])
wrapper = getattr(importlib.import_module(f'wrappers.{video2x_args.driver}'), 'WrapperMain') wrapper = getattr(
driver_args = wrapper.parse_arguments(sys.argv[sys.argv.index('--') + 1:]) importlib.import_module(f"wrappers.{video2x_args.driver}"), "WrapperMain"
)
driver_args = wrapper.parse_arguments(sys.argv[sys.argv.index("--") + 1 :])
return video2x_args, driver_args return video2x_args, driver_args
@ -160,16 +228,16 @@ def read_config(config_file: pathlib.Path) -> dict:
dict -- dictionary of video2x configuration dict -- dictionary of video2x configuration
""" """
with open(config_file, 'r') as config: with open(config_file, "r") as config:
return yaml.load(config, Loader=yaml.FullLoader) return yaml.load(config, Loader=yaml.FullLoader)
# /////////////////// Execution /////////////////// # # /////////////////// Execution /////////////////// #
# this is not a library # this is not a library
if __name__ != '__main__': if __name__ != "__main__":
Avalon.error(_('This file cannot be imported')) Avalon.error(_("This file cannot be imported"))
raise ImportError(f'{__file__} cannot be imported') raise ImportError(f"{__file__} cannot be imported")
# print video2x logo # print video2x logo
print_logo() print_logo()
@ -183,15 +251,19 @@ if video2x_args.version:
sys.exit(0) sys.exit(0)
# additional checks on upscaling arguments # additional checks on upscaling arguments
if video2x_args.ratio is not None and (video2x_args.width is not None or video2x_args.height is not None): if video2x_args.ratio is not None and (
Avalon.error(_('Specify either scaling ratio or scaling resolution, not both')) video2x_args.width is not None or video2x_args.height is not None
):
Avalon.error(_("Specify either scaling ratio or scaling resolution, not both"))
sys.exit(1) sys.exit(1)
# redirect output to both terminal and log file # redirect output to both terminal and log file
if video2x_args.log is not None: if video2x_args.log is not None:
log_file = video2x_args.log.open(mode='a+', encoding='utf-8') log_file = video2x_args.log.open(mode="a+", encoding="utf-8")
else: else:
log_file = tempfile.TemporaryFile(mode='a+', suffix='.log', prefix='video2x_', encoding='utf-8') log_file = tempfile.TemporaryFile(
mode="a+", suffix=".log", prefix="video2x_", encoding="utf-8"
)
original_stdout = sys.stdout original_stdout = sys.stdout
original_stderr = sys.stderr original_stderr = sys.stderr
@ -203,22 +275,22 @@ config = read_config(video2x_args.config)
# load waifu2x configuration # load waifu2x configuration
driver_settings = config[video2x_args.driver] driver_settings = config[video2x_args.driver]
driver_settings['path'] = os.path.expandvars(driver_settings['path']) driver_settings["path"] = os.path.expandvars(driver_settings["path"])
# read FFmpeg configuration # read FFmpeg configuration
ffmpeg_settings = config['ffmpeg'] ffmpeg_settings = config["ffmpeg"]
ffmpeg_settings['ffmpeg_path'] = os.path.expandvars(ffmpeg_settings['ffmpeg_path']) ffmpeg_settings["ffmpeg_path"] = os.path.expandvars(ffmpeg_settings["ffmpeg_path"])
# read Gifski configuration # read Gifski configuration
gifski_settings = config['gifski'] gifski_settings = config["gifski"]
gifski_settings['gifski_path'] = os.path.expandvars(gifski_settings['gifski_path']) gifski_settings["gifski_path"] = os.path.expandvars(gifski_settings["gifski_path"])
# load video2x settings # load video2x settings
extracted_frame_format = config['video2x']['extracted_frame_format'].lower() extracted_frame_format = config["video2x"]["extracted_frame_format"].lower()
output_file_name_format_string = config['video2x']['output_file_name_format_string'] output_file_name_format_string = config["video2x"]["output_file_name_format_string"]
image_output_extension = config['video2x']['image_output_extension'] image_output_extension = config["video2x"]["image_output_extension"]
video_output_extension = config['video2x']['video_output_extension'] video_output_extension = config["video2x"]["video_output_extension"]
preserve_frames = config['video2x']['preserve_frames'] preserve_frames = config["video2x"]["preserve_frames"]
# if preserve frames specified in command line # if preserve frames specified in command line
# overwrite config file options # overwrite config file options
@ -227,10 +299,10 @@ if video2x_args.preserve_frames is True:
# if cache directory not specified # if cache directory not specified
# use default path: %TEMP%\video2x # use default path: %TEMP%\video2x
if config['video2x']['video2x_cache_directory'] is None: if config["video2x"]["video2x_cache_directory"] is None:
video2x_cache_directory = (pathlib.Path(tempfile.gettempdir()) / 'video2x') video2x_cache_directory = pathlib.Path(tempfile.gettempdir()) / "video2x"
else: else:
video2x_cache_directory = pathlib.Path(config['video2x']['video2x_cache_directory']) video2x_cache_directory = pathlib.Path(config["video2x"]["video2x_cache_directory"])
# overwrite driver_settings with driver_args # overwrite driver_settings with driver_args
if driver_args is not None: if driver_args is not None:
@ -252,7 +324,6 @@ try:
driver_settings=driver_settings, driver_settings=driver_settings,
ffmpeg_settings=ffmpeg_settings, ffmpeg_settings=ffmpeg_settings,
gifski_settings=gifski_settings, gifski_settings=gifski_settings,
# optional parameters # optional parameters
driver=video2x_args.driver, driver=video2x_args.driver,
scale_ratio=video2x_args.ratio, scale_ratio=video2x_args.ratio,
@ -264,17 +335,21 @@ try:
output_file_name_format_string=output_file_name_format_string, output_file_name_format_string=output_file_name_format_string,
image_output_extension=image_output_extension, image_output_extension=image_output_extension,
video_output_extension=video_output_extension, video_output_extension=video_output_extension,
preserve_frames=preserve_frames preserve_frames=preserve_frames,
) )
# run upscaler # run upscaler
upscaler.run() upscaler.run()
Avalon.info(_('Program completed, taking {} seconds').format(round((time.time() - begin_time), 5))) Avalon.info(
_("Program completed, taking {} seconds").format(
round((time.time() - begin_time), 5)
)
)
except Exception: except Exception:
Avalon.error(_('An exception has occurred')) Avalon.error(_("An exception has occurred"))
traceback.print_exc() traceback.print_exc()
if video2x_args.log is not None: if video2x_args.log is not None:
@ -284,12 +359,12 @@ except Exception:
# tempfile.TempFile does not have a name attribute and is not guaranteed to have # tempfile.TempFile does not have a name attribute and is not guaranteed to have
# a visible name on the file system # a visible name on the file system
else: else:
log_file_path = tempfile.mkstemp(suffix='.log', prefix='video2x_')[1] log_file_path = tempfile.mkstemp(suffix=".log", prefix="video2x_")[1]
with open(log_file_path, 'w', encoding='utf-8') as permanent_log_file: with open(log_file_path, "w", encoding="utf-8") as permanent_log_file:
log_file.seek(0) log_file.seek(0)
permanent_log_file.write(log_file.read()) permanent_log_file.write(log_file.read())
Avalon.error(_('The error log file can be found at: {}').format(log_file_path)) Avalon.error(_("The error log file can be found at: {}").format(log_file_path))
finally: finally:
sys.stdout = original_stdout sys.stdout = original_stdout

File diff suppressed because it is too large Load Diff

View File

@ -27,8 +27,7 @@ from avalon_framework import Avalon
class WrapperMain: class WrapperMain:
""" Anime4K CPP wrapper """Anime4K CPP wrapper"""
"""
def __init__(self, driver_settings): def __init__(self, driver_settings):
self.driver_settings = driver_settings self.driver_settings = driver_settings
@ -38,53 +37,55 @@ class WrapperMain:
def zero_to_one_float(value): def zero_to_one_float(value):
value = float(value) value = float(value)
if value < 0.0 or value > 1.0: if value < 0.0 or value > 1.0:
raise argparse.ArgumentTypeError(f'{value} is not between 0.0 and 1.0') raise argparse.ArgumentTypeError(f"{value} is not between 0.0 and 1.0")
return value return value
@staticmethod @staticmethod
def parse_arguments(arguments): def parse_arguments(arguments):
# fmt: off
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter, add_help=False) parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter, add_help=False)
parser.error = lambda message: (_ for _ in ()).throw(AttributeError(message)) parser.error = lambda message: (_ for _ in ()).throw(AttributeError(message))
parser.add_argument('--help', action='help', help='show this help message and exit') parser.add_argument("--help", action="help", help="show this help message and exit")
parser.add_argument('-i', '--input', type=str, help=argparse.SUPPRESS) # help='File for loading') parser.add_argument("-i", "--input", type=str, help=argparse.SUPPRESS) # help="File for loading")
parser.add_argument('-o', '--output', type=str, help=argparse.SUPPRESS) # help='File for outputting') parser.add_argument("-o", "--output", type=str, help=argparse.SUPPRESS) # help="File for outputting")
parser.add_argument('-p', '--passes', type=int, help='Passes for processing') parser.add_argument("-p", "--passes", type=int, help="Passes for processing")
parser.add_argument('-n', '--pushColorCount', type=int, help='Limit the number of color pushes') parser.add_argument("-n", "--pushColorCount", type=int, help="Limit the number of color pushes")
parser.add_argument('-c', '--strengthColor', type=WrapperMain.zero_to_one_float, help='Strength for pushing color,range 0 to 1,higher for thinner') parser.add_argument("-c", "--strengthColor", type=WrapperMain.zero_to_one_float, help="Strength for pushing color,range 0 to 1,higher for thinner")
parser.add_argument('-g', '--strengthGradient', type=WrapperMain.zero_to_one_float, help='Strength for pushing gradient,range 0 to 1,higher for sharper') parser.add_argument("-g", "--strengthGradient", type=WrapperMain.zero_to_one_float, help="Strength for pushing gradient,range 0 to 1,higher for sharper")
parser.add_argument('-z', '--zoomFactor', type=float, help='zoom factor for resizing') parser.add_argument("-z", "--zoomFactor", type=float, help="zoom factor for resizing")
parser.add_argument('-t', '--threads', type=int, help='Threads count for video processing') parser.add_argument("-t", "--threads", type=int, help="Threads count for video processing")
parser.add_argument('-f', '--fastMode', action='store_true', help='Faster but maybe low quality') parser.add_argument("-f", "--fastMode", action="store_true", help="Faster but maybe low quality")
parser.add_argument('-v', '--videoMode', action='store_true', help='Video process') parser.add_argument("-v", "--videoMode", action="store_true", help="Video process")
parser.add_argument('-s', '--preview', action='store_true', help='Preview image') parser.add_argument("-s", "--preview", action="store_true", help="Preview image")
parser.add_argument('-b', '--preprocessing', action='store_true', help='Enable pre processing') parser.add_argument("-b", "--preprocessing", action="store_true", help="Enable pre processing")
parser.add_argument('-a', '--postprocessing', action='store_true', help='Enable post processing') parser.add_argument("-a", "--postprocessing", action="store_true", help="Enable post processing")
parser.add_argument('-r', '--preFilters', type=int, help='Enhancement filter, only working when preProcessing is true,there are 5 options by binary:Median blur=0000001, Mean blur=0000010, CAS Sharpening=0000100, Gaussian blur weak=0001000, Gaussian blur=0010000, Bilateral filter=0100000, Bilateral filter faster=1000000, you can freely combine them, eg: Gaussian blur weak + Bilateral filter = 0001000 | 0100000 = 0101000 = 40(D)') parser.add_argument("-r", "--preFilters", type=int, help="Enhancement filter, only working when preProcessing is true,there are 5 options by binary:Median blur=0000001, Mean blur=0000010, CAS Sharpening=0000100, Gaussian blur weak=0001000, Gaussian blur=0010000, Bilateral filter=0100000, Bilateral filter faster=1000000, you can freely combine them, eg: Gaussian blur weak + Bilateral filter = 0001000 | 0100000 = 0101000 = 40(D)")
parser.add_argument('-e', '--postFilters', type=int, help='Enhancement filter, only working when postProcessing is true,there are 5 options by binary:Median blur=0000001, Mean blur=0000010, CAS Sharpening=0000100, Gaussian blur weak=0001000, Gaussian blur=0010000, Bilateral filter=0100000, Bilateral filter faster=1000000, you can freely combine them, eg: Gaussian blur weak + Bilateral filter = 0001000 | 0100000 = 0101000 = 40(D), so you can put 40 to enable Gaussian blur weak and Bilateral filter, which also is what I recommend for image that < 1080P, 48 for image that >= 1080P, and for performance I recommend to use 72 for video that < 1080P, 80 for video that >=1080P') parser.add_argument("-e", "--postFilters", type=int, help="Enhancement filter, only working when postProcessing is true,there are 5 options by binary:Median blur=0000001, Mean blur=0000010, CAS Sharpening=0000100, Gaussian blur weak=0001000, Gaussian blur=0010000, Bilateral filter=0100000, Bilateral filter faster=1000000, you can freely combine them, eg: Gaussian blur weak + Bilateral filter = 0001000 | 0100000 = 0101000 = 40(D), so you can put 40 to enable Gaussian blur weak and Bilateral filter, which also is what I recommend for image that < 1080P, 48 for image that >= 1080P, and for performance I recommend to use 72 for video that < 1080P, 80 for video that >=1080P")
parser.add_argument('-q', '--GPUMode', action='store_true', help='Enable GPU acceleration') parser.add_argument("-q", "--GPUMode", action="store_true", help="Enable GPU acceleration")
parser.add_argument('-w', '--CNNMode', action='store_true', help='Enable ACNet') parser.add_argument("-w", "--CNNMode", action="store_true", help="Enable ACNet")
parser.add_argument('-H', '--HDN', action='store_true', help='Enable HDN mode for ACNet') parser.add_argument("-H", "--HDN", action="store_true", help="Enable HDN mode for ACNet")
parser.add_argument('-L', '--HDNLevel', type=int, help='Set HDN level') parser.add_argument("-L", "--HDNLevel", type=int, help="Set HDN level")
parser.add_argument('-l', '--listGPUs', action='store_true', help='list GPUs') parser.add_argument("-l", "--listGPUs", action="store_true", help="list GPUs")
parser.add_argument('-h', '--platformID', type=int, help='Specify the platform ID') parser.add_argument("-h", "--platformID", type=int, help="Specify the platform ID")
parser.add_argument('-d', '--deviceID', type=int, help='Specify the device ID') parser.add_argument("-d", "--deviceID", type=int, help="Specify the device ID")
parser.add_argument('-C', '--codec', type=str, help='Specify the codec for encoding from mp4v(recommended in Windows), dxva(for Windows), avc1(H264, recommended in Linux), vp09(very slow), hevc(not support in Windowds), av01(not support in Windowds) (string [=mp4v])') parser.add_argument("-C", "--codec", type=str, help="Specify the codec for encoding from mp4v(recommended in Windows), dxva(for Windows), avc1(H264, recommended in Linux), vp09(very slow), hevc(not support in Windowds), av01(not support in Windowds) (string [=mp4v])")
parser.add_argument('-F', '--forceFps', type=float, help='Set output video fps to the specifying number, 0 to disable') parser.add_argument("-F", "--forceFps", type=float, help="Set output video fps to the specifying number, 0 to disable")
parser.add_argument('-D', '--disableProgress', action='store_true', help='disable progress display') parser.add_argument("-D", "--disableProgress", action="store_true", help="disable progress display")
parser.add_argument('-W', '--webVideo', type=str, help='process the video from URL') parser.add_argument("-W", "--webVideo", type=str, help="process the video from URL")
parser.add_argument('-A', '--alpha', action='store_true', help='preserve the Alpha channel for transparent image') parser.add_argument("-A", "--alpha", action="store_true", help="preserve the Alpha channel for transparent image")
return parser.parse_args(arguments) return parser.parse_args(arguments)
# fmt: on
def load_configurations(self, upscaler): def load_configurations(self, upscaler):
# self.driver_settings['zoomFactor'] = upscaler.scale_ratio # self.driver_settings['zoomFactor'] = upscaler.scale_ratio
self.driver_settings['threads'] = upscaler.processes self.driver_settings["threads"] = upscaler.processes
# append FFmpeg path to the end of PATH # append FFmpeg path to the end of PATH
# Anime4KCPP will then use FFmpeg to migrate audio tracks # Anime4KCPP will then use FFmpeg to migrate audio tracks
os.environ['PATH'] += f';{upscaler.ffmpeg_settings["ffmpeg_path"]}' os.environ["PATH"] += f';{upscaler.ffmpeg_settings["ffmpeg_path"]}'
def set_scale_ratio(self, scale_ratio: float): def set_scale_ratio(self, scale_ratio: float):
self.driver_settings['zoomFactor'] = scale_ratio self.driver_settings["zoomFactor"] = scale_ratio
def upscale(self, input_file, output_file): def upscale(self, input_file, output_file):
"""This is the core function for WAIFU2X class """This is the core function for WAIFU2X class
@ -98,33 +99,33 @@ class WrapperMain:
# change the working directory to the binary's parent directory # change the working directory to the binary's parent directory
# so the binary can find shared object files and other files # so the binary can find shared object files and other files
os.chdir(pathlib.Path(self.driver_settings['path']).parent) os.chdir(pathlib.Path(self.driver_settings["path"]).parent)
# overwrite config file settings # overwrite config file settings
self.driver_settings['input'] = input_file self.driver_settings["input"] = input_file
self.driver_settings['output'] = output_file self.driver_settings["output"] = output_file
# Anime4KCPP will look for Anime4KCPPKernel.cl under the current working directory # Anime4KCPP will look for Anime4KCPPKernel.cl under the current working directory
# change the CWD to its containing directory so it will find it # change the CWD to its containing directory so it will find it
if platform.system() == 'Windows': if platform.system() == "Windows":
os.chdir(pathlib.Path(self.driver_settings['path']).parent) os.chdir(pathlib.Path(self.driver_settings["path"]).parent)
# list to be executed # list to be executed
# initialize the list with waifu2x binary path as the first element # initialize the list with waifu2x binary path as the first element
execute = [self.driver_settings['path']] execute = [self.driver_settings["path"]]
for key in self.driver_settings.keys(): for key in self.driver_settings.keys():
value = self.driver_settings[key] value = self.driver_settings[key]
# null or None means that leave this option out (keep default) # null or None means that leave this option out (keep default)
if key == 'path' or value is None or value is False: if key == "path" or value is None or value is False:
continue continue
else: else:
if len(key) == 1: if len(key) == 1:
execute.append(f'-{key}') execute.append(f"-{key}")
else: else:
execute.append(f'--{key}') execute.append(f"--{key}")
# true means key is an option # true means key is an option
if value is not True: if value is not True:
@ -132,6 +133,8 @@ class WrapperMain:
# return the Popen object of the new process created # return the Popen object of the new process created
self.print_lock.acquire() self.print_lock.acquire()
Avalon.debug_info(f'[upscaler] Subprocess {os.getpid()} executing: {" ".join(execute)}') Avalon.debug_info(
f'[upscaler] Subprocess {os.getpid()} executing: {" ".join(execute)}'
)
self.print_lock.release() self.print_lock.release()
return subprocess.Popen(execute, stdout=sys.stdout, stderr=sys.stderr) return subprocess.Popen(execute, stdout=sys.stdout, stderr=sys.stderr)

View File

@ -27,17 +27,21 @@ class Ffmpeg:
and inserting audio tracks to videos. and inserting audio tracks to videos.
""" """
def __init__(self, ffmpeg_settings, extracted_frame_format='png'): def __init__(self, ffmpeg_settings, extracted_frame_format="png"):
self.ffmpeg_settings = ffmpeg_settings self.ffmpeg_settings = ffmpeg_settings
self.ffmpeg_path = pathlib.Path(self.ffmpeg_settings['ffmpeg_path']) self.ffmpeg_path = pathlib.Path(self.ffmpeg_settings["ffmpeg_path"])
self.ffmpeg_binary = self.ffmpeg_path / 'ffmpeg' self.ffmpeg_binary = self.ffmpeg_path / "ffmpeg"
self.ffmpeg_probe_binary = self.ffmpeg_path / 'ffprobe' self.ffmpeg_probe_binary = self.ffmpeg_path / "ffprobe"
# video metadata # video metadata
self.extracted_frame_format = extracted_frame_format self.extracted_frame_format = extracted_frame_format
self.intermediate_file_name = pathlib.Path(self.ffmpeg_settings['intermediate_file_name']) self.intermediate_file_name = pathlib.Path(
self.pixel_format = self.ffmpeg_settings['extract_frames']['output_options']['-pix_fmt'] self.ffmpeg_settings["intermediate_file_name"]
)
self.pixel_format = self.ffmpeg_settings["extract_frames"]["output_options"][
"-pix_fmt"
]
def get_pixel_formats(self): def get_pixel_formats(self):
"""Get a dictionary of supported pixel formats """Get a dictionary of supported pixel formats
@ -48,12 +52,7 @@ class Ffmpeg:
Returns: Returns:
dictionary -- JSON dict of all pixel formats to bit depth dictionary -- JSON dict of all pixel formats to bit depth
""" """
execute = [ execute = [self.ffmpeg_probe_binary, "-v", "quiet", "-pix_fmts"]
self.ffmpeg_probe_binary,
'-v',
'quiet',
'-pix_fmts'
]
# turn elements into str # turn elements into str
execute = [str(e) for e in execute] execute = [str(e) for e in execute]
@ -64,9 +63,15 @@ class Ffmpeg:
pixel_formats = {} pixel_formats = {}
# record all pixel formats into dictionary # record all pixel formats into dictionary
for line in subprocess.run(execute, check=True, stdout=subprocess.PIPE).stdout.decode().split('\n'): for line in (
subprocess.run(execute, check=True, stdout=subprocess.PIPE)
.stdout.decode()
.split("\n")
):
try: try:
pixel_formats[" ".join(line.split()).split()[1]] = int(" ".join(line.split()).split()[3]) pixel_formats[" ".join(line.split()).split()[1]] = int(
" ".join(line.split()).split()[3]
)
except (IndexError, ValueError): except (IndexError, ValueError):
pass pass
@ -88,23 +93,27 @@ class Ffmpeg:
execute = [ execute = [
self.ffmpeg_probe_binary, self.ffmpeg_probe_binary,
'-v', "-v",
'quiet', "quiet",
'-count_frames', "-count_frames",
'-select_streams', "-select_streams",
f'v:{video_stream_index}', f"v:{video_stream_index}",
'-show_entries', "-show_entries",
'stream=nb_read_frames', "stream=nb_read_frames",
'-of', "-of",
'default=nokey=1:noprint_wrappers=1', "default=nokey=1:noprint_wrappers=1",
input_file input_file,
] ]
# turn elements into str # turn elements into str
execute = [str(e) for e in execute] execute = [str(e) for e in execute]
Avalon.debug_info(f'Executing: {" ".join(execute)}') Avalon.debug_info(f'Executing: {" ".join(execute)}')
return int(subprocess.run(execute, check=True, stdout=subprocess.PIPE).stdout.decode().strip()) return int(
subprocess.run(execute, check=True, stdout=subprocess.PIPE)
.stdout.decode()
.strip()
)
def probe_file_info(self, input_video): def probe_file_info(self, input_video):
"""Gets input video information """Gets input video information
@ -123,14 +132,14 @@ class Ffmpeg:
# since video2x only strictly recignizes this one format # since video2x only strictly recignizes this one format
execute = [ execute = [
self.ffmpeg_probe_binary, self.ffmpeg_probe_binary,
'-v', "-v",
'quiet', "quiet",
'-print_format', "-print_format",
'json', "json",
'-show_format', "-show_format",
'-show_streams', "-show_streams",
'-i', "-i",
input_video input_video,
] ]
# turn elements into str # turn elements into str
@ -138,37 +147,38 @@ class Ffmpeg:
Avalon.debug_info(f'Executing: {" ".join(execute)}') Avalon.debug_info(f'Executing: {" ".join(execute)}')
json_str = subprocess.run(execute, check=True, stdout=subprocess.PIPE).stdout json_str = subprocess.run(execute, check=True, stdout=subprocess.PIPE).stdout
return json.loads(json_str.decode('utf-8')) return json.loads(json_str.decode("utf-8"))
def extract_frames(self, input_file, extracted_frames): def extract_frames(self, input_file, extracted_frames):
""" extract frames from video or GIF file """extract frames from video or GIF file"""
""" execute = [self.ffmpeg_binary]
execute = [
self.ffmpeg_binary
]
# load general options # load general options
execute.extend(self._read_configuration(phase='extract_frames')) execute.extend(self._read_configuration(phase="extract_frames"))
# load input_options # load input_options
execute.extend(self._read_configuration(phase='extract_frames', section='input_options')) execute.extend(
self._read_configuration(phase="extract_frames", section="input_options")
)
# specify input file # specify input file
execute.extend([ execute.extend(["-i", input_file])
'-i',
input_file
])
# load output options # load output options
execute.extend(self._read_configuration(phase='extract_frames', section='output_options')) execute.extend(
self._read_configuration(phase="extract_frames", section="output_options")
)
# specify output file # specify output file
execute.extend([ execute.extend(
extracted_frames / f'extracted_%0d.{self.extracted_frame_format}' [
extracted_frames
/ f"extracted_%0d.{self.extracted_frame_format}"
# extracted_frames / f'frame_%06d.{self.extracted_frame_format}' # extracted_frames / f'frame_%06d.{self.extracted_frame_format}'
]) ]
)
return(self._execute(execute)) return self._execute(execute)
def assemble_video(self, framerate, upscaled_frames): def assemble_video(self, framerate, upscaled_frames):
"""Converts images into videos """Converts images into videos
@ -182,43 +192,50 @@ class Ffmpeg:
""" """
execute = [ execute = [
self.ffmpeg_binary, self.ffmpeg_binary,
'-r', "-r",
str(framerate) str(framerate)
# '-s', # '-s',
# resolution # resolution
] ]
# read other options # read other options
execute.extend(self._read_configuration(phase='assemble_video')) execute.extend(self._read_configuration(phase="assemble_video"))
# read input options # read input options
execute.extend(self._read_configuration(phase='assemble_video', section='input_options')) execute.extend(
self._read_configuration(phase="assemble_video", section="input_options")
)
# WORKAROUND FOR WAIFU2X-NCNN-VULKAN # WORKAROUND FOR WAIFU2X-NCNN-VULKAN
# Dev: SAT3LL # Dev: SAT3LL
# rename all .png.png suffixes to .png # rename all .png.png suffixes to .png
import re import re
regex = re.compile(r'\.png\.png$', re.IGNORECASE)
regex = re.compile(r"\.png\.png$", re.IGNORECASE)
for frame_name in upscaled_frames.iterdir(): for frame_name in upscaled_frames.iterdir():
(upscaled_frames / frame_name).rename(upscaled_frames / regex.sub('.png', str(frame_name))) (upscaled_frames / frame_name).rename(
upscaled_frames / regex.sub(".png", str(frame_name))
)
# END WORKAROUND # END WORKAROUND
# append input frames path into command # append input frames path into command
execute.extend([ execute.extend(
'-i', [
upscaled_frames / f'extracted_%d.{self.extracted_frame_format}' "-i",
upscaled_frames / f"extracted_%d.{self.extracted_frame_format}"
# upscaled_frames / f'%06d.{self.extracted_frame_format}' # upscaled_frames / f'%06d.{self.extracted_frame_format}'
]) ]
)
# read FFmpeg output options # read FFmpeg output options
execute.extend(self._read_configuration(phase='assemble_video', section='output_options')) execute.extend(
self._read_configuration(phase="assemble_video", section="output_options")
)
# specify output file location # specify output file location
execute.extend([ execute.extend([upscaled_frames / self.intermediate_file_name])
upscaled_frames / self.intermediate_file_name
])
return(self._execute(execute)) return self._execute(execute)
def migrate_streams(self, input_video, output_video, upscaled_frames): def migrate_streams(self, input_video, output_video, upscaled_frames):
"""Migrates audio tracks and subtitles from input video to output video """Migrates audio tracks and subtitles from input video to output video
@ -228,37 +245,37 @@ class Ffmpeg:
output_video {string} -- output video file path output_video {string} -- output video file path
upscaled_frames {string} -- directory containing upscaled frames upscaled_frames {string} -- directory containing upscaled frames
""" """
execute = [ execute = [self.ffmpeg_binary]
self.ffmpeg_binary
]
# load general options # load general options
execute.extend(self._read_configuration(phase='migrate_streams')) execute.extend(self._read_configuration(phase="migrate_streams"))
# load input options # load input options
execute.extend(self._read_configuration(phase='migrate_streams', section='input_options')) execute.extend(
self._read_configuration(phase="migrate_streams", section="input_options")
)
# load input file names # load input file names
execute.extend([ execute.extend(
[
# input 1: upscaled intermediate file without sound # input 1: upscaled intermediate file without sound
'-i', "-i",
upscaled_frames / self.intermediate_file_name, upscaled_frames / self.intermediate_file_name,
# input 2: original video with streams to copy over # input 2: original video with streams to copy over
'-i', "-i",
input_video input_video,
]) ]
)
# load output options # load output options
execute.extend(self._read_configuration(phase='migrate_streams', section='output_options')) execute.extend(
self._read_configuration(phase="migrate_streams", section="output_options")
)
# load output video path # load output video path
execute.extend([ execute.extend([output_video])
output_video
])
return(self._execute(execute)) return self._execute(execute)
def _read_configuration(self, phase, section=None): def _read_configuration(self, phase, section=None):
"""read configuration from JSON """read configuration from JSON
@ -290,7 +307,12 @@ class Ffmpeg:
value = self.ffmpeg_settings[phase][key] value = self.ffmpeg_settings[phase][key]
# null or None means that leave this option out (keep default) # null or None means that leave this option out (keep default)
if value is None or value is False or isinstance(value, dict) or value == '': if (
value is None
or value is False
or isinstance(value, dict)
or value == ""
):
continue continue
# if the value is a list, append the same argument and all values # if the value is a list, append the same argument and all values

View File

@ -19,30 +19,37 @@ from avalon_framework import Avalon
class Gifski: class Gifski:
def __init__(self, gifski_settings): def __init__(self, gifski_settings):
self.gifski_settings = gifski_settings self.gifski_settings = gifski_settings
def make_gif(self, upscaled_frames: pathlib.Path, output_path: pathlib.Path, framerate: float, extracted_frame_format: str, output_width: int, output_height: int) -> subprocess.Popen: def make_gif(
self,
upscaled_frames: pathlib.Path,
output_path: pathlib.Path,
framerate: float,
extracted_frame_format: str,
output_width: int,
output_height: int,
) -> subprocess.Popen:
execute = [ execute = [
self.gifski_settings['gifski_path'], self.gifski_settings["gifski_path"],
'-o', "-o",
output_path, output_path,
'--fps', "--fps",
int(round(framerate, 0)), int(round(framerate, 0)),
'--width', "--width",
output_width, output_width,
'--height', "--height",
output_height output_height,
] ]
# load configurations from config file # load configurations from config file
execute.extend(self._load_configuration()) execute.extend(self._load_configuration())
# append frames location # append frames location
execute.extend([upscaled_frames / f'extracted_*.{extracted_frame_format}']) execute.extend([upscaled_frames / f"extracted_*.{extracted_frame_format}"])
return(self._execute(execute)) return self._execute(execute)
def _load_configuration(self): def _load_configuration(self):
@ -53,13 +60,13 @@ class Gifski:
value = self.gifski_settings[key] value = self.gifski_settings[key]
# null or None means that leave this option out (keep default) # null or None means that leave this option out (keep default)
if key == 'gifski_path' or value is None or value is False: if key == "gifski_path" or value is None or value is False:
continue continue
else: else:
if len(key) == 1: if len(key) == 1:
configuration.append(f'-{key}') configuration.append(f"-{key}")
else: else:
configuration.append(f'--{key}') configuration.append(f"--{key}")
# true means key is an option # true means key is an option
if value is not True: if value is not True:
@ -70,6 +77,6 @@ class Gifski:
# turn all list elements into string to avoid errors # turn all list elements into string to avoid errors
execute = [str(e) for e in execute] execute = [str(e) for e in execute]
Avalon.debug_info(f'Executing: {execute}') Avalon.debug_info(f"Executing: {execute}")
return subprocess.Popen(execute, stdout=sys.stdout, stderr=sys.stderr) return subprocess.Popen(execute, stdout=sys.stdout, stderr=sys.stderr)

View File

@ -38,28 +38,32 @@ class WrapperMain:
@staticmethod @staticmethod
def parse_arguments(arguments): def parse_arguments(arguments):
# fmt: off
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter, add_help=False) parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter, add_help=False)
parser.error = lambda message: (_ for _ in ()).throw(AttributeError(message)) parser.error = lambda message: (_ for _ in ()).throw(AttributeError(message))
parser.add_argument('--help', action='help', help='show this help message and exit') parser.add_argument("--help", action="help", help="show this help message and exit")
parser.add_argument('-v', action='store_true', help='verbose output') parser.add_argument("-v", action="store_true", help="verbose output")
parser.add_argument('-i', type=str, help=argparse.SUPPRESS) # help='input image path (jpg/png) or directory') parser.add_argument("-i", type=str, help=argparse.SUPPRESS) # help="input image path (jpg/png) or directory")
parser.add_argument('-o', type=str, help=argparse.SUPPRESS) # help='output image path (png) or directory') parser.add_argument("-o", type=str, help=argparse.SUPPRESS) # help="output image path (png) or directory")
parser.add_argument('-s', type=int, help='upscale ratio') parser.add_argument("-s", type=int, help="upscale ratio")
parser.add_argument('-t', type=int, help='tile size (>=32/0=auto)') parser.add_argument("-t", type=int, help="tile size (>=32/0=auto)")
parser.add_argument('-m', type=str, help='realsr model path') parser.add_argument("-m", type=str, help="realsr model path")
parser.add_argument('-g', type=int, help='gpu device to use') parser.add_argument("-g", type=int, help="gpu device to use")
parser.add_argument('-j', type=str, help='thread count for load/proc/save') parser.add_argument("-j", type=str, help="thread count for load/proc/save")
parser.add_argument('-x', action='store_true', help='enable tta mode') parser.add_argument("-x", action="store_true", help="enable tta mode")
parser.add_argument('-f', type=str, help=argparse.SUPPRESS) # help='output image format (jpg/png/webp, default=ext/png)') parser.add_argument("-f", type=str, help=argparse.SUPPRESS) # help="output image format (jpg/png/webp, default=ext/png)")
return parser.parse_args(arguments) return parser.parse_args(arguments)
# fmt: on
def load_configurations(self, upscaler): def load_configurations(self, upscaler):
# self.driver_settings['s'] = int(upscaler.scale_ratio) # self.driver_settings['s'] = int(upscaler.scale_ratio)
self.driver_settings['j'] = '{}:{}:{}'.format(upscaler.processes, upscaler.processes, upscaler.processes) self.driver_settings["j"] = "{}:{}:{}".format(
self.driver_settings['f'] = upscaler.extracted_frame_format.lower() upscaler.processes, upscaler.processes, upscaler.processes
)
self.driver_settings["f"] = upscaler.extracted_frame_format.lower()
def set_scale_ratio(self, scale_ratio: int): def set_scale_ratio(self, scale_ratio: int):
self.driver_settings['s'] = int(scale_ratio) self.driver_settings["s"] = int(scale_ratio)
def upscale(self, input_directory, output_directory): def upscale(self, input_directory, output_directory):
"""This is the core function for RealSR NCNN Vulkan class """This is the core function for RealSR NCNN Vulkan class
@ -72,33 +76,33 @@ class WrapperMain:
# change the working directory to the binary's parent directory # change the working directory to the binary's parent directory
# so the binary can find shared object files and other files # so the binary can find shared object files and other files
os.chdir(pathlib.Path(self.driver_settings['path']).parent) os.chdir(pathlib.Path(self.driver_settings["path"]).parent)
# overwrite config file settings # overwrite config file settings
self.driver_settings['i'] = input_directory self.driver_settings["i"] = input_directory
self.driver_settings['o'] = output_directory self.driver_settings["o"] = output_directory
# by default, realsr-ncnn-vulkan will look for the models under the current working directory # by default, realsr-ncnn-vulkan will look for the models under the current working directory
# change the working directory to its containing folder if model directory not specified # change the working directory to its containing folder if model directory not specified
if self.driver_settings['m'] is None and platform.system() == 'Windows': if self.driver_settings["m"] is None and platform.system() == "Windows":
os.chdir(pathlib.Path(self.driver_settings['path']).parent) os.chdir(pathlib.Path(self.driver_settings["path"]).parent)
# list to be executed # list to be executed
# initialize the list with the binary path as the first element # initialize the list with the binary path as the first element
execute = [self.driver_settings['path']] execute = [self.driver_settings["path"]]
for key in self.driver_settings.keys(): for key in self.driver_settings.keys():
value = self.driver_settings[key] value = self.driver_settings[key]
# null or None means that leave this option out (keep default) # null or None means that leave this option out (keep default)
if key == 'path' or value is None or value is False: if key == "path" or value is None or value is False:
continue continue
else: else:
if len(key) == 1: if len(key) == 1:
execute.append(f'-{key}') execute.append(f"-{key}")
else: else:
execute.append(f'--{key}') execute.append(f"--{key}")
# true means key is an option # true means key is an option
if value is not True: if value is not True:
@ -106,6 +110,8 @@ class WrapperMain:
# return the Popen object of the new process created # return the Popen object of the new process created
self.print_lock.acquire() self.print_lock.acquire()
Avalon.debug_info(f'[upscaler] Subprocess {os.getpid()} executing: {" ".join(execute)}') Avalon.debug_info(
f'[upscaler] Subprocess {os.getpid()} executing: {" ".join(execute)}'
)
self.print_lock.release() self.print_lock.release()
return subprocess.Popen(execute, stdout=sys.stdout, stderr=sys.stderr) return subprocess.Popen(execute, stdout=sys.stdout, stderr=sys.stderr)

View File

@ -38,29 +38,33 @@ class WrapperMain:
@staticmethod @staticmethod
def parse_arguments(arguments): def parse_arguments(arguments):
# fmt: off
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter, add_help=False) parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter, add_help=False)
parser.error = lambda message: (_ for _ in ()).throw(AttributeError(message)) parser.error = lambda message: (_ for _ in ()).throw(AttributeError(message))
parser.add_argument('--help', action='help', help='show this help message and exit') parser.add_argument("--help", action="help", help="show this help message and exit")
parser.add_argument('-v', action='store_true', help='verbose output') parser.add_argument("-v", action="store_true", help="verbose output")
parser.add_argument('-i', type=str, help=argparse.SUPPRESS) # help='input image path (jpg/png) or directory') parser.add_argument("-i", type=str, help=argparse.SUPPRESS) # help="input image path (jpg/png) or directory")
parser.add_argument('-o', type=str, help=argparse.SUPPRESS) # help='output image path (png) or directory') parser.add_argument("-o", type=str, help=argparse.SUPPRESS) # help="output image path (png) or directory")
parser.add_argument('-n', type=int, choices=range(-1, 11), help='denoise level') parser.add_argument("-n", type=int, choices=range(-1, 11), help="denoise level")
parser.add_argument('-s', type=int, help='upscale ratio') parser.add_argument("-s", type=int, help="upscale ratio")
parser.add_argument('-t', type=int, help='tile size (>=32)') parser.add_argument("-t", type=int, help="tile size (>=32)")
parser.add_argument('-m', type=str, help='srmd model path') parser.add_argument("-m", type=str, help="srmd model path")
parser.add_argument('-g', type=int, help='gpu device to use') parser.add_argument("-g", type=int, help="gpu device to use")
parser.add_argument('-j', type=str, help='thread count for load/proc/save') parser.add_argument("-j", type=str, help="thread count for load/proc/save")
parser.add_argument('-x', action='store_true', help='enable tta mode') parser.add_argument("-x", action="store_true", help="enable tta mode")
parser.add_argument('-f', type=str, help=argparse.SUPPRESS) # help='output image format (jpg/png/webp, default=ext/png)') parser.add_argument("-f", type=str, help=argparse.SUPPRESS) # help="output image format (jpg/png/webp, default=ext/png)")
return parser.parse_args(arguments) return parser.parse_args(arguments)
# fmt: on
def load_configurations(self, upscaler): def load_configurations(self, upscaler):
# self.driver_settings['s'] = int(upscaler.scale_ratio) # self.driver_settings['s'] = int(upscaler.scale_ratio)
self.driver_settings['j'] = '{}:{}:{}'.format(upscaler.processes, upscaler.processes, upscaler.processes) self.driver_settings["j"] = "{}:{}:{}".format(
self.driver_settings['f'] = upscaler.extracted_frame_format.lower() upscaler.processes, upscaler.processes, upscaler.processes
)
self.driver_settings["f"] = upscaler.extracted_frame_format.lower()
def set_scale_ratio(self, scale_ratio: int): def set_scale_ratio(self, scale_ratio: int):
self.driver_settings['s'] = int(scale_ratio) self.driver_settings["s"] = int(scale_ratio)
def upscale(self, input_directory, output_directory): def upscale(self, input_directory, output_directory):
"""This is the core function for SRMD ncnn Vulkan class """This is the core function for SRMD ncnn Vulkan class
@ -73,33 +77,33 @@ class WrapperMain:
# change the working directory to the binary's parent directory # change the working directory to the binary's parent directory
# so the binary can find shared object files and other files # so the binary can find shared object files and other files
os.chdir(pathlib.Path(self.driver_settings['path']).parent) os.chdir(pathlib.Path(self.driver_settings["path"]).parent)
# overwrite config file settings # overwrite config file settings
self.driver_settings['i'] = input_directory self.driver_settings["i"] = input_directory
self.driver_settings['o'] = output_directory self.driver_settings["o"] = output_directory
# by default, srmd-ncnn-vulkan will look for the models under the current working directory # by default, srmd-ncnn-vulkan will look for the models under the current working directory
# change the working directory to its containing folder if model directory not specified # change the working directory to its containing folder if model directory not specified
if self.driver_settings['m'] is None and platform.system() == 'Windows': if self.driver_settings["m"] is None and platform.system() == "Windows":
os.chdir(pathlib.Path(self.driver_settings['path']).parent) os.chdir(pathlib.Path(self.driver_settings["path"]).parent)
# list to be executed # list to be executed
# initialize the list with the binary path as the first element # initialize the list with the binary path as the first element
execute = [self.driver_settings['path']] execute = [self.driver_settings["path"]]
for key in self.driver_settings.keys(): for key in self.driver_settings.keys():
value = self.driver_settings[key] value = self.driver_settings[key]
# null or None means that leave this option out (keep default) # null or None means that leave this option out (keep default)
if key == 'path' or value is None or value is False: if key == "path" or value is None or value is False:
continue continue
else: else:
if len(key) == 1: if len(key) == 1:
execute.append(f'-{key}') execute.append(f"-{key}")
else: else:
execute.append(f'--{key}') execute.append(f"--{key}")
# true means key is an option # true means key is an option
if value is not True: if value is not True:
@ -107,6 +111,8 @@ class WrapperMain:
# return the Popen object of the new process created # return the Popen object of the new process created
self.print_lock.acquire() self.print_lock.acquire()
Avalon.debug_info(f'[upscaler] Subprocess {os.getpid()} executing: {" ".join(execute)}') Avalon.debug_info(
f'[upscaler] Subprocess {os.getpid()} executing: {" ".join(execute)}'
)
self.print_lock.release() self.print_lock.release()
return subprocess.Popen(execute, stdout=sys.stdout, stderr=sys.stderr) return subprocess.Popen(execute, stdout=sys.stdout, stderr=sys.stderr)

View File

@ -37,77 +37,78 @@ class WrapperMain:
@staticmethod @staticmethod
def parse_arguments(arguments): def parse_arguments(arguments):
# fmt: off
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter, add_help=False) parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter, add_help=False)
parser.error = lambda message: (_ for _ in ()).throw(AttributeError(message)) parser.error = lambda message: (_ for _ in ()).throw(AttributeError(message))
parser.add_argument('--help', action='help', help='show this help message and exit') parser.add_argument("--help", action="help", help="show this help message and exit")
parser.add_argument('-t', '--tta', type=int, choices=range(2), help='8x slower and slightly high quality') parser.add_argument("-t", "--tta", type=int, choices=range(2), help="8x slower and slightly high quality")
parser.add_argument('--gpu', type=int, help='gpu device no') parser.add_argument("--gpu", type=int, help="gpu device no")
parser.add_argument('-b', '--batch_size', type=int, help='input batch size') parser.add_argument("-b", "--batch_size", type=int, help="input batch size")
parser.add_argument('--crop_h', type=int, help='input image split size(height)') parser.add_argument("--crop_h", type=int, help="input image split size(height)")
parser.add_argument('--crop_w', type=int, help='input image split size(width)') parser.add_argument("--crop_w", type=int, help="input image split size(width)")
parser.add_argument('-c', '--crop_size', type=int, help='input image split size') parser.add_argument("-c", "--crop_size", type=int, help="input image split size")
parser.add_argument('-d', '--output_depth', type=int, help='output image chaneel depth bit') parser.add_argument("-d", "--output_depth", type=int, help="output image chaneel depth bit")
parser.add_argument('-q', '--output_quality', type=int, help='output image quality') parser.add_argument("-q", "--output_quality", type=int, help="output image quality")
parser.add_argument('-p', '--process', choices=['cpu', 'gpu', 'cudnn'], help='process mode') parser.add_argument("-p", "--process", choices=["cpu", "gpu", "cudnn"], help="process mode")
parser.add_argument('--model_dir', type=str, help='path to custom model directory (don\'t append last / )') parser.add_argument("--model_dir", type=str, help="path to custom model directory (don\"t append last / )")
parser.add_argument('-h', '--scale_height', type=int, help='custom scale height') parser.add_argument("-h", "--scale_height", type=int, help="custom scale height")
parser.add_argument('-w', '--scale_width', type=int, help='custom scale width') parser.add_argument("-w", "--scale_width", type=int, help="custom scale width")
parser.add_argument('-s', '--scale_ratio', type=float, help='custom scale ratio') parser.add_argument("-s", "--scale_ratio", type=float, help="custom scale ratio")
parser.add_argument('-n', '--noise_level', type=int, choices=range(4), help='noise reduction level') parser.add_argument("-n", "--noise_level", type=int, choices=range(4), help="noise reduction level")
parser.add_argument('-m', '--mode', choices=['noise', 'scale', 'noise_scale', 'auto_scale'], help='image processing mode') parser.add_argument("-m", "--mode", choices=["noise", "scale", "noise_scale", "auto_scale"], help="image processing mode")
parser.add_argument('-e', '--output_extention', type=str, help='extention to output image file when output_path is (auto) or input_path is folder') parser.add_argument("-e", "--output_extention", type=str, help="extention to output image file when output_path is (auto) or input_path is folder")
parser.add_argument('-l', '--input_extention_list', type=str, help='extention to input image file when input_path is folder') parser.add_argument("-l", "--input_extention_list", type=str, help="extention to input image file when input_path is folder")
parser.add_argument('-o', '--output_path', type=str, help=argparse.SUPPRESS) # help='path to output image file (when input_path is folder, output_path must be folder)') parser.add_argument("-o", "--output_path", type=str, help=argparse.SUPPRESS) # help="path to output image file (when input_path is folder, output_path must be folder)")
parser.add_argument('-i', '--input_path', type=str, help=argparse.SUPPRESS) # help='(required) path to input image file') parser.add_argument("-i", "--input_path", type=str, help=argparse.SUPPRESS) # help="(required) path to input image file")
return parser.parse_args(arguments) return parser.parse_args(arguments)
# fmt: on
def load_configurations(self, upscaler): def load_configurations(self, upscaler):
# use scale width and scale height if specified # use scale width and scale height if specified
# self.driver_settings['scale_ratio'] = upscaler.scale_ratio # self.driver_settings['scale_ratio'] = upscaler.scale_ratio
self.driver_settings['output_extention'] = upscaler.extracted_frame_format self.driver_settings["output_extention"] = upscaler.extracted_frame_format
# bit_depth will be 12 at this point # bit_depth will be 12 at this point
# it will up updated later # it will up updated later
self.driver_settings['output_depth'] = 12 self.driver_settings["output_depth"] = 12
def set_scale_resolution(self, width: int, height: int): def set_scale_resolution(self, width: int, height: int):
self.driver_settings['scale_width'] = width self.driver_settings["scale_width"] = width
self.driver_settings['scale_height'] = height self.driver_settings["scale_height"] = height
self.driver_settings['scale_ratio'] = None self.driver_settings["scale_ratio"] = None
def set_scale_ratio(self, scale_ratio: float): def set_scale_ratio(self, scale_ratio: float):
self.driver_settings['scale_width'] = None self.driver_settings["scale_width"] = None
self.driver_settings['scale_height'] = None self.driver_settings["scale_height"] = None
self.driver_settings['scale_ratio'] = scale_ratio self.driver_settings["scale_ratio"] = scale_ratio
def upscale(self, input_directory, output_directory): def upscale(self, input_directory, output_directory):
""" start upscaling process """start upscaling process"""
"""
# change the working directory to the binary's parent directory # change the working directory to the binary's parent directory
# so the binary can find shared object files and other files # so the binary can find shared object files and other files
os.chdir(pathlib.Path(self.driver_settings['path']).parent) os.chdir(pathlib.Path(self.driver_settings["path"]).parent)
# overwrite config file settings # overwrite config file settings
self.driver_settings['input_path'] = input_directory self.driver_settings["input_path"] = input_directory
self.driver_settings['output_path'] = output_directory self.driver_settings["output_path"] = output_directory
# list to be executed # list to be executed
# initialize the list with waifu2x binary path as the first element # initialize the list with waifu2x binary path as the first element
execute = [self.driver_settings['path']] execute = [self.driver_settings["path"]]
for key in self.driver_settings.keys(): for key in self.driver_settings.keys():
value = self.driver_settings[key] value = self.driver_settings[key]
# null or None means that leave this option out (keep default) # null or None means that leave this option out (keep default)
if key == 'path' or value is None or value is False: if key == "path" or value is None or value is False:
continue continue
else: else:
if len(key) == 1: if len(key) == 1:
execute.append(f'-{key}') execute.append(f"-{key}")
else: else:
execute.append(f'--{key}') execute.append(f"--{key}")
# true means key is an option # true means key is an option
if value is not True: if value is not True:
@ -115,6 +116,8 @@ class WrapperMain:
# return the Popen object of the new process created # return the Popen object of the new process created
self.print_lock.acquire() self.print_lock.acquire()
Avalon.debug_info(f'[upscaler] Subprocess {os.getpid()} executing: {" ".join(execute)}') Avalon.debug_info(
f'[upscaler] Subprocess {os.getpid()} executing: {" ".join(execute)}'
)
self.print_lock.release() self.print_lock.release()
return subprocess.Popen(execute, stdout=sys.stdout, stderr=sys.stderr) return subprocess.Popen(execute, stdout=sys.stdout, stderr=sys.stderr)

View File

@ -37,42 +37,44 @@ class WrapperMain:
@staticmethod @staticmethod
def parse_arguments(arguments): def parse_arguments(arguments):
# fmt: off
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter, add_help=False) parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter, add_help=False)
parser.error = lambda message: (_ for _ in ()).throw(AttributeError(message)) parser.error = lambda message: (_ for _ in ()).throw(AttributeError(message))
parser.add_argument('--help', action='help', help='show this help message and exit') parser.add_argument("--help", action="help", help="show this help message and exit")
parser.add_argument('--list-supported-formats', action='store_true', help='dump currently supported format list') parser.add_argument("--list-supported-formats", action="store_true", help="dump currently supported format list")
parser.add_argument('--list-opencv-formats', action='store_true', help='(deprecated. Use --list-supported-formats) dump opencv supported format list') parser.add_argument("--list-opencv-formats", action="store_true", help="(deprecated. Use --list-supported-formats) dump opencv supported format list")
parser.add_argument('-l', '--list-processor', action='store_true', help='dump processor list') parser.add_argument("-l", "--list-processor", action="store_true", help="dump processor list")
parser.add_argument('-f', '--output-format', choices=['png', 'jpg'], help='The format used when running in recursive/folder mode\nSee --list-supported-formats for a list of supported formats/extensions.') parser.add_argument("-f", "--output-format", choices=["png", "jpg"], help="The format used when running in recursive/folder mode\nSee --list-supported-formats for a list of supported formats/extensions.")
parser.add_argument('-c', '--png-compression', type=int, choices=range(10), help='Set PNG compression level (0-9), 9 = Max compression (slowest & smallest)') parser.add_argument("-c", "--png-compression", type=int, choices=range(10), help="Set PNG compression level (0-9), 9 = Max compression (slowest & smallest)")
parser.add_argument('-q', '--image-quality', type=int, choices=range(-1, 102), help='JPEG & WebP Compression quality (0-101, 0 being smallest size and lowest quality), use 101 for lossless WebP') parser.add_argument("-q", "--image-quality", type=int, choices=range(-1, 102), help="JPEG & WebP Compression quality (0-101, 0 being smallest size and lowest quality), use 101 for lossless WebP")
parser.add_argument('--block-size', type=int, help='block size') parser.add_argument("--block-size", type=int, help="block size")
parser.add_argument('--disable-gpu', action='store_true', help='disable GPU') parser.add_argument("--disable-gpu", action="store_true", help="disable GPU")
parser.add_argument('--force-OpenCL', action='store_true', help='force to use OpenCL on Intel Platform') parser.add_argument("--force-OpenCL", action="store_true", help="force to use OpenCL on Intel Platform")
parser.add_argument('-p', '--processor', type=int, help='set target processor') parser.add_argument("-p", "--processor", type=int, help="set target processor")
parser.add_argument('-j', '--jobs', type=int, help='number of threads launching at the same time') parser.add_argument("-j", "--jobs", type=int, help="number of threads launching at the same time")
parser.add_argument('--model-dir', type=str, help='path to custom model directory (don\'t append last / )') parser.add_argument("--model-dir", type=str, help="path to custom model directory (don\"t append last / )")
parser.add_argument('--scale-ratio', type=float, help='custom scale ratio') parser.add_argument("--scale-ratio", type=float, help="custom scale ratio")
parser.add_argument('--noise-level', type=int, choices=range(4), help='noise reduction level') parser.add_argument("--noise-level", type=int, choices=range(4), help="noise reduction level")
parser.add_argument('-m', '--mode', choices=['noise', 'scale', 'noise-scale'], help='image processing mode') parser.add_argument("-m", "--mode", choices=["noise", "scale", "noise-scale"], help="image processing mode")
parser.add_argument('-v', '--log-level', type=int, choices=range(5), help='Set log level') parser.add_argument("-v", "--log-level", type=int, choices=range(5), help="Set log level")
parser.add_argument('-s', '--silent', action='store_true', help='Enable silent mode. (same as --log-level 1)') parser.add_argument("-s", "--silent", action="store_true", help="Enable silent mode. (same as --log-level 1)")
parser.add_argument('-t', '--tta', type=int, choices=range(2), help='Enable Test-Time Augmentation mode.') parser.add_argument("-t", "--tta", type=int, choices=range(2), help="Enable Test-Time Augmentation mode.")
parser.add_argument('-g', '--generate-subdir', type=int, choices=range(2), help='Generate sub folder when recursive directory is enabled.') parser.add_argument("-g", "--generate-subdir", type=int, choices=range(2), help="Generate sub folder when recursive directory is enabled.")
parser.add_argument('-a', '--auto-naming', type=int, choices=range(2), help='Add postfix to output name when output path is not specified.\nSet 0 to disable this.') parser.add_argument("-a", "--auto-naming", type=int, choices=range(2), help="Add postfix to output name when output path is not specified.\nSet 0 to disable this.")
parser.add_argument('-r', '--recursive-directory', type=int, choices=range(2), help='Search recursively through directories to find more images to process.') parser.add_argument("-r", "--recursive-directory", type=int, choices=range(2), help="Search recursively through directories to find more images to process.")
parser.add_argument('-o', '--output', type=str, help=argparse.SUPPRESS) # help='path to output image file or directory (you should use the full path)') parser.add_argument("-o", "--output", type=str, help=argparse.SUPPRESS) # help="path to output image file or directory (you should use the full path)")
parser.add_argument('-i', '--input', type=str, help=argparse.SUPPRESS) # help='(required) path to input image file or directory (you should use the full path)') parser.add_argument("-i", "--input", type=str, help=argparse.SUPPRESS) # help="(required) path to input image file or directory (you should use the full path)")
parser.add_argument('--version', action='store_true', help='Displays version information and exits.') parser.add_argument("--version", action="store_true", help="Displays version information and exits.")
return parser.parse_args(arguments) return parser.parse_args(arguments)
# fmt: on
def load_configurations(self, upscaler): def load_configurations(self, upscaler):
# self.driver_settings['scale-ratio'] = upscaler.scale_ratio # self.driver_settings['scale-ratio'] = upscaler.scale_ratio
self.driver_settings['jobs'] = upscaler.processes self.driver_settings["jobs"] = upscaler.processes
self.driver_settings['output-format'] = upscaler.extracted_frame_format.lower() self.driver_settings["output-format"] = upscaler.extracted_frame_format.lower()
def set_scale_ratio(self, scale_ratio: float): def set_scale_ratio(self, scale_ratio: float):
self.driver_settings['scale-ratio'] = scale_ratio self.driver_settings["scale-ratio"] = scale_ratio
def upscale(self, input_directory, output_directory): def upscale(self, input_directory, output_directory):
"""Waifu2x Converter Driver Upscaler """Waifu2x Converter Driver Upscaler
@ -87,33 +89,35 @@ class WrapperMain:
# change the working directory to the binary's parent directory # change the working directory to the binary's parent directory
# so the binary can find shared object files and other files # so the binary can find shared object files and other files
os.chdir(pathlib.Path(self.driver_settings['path']).parent) os.chdir(pathlib.Path(self.driver_settings["path"]).parent)
# overwrite config file settings # overwrite config file settings
self.driver_settings['input'] = input_directory self.driver_settings["input"] = input_directory
self.driver_settings['output'] = output_directory self.driver_settings["output"] = output_directory
# models_rgb must be specified manually for waifu2x-converter-cpp # models_rgb must be specified manually for waifu2x-converter-cpp
# if it's not specified in the arguments, create automatically # if it's not specified in the arguments, create automatically
if self.driver_settings['model-dir'] is None: if self.driver_settings["model-dir"] is None:
self.driver_settings['model-dir'] = pathlib.Path(self.driver_settings['path']).parent / 'models_rgb' self.driver_settings["model-dir"] = (
pathlib.Path(self.driver_settings["path"]).parent / "models_rgb"
)
# list to be executed # list to be executed
# initialize the list with waifu2x binary path as the first element # initialize the list with waifu2x binary path as the first element
execute = [self.driver_settings['path']] execute = [self.driver_settings["path"]]
for key in self.driver_settings.keys(): for key in self.driver_settings.keys():
value = self.driver_settings[key] value = self.driver_settings[key]
# null or None means that leave this option out (keep default) # null or None means that leave this option out (keep default)
if key == 'path' or value is None or value is False: if key == "path" or value is None or value is False:
continue continue
else: else:
if len(key) == 1: if len(key) == 1:
execute.append(f'-{key}') execute.append(f"-{key}")
else: else:
execute.append(f'--{key}') execute.append(f"--{key}")
# true means key is an option # true means key is an option
if value is not True: if value is not True:
@ -121,6 +125,8 @@ class WrapperMain:
# return the Popen object of the new process created # return the Popen object of the new process created
self.print_lock.acquire() self.print_lock.acquire()
Avalon.debug_info(f'[upscaler] Subprocess {os.getpid()} executing: {" ".join(execute)}') Avalon.debug_info(
f'[upscaler] Subprocess {os.getpid()} executing: {" ".join(execute)}'
)
self.print_lock.release() self.print_lock.release()
return subprocess.Popen(execute, stdout=sys.stdout, stderr=sys.stderr) return subprocess.Popen(execute, stdout=sys.stdout, stderr=sys.stderr)

View File

@ -41,29 +41,33 @@ class WrapperMain:
@staticmethod @staticmethod
def parse_arguments(arguments): def parse_arguments(arguments):
# fmt: off
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter, add_help=False) parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter, add_help=False)
parser.error = lambda message: (_ for _ in ()).throw(AttributeError(message)) parser.error = lambda message: (_ for _ in ()).throw(AttributeError(message))
parser.add_argument('--help', action='help', help='show this help message and exit') parser.add_argument("--help", action="help", help="show this help message and exit")
parser.add_argument('-v', action='store_true', help='verbose output') parser.add_argument("-v", action="store_true", help="verbose output")
parser.add_argument('-i', type=str, help=argparse.SUPPRESS) # help='input image path (jpg/png/webp) or directory') parser.add_argument("-i", type=str, help=argparse.SUPPRESS) # help="input image path (jpg/png/webp) or directory")
parser.add_argument('-o', type=str, help=argparse.SUPPRESS) # help='output image path (jpg/png/webp) or directory') parser.add_argument("-o", type=str, help=argparse.SUPPRESS) # help="output image path (jpg/png/webp) or directory")
parser.add_argument('-n', type=int, choices=range(-1, 4), help='denoise level') parser.add_argument("-n", type=int, choices=range(-1, 4), help="denoise level")
parser.add_argument('-s', type=int, help='upscale ratio') parser.add_argument("-s", type=int, help="upscale ratio")
parser.add_argument('-t', type=int, help='tile size (>=32)') parser.add_argument("-t", type=int, help="tile size (>=32)")
parser.add_argument('-m', type=str, help='waifu2x model path') parser.add_argument("-m", type=str, help="waifu2x model path")
parser.add_argument('-g', type=int, help='gpu device to use') parser.add_argument("-g", type=int, help="gpu device to use")
parser.add_argument('-j', type=str, help='thread count for load/proc/save') parser.add_argument("-j", type=str, help="thread count for load/proc/save")
parser.add_argument('-x', action='store_true', help='enable tta mode') parser.add_argument("-x", action="store_true", help="enable tta mode")
parser.add_argument('-f', type=str, help=argparse.SUPPRESS) # help='output image format (jpg/png/webp, default=ext/png)') parser.add_argument("-f", type=str, help=argparse.SUPPRESS) # help="output image format (jpg/png/webp, default=ext/png)")
return parser.parse_args(arguments) return parser.parse_args(arguments)
# fmt: on
def load_configurations(self, upscaler): def load_configurations(self, upscaler):
# self.driver_settings['s'] = int(upscaler.scale_ratio) # self.driver_settings['s'] = int(upscaler.scale_ratio)
self.driver_settings['j'] = '{}:{}:{}'.format(upscaler.processes, upscaler.processes, upscaler.processes) self.driver_settings["j"] = "{}:{}:{}".format(
self.driver_settings['f'] = upscaler.extracted_frame_format.lower() upscaler.processes, upscaler.processes, upscaler.processes
)
self.driver_settings["f"] = upscaler.extracted_frame_format.lower()
def set_scale_ratio(self, scale_ratio: int): def set_scale_ratio(self, scale_ratio: int):
self.driver_settings['s'] = int(scale_ratio) self.driver_settings["s"] = int(scale_ratio)
def upscale(self, input_directory, output_directory): def upscale(self, input_directory, output_directory):
"""This is the core function for waifu2x class """This is the core function for waifu2x class
@ -76,33 +80,33 @@ class WrapperMain:
# change the working directory to the binary's parent directory # change the working directory to the binary's parent directory
# so the binary can find shared object files and other files # so the binary can find shared object files and other files
os.chdir(pathlib.Path(self.driver_settings['path']).parent) os.chdir(pathlib.Path(self.driver_settings["path"]).parent)
# overwrite config file settings # overwrite config file settings
self.driver_settings['i'] = input_directory self.driver_settings["i"] = input_directory
self.driver_settings['o'] = output_directory self.driver_settings["o"] = output_directory
# by default, waifu2x-ncnn-vulkan will look for the models under the current working directory # by default, waifu2x-ncnn-vulkan will look for the models under the current working directory
# change the working directory to its containing folder if model directory not specified # change the working directory to its containing folder if model directory not specified
if self.driver_settings['m'] is None and platform.system() == 'Windows': if self.driver_settings["m"] is None and platform.system() == "Windows":
os.chdir(pathlib.Path(self.driver_settings['path']).parent) os.chdir(pathlib.Path(self.driver_settings["path"]).parent)
# list to be executed # list to be executed
# initialize the list with waifu2x binary path as the first element # initialize the list with waifu2x binary path as the first element
execute = [self.driver_settings['path']] execute = [self.driver_settings["path"]]
for key in self.driver_settings.keys(): for key in self.driver_settings.keys():
value = self.driver_settings[key] value = self.driver_settings[key]
# null or None means that leave this option out (keep default) # null or None means that leave this option out (keep default)
if key == 'path' or value is None or value is False: if key == "path" or value is None or value is False:
continue continue
else: else:
if len(key) == 1: if len(key) == 1:
execute.append(f'-{key}') execute.append(f"-{key}")
else: else:
execute.append(f'--{key}') execute.append(f"--{key}")
# true means key is an option # true means key is an option
if value is not True: if value is not True:
@ -110,6 +114,8 @@ class WrapperMain:
# return the Popen object of the new process created # return the Popen object of the new process created
self.print_lock.acquire() self.print_lock.acquire()
Avalon.debug_info(f'[upscaler] Subprocess {os.getpid()} executing: {" ".join(execute)}') Avalon.debug_info(
f'[upscaler] Subprocess {os.getpid()} executing: {" ".join(execute)}'
)
self.print_lock.release() self.print_lock.release()
return subprocess.Popen(execute, stdout=sys.stdout, stderr=sys.stderr) return subprocess.Popen(execute, stdout=sys.stdout, stderr=sys.stderr)