formatted code with black

This commit is contained in:
K4YT3X 2020-12-19 18:11:11 -05:00
parent 09345703e6
commit bfda833bcf
14 changed files with 2117 additions and 1090 deletions

View File

@ -12,14 +12,14 @@ import io
class BiLogger(io.TextIOWrapper):
""" A bidirectional logger that both prints the output
"""A bidirectional logger that both prints the output
and log all output to file.
Original code from: https://stackoverflow.com/a/14906787
"""
def __init__(self, terminal: io.TextIOWrapper, log_file: io.BufferedRandom):
""" initialize BiLogger
"""initialize BiLogger
Args:
terminal (_io.TextIOWrapper): original terminal IO wrapper
@ -30,7 +30,7 @@ class BiLogger(io.TextIOWrapper):
self.fileno = self.log_file.fileno
def write(self, message: str):
""" write message to original terminal output and log file
"""write message to original terminal output and log file
Args:
message (str): message to write
@ -41,6 +41,5 @@ class BiLogger(io.TextIOWrapper):
self.log_file.flush()
def flush(self):
""" flush logger (for compability only)
"""
"""flush logger (for compability only)"""
pass

View File

@ -22,7 +22,7 @@ import time
class ImageCleaner(threading.Thread):
""" Video2X Image Cleaner
"""Video2X Image Cleaner
This class creates an object that keeps track of extracted
frames that has already been upscaled and are not needed
@ -40,8 +40,7 @@ class ImageCleaner(threading.Thread):
self.running = False
def run(self):
""" Run image cleaner
"""
"""Run image cleaner"""
self.running = True
while self.running:
@ -49,13 +48,12 @@ class ImageCleaner(threading.Thread):
time.sleep(1)
def stop(self):
""" Stop the image cleaner
"""
"""Stop the image cleaner"""
self.running = False
self.join()
def remove_upscaled_frames(self):
""" remove frames that have already been upscaled
"""remove frames that have already been upscaled
This method compares the files in the extracted frames
directory with the upscaled frames directory, and removes

View File

@ -17,7 +17,7 @@ from tqdm import tqdm
class ProgressMonitor(threading.Thread):
""" progress monitor
"""progress monitor
This class provides progress monitoring functionalities
by keeping track of the amount of frames in the input
@ -34,7 +34,15 @@ class ProgressMonitor(threading.Thread):
def run(self):
self.running = True
with tqdm(total=self.upscaler.total_frames, ascii=True, desc=_('Processing: {} (pass {}/{})').format(self.upscaler.current_input_file.name, self.upscaler.current_pass, len(self.upscaler.scaling_jobs))) as progress_bar:
with tqdm(
total=self.upscaler.total_frames,
ascii=True,
desc=_("Processing: {} (pass {}/{})").format(
self.upscaler.current_input_file.name,
self.upscaler.current_pass,
len(self.upscaler.scaling_jobs),
),
) as progress_bar:
# tqdm update method adds the value to the progress
# bar instead of setting the value. Therefore, a delta
# needs to be calculated.
@ -42,7 +50,13 @@ class ProgressMonitor(threading.Thread):
while self.running:
with contextlib.suppress(FileNotFoundError):
upscaled_frames = [f for f in self.upscaler.upscaled_frames.iterdir() if str(f).lower().endswith(self.upscaler.extracted_frame_format.lower())]
upscaled_frames = [
f
for f in self.upscaler.upscaled_frames.iterdir()
if str(f)
.lower()
.endswith(self.upscaler.extracted_frame_format.lower())
]
if len(upscaled_frames) >= 1:
self.upscaler.last_frame_upscaled = sorted(upscaled_frames)[-1]
self.upscaler.total_frames_upscaled = len(upscaled_frames)

View File

@ -43,40 +43,44 @@ from tqdm import tqdm
import magic
# internationalization constants
DOMAIN = 'video2x'
LOCALE_DIRECTORY = pathlib.Path(__file__).parent.absolute() / 'locale'
DOMAIN = "video2x"
LOCALE_DIRECTORY = pathlib.Path(__file__).parent.absolute() / "locale"
# getting default locale settings
default_locale, encoding = locale.getdefaultlocale()
language = gettext.translation(DOMAIN, LOCALE_DIRECTORY, [default_locale], fallback=True)
language = gettext.translation(
DOMAIN, LOCALE_DIRECTORY, [default_locale], fallback=True
)
language.install()
_ = language.gettext
# version information
UPSCALER_VERSION = '4.4.1'
UPSCALER_VERSION = "4.4.1"
# these names are consistent for
# - driver selection in command line
# - driver wrapper file names
# - config file keys
AVAILABLE_DRIVERS = ['waifu2x_caffe',
'waifu2x_converter_cpp',
'waifu2x_ncnn_vulkan',
'srmd_ncnn_vulkan',
'realsr_ncnn_vulkan',
'anime4kcpp']
AVAILABLE_DRIVERS = [
"waifu2x_caffe",
"waifu2x_converter_cpp",
"waifu2x_ncnn_vulkan",
"srmd_ncnn_vulkan",
"realsr_ncnn_vulkan",
"anime4kcpp",
]
# fixed scaling ratios supported by the drivers
# that only support certain fixed scale ratios
DRIVER_FIXED_SCALING_RATIOS = {
'waifu2x_ncnn_vulkan': [1, 2],
'srmd_ncnn_vulkan': [2, 3, 4],
'realsr_ncnn_vulkan': [4],
"waifu2x_ncnn_vulkan": [1, 2],
"srmd_ncnn_vulkan": [2, 3, 4],
"realsr_ncnn_vulkan": [4],
}
class Upscaler:
""" An instance of this class is a upscaler that will
"""An instance of this class is a upscaler that will
upscale all images in the given directory.
Raises:
@ -91,17 +95,18 @@ class Upscaler:
driver_settings: dict,
ffmpeg_settings: dict,
gifski_settings: dict,
driver: str = 'waifu2x_caffe',
driver: str = "waifu2x_caffe",
scale_ratio: float = None,
scale_width: int = None,
scale_height: int = None,
processes: int = 1,
video2x_cache_directory: pathlib.Path = pathlib.Path(tempfile.gettempdir()) / 'video2x',
extracted_frame_format: str = 'png',
output_file_name_format_string: str = '{original_file_name}_output{extension}',
image_output_extension: str = '.png',
video_output_extension: str = '.mp4',
preserve_frames: bool = False
video2x_cache_directory: pathlib.Path = pathlib.Path(tempfile.gettempdir())
/ "video2x",
extracted_frame_format: str = "png",
output_file_name_format_string: str = "{original_file_name}_output{extension}",
image_output_extension: str = ".png",
video_output_extension: str = ".mp4",
preserve_frames: bool = False,
):
# required parameters
@ -137,109 +142,155 @@ class Upscaler:
self.last_frame_upscaled = pathlib.Path()
def create_temp_directories(self):
"""create temporary directories
"""
"""create temporary directories"""
# if cache directory unspecified, use %TEMP%\video2x
if self.video2x_cache_directory is None:
self.video2x_cache_directory = pathlib.Path(tempfile.gettempdir()) / 'video2x'
self.video2x_cache_directory = (
pathlib.Path(tempfile.gettempdir()) / "video2x"
)
# if specified cache path exists and isn't a directory
if self.video2x_cache_directory.exists() and not self.video2x_cache_directory.is_dir():
Avalon.error(_('Specified or default cache directory is a file/link'))
raise FileExistsError('Specified or default cache directory is a file/link')
if (
self.video2x_cache_directory.exists()
and not self.video2x_cache_directory.is_dir()
):
Avalon.error(_("Specified or default cache directory is a file/link"))
raise FileExistsError("Specified or default cache directory is a file/link")
# if cache directory doesn't exist, try creating it
if not self.video2x_cache_directory.exists():
try:
Avalon.debug_info(_('Creating cache directory {}').format(self.video2x_cache_directory))
Avalon.debug_info(
_("Creating cache directory {}").format(
self.video2x_cache_directory
)
)
self.video2x_cache_directory.mkdir(parents=True, exist_ok=True)
except Exception as exception:
Avalon.error(_('Unable to create {}').format(self.video2x_cache_directory))
Avalon.error(
_("Unable to create {}").format(self.video2x_cache_directory)
)
raise exception
# create temp directories for extracted frames and upscaled frames
self.extracted_frames = pathlib.Path(tempfile.mkdtemp(dir=self.video2x_cache_directory))
Avalon.debug_info(_('Extracted frames are being saved to: {}').format(self.extracted_frames))
self.upscaled_frames = pathlib.Path(tempfile.mkdtemp(dir=self.video2x_cache_directory))
Avalon.debug_info(_('Upscaled frames are being saved to: {}').format(self.upscaled_frames))
self.extracted_frames = pathlib.Path(
tempfile.mkdtemp(dir=self.video2x_cache_directory)
)
Avalon.debug_info(
_("Extracted frames are being saved to: {}").format(self.extracted_frames)
)
self.upscaled_frames = pathlib.Path(
tempfile.mkdtemp(dir=self.video2x_cache_directory)
)
Avalon.debug_info(
_("Upscaled frames are being saved to: {}").format(self.upscaled_frames)
)
def cleanup_temp_directories(self):
"""delete temp directories when done
"""
"""delete temp directories when done"""
if not self.preserve_frames:
for directory in [self.extracted_frames, self.upscaled_frames, self.video2x_cache_directory]:
for directory in [
self.extracted_frames,
self.upscaled_frames,
self.video2x_cache_directory,
]:
try:
# avalon framework cannot be used if python is shutting down
# therefore, plain print is used
print(_('Cleaning up cache directory: {}').format(directory))
print(_("Cleaning up cache directory: {}").format(directory))
shutil.rmtree(directory)
except FileNotFoundError:
pass
except OSError:
print(_('Unable to delete: {}').format(directory))
print(_("Unable to delete: {}").format(directory))
traceback.print_exc()
def _check_arguments(self):
if isinstance(self.input, list):
if self.output.exists() and not self.output.is_dir():
Avalon.error(_('Input and output path type mismatch'))
Avalon.error(_('Input is multiple files but output is not directory'))
raise ArgumentError('input output path type mismatch')
Avalon.error(_("Input and output path type mismatch"))
Avalon.error(_("Input is multiple files but output is not directory"))
raise ArgumentError("input output path type mismatch")
for input_path in self.input:
if not input_path.is_file() and not input_path.is_dir():
Avalon.error(_('Input path {} is neither a file nor a directory').format(input_path))
raise FileNotFoundError(f'{input_path} is neither file nor directory')
Avalon.error(
_("Input path {} is neither a file nor a directory").format(
input_path
)
)
raise FileNotFoundError(
f"{input_path} is neither file nor directory"
)
with contextlib.suppress(FileNotFoundError):
if input_path.samefile(self.output):
Avalon.error(_('Input directory and output directory cannot be the same'))
raise FileExistsError('input directory and output directory are the same')
Avalon.error(
_("Input directory and output directory cannot be the same")
)
raise FileExistsError(
"input directory and output directory are the same"
)
# if input is a file
elif self.input.is_file():
if self.output.is_dir():
Avalon.error(_('Input and output path type mismatch'))
Avalon.error(_('Input is single file but output is directory'))
raise ArgumentError('input output path type mismatch')
if self.output.suffix == '':
Avalon.error(_('No suffix found in output file path'))
Avalon.error(_('Suffix must be specified'))
raise ArgumentError('no output file suffix specified')
Avalon.error(_("Input and output path type mismatch"))
Avalon.error(_("Input is single file but output is directory"))
raise ArgumentError("input output path type mismatch")
if self.output.suffix == "":
Avalon.error(_("No suffix found in output file path"))
Avalon.error(_("Suffix must be specified"))
raise ArgumentError("no output file suffix specified")
# if input is a directory
elif self.input.is_dir():
if self.output.is_file():
Avalon.error(_('Input and output path type mismatch'))
Avalon.error(_('Input is directory but output is existing single file'))
raise ArgumentError('input output path type mismatch')
Avalon.error(_("Input and output path type mismatch"))
Avalon.error(_("Input is directory but output is existing single file"))
raise ArgumentError("input output path type mismatch")
with contextlib.suppress(FileNotFoundError):
if self.input.samefile(self.output):
Avalon.error(_('Input directory and output directory cannot be the same'))
raise FileExistsError('input directory and output directory are the same')
Avalon.error(
_("Input directory and output directory cannot be the same")
)
raise FileExistsError(
"input directory and output directory are the same"
)
# if input is neither
else:
Avalon.error(_('Input path is neither a file nor a directory'))
raise FileNotFoundError(f'{self.input} is neither file nor directory')
Avalon.error(_("Input path is neither a file nor a directory"))
raise FileNotFoundError(f"{self.input} is neither file nor directory")
# check FFmpeg settings
ffmpeg_path = pathlib.Path(self.ffmpeg_settings['ffmpeg_path'])
if not ((pathlib.Path(ffmpeg_path / 'ffmpeg.exe').is_file() and
pathlib.Path(ffmpeg_path / 'ffprobe.exe').is_file()) or
(pathlib.Path(ffmpeg_path / 'ffmpeg').is_file() and
pathlib.Path(ffmpeg_path / 'ffprobe').is_file())):
Avalon.error(_('FFmpeg or FFprobe cannot be found under the specified path'))
Avalon.error(_('Please check the configuration file settings'))
raise FileNotFoundError(self.ffmpeg_settings['ffmpeg_path'])
ffmpeg_path = pathlib.Path(self.ffmpeg_settings["ffmpeg_path"])
if not (
(
pathlib.Path(ffmpeg_path / "ffmpeg.exe").is_file()
and pathlib.Path(ffmpeg_path / "ffprobe.exe").is_file()
)
or (
pathlib.Path(ffmpeg_path / "ffmpeg").is_file()
and pathlib.Path(ffmpeg_path / "ffprobe").is_file()
)
):
Avalon.error(
_("FFmpeg or FFprobe cannot be found under the specified path")
)
Avalon.error(_("Please check the configuration file settings"))
raise FileNotFoundError(self.ffmpeg_settings["ffmpeg_path"])
# check if driver settings
driver_settings = copy.deepcopy(self.driver_settings)
driver_path = driver_settings.pop('path')
driver_path = driver_settings.pop("path")
# check if driver path exists
if not (pathlib.Path(driver_path).is_file() or pathlib.Path(f'{driver_path}.exe').is_file()):
Avalon.error(_('Specified driver executable directory doesn\'t exist'))
Avalon.error(_('Please check the configuration file settings'))
if not (
pathlib.Path(driver_path).is_file()
or pathlib.Path(f"{driver_path}.exe").is_file()
):
Avalon.error(_("Specified driver executable directory doesn't exist"))
Avalon.error(_("Please check the configuration file settings"))
raise FileNotFoundError(driver_path)
# parse driver arguments using driver's parser
@ -255,21 +306,25 @@ class Upscaler:
else:
if len(key) == 1:
driver_arguments.append(f'-{key}')
driver_arguments.append(f"-{key}")
else:
driver_arguments.append(f'--{key}')
driver_arguments.append(f"--{key}")
# true means key is an option
if value is not True:
driver_arguments.append(str(value))
DriverWrapperMain = getattr(importlib.import_module(f'wrappers.{self.driver}'), 'WrapperMain')
DriverWrapperMain = getattr(
importlib.import_module(f"wrappers.{self.driver}"), "WrapperMain"
)
DriverWrapperMain.parse_arguments(driver_arguments)
except AttributeError as e:
Avalon.error(_('Failed to parse driver argument: {}').format(e.args[0]))
Avalon.error(_("Failed to parse driver argument: {}").format(e.args[0]))
raise e
def _upscale_frames(self, input_directory: pathlib.Path, output_directory: pathlib.Path):
""" Upscale video frames with waifu2x-caffe
def _upscale_frames(
self, input_directory: pathlib.Path, output_directory: pathlib.Path
):
"""Upscale video frames with waifu2x-caffe
This function upscales all the frames extracted
by ffmpeg using the waifu2x-caffe binary.
@ -285,7 +340,9 @@ class Upscaler:
# initialize waifu2x driver
if self.driver not in AVAILABLE_DRIVERS:
raise UnrecognizedDriverError(_('Unrecognized driver: {}').format(self.driver))
raise UnrecognizedDriverError(
_("Unrecognized driver: {}").format(self.driver)
)
# list all images in the extracted frames
frames = [(input_directory / f) for f in input_directory.iterdir() if f.is_file]
@ -308,7 +365,13 @@ class Upscaler:
process_directory.mkdir(parents=True, exist_ok=True)
# waifu2x-converter-cpp will perform multi-threading within its own process
if self.driver in ['waifu2x_converter_cpp', 'waifu2x_ncnn_vulkan', 'srmd_ncnn_vulkan', 'realsr_ncnn_vulkan', 'anime4kcpp']:
if self.driver in [
"waifu2x_converter_cpp",
"waifu2x_ncnn_vulkan",
"srmd_ncnn_vulkan",
"realsr_ncnn_vulkan",
"anime4kcpp",
]:
process_directories = [input_directory]
else:
@ -318,20 +381,26 @@ class Upscaler:
# move image
image.rename(process_directories[0] / image.name)
# rotate list
process_directories = process_directories[-1:] + process_directories[:-1]
process_directories = (
process_directories[-1:] + process_directories[:-1]
)
# create driver processes and start them
for process_directory in process_directories:
self.process_pool.append(self.driver_object.upscale(process_directory, output_directory))
self.process_pool.append(
self.driver_object.upscale(process_directory, output_directory)
)
# start progress bar in a different thread
Avalon.debug_info(_('Starting progress monitor'))
Avalon.debug_info(_("Starting progress monitor"))
self.progress_monitor = ProgressMonitor(self, process_directories)
self.progress_monitor.start()
# create the clearer and start it
Avalon.debug_info(_('Starting upscaled image cleaner'))
self.image_cleaner = ImageCleaner(input_directory, output_directory, len(self.process_pool))
Avalon.debug_info(_("Starting upscaled image cleaner"))
self.image_cleaner = ImageCleaner(
input_directory, output_directory, len(self.process_pool)
)
self.image_cleaner.start()
# wait for all process to exit
@ -339,38 +408,39 @@ class Upscaler:
self._wait()
except (Exception, KeyboardInterrupt, SystemExit) as e:
# cleanup
Avalon.debug_info(_('Killing progress monitor'))
Avalon.debug_info(_("Killing progress monitor"))
self.progress_monitor.stop()
Avalon.debug_info(_('Killing upscaled image cleaner'))
Avalon.debug_info(_("Killing upscaled image cleaner"))
self.image_cleaner.stop()
raise e
# if the driver is waifu2x-converter-cpp
# images need to be renamed to be recognizable for FFmpeg
if self.driver == 'waifu2x_converter_cpp':
if self.driver == "waifu2x_converter_cpp":
for image in [f for f in output_directory.iterdir() if f.is_file()]:
renamed = re.sub(f'_\\[.*\\]\\[x(\\d+(\\.\\d+)?)\\]\\.{self.extracted_frame_format}',
f'.{self.extracted_frame_format}',
str(image.name))
renamed = re.sub(
f"_\\[.*\\]\\[x(\\d+(\\.\\d+)?)\\]\\.{self.extracted_frame_format}",
f".{self.extracted_frame_format}",
str(image.name),
)
(output_directory / image).rename(output_directory / renamed)
# upscaling done, kill helper threads
Avalon.debug_info(_('Killing progress monitor'))
Avalon.debug_info(_("Killing progress monitor"))
self.progress_monitor.stop()
Avalon.debug_info(_('Killing upscaled image cleaner'))
Avalon.debug_info(_("Killing upscaled image cleaner"))
self.image_cleaner.stop()
def _terminate_subprocesses(self):
Avalon.warning(_('Terminating all processes'))
Avalon.warning(_("Terminating all processes"))
for process in self.process_pool:
process.terminate()
def _wait(self):
""" wait for subprocesses in process pool to complete
"""
Avalon.debug_info(_('Main process waiting for subprocesses to exit'))
"""wait for subprocesses in process pool to complete"""
Avalon.debug_info(_("Main process waiting for subprocesses to exit"))
try:
# while process pool not empty
@ -389,27 +459,37 @@ class Upscaler:
# if return code is not 0
elif process_status != 0:
Avalon.error(_('Subprocess {} exited with code {}').format(process.pid, process_status))
raise subprocess.CalledProcessError(process_status, process.args)
Avalon.error(
_("Subprocess {} exited with code {}").format(
process.pid, process_status
)
)
raise subprocess.CalledProcessError(
process_status, process.args
)
else:
Avalon.debug_info(_('Subprocess {} exited with code {}').format(process.pid, process_status))
Avalon.debug_info(
_("Subprocess {} exited with code {}").format(
process.pid, process_status
)
)
self.process_pool.remove(process)
time.sleep(0.1)
except (KeyboardInterrupt, SystemExit) as e:
Avalon.warning(_('Stop signal received'))
Avalon.warning(_("Stop signal received"))
self._terminate_subprocesses()
raise e
except (Exception, subprocess.CalledProcessError) as e:
Avalon.error(_('Subprocess execution ran into an error'))
Avalon.error(_("Subprocess execution ran into an error"))
self._terminate_subprocesses()
raise e
def run(self):
""" Main controller for Video2X
"""Main controller for Video2X
This function controls the flow of video conversion
and handles all necessary functions.
@ -422,20 +502,24 @@ class Upscaler:
self.process_pool = []
# load driver modules
DriverWrapperMain = getattr(importlib.import_module(f'wrappers.{self.driver}'), 'WrapperMain')
DriverWrapperMain = getattr(
importlib.import_module(f"wrappers.{self.driver}"), "WrapperMain"
)
self.driver_object = DriverWrapperMain(self.driver_settings)
# load options from upscaler class into driver settings
self.driver_object.load_configurations(self)
# initialize FFmpeg object
self.ffmpeg_object = Ffmpeg(self.ffmpeg_settings, extracted_frame_format=self.extracted_frame_format)
self.ffmpeg_object = Ffmpeg(
self.ffmpeg_settings, extracted_frame_format=self.extracted_frame_format
)
# define processing queue
self.processing_queue = queue.Queue()
Avalon.info(_('Loading files into processing queue'))
Avalon.debug_info(_('Input path(s): {}').format(self.input))
Avalon.info(_("Loading files into processing queue"))
Avalon.debug_info(_("Input path(s): {}").format(self.input))
# make output directory if the input is a list or a directory
if isinstance(self.input, list) or self.input.is_dir():
@ -470,39 +554,53 @@ class Upscaler:
# get file type
# try python-magic if it's available
try:
input_file_mime_type = magic.from_file(str(input_path.absolute()), mime=True)
input_file_type = input_file_mime_type.split('/')[0]
input_file_subtype = input_file_mime_type.split('/')[1]
input_file_mime_type = magic.from_file(
str(input_path.absolute()), mime=True
)
input_file_type = input_file_mime_type.split("/")[0]
input_file_subtype = input_file_mime_type.split("/")[1]
except Exception:
input_file_mime_type = input_file_type = input_file_subtype = ''
input_file_mime_type = input_file_type = input_file_subtype = ""
# if python-magic doesn't determine the file to be an image/video file
# fall back to mimetypes to guess the file type based on the extension
if input_file_type not in ['image', 'video']:
if input_file_type not in ["image", "video"]:
# in case python-magic fails to detect file type
# try guessing file mime type with mimetypes
input_file_mime_type = mimetypes.guess_type(input_path.name)[0]
input_file_type = input_file_mime_type.split('/')[0]
input_file_subtype = input_file_mime_type.split('/')[1]
input_file_type = input_file_mime_type.split("/")[0]
input_file_subtype = input_file_mime_type.split("/")[1]
Avalon.debug_info(_('File MIME type: {}').format(input_file_mime_type))
Avalon.debug_info(_("File MIME type: {}").format(input_file_mime_type))
# set default output file suffixes
# if image type is GIF, default output suffix is also .gif
if input_file_mime_type == 'image/gif':
output_path = self.output / self.output_file_name_format_string.format(original_file_name=input_path.stem, extension='.gif')
if input_file_mime_type == "image/gif":
output_path = self.output / self.output_file_name_format_string.format(
original_file_name=input_path.stem, extension=".gif"
)
elif input_file_type == 'image':
output_path = self.output / self.output_file_name_format_string.format(original_file_name=input_path.stem, extension=self.image_output_extension)
elif input_file_type == "image":
output_path = self.output / self.output_file_name_format_string.format(
original_file_name=input_path.stem,
extension=self.image_output_extension,
)
elif input_file_type == 'video':
output_path = self.output / self.output_file_name_format_string.format(original_file_name=input_path.stem, extension=self.video_output_extension)
elif input_file_type == "video":
output_path = self.output / self.output_file_name_format_string.format(
original_file_name=input_path.stem,
extension=self.video_output_extension,
)
# if file is none of: image, image/gif, video
# skip to the next task
else:
Avalon.error(_('File {} ({}) neither an image nor a video').format(input_path, input_file_mime_type))
Avalon.warning(_('Skipping this file'))
Avalon.error(
_("File {} ({}) neither an image nor a video").format(
input_path, input_file_mime_type
)
)
Avalon.warning(_("Skipping this file"))
continue
# if there is only one input file
@ -512,14 +610,24 @@ class Upscaler:
output_path_id = 0
while str(output_path) in output_paths:
output_path = output_path.parent / pathlib.Path(f'{output_path.stem}_{output_path_id}{output_path.suffix}')
output_path = output_path.parent / pathlib.Path(
f"{output_path.stem}_{output_path_id}{output_path.suffix}"
)
output_path_id += 1
# record output path
output_paths.append(str(output_path))
# push file information into processing queue
self.processing_queue.put((input_path.absolute(), output_path.absolute(), input_file_mime_type, input_file_type, input_file_subtype))
self.processing_queue.put(
(
input_path.absolute(),
output_path.absolute(),
input_file_mime_type,
input_file_type,
input_file_subtype,
)
)
# check argument sanity before running
self._check_arguments()
@ -527,22 +635,28 @@ class Upscaler:
# record file count for external calls
self.total_files = self.processing_queue.qsize()
Avalon.info(_('Loaded files into processing queue'))
Avalon.info(_("Loaded files into processing queue"))
# print all files in queue for debugging
for job in self.processing_queue.queue:
Avalon.debug_info(_('Input file: {}').format(job[0].absolute()))
Avalon.debug_info(_("Input file: {}").format(job[0].absolute()))
try:
while not self.processing_queue.empty():
# get new job from queue
self.current_input_file, output_path, input_file_mime_type, input_file_type, input_file_subtype = self.processing_queue.get()
(
self.current_input_file,
output_path,
input_file_mime_type,
input_file_type,
input_file_subtype,
) = self.processing_queue.get()
# get current job starting time for GUI calculations
self.current_processing_starting_time = time.time()
# get video information JSON using FFprobe
Avalon.info(_('Reading file information'))
Avalon.info(_("Reading file information"))
file_info = self.ffmpeg_object.probe_file_info(self.current_input_file)
# create temporary directories for storing frames
@ -550,50 +664,61 @@ class Upscaler:
# start handling input
# if input file is a static image
if input_file_type == 'image' and input_file_subtype != 'gif':
Avalon.info(_('Starting upscaling image'))
if input_file_type == "image" and input_file_subtype != "gif":
Avalon.info(_("Starting upscaling image"))
# copy original file into the pre-processing directory
shutil.copy(self.current_input_file, self.extracted_frames / self.current_input_file.name)
shutil.copy(
self.current_input_file,
self.extracted_frames / self.current_input_file.name,
)
width = int(file_info['streams'][0]['width'])
height = int(file_info['streams'][0]['height'])
width = int(file_info["streams"][0]["width"])
height = int(file_info["streams"][0]["height"])
framerate = self.total_frames = 1
# elif input_file_mime_type == 'image/gif' or input_file_type == 'video':
else:
Avalon.info(_('Starting upscaling video/GIF'))
Avalon.info(_("Starting upscaling video/GIF"))
# find index of video stream
video_stream_index = None
for stream in file_info['streams']:
if stream['codec_type'] == 'video':
video_stream_index = stream['index']
for stream in file_info["streams"]:
if stream["codec_type"] == "video":
video_stream_index = stream["index"]
break
# exit if no video stream found
if video_stream_index is None:
Avalon.error(_('Aborting: No video stream found'))
raise StreamNotFoundError('no video stream found')
Avalon.error(_("Aborting: No video stream found"))
raise StreamNotFoundError("no video stream found")
# get average frame rate of video stream
framerate = float(Fraction(file_info['streams'][video_stream_index]['r_frame_rate']))
width = int(file_info['streams'][video_stream_index]['width'])
height = int(file_info['streams'][video_stream_index]['height'])
framerate = float(
Fraction(
file_info["streams"][video_stream_index]["r_frame_rate"]
)
)
width = int(file_info["streams"][video_stream_index]["width"])
height = int(file_info["streams"][video_stream_index]["height"])
# get total number of frames
Avalon.info(_('Getting total number of frames in the file'))
Avalon.info(_("Getting total number of frames in the file"))
# if container stores total number of frames in nb_frames, fetch it directly
if 'nb_frames' in file_info['streams'][video_stream_index]:
self.total_frames = int(file_info['streams'][video_stream_index]['nb_frames'])
if "nb_frames" in file_info["streams"][video_stream_index]:
self.total_frames = int(
file_info["streams"][video_stream_index]["nb_frames"]
)
# otherwise call FFprobe to count the total number of frames
else:
self.total_frames = self.ffmpeg_object.get_number_of_frames(self.current_input_file, video_stream_index)
self.total_frames = self.ffmpeg_object.get_number_of_frames(
self.current_input_file, video_stream_index
)
# calculate scale width/height/ratio and scaling jobs if required
Avalon.info(_('Calculating scaling parameters'))
Avalon.info(_("Calculating scaling parameters"))
# create a local copy of the global output settings
output_scale = self.scale_ratio
@ -624,7 +749,9 @@ class Upscaler:
if self.driver in DRIVER_FIXED_SCALING_RATIOS:
# select the optimal driver scaling ratio to use
supported_scaling_ratios = sorted(DRIVER_FIXED_SCALING_RATIOS[self.driver])
supported_scaling_ratios = sorted(
DRIVER_FIXED_SCALING_RATIOS[self.driver]
)
remaining_scaling_ratio = math.ceil(output_scale)
self.scaling_jobs = []
@ -654,46 +781,68 @@ class Upscaler:
break
if found is False:
self.scaling_jobs.append(supported_scaling_ratios[-1])
remaining_scaling_ratio /= supported_scaling_ratios[-1]
self.scaling_jobs.append(
supported_scaling_ratios[-1]
)
remaining_scaling_ratio /= supported_scaling_ratios[
-1
]
else:
self.scaling_jobs = [output_scale]
# print file information
Avalon.debug_info(_('Framerate: {}').format(framerate))
Avalon.debug_info(_('Width: {}').format(width))
Avalon.debug_info(_('Height: {}').format(height))
Avalon.debug_info(_('Total number of frames: {}').format(self.total_frames))
Avalon.debug_info(_('Output width: {}').format(output_width))
Avalon.debug_info(_('Output height: {}').format(output_height))
Avalon.debug_info(_('Required scale ratio: {}').format(output_scale))
Avalon.debug_info(_('Upscaling jobs queue: {}').format(self.scaling_jobs))
Avalon.debug_info(_("Framerate: {}").format(framerate))
Avalon.debug_info(_("Width: {}").format(width))
Avalon.debug_info(_("Height: {}").format(height))
Avalon.debug_info(
_("Total number of frames: {}").format(self.total_frames)
)
Avalon.debug_info(_("Output width: {}").format(output_width))
Avalon.debug_info(_("Output height: {}").format(output_height))
Avalon.debug_info(_("Required scale ratio: {}").format(output_scale))
Avalon.debug_info(
_("Upscaling jobs queue: {}").format(self.scaling_jobs)
)
# extract frames from video
if input_file_mime_type == 'image/gif' or input_file_type == 'video':
self.process_pool.append((self.ffmpeg_object.extract_frames(self.current_input_file, self.extracted_frames)))
if input_file_mime_type == "image/gif" or input_file_type == "video":
self.process_pool.append(
(
self.ffmpeg_object.extract_frames(
self.current_input_file, self.extracted_frames
)
)
)
self._wait()
# if driver is waifu2x-caffe
# pass pixel format output depth information
if self.driver == 'waifu2x_caffe':
if self.driver == "waifu2x_caffe":
# get a dict of all pixel formats and corresponding bit depth
pixel_formats = self.ffmpeg_object.get_pixel_formats()
# try getting pixel format's corresponding bti depth
try:
self.driver_settings['output_depth'] = pixel_formats[self.ffmpeg_object.pixel_format]
self.driver_settings["output_depth"] = pixel_formats[
self.ffmpeg_object.pixel_format
]
except KeyError:
Avalon.error(_('Unsupported pixel format: {}').format(self.ffmpeg_object.pixel_format))
raise UnsupportedPixelError(f'unsupported pixel format {self.ffmpeg_object.pixel_format}')
Avalon.error(
_("Unsupported pixel format: {}").format(
self.ffmpeg_object.pixel_format
)
)
raise UnsupportedPixelError(
f"unsupported pixel format {self.ffmpeg_object.pixel_format}"
)
# upscale images one by one using waifu2x
Avalon.info(_('Starting to upscale extracted frames'))
Avalon.info(_("Starting to upscale extracted frames"))
upscale_begin_time = time.time()
self.current_pass = 1
if self.driver == 'waifu2x_caffe':
if self.driver == "waifu2x_caffe":
self.driver_object.set_scale_resolution(output_width, output_height)
else:
self.driver_object.set_scale_ratio(self.scaling_jobs[0])
@ -706,22 +855,39 @@ class Upscaler:
self.upscaled_frames.mkdir(parents=True, exist_ok=True)
self._upscale_frames(self.extracted_frames, self.upscaled_frames)
Avalon.info(_('Upscaling completed'))
Avalon.info(_('Average processing speed: {} seconds per frame').format(self.total_frames / (time.time() - upscale_begin_time)))
Avalon.info(_("Upscaling completed"))
Avalon.info(
_("Average processing speed: {} seconds per frame").format(
self.total_frames / (time.time() - upscale_begin_time)
)
)
# downscale frames with Lanczos
Avalon.info(_('Lanczos downscaling frames'))
Avalon.info(_("Lanczos downscaling frames"))
shutil.rmtree(self.extracted_frames)
shutil.move(self.upscaled_frames, self.extracted_frames)
self.upscaled_frames.mkdir(parents=True, exist_ok=True)
for image in tqdm([i for i in self.extracted_frames.iterdir() if i.is_file() and i.name.endswith(self.extracted_frame_format)], ascii=True, desc=_('Downscaling')):
for image in tqdm(
[
i
for i in self.extracted_frames.iterdir()
if i.is_file() and i.name.endswith(self.extracted_frame_format)
],
ascii=True,
desc=_("Downscaling"),
):
image_object = Image.open(image)
# if the image dimensions are not equal to the output size
# resize the image using Lanczos
if (image_object.width, image_object.height) != (output_width, output_height):
image_object.resize((output_width, output_height), Image.LANCZOS).save(self.upscaled_frames / image.name)
if (image_object.width, image_object.height) != (
output_width,
output_height,
):
image_object.resize(
(output_width, output_height), Image.LANCZOS
).save(self.upscaled_frames / image.name)
image_object.close()
# if the image's dimensions are already equal to the output size
@ -732,71 +898,117 @@ class Upscaler:
# start handling output
# output can be either GIF or video
if input_file_type == 'image' and input_file_subtype != 'gif':
if input_file_type == "image" and input_file_subtype != "gif":
Avalon.info(_('Exporting image'))
Avalon.info(_("Exporting image"))
# there should be only one image in the directory
shutil.move([f for f in self.upscaled_frames.iterdir() if f.is_file()][0], output_path)
shutil.move(
[f for f in self.upscaled_frames.iterdir() if f.is_file()][0],
output_path,
)
# elif input_file_mime_type == 'image/gif' or input_file_type == 'video':
else:
# if the desired output is gif file
if output_path.suffix.lower() == '.gif':
Avalon.info(_('Converting extracted frames into GIF image'))
if output_path.suffix.lower() == ".gif":
Avalon.info(_("Converting extracted frames into GIF image"))
gifski_object = Gifski(self.gifski_settings)
self.process_pool.append(gifski_object.make_gif(self.upscaled_frames, output_path, framerate, self.extracted_frame_format, output_width, output_height))
self.process_pool.append(
gifski_object.make_gif(
self.upscaled_frames,
output_path,
framerate,
self.extracted_frame_format,
output_width,
output_height,
)
)
self._wait()
Avalon.info(_('Conversion completed'))
Avalon.info(_("Conversion completed"))
# if the desired output is video
else:
# frames to video
Avalon.info(_('Converting extracted frames into video'))
self.process_pool.append(self.ffmpeg_object.assemble_video(framerate, self.upscaled_frames))
Avalon.info(_("Converting extracted frames into video"))
self.process_pool.append(
self.ffmpeg_object.assemble_video(
framerate, self.upscaled_frames
)
)
# f'{scale_width}x{scale_height}'
self._wait()
Avalon.info(_('Conversion completed'))
Avalon.info(_("Conversion completed"))
try:
# migrate audio tracks and subtitles
Avalon.info(_('Migrating audio, subtitles and other streams to upscaled video'))
self.process_pool.append(self.ffmpeg_object.migrate_streams(self.current_input_file,
output_path,
self.upscaled_frames))
Avalon.info(
_(
"Migrating audio, subtitles and other streams to upscaled video"
)
)
self.process_pool.append(
self.ffmpeg_object.migrate_streams(
self.current_input_file,
output_path,
self.upscaled_frames,
)
)
self._wait()
# if failed to copy streams
# use file with only video stream
except subprocess.CalledProcessError:
traceback.print_exc()
Avalon.error(_('Failed to migrate streams'))
Avalon.warning(_('Trying to output video without additional streams'))
Avalon.error(_("Failed to migrate streams"))
Avalon.warning(
_("Trying to output video without additional streams")
)
if input_file_mime_type == 'image/gif':
if input_file_mime_type == "image/gif":
# copy will overwrite destination content if exists
shutil.copy(self.upscaled_frames / self.ffmpeg_object.intermediate_file_name, output_path)
shutil.copy(
self.upscaled_frames
/ self.ffmpeg_object.intermediate_file_name,
output_path,
)
else:
# construct output file path
output_file_name = f'{output_path.stem}{self.ffmpeg_object.intermediate_file_name.suffix}'
output_video_path = output_path.parent / output_file_name
output_file_name = f"{output_path.stem}{self.ffmpeg_object.intermediate_file_name.suffix}"
output_video_path = (
output_path.parent / output_file_name
)
# if output file already exists
# create temporary directory in output folder
# temporary directories generated by tempfile are guaranteed to be unique
# and won't conflict with other files
if output_video_path.exists():
Avalon.error(_('Output video file exists'))
Avalon.error(_("Output video file exists"))
temporary_directory = pathlib.Path(tempfile.mkdtemp(dir=output_path.parent))
output_video_path = temporary_directory / output_file_name
Avalon.info(_('Created temporary directory to contain file'))
temporary_directory = pathlib.Path(
tempfile.mkdtemp(dir=output_path.parent)
)
output_video_path = (
temporary_directory / output_file_name
)
Avalon.info(
_("Created temporary directory to contain file")
)
# move file to new destination
Avalon.info(_('Writing intermediate file to: {}').format(output_video_path.absolute()))
shutil.move(self.upscaled_frames / self.ffmpeg_object.intermediate_file_name, output_video_path)
Avalon.info(
_("Writing intermediate file to: {}").format(
output_video_path.absolute()
)
)
shutil.move(
self.upscaled_frames
/ self.ffmpeg_object.intermediate_file_name,
output_video_path,
)
# increment total number of files processed
self.cleanup_temp_directories()

View File

@ -71,75 +71,143 @@ import yaml
from avalon_framework import Avalon
# internationalization constants
DOMAIN = 'video2x'
LOCALE_DIRECTORY = pathlib.Path(__file__).parent.absolute() / 'locale'
DOMAIN = "video2x"
LOCALE_DIRECTORY = pathlib.Path(__file__).parent.absolute() / "locale"
# getting default locale settings
default_locale, encoding = locale.getdefaultlocale()
language = gettext.translation(DOMAIN, LOCALE_DIRECTORY, [default_locale], fallback=True)
language = gettext.translation(
DOMAIN, LOCALE_DIRECTORY, [default_locale], fallback=True
)
language.install()
_ = language.gettext
CLI_VERSION = '4.3.1'
CLI_VERSION = "4.3.1"
LEGAL_INFO = _('''Video2X CLI Version: {}
LEGAL_INFO = _(
"""Video2X CLI Version: {}
Upscaler Version: {}
Author: K4YT3X
License: GNU GPL v3
Github Page: https://github.com/k4yt3x/video2x
Contact: k4yt3x@k4yt3x.com''').format(CLI_VERSION, UPSCALER_VERSION)
Contact: k4yt3x@k4yt3x.com"""
).format(CLI_VERSION, UPSCALER_VERSION)
LOGO = r'''
LOGO = r"""
__ __ _ _ ___ __ __
\ \ / / (_) | | |__ \ \ \ / /
\ \ / / _ __| | ___ ___ ) | \ V /
\ \/ / | | / _` | / _ \ / _ \ / / > <
\ / | | | (_| | | __/ | (_) | / /_ / . \
\/ |_| \__,_| \___| \___/ |____| /_/ \_\
'''
"""
def parse_arguments():
""" parse CLI arguments
"""
parser = argparse.ArgumentParser(prog='video2x', formatter_class=argparse.ArgumentDefaultsHelpFormatter, add_help=False)
"""parse CLI arguments"""
parser = argparse.ArgumentParser(
prog="video2x",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
add_help=False,
)
# video options
video2x_options = parser.add_argument_group(_('Video2X Options'))
video2x_options.add_argument('--help', action='help', help=_('show this help message and exit'))
video2x_options = parser.add_argument_group(_("Video2X Options"))
video2x_options.add_argument(
"--help", action="help", help=_("show this help message and exit")
)
# if help is in arguments list
# do not require input and output path to be specified
require_input_output = True
if '-h' in sys.argv or '--help' in sys.argv:
if "-h" in sys.argv or "--help" in sys.argv:
require_input_output = False
video2x_options.add_argument('-i', '--input', type=pathlib.Path, help=_('source video file/directory'), required=require_input_output)
video2x_options.add_argument('-o', '--output', type=pathlib.Path, help=_('output video file/directory'), required=require_input_output)
video2x_options.add_argument('-c', '--config', type=pathlib.Path, help=_('Video2X config file path'), action='store',
default=pathlib.Path(__file__).parent.absolute() / 'video2x.yaml')
video2x_options.add_argument('--log', type=pathlib.Path, help=_('log file path'))
video2x_options.add_argument('-v', '--version', help=_('display version, lawful information and exit'), action='store_true')
video2x_options.add_argument(
"-i",
"--input",
type=pathlib.Path,
help=_("source video file/directory"),
required=require_input_output,
)
video2x_options.add_argument(
"-o",
"--output",
type=pathlib.Path,
help=_("output video file/directory"),
required=require_input_output,
)
video2x_options.add_argument(
"-c",
"--config",
type=pathlib.Path,
help=_("Video2X config file path"),
action="store",
default=pathlib.Path(__file__).parent.absolute() / "video2x.yaml",
)
video2x_options.add_argument("--log", type=pathlib.Path, help=_("log file path"))
video2x_options.add_argument(
"-v",
"--version",
help=_("display version, lawful information and exit"),
action="store_true",
)
# scaling options
upscaling_options = parser.add_argument_group(_('Upscaling Options'))
upscaling_options.add_argument('-r', '--ratio', help=_('scaling ratio'), action='store', type=float)
upscaling_options.add_argument('-w', '--width', help=_('output width'), action='store', type=float)
upscaling_options.add_argument('-h', '--height', help=_('output height'), action='store', type=float)
upscaling_options.add_argument('-d', '--driver', help=_('upscaling driver'), choices=AVAILABLE_DRIVERS, default='waifu2x_ncnn_vulkan')
upscaling_options.add_argument('-p', '--processes', help=_('number of processes to use for upscaling'), action='store', type=int, default=1)
upscaling_options.add_argument('--preserve_frames', help=_('preserve extracted and upscaled frames'), action='store_true')
upscaling_options = parser.add_argument_group(_("Upscaling Options"))
upscaling_options.add_argument(
"-r", "--ratio", help=_("scaling ratio"), action="store", type=float
)
upscaling_options.add_argument(
"-w", "--width", help=_("output width"), action="store", type=float
)
upscaling_options.add_argument(
"-h", "--height", help=_("output height"), action="store", type=float
)
upscaling_options.add_argument(
"-d",
"--driver",
help=_("upscaling driver"),
choices=AVAILABLE_DRIVERS,
default="waifu2x_ncnn_vulkan",
)
upscaling_options.add_argument(
"-p",
"--processes",
help=_("number of processes to use for upscaling"),
action="store",
type=int,
default=1,
)
upscaling_options.add_argument(
"--preserve_frames",
help=_("preserve extracted and upscaled frames"),
action="store_true",
)
# if no driver arguments are specified
if '--' not in sys.argv:
if "--" not in sys.argv:
video2x_args = parser.parse_args()
return video2x_args, None
# if driver arguments are specified
else:
video2x_args = parser.parse_args(sys.argv[1:sys.argv.index('--')])
wrapper = getattr(importlib.import_module(f'wrappers.{video2x_args.driver}'), 'WrapperMain')
driver_args = wrapper.parse_arguments(sys.argv[sys.argv.index('--') + 1:])
video2x_args = parser.parse_args(sys.argv[1 : sys.argv.index("--")])
wrapper = getattr(
importlib.import_module(f"wrappers.{video2x_args.driver}"), "WrapperMain"
)
driver_args = wrapper.parse_arguments(sys.argv[sys.argv.index("--") + 1 :])
return video2x_args, driver_args
@ -151,7 +219,7 @@ def print_logo():
def read_config(config_file: pathlib.Path) -> dict:
""" read video2x configurations from config file
"""read video2x configurations from config file
Arguments:
config_file {pathlib.Path} -- video2x configuration file pathlib.Path
@ -160,16 +228,16 @@ def read_config(config_file: pathlib.Path) -> dict:
dict -- dictionary of video2x configuration
"""
with open(config_file, 'r') as config:
with open(config_file, "r") as config:
return yaml.load(config, Loader=yaml.FullLoader)
# /////////////////// Execution /////////////////// #
# this is not a library
if __name__ != '__main__':
Avalon.error(_('This file cannot be imported'))
raise ImportError(f'{__file__} cannot be imported')
if __name__ != "__main__":
Avalon.error(_("This file cannot be imported"))
raise ImportError(f"{__file__} cannot be imported")
# print video2x logo
print_logo()
@ -183,15 +251,19 @@ if video2x_args.version:
sys.exit(0)
# additional checks on upscaling arguments
if video2x_args.ratio is not None and (video2x_args.width is not None or video2x_args.height is not None):
Avalon.error(_('Specify either scaling ratio or scaling resolution, not both'))
if video2x_args.ratio is not None and (
video2x_args.width is not None or video2x_args.height is not None
):
Avalon.error(_("Specify either scaling ratio or scaling resolution, not both"))
sys.exit(1)
# redirect output to both terminal and log file
if video2x_args.log is not None:
log_file = video2x_args.log.open(mode='a+', encoding='utf-8')
log_file = video2x_args.log.open(mode="a+", encoding="utf-8")
else:
log_file = tempfile.TemporaryFile(mode='a+', suffix='.log', prefix='video2x_', encoding='utf-8')
log_file = tempfile.TemporaryFile(
mode="a+", suffix=".log", prefix="video2x_", encoding="utf-8"
)
original_stdout = sys.stdout
original_stderr = sys.stderr
@ -203,22 +275,22 @@ config = read_config(video2x_args.config)
# load waifu2x configuration
driver_settings = config[video2x_args.driver]
driver_settings['path'] = os.path.expandvars(driver_settings['path'])
driver_settings["path"] = os.path.expandvars(driver_settings["path"])
# read FFmpeg configuration
ffmpeg_settings = config['ffmpeg']
ffmpeg_settings['ffmpeg_path'] = os.path.expandvars(ffmpeg_settings['ffmpeg_path'])
ffmpeg_settings = config["ffmpeg"]
ffmpeg_settings["ffmpeg_path"] = os.path.expandvars(ffmpeg_settings["ffmpeg_path"])
# read Gifski configuration
gifski_settings = config['gifski']
gifski_settings['gifski_path'] = os.path.expandvars(gifski_settings['gifski_path'])
gifski_settings = config["gifski"]
gifski_settings["gifski_path"] = os.path.expandvars(gifski_settings["gifski_path"])
# load video2x settings
extracted_frame_format = config['video2x']['extracted_frame_format'].lower()
output_file_name_format_string = config['video2x']['output_file_name_format_string']
image_output_extension = config['video2x']['image_output_extension']
video_output_extension = config['video2x']['video_output_extension']
preserve_frames = config['video2x']['preserve_frames']
extracted_frame_format = config["video2x"]["extracted_frame_format"].lower()
output_file_name_format_string = config["video2x"]["output_file_name_format_string"]
image_output_extension = config["video2x"]["image_output_extension"]
video_output_extension = config["video2x"]["video_output_extension"]
preserve_frames = config["video2x"]["preserve_frames"]
# if preserve frames specified in command line
# overwrite config file options
@ -227,10 +299,10 @@ if video2x_args.preserve_frames is True:
# if cache directory not specified
# use default path: %TEMP%\video2x
if config['video2x']['video2x_cache_directory'] is None:
video2x_cache_directory = (pathlib.Path(tempfile.gettempdir()) / 'video2x')
if config["video2x"]["video2x_cache_directory"] is None:
video2x_cache_directory = pathlib.Path(tempfile.gettempdir()) / "video2x"
else:
video2x_cache_directory = pathlib.Path(config['video2x']['video2x_cache_directory'])
video2x_cache_directory = pathlib.Path(config["video2x"]["video2x_cache_directory"])
# overwrite driver_settings with driver_args
if driver_args is not None:
@ -252,7 +324,6 @@ try:
driver_settings=driver_settings,
ffmpeg_settings=ffmpeg_settings,
gifski_settings=gifski_settings,
# optional parameters
driver=video2x_args.driver,
scale_ratio=video2x_args.ratio,
@ -264,17 +335,21 @@ try:
output_file_name_format_string=output_file_name_format_string,
image_output_extension=image_output_extension,
video_output_extension=video_output_extension,
preserve_frames=preserve_frames
preserve_frames=preserve_frames,
)
# run upscaler
upscaler.run()
Avalon.info(_('Program completed, taking {} seconds').format(round((time.time() - begin_time), 5)))
Avalon.info(
_("Program completed, taking {} seconds").format(
round((time.time() - begin_time), 5)
)
)
except Exception:
Avalon.error(_('An exception has occurred'))
Avalon.error(_("An exception has occurred"))
traceback.print_exc()
if video2x_args.log is not None:
@ -284,12 +359,12 @@ except Exception:
# tempfile.TempFile does not have a name attribute and is not guaranteed to have
# a visible name on the file system
else:
log_file_path = tempfile.mkstemp(suffix='.log', prefix='video2x_')[1]
with open(log_file_path, 'w', encoding='utf-8') as permanent_log_file:
log_file_path = tempfile.mkstemp(suffix=".log", prefix="video2x_")[1]
with open(log_file_path, "w", encoding="utf-8") as permanent_log_file:
log_file.seek(0)
permanent_log_file.write(log_file.read())
Avalon.error(_('The error log file can be found at: {}').format(log_file_path))
Avalon.error(_("The error log file can be found at: {}").format(log_file_path))
finally:
sys.stdout = original_stdout

File diff suppressed because it is too large Load Diff

View File

@ -27,8 +27,7 @@ from avalon_framework import Avalon
class WrapperMain:
""" Anime4K CPP wrapper
"""
"""Anime4K CPP wrapper"""
def __init__(self, driver_settings):
self.driver_settings = driver_settings
@ -38,53 +37,55 @@ class WrapperMain:
def zero_to_one_float(value):
value = float(value)
if value < 0.0 or value > 1.0:
raise argparse.ArgumentTypeError(f'{value} is not between 0.0 and 1.0')
raise argparse.ArgumentTypeError(f"{value} is not between 0.0 and 1.0")
return value
@staticmethod
def parse_arguments(arguments):
# fmt: off
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter, add_help=False)
parser.error = lambda message: (_ for _ in ()).throw(AttributeError(message))
parser.add_argument('--help', action='help', help='show this help message and exit')
parser.add_argument('-i', '--input', type=str, help=argparse.SUPPRESS) # help='File for loading')
parser.add_argument('-o', '--output', type=str, help=argparse.SUPPRESS) # help='File for outputting')
parser.add_argument('-p', '--passes', type=int, help='Passes for processing')
parser.add_argument('-n', '--pushColorCount', type=int, help='Limit the number of color pushes')
parser.add_argument('-c', '--strengthColor', type=WrapperMain.zero_to_one_float, help='Strength for pushing color,range 0 to 1,higher for thinner')
parser.add_argument('-g', '--strengthGradient', type=WrapperMain.zero_to_one_float, help='Strength for pushing gradient,range 0 to 1,higher for sharper')
parser.add_argument('-z', '--zoomFactor', type=float, help='zoom factor for resizing')
parser.add_argument('-t', '--threads', type=int, help='Threads count for video processing')
parser.add_argument('-f', '--fastMode', action='store_true', help='Faster but maybe low quality')
parser.add_argument('-v', '--videoMode', action='store_true', help='Video process')
parser.add_argument('-s', '--preview', action='store_true', help='Preview image')
parser.add_argument('-b', '--preprocessing', action='store_true', help='Enable pre processing')
parser.add_argument('-a', '--postprocessing', action='store_true', help='Enable post processing')
parser.add_argument('-r', '--preFilters', type=int, help='Enhancement filter, only working when preProcessing is true,there are 5 options by binary:Median blur=0000001, Mean blur=0000010, CAS Sharpening=0000100, Gaussian blur weak=0001000, Gaussian blur=0010000, Bilateral filter=0100000, Bilateral filter faster=1000000, you can freely combine them, eg: Gaussian blur weak + Bilateral filter = 0001000 | 0100000 = 0101000 = 40(D)')
parser.add_argument('-e', '--postFilters', type=int, help='Enhancement filter, only working when postProcessing is true,there are 5 options by binary:Median blur=0000001, Mean blur=0000010, CAS Sharpening=0000100, Gaussian blur weak=0001000, Gaussian blur=0010000, Bilateral filter=0100000, Bilateral filter faster=1000000, you can freely combine them, eg: Gaussian blur weak + Bilateral filter = 0001000 | 0100000 = 0101000 = 40(D), so you can put 40 to enable Gaussian blur weak and Bilateral filter, which also is what I recommend for image that < 1080P, 48 for image that >= 1080P, and for performance I recommend to use 72 for video that < 1080P, 80 for video that >=1080P')
parser.add_argument('-q', '--GPUMode', action='store_true', help='Enable GPU acceleration')
parser.add_argument('-w', '--CNNMode', action='store_true', help='Enable ACNet')
parser.add_argument('-H', '--HDN', action='store_true', help='Enable HDN mode for ACNet')
parser.add_argument('-L', '--HDNLevel', type=int, help='Set HDN level')
parser.add_argument('-l', '--listGPUs', action='store_true', help='list GPUs')
parser.add_argument('-h', '--platformID', type=int, help='Specify the platform ID')
parser.add_argument('-d', '--deviceID', type=int, help='Specify the device ID')
parser.add_argument('-C', '--codec', type=str, help='Specify the codec for encoding from mp4v(recommended in Windows), dxva(for Windows), avc1(H264, recommended in Linux), vp09(very slow), hevc(not support in Windowds), av01(not support in Windowds) (string [=mp4v])')
parser.add_argument('-F', '--forceFps', type=float, help='Set output video fps to the specifying number, 0 to disable')
parser.add_argument('-D', '--disableProgress', action='store_true', help='disable progress display')
parser.add_argument('-W', '--webVideo', type=str, help='process the video from URL')
parser.add_argument('-A', '--alpha', action='store_true', help='preserve the Alpha channel for transparent image')
parser.add_argument("--help", action="help", help="show this help message and exit")
parser.add_argument("-i", "--input", type=str, help=argparse.SUPPRESS) # help="File for loading")
parser.add_argument("-o", "--output", type=str, help=argparse.SUPPRESS) # help="File for outputting")
parser.add_argument("-p", "--passes", type=int, help="Passes for processing")
parser.add_argument("-n", "--pushColorCount", type=int, help="Limit the number of color pushes")
parser.add_argument("-c", "--strengthColor", type=WrapperMain.zero_to_one_float, help="Strength for pushing color,range 0 to 1,higher for thinner")
parser.add_argument("-g", "--strengthGradient", type=WrapperMain.zero_to_one_float, help="Strength for pushing gradient,range 0 to 1,higher for sharper")
parser.add_argument("-z", "--zoomFactor", type=float, help="zoom factor for resizing")
parser.add_argument("-t", "--threads", type=int, help="Threads count for video processing")
parser.add_argument("-f", "--fastMode", action="store_true", help="Faster but maybe low quality")
parser.add_argument("-v", "--videoMode", action="store_true", help="Video process")
parser.add_argument("-s", "--preview", action="store_true", help="Preview image")
parser.add_argument("-b", "--preprocessing", action="store_true", help="Enable pre processing")
parser.add_argument("-a", "--postprocessing", action="store_true", help="Enable post processing")
parser.add_argument("-r", "--preFilters", type=int, help="Enhancement filter, only working when preProcessing is true,there are 5 options by binary:Median blur=0000001, Mean blur=0000010, CAS Sharpening=0000100, Gaussian blur weak=0001000, Gaussian blur=0010000, Bilateral filter=0100000, Bilateral filter faster=1000000, you can freely combine them, eg: Gaussian blur weak + Bilateral filter = 0001000 | 0100000 = 0101000 = 40(D)")
parser.add_argument("-e", "--postFilters", type=int, help="Enhancement filter, only working when postProcessing is true,there are 5 options by binary:Median blur=0000001, Mean blur=0000010, CAS Sharpening=0000100, Gaussian blur weak=0001000, Gaussian blur=0010000, Bilateral filter=0100000, Bilateral filter faster=1000000, you can freely combine them, eg: Gaussian blur weak + Bilateral filter = 0001000 | 0100000 = 0101000 = 40(D), so you can put 40 to enable Gaussian blur weak and Bilateral filter, which also is what I recommend for image that < 1080P, 48 for image that >= 1080P, and for performance I recommend to use 72 for video that < 1080P, 80 for video that >=1080P")
parser.add_argument("-q", "--GPUMode", action="store_true", help="Enable GPU acceleration")
parser.add_argument("-w", "--CNNMode", action="store_true", help="Enable ACNet")
parser.add_argument("-H", "--HDN", action="store_true", help="Enable HDN mode for ACNet")
parser.add_argument("-L", "--HDNLevel", type=int, help="Set HDN level")
parser.add_argument("-l", "--listGPUs", action="store_true", help="list GPUs")
parser.add_argument("-h", "--platformID", type=int, help="Specify the platform ID")
parser.add_argument("-d", "--deviceID", type=int, help="Specify the device ID")
parser.add_argument("-C", "--codec", type=str, help="Specify the codec for encoding from mp4v(recommended in Windows), dxva(for Windows), avc1(H264, recommended in Linux), vp09(very slow), hevc(not support in Windowds), av01(not support in Windowds) (string [=mp4v])")
parser.add_argument("-F", "--forceFps", type=float, help="Set output video fps to the specifying number, 0 to disable")
parser.add_argument("-D", "--disableProgress", action="store_true", help="disable progress display")
parser.add_argument("-W", "--webVideo", type=str, help="process the video from URL")
parser.add_argument("-A", "--alpha", action="store_true", help="preserve the Alpha channel for transparent image")
return parser.parse_args(arguments)
# fmt: on
def load_configurations(self, upscaler):
# self.driver_settings['zoomFactor'] = upscaler.scale_ratio
self.driver_settings['threads'] = upscaler.processes
self.driver_settings["threads"] = upscaler.processes
# append FFmpeg path to the end of PATH
# Anime4KCPP will then use FFmpeg to migrate audio tracks
os.environ['PATH'] += f';{upscaler.ffmpeg_settings["ffmpeg_path"]}'
os.environ["PATH"] += f';{upscaler.ffmpeg_settings["ffmpeg_path"]}'
def set_scale_ratio(self, scale_ratio: float):
self.driver_settings['zoomFactor'] = scale_ratio
self.driver_settings["zoomFactor"] = scale_ratio
def upscale(self, input_file, output_file):
"""This is the core function for WAIFU2X class
@ -98,33 +99,33 @@ class WrapperMain:
# change the working directory to the binary's parent directory
# so the binary can find shared object files and other files
os.chdir(pathlib.Path(self.driver_settings['path']).parent)
os.chdir(pathlib.Path(self.driver_settings["path"]).parent)
# overwrite config file settings
self.driver_settings['input'] = input_file
self.driver_settings['output'] = output_file
self.driver_settings["input"] = input_file
self.driver_settings["output"] = output_file
# Anime4KCPP will look for Anime4KCPPKernel.cl under the current working directory
# change the CWD to its containing directory so it will find it
if platform.system() == 'Windows':
os.chdir(pathlib.Path(self.driver_settings['path']).parent)
if platform.system() == "Windows":
os.chdir(pathlib.Path(self.driver_settings["path"]).parent)
# list to be executed
# initialize the list with waifu2x binary path as the first element
execute = [self.driver_settings['path']]
execute = [self.driver_settings["path"]]
for key in self.driver_settings.keys():
value = self.driver_settings[key]
# null or None means that leave this option out (keep default)
if key == 'path' or value is None or value is False:
if key == "path" or value is None or value is False:
continue
else:
if len(key) == 1:
execute.append(f'-{key}')
execute.append(f"-{key}")
else:
execute.append(f'--{key}')
execute.append(f"--{key}")
# true means key is an option
if value is not True:
@ -132,6 +133,8 @@ class WrapperMain:
# return the Popen object of the new process created
self.print_lock.acquire()
Avalon.debug_info(f'[upscaler] Subprocess {os.getpid()} executing: {" ".join(execute)}')
Avalon.debug_info(
f'[upscaler] Subprocess {os.getpid()} executing: {" ".join(execute)}'
)
self.print_lock.release()
return subprocess.Popen(execute, stdout=sys.stdout, stderr=sys.stderr)

View File

@ -27,20 +27,24 @@ class Ffmpeg:
and inserting audio tracks to videos.
"""
def __init__(self, ffmpeg_settings, extracted_frame_format='png'):
def __init__(self, ffmpeg_settings, extracted_frame_format="png"):
self.ffmpeg_settings = ffmpeg_settings
self.ffmpeg_path = pathlib.Path(self.ffmpeg_settings['ffmpeg_path'])
self.ffmpeg_binary = self.ffmpeg_path / 'ffmpeg'
self.ffmpeg_probe_binary = self.ffmpeg_path / 'ffprobe'
self.ffmpeg_path = pathlib.Path(self.ffmpeg_settings["ffmpeg_path"])
self.ffmpeg_binary = self.ffmpeg_path / "ffmpeg"
self.ffmpeg_probe_binary = self.ffmpeg_path / "ffprobe"
# video metadata
self.extracted_frame_format = extracted_frame_format
self.intermediate_file_name = pathlib.Path(self.ffmpeg_settings['intermediate_file_name'])
self.pixel_format = self.ffmpeg_settings['extract_frames']['output_options']['-pix_fmt']
self.intermediate_file_name = pathlib.Path(
self.ffmpeg_settings["intermediate_file_name"]
)
self.pixel_format = self.ffmpeg_settings["extract_frames"]["output_options"][
"-pix_fmt"
]
def get_pixel_formats(self):
""" Get a dictionary of supported pixel formats
"""Get a dictionary of supported pixel formats
List all supported pixel formats and their
corresponding bit depth.
@ -48,12 +52,7 @@ class Ffmpeg:
Returns:
dictionary -- JSON dict of all pixel formats to bit depth
"""
execute = [
self.ffmpeg_probe_binary,
'-v',
'quiet',
'-pix_fmts'
]
execute = [self.ffmpeg_probe_binary, "-v", "quiet", "-pix_fmts"]
# turn elements into str
execute = [str(e) for e in execute]
@ -64,9 +63,15 @@ class Ffmpeg:
pixel_formats = {}
# record all pixel formats into dictionary
for line in subprocess.run(execute, check=True, stdout=subprocess.PIPE).stdout.decode().split('\n'):
for line in (
subprocess.run(execute, check=True, stdout=subprocess.PIPE)
.stdout.decode()
.split("\n")
):
try:
pixel_formats[" ".join(line.split()).split()[1]] = int(" ".join(line.split()).split()[3])
pixel_formats[" ".join(line.split()).split()[1]] = int(
" ".join(line.split()).split()[3]
)
except (IndexError, ValueError):
pass
@ -76,7 +81,7 @@ class Ffmpeg:
return pixel_formats
def get_number_of_frames(self, input_file: str, video_stream_index: int) -> int:
""" Count the number of frames in a video
"""Count the number of frames in a video
Args:
input_file (str): input file path
@ -88,26 +93,30 @@ class Ffmpeg:
execute = [
self.ffmpeg_probe_binary,
'-v',
'quiet',
'-count_frames',
'-select_streams',
f'v:{video_stream_index}',
'-show_entries',
'stream=nb_read_frames',
'-of',
'default=nokey=1:noprint_wrappers=1',
input_file
"-v",
"quiet",
"-count_frames",
"-select_streams",
f"v:{video_stream_index}",
"-show_entries",
"stream=nb_read_frames",
"-of",
"default=nokey=1:noprint_wrappers=1",
input_file,
]
# turn elements into str
execute = [str(e) for e in execute]
Avalon.debug_info(f'Executing: {" ".join(execute)}')
return int(subprocess.run(execute, check=True, stdout=subprocess.PIPE).stdout.decode().strip())
return int(
subprocess.run(execute, check=True, stdout=subprocess.PIPE)
.stdout.decode()
.strip()
)
def probe_file_info(self, input_video):
""" Gets input video information
"""Gets input video information
This method reads input video information
using ffprobe in dictionary
@ -123,14 +132,14 @@ class Ffmpeg:
# since video2x only strictly recignizes this one format
execute = [
self.ffmpeg_probe_binary,
'-v',
'quiet',
'-print_format',
'json',
'-show_format',
'-show_streams',
'-i',
input_video
"-v",
"quiet",
"-print_format",
"json",
"-show_format",
"-show_streams",
"-i",
input_video,
]
# turn elements into str
@ -138,37 +147,38 @@ class Ffmpeg:
Avalon.debug_info(f'Executing: {" ".join(execute)}')
json_str = subprocess.run(execute, check=True, stdout=subprocess.PIPE).stdout
return json.loads(json_str.decode('utf-8'))
return json.loads(json_str.decode("utf-8"))
def extract_frames(self, input_file, extracted_frames):
""" extract frames from video or GIF file
"""
execute = [
self.ffmpeg_binary
]
"""extract frames from video or GIF file"""
execute = [self.ffmpeg_binary]
# load general options
execute.extend(self._read_configuration(phase='extract_frames'))
execute.extend(self._read_configuration(phase="extract_frames"))
# load input_options
execute.extend(self._read_configuration(phase='extract_frames', section='input_options'))
execute.extend(
self._read_configuration(phase="extract_frames", section="input_options")
)
# specify input file
execute.extend([
'-i',
input_file
])
execute.extend(["-i", input_file])
# load output options
execute.extend(self._read_configuration(phase='extract_frames', section='output_options'))
execute.extend(
self._read_configuration(phase="extract_frames", section="output_options")
)
# specify output file
execute.extend([
extracted_frames / f'extracted_%0d.{self.extracted_frame_format}'
# extracted_frames / f'frame_%06d.{self.extracted_frame_format}'
])
execute.extend(
[
extracted_frames
/ f"extracted_%0d.{self.extracted_frame_format}"
# extracted_frames / f'frame_%06d.{self.extracted_frame_format}'
]
)
return(self._execute(execute))
return self._execute(execute)
def assemble_video(self, framerate, upscaled_frames):
"""Converts images into videos
@ -182,86 +192,93 @@ class Ffmpeg:
"""
execute = [
self.ffmpeg_binary,
'-r',
"-r",
str(framerate)
# '-s',
# resolution
]
# read other options
execute.extend(self._read_configuration(phase='assemble_video'))
execute.extend(self._read_configuration(phase="assemble_video"))
# read input options
execute.extend(self._read_configuration(phase='assemble_video', section='input_options'))
execute.extend(
self._read_configuration(phase="assemble_video", section="input_options")
)
# WORKAROUND FOR WAIFU2X-NCNN-VULKAN
# Dev: SAT3LL
# rename all .png.png suffixes to .png
import re
regex = re.compile(r'\.png\.png$', re.IGNORECASE)
regex = re.compile(r"\.png\.png$", re.IGNORECASE)
for frame_name in upscaled_frames.iterdir():
(upscaled_frames / frame_name).rename(upscaled_frames / regex.sub('.png', str(frame_name)))
(upscaled_frames / frame_name).rename(
upscaled_frames / regex.sub(".png", str(frame_name))
)
# END WORKAROUND
# append input frames path into command
execute.extend([
'-i',
upscaled_frames / f'extracted_%d.{self.extracted_frame_format}'
# upscaled_frames / f'%06d.{self.extracted_frame_format}'
])
execute.extend(
[
"-i",
upscaled_frames / f"extracted_%d.{self.extracted_frame_format}"
# upscaled_frames / f'%06d.{self.extracted_frame_format}'
]
)
# read FFmpeg output options
execute.extend(self._read_configuration(phase='assemble_video', section='output_options'))
execute.extend(
self._read_configuration(phase="assemble_video", section="output_options")
)
# specify output file location
execute.extend([
upscaled_frames / self.intermediate_file_name
])
execute.extend([upscaled_frames / self.intermediate_file_name])
return(self._execute(execute))
return self._execute(execute)
def migrate_streams(self, input_video, output_video, upscaled_frames):
""" Migrates audio tracks and subtitles from input video to output video
"""Migrates audio tracks and subtitles from input video to output video
Arguments:
input_video {string} -- input video file path
output_video {string} -- output video file path
upscaled_frames {string} -- directory containing upscaled frames
"""
execute = [
self.ffmpeg_binary
]
execute = [self.ffmpeg_binary]
# load general options
execute.extend(self._read_configuration(phase='migrate_streams'))
execute.extend(self._read_configuration(phase="migrate_streams"))
# load input options
execute.extend(self._read_configuration(phase='migrate_streams', section='input_options'))
execute.extend(
self._read_configuration(phase="migrate_streams", section="input_options")
)
# load input file names
execute.extend([
# input 1: upscaled intermediate file without sound
'-i',
upscaled_frames / self.intermediate_file_name,
# input 2: original video with streams to copy over
'-i',
input_video
])
execute.extend(
[
# input 1: upscaled intermediate file without sound
"-i",
upscaled_frames / self.intermediate_file_name,
# input 2: original video with streams to copy over
"-i",
input_video,
]
)
# load output options
execute.extend(self._read_configuration(phase='migrate_streams', section='output_options'))
execute.extend(
self._read_configuration(phase="migrate_streams", section="output_options")
)
# load output video path
execute.extend([
output_video
])
execute.extend([output_video])
return(self._execute(execute))
return self._execute(execute)
def _read_configuration(self, phase, section=None):
""" read configuration from JSON
"""read configuration from JSON
Read the configurations (arguments) from the JSON
configuration file and append them to the end of the
@ -290,7 +307,12 @@ class Ffmpeg:
value = self.ffmpeg_settings[phase][key]
# null or None means that leave this option out (keep default)
if value is None or value is False or isinstance(value, dict) or value == '':
if (
value is None
or value is False
or isinstance(value, dict)
or value == ""
):
continue
# if the value is a list, append the same argument and all values

View File

@ -19,30 +19,37 @@ from avalon_framework import Avalon
class Gifski:
def __init__(self, gifski_settings):
self.gifski_settings = gifski_settings
def make_gif(self, upscaled_frames: pathlib.Path, output_path: pathlib.Path, framerate: float, extracted_frame_format: str, output_width: int, output_height: int) -> subprocess.Popen:
def make_gif(
self,
upscaled_frames: pathlib.Path,
output_path: pathlib.Path,
framerate: float,
extracted_frame_format: str,
output_width: int,
output_height: int,
) -> subprocess.Popen:
execute = [
self.gifski_settings['gifski_path'],
'-o',
self.gifski_settings["gifski_path"],
"-o",
output_path,
'--fps',
"--fps",
int(round(framerate, 0)),
'--width',
"--width",
output_width,
'--height',
output_height
"--height",
output_height,
]
# load configurations from config file
execute.extend(self._load_configuration())
# append frames location
execute.extend([upscaled_frames / f'extracted_*.{extracted_frame_format}'])
execute.extend([upscaled_frames / f"extracted_*.{extracted_frame_format}"])
return(self._execute(execute))
return self._execute(execute)
def _load_configuration(self):
@ -53,13 +60,13 @@ class Gifski:
value = self.gifski_settings[key]
# null or None means that leave this option out (keep default)
if key == 'gifski_path' or value is None or value is False:
if key == "gifski_path" or value is None or value is False:
continue
else:
if len(key) == 1:
configuration.append(f'-{key}')
configuration.append(f"-{key}")
else:
configuration.append(f'--{key}')
configuration.append(f"--{key}")
# true means key is an option
if value is not True:
@ -70,6 +77,6 @@ class Gifski:
# turn all list elements into string to avoid errors
execute = [str(e) for e in execute]
Avalon.debug_info(f'Executing: {execute}')
Avalon.debug_info(f"Executing: {execute}")
return subprocess.Popen(execute, stdout=sys.stdout, stderr=sys.stderr)

View File

@ -38,28 +38,32 @@ class WrapperMain:
@staticmethod
def parse_arguments(arguments):
# fmt: off
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter, add_help=False)
parser.error = lambda message: (_ for _ in ()).throw(AttributeError(message))
parser.add_argument('--help', action='help', help='show this help message and exit')
parser.add_argument('-v', action='store_true', help='verbose output')
parser.add_argument('-i', type=str, help=argparse.SUPPRESS) # help='input image path (jpg/png) or directory')
parser.add_argument('-o', type=str, help=argparse.SUPPRESS) # help='output image path (png) or directory')
parser.add_argument('-s', type=int, help='upscale ratio')
parser.add_argument('-t', type=int, help='tile size (>=32/0=auto)')
parser.add_argument('-m', type=str, help='realsr model path')
parser.add_argument('-g', type=int, help='gpu device to use')
parser.add_argument('-j', type=str, help='thread count for load/proc/save')
parser.add_argument('-x', action='store_true', help='enable tta mode')
parser.add_argument('-f', type=str, help=argparse.SUPPRESS) # help='output image format (jpg/png/webp, default=ext/png)')
parser.add_argument("--help", action="help", help="show this help message and exit")
parser.add_argument("-v", action="store_true", help="verbose output")
parser.add_argument("-i", type=str, help=argparse.SUPPRESS) # help="input image path (jpg/png) or directory")
parser.add_argument("-o", type=str, help=argparse.SUPPRESS) # help="output image path (png) or directory")
parser.add_argument("-s", type=int, help="upscale ratio")
parser.add_argument("-t", type=int, help="tile size (>=32/0=auto)")
parser.add_argument("-m", type=str, help="realsr model path")
parser.add_argument("-g", type=int, help="gpu device to use")
parser.add_argument("-j", type=str, help="thread count for load/proc/save")
parser.add_argument("-x", action="store_true", help="enable tta mode")
parser.add_argument("-f", type=str, help=argparse.SUPPRESS) # help="output image format (jpg/png/webp, default=ext/png)")
return parser.parse_args(arguments)
# fmt: on
def load_configurations(self, upscaler):
# self.driver_settings['s'] = int(upscaler.scale_ratio)
self.driver_settings['j'] = '{}:{}:{}'.format(upscaler.processes, upscaler.processes, upscaler.processes)
self.driver_settings['f'] = upscaler.extracted_frame_format.lower()
self.driver_settings["j"] = "{}:{}:{}".format(
upscaler.processes, upscaler.processes, upscaler.processes
)
self.driver_settings["f"] = upscaler.extracted_frame_format.lower()
def set_scale_ratio(self, scale_ratio: int):
self.driver_settings['s'] = int(scale_ratio)
self.driver_settings["s"] = int(scale_ratio)
def upscale(self, input_directory, output_directory):
"""This is the core function for RealSR NCNN Vulkan class
@ -72,33 +76,33 @@ class WrapperMain:
# change the working directory to the binary's parent directory
# so the binary can find shared object files and other files
os.chdir(pathlib.Path(self.driver_settings['path']).parent)
os.chdir(pathlib.Path(self.driver_settings["path"]).parent)
# overwrite config file settings
self.driver_settings['i'] = input_directory
self.driver_settings['o'] = output_directory
self.driver_settings["i"] = input_directory
self.driver_settings["o"] = output_directory
# by default, realsr-ncnn-vulkan will look for the models under the current working directory
# change the working directory to its containing folder if model directory not specified
if self.driver_settings['m'] is None and platform.system() == 'Windows':
os.chdir(pathlib.Path(self.driver_settings['path']).parent)
if self.driver_settings["m"] is None and platform.system() == "Windows":
os.chdir(pathlib.Path(self.driver_settings["path"]).parent)
# list to be executed
# initialize the list with the binary path as the first element
execute = [self.driver_settings['path']]
execute = [self.driver_settings["path"]]
for key in self.driver_settings.keys():
value = self.driver_settings[key]
# null or None means that leave this option out (keep default)
if key == 'path' or value is None or value is False:
if key == "path" or value is None or value is False:
continue
else:
if len(key) == 1:
execute.append(f'-{key}')
execute.append(f"-{key}")
else:
execute.append(f'--{key}')
execute.append(f"--{key}")
# true means key is an option
if value is not True:
@ -106,6 +110,8 @@ class WrapperMain:
# return the Popen object of the new process created
self.print_lock.acquire()
Avalon.debug_info(f'[upscaler] Subprocess {os.getpid()} executing: {" ".join(execute)}')
Avalon.debug_info(
f'[upscaler] Subprocess {os.getpid()} executing: {" ".join(execute)}'
)
self.print_lock.release()
return subprocess.Popen(execute, stdout=sys.stdout, stderr=sys.stderr)

View File

@ -38,29 +38,33 @@ class WrapperMain:
@staticmethod
def parse_arguments(arguments):
# fmt: off
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter, add_help=False)
parser.error = lambda message: (_ for _ in ()).throw(AttributeError(message))
parser.add_argument('--help', action='help', help='show this help message and exit')
parser.add_argument('-v', action='store_true', help='verbose output')
parser.add_argument('-i', type=str, help=argparse.SUPPRESS) # help='input image path (jpg/png) or directory')
parser.add_argument('-o', type=str, help=argparse.SUPPRESS) # help='output image path (png) or directory')
parser.add_argument('-n', type=int, choices=range(-1, 11), help='denoise level')
parser.add_argument('-s', type=int, help='upscale ratio')
parser.add_argument('-t', type=int, help='tile size (>=32)')
parser.add_argument('-m', type=str, help='srmd model path')
parser.add_argument('-g', type=int, help='gpu device to use')
parser.add_argument('-j', type=str, help='thread count for load/proc/save')
parser.add_argument('-x', action='store_true', help='enable tta mode')
parser.add_argument('-f', type=str, help=argparse.SUPPRESS) # help='output image format (jpg/png/webp, default=ext/png)')
parser.add_argument("--help", action="help", help="show this help message and exit")
parser.add_argument("-v", action="store_true", help="verbose output")
parser.add_argument("-i", type=str, help=argparse.SUPPRESS) # help="input image path (jpg/png) or directory")
parser.add_argument("-o", type=str, help=argparse.SUPPRESS) # help="output image path (png) or directory")
parser.add_argument("-n", type=int, choices=range(-1, 11), help="denoise level")
parser.add_argument("-s", type=int, help="upscale ratio")
parser.add_argument("-t", type=int, help="tile size (>=32)")
parser.add_argument("-m", type=str, help="srmd model path")
parser.add_argument("-g", type=int, help="gpu device to use")
parser.add_argument("-j", type=str, help="thread count for load/proc/save")
parser.add_argument("-x", action="store_true", help="enable tta mode")
parser.add_argument("-f", type=str, help=argparse.SUPPRESS) # help="output image format (jpg/png/webp, default=ext/png)")
return parser.parse_args(arguments)
# fmt: on
def load_configurations(self, upscaler):
# self.driver_settings['s'] = int(upscaler.scale_ratio)
self.driver_settings['j'] = '{}:{}:{}'.format(upscaler.processes, upscaler.processes, upscaler.processes)
self.driver_settings['f'] = upscaler.extracted_frame_format.lower()
self.driver_settings["j"] = "{}:{}:{}".format(
upscaler.processes, upscaler.processes, upscaler.processes
)
self.driver_settings["f"] = upscaler.extracted_frame_format.lower()
def set_scale_ratio(self, scale_ratio: int):
self.driver_settings['s'] = int(scale_ratio)
self.driver_settings["s"] = int(scale_ratio)
def upscale(self, input_directory, output_directory):
"""This is the core function for SRMD ncnn Vulkan class
@ -73,33 +77,33 @@ class WrapperMain:
# change the working directory to the binary's parent directory
# so the binary can find shared object files and other files
os.chdir(pathlib.Path(self.driver_settings['path']).parent)
os.chdir(pathlib.Path(self.driver_settings["path"]).parent)
# overwrite config file settings
self.driver_settings['i'] = input_directory
self.driver_settings['o'] = output_directory
self.driver_settings["i"] = input_directory
self.driver_settings["o"] = output_directory
# by default, srmd-ncnn-vulkan will look for the models under the current working directory
# change the working directory to its containing folder if model directory not specified
if self.driver_settings['m'] is None and platform.system() == 'Windows':
os.chdir(pathlib.Path(self.driver_settings['path']).parent)
if self.driver_settings["m"] is None and platform.system() == "Windows":
os.chdir(pathlib.Path(self.driver_settings["path"]).parent)
# list to be executed
# initialize the list with the binary path as the first element
execute = [self.driver_settings['path']]
execute = [self.driver_settings["path"]]
for key in self.driver_settings.keys():
value = self.driver_settings[key]
# null or None means that leave this option out (keep default)
if key == 'path' or value is None or value is False:
if key == "path" or value is None or value is False:
continue
else:
if len(key) == 1:
execute.append(f'-{key}')
execute.append(f"-{key}")
else:
execute.append(f'--{key}')
execute.append(f"--{key}")
# true means key is an option
if value is not True:
@ -107,6 +111,8 @@ class WrapperMain:
# return the Popen object of the new process created
self.print_lock.acquire()
Avalon.debug_info(f'[upscaler] Subprocess {os.getpid()} executing: {" ".join(execute)}')
Avalon.debug_info(
f'[upscaler] Subprocess {os.getpid()} executing: {" ".join(execute)}'
)
self.print_lock.release()
return subprocess.Popen(execute, stdout=sys.stdout, stderr=sys.stderr)

View File

@ -37,77 +37,78 @@ class WrapperMain:
@staticmethod
def parse_arguments(arguments):
# fmt: off
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter, add_help=False)
parser.error = lambda message: (_ for _ in ()).throw(AttributeError(message))
parser.add_argument('--help', action='help', help='show this help message and exit')
parser.add_argument('-t', '--tta', type=int, choices=range(2), help='8x slower and slightly high quality')
parser.add_argument('--gpu', type=int, help='gpu device no')
parser.add_argument('-b', '--batch_size', type=int, help='input batch size')
parser.add_argument('--crop_h', type=int, help='input image split size(height)')
parser.add_argument('--crop_w', type=int, help='input image split size(width)')
parser.add_argument('-c', '--crop_size', type=int, help='input image split size')
parser.add_argument('-d', '--output_depth', type=int, help='output image chaneel depth bit')
parser.add_argument('-q', '--output_quality', type=int, help='output image quality')
parser.add_argument('-p', '--process', choices=['cpu', 'gpu', 'cudnn'], help='process mode')
parser.add_argument('--model_dir', type=str, help='path to custom model directory (don\'t append last / )')
parser.add_argument('-h', '--scale_height', type=int, help='custom scale height')
parser.add_argument('-w', '--scale_width', type=int, help='custom scale width')
parser.add_argument('-s', '--scale_ratio', type=float, help='custom scale ratio')
parser.add_argument('-n', '--noise_level', type=int, choices=range(4), help='noise reduction level')
parser.add_argument('-m', '--mode', choices=['noise', 'scale', 'noise_scale', 'auto_scale'], help='image processing mode')
parser.add_argument('-e', '--output_extention', type=str, help='extention to output image file when output_path is (auto) or input_path is folder')
parser.add_argument('-l', '--input_extention_list', type=str, help='extention to input image file when input_path is folder')
parser.add_argument('-o', '--output_path', type=str, help=argparse.SUPPRESS) # help='path to output image file (when input_path is folder, output_path must be folder)')
parser.add_argument('-i', '--input_path', type=str, help=argparse.SUPPRESS) # help='(required) path to input image file')
parser.add_argument("--help", action="help", help="show this help message and exit")
parser.add_argument("-t", "--tta", type=int, choices=range(2), help="8x slower and slightly high quality")
parser.add_argument("--gpu", type=int, help="gpu device no")
parser.add_argument("-b", "--batch_size", type=int, help="input batch size")
parser.add_argument("--crop_h", type=int, help="input image split size(height)")
parser.add_argument("--crop_w", type=int, help="input image split size(width)")
parser.add_argument("-c", "--crop_size", type=int, help="input image split size")
parser.add_argument("-d", "--output_depth", type=int, help="output image chaneel depth bit")
parser.add_argument("-q", "--output_quality", type=int, help="output image quality")
parser.add_argument("-p", "--process", choices=["cpu", "gpu", "cudnn"], help="process mode")
parser.add_argument("--model_dir", type=str, help="path to custom model directory (don\"t append last / )")
parser.add_argument("-h", "--scale_height", type=int, help="custom scale height")
parser.add_argument("-w", "--scale_width", type=int, help="custom scale width")
parser.add_argument("-s", "--scale_ratio", type=float, help="custom scale ratio")
parser.add_argument("-n", "--noise_level", type=int, choices=range(4), help="noise reduction level")
parser.add_argument("-m", "--mode", choices=["noise", "scale", "noise_scale", "auto_scale"], help="image processing mode")
parser.add_argument("-e", "--output_extention", type=str, help="extention to output image file when output_path is (auto) or input_path is folder")
parser.add_argument("-l", "--input_extention_list", type=str, help="extention to input image file when input_path is folder")
parser.add_argument("-o", "--output_path", type=str, help=argparse.SUPPRESS) # help="path to output image file (when input_path is folder, output_path must be folder)")
parser.add_argument("-i", "--input_path", type=str, help=argparse.SUPPRESS) # help="(required) path to input image file")
return parser.parse_args(arguments)
# fmt: on
def load_configurations(self, upscaler):
# use scale width and scale height if specified
# self.driver_settings['scale_ratio'] = upscaler.scale_ratio
self.driver_settings['output_extention'] = upscaler.extracted_frame_format
self.driver_settings["output_extention"] = upscaler.extracted_frame_format
# bit_depth will be 12 at this point
# it will up updated later
self.driver_settings['output_depth'] = 12
self.driver_settings["output_depth"] = 12
def set_scale_resolution(self, width: int, height: int):
self.driver_settings['scale_width'] = width
self.driver_settings['scale_height'] = height
self.driver_settings['scale_ratio'] = None
self.driver_settings["scale_width"] = width
self.driver_settings["scale_height"] = height
self.driver_settings["scale_ratio"] = None
def set_scale_ratio(self, scale_ratio: float):
self.driver_settings['scale_width'] = None
self.driver_settings['scale_height'] = None
self.driver_settings['scale_ratio'] = scale_ratio
self.driver_settings["scale_width"] = None
self.driver_settings["scale_height"] = None
self.driver_settings["scale_ratio"] = scale_ratio
def upscale(self, input_directory, output_directory):
""" start upscaling process
"""
"""start upscaling process"""
# change the working directory to the binary's parent directory
# so the binary can find shared object files and other files
os.chdir(pathlib.Path(self.driver_settings['path']).parent)
os.chdir(pathlib.Path(self.driver_settings["path"]).parent)
# overwrite config file settings
self.driver_settings['input_path'] = input_directory
self.driver_settings['output_path'] = output_directory
self.driver_settings["input_path"] = input_directory
self.driver_settings["output_path"] = output_directory
# list to be executed
# initialize the list with waifu2x binary path as the first element
execute = [self.driver_settings['path']]
execute = [self.driver_settings["path"]]
for key in self.driver_settings.keys():
value = self.driver_settings[key]
# null or None means that leave this option out (keep default)
if key == 'path' or value is None or value is False:
if key == "path" or value is None or value is False:
continue
else:
if len(key) == 1:
execute.append(f'-{key}')
execute.append(f"-{key}")
else:
execute.append(f'--{key}')
execute.append(f"--{key}")
# true means key is an option
if value is not True:
@ -115,6 +116,8 @@ class WrapperMain:
# return the Popen object of the new process created
self.print_lock.acquire()
Avalon.debug_info(f'[upscaler] Subprocess {os.getpid()} executing: {" ".join(execute)}')
Avalon.debug_info(
f'[upscaler] Subprocess {os.getpid()} executing: {" ".join(execute)}'
)
self.print_lock.release()
return subprocess.Popen(execute, stdout=sys.stdout, stderr=sys.stderr)

View File

@ -37,45 +37,47 @@ class WrapperMain:
@staticmethod
def parse_arguments(arguments):
# fmt: off
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter, add_help=False)
parser.error = lambda message: (_ for _ in ()).throw(AttributeError(message))
parser.add_argument('--help', action='help', help='show this help message and exit')
parser.add_argument('--list-supported-formats', action='store_true', help='dump currently supported format list')
parser.add_argument('--list-opencv-formats', action='store_true', help='(deprecated. Use --list-supported-formats) dump opencv supported format list')
parser.add_argument('-l', '--list-processor', action='store_true', help='dump processor list')
parser.add_argument('-f', '--output-format', choices=['png', 'jpg'], help='The format used when running in recursive/folder mode\nSee --list-supported-formats for a list of supported formats/extensions.')
parser.add_argument('-c', '--png-compression', type=int, choices=range(10), help='Set PNG compression level (0-9), 9 = Max compression (slowest & smallest)')
parser.add_argument('-q', '--image-quality', type=int, choices=range(-1, 102), help='JPEG & WebP Compression quality (0-101, 0 being smallest size and lowest quality), use 101 for lossless WebP')
parser.add_argument('--block-size', type=int, help='block size')
parser.add_argument('--disable-gpu', action='store_true', help='disable GPU')
parser.add_argument('--force-OpenCL', action='store_true', help='force to use OpenCL on Intel Platform')
parser.add_argument('-p', '--processor', type=int, help='set target processor')
parser.add_argument('-j', '--jobs', type=int, help='number of threads launching at the same time')
parser.add_argument('--model-dir', type=str, help='path to custom model directory (don\'t append last / )')
parser.add_argument('--scale-ratio', type=float, help='custom scale ratio')
parser.add_argument('--noise-level', type=int, choices=range(4), help='noise reduction level')
parser.add_argument('-m', '--mode', choices=['noise', 'scale', 'noise-scale'], help='image processing mode')
parser.add_argument('-v', '--log-level', type=int, choices=range(5), help='Set log level')
parser.add_argument('-s', '--silent', action='store_true', help='Enable silent mode. (same as --log-level 1)')
parser.add_argument('-t', '--tta', type=int, choices=range(2), help='Enable Test-Time Augmentation mode.')
parser.add_argument('-g', '--generate-subdir', type=int, choices=range(2), help='Generate sub folder when recursive directory is enabled.')
parser.add_argument('-a', '--auto-naming', type=int, choices=range(2), help='Add postfix to output name when output path is not specified.\nSet 0 to disable this.')
parser.add_argument('-r', '--recursive-directory', type=int, choices=range(2), help='Search recursively through directories to find more images to process.')
parser.add_argument('-o', '--output', type=str, help=argparse.SUPPRESS) # help='path to output image file or directory (you should use the full path)')
parser.add_argument('-i', '--input', type=str, help=argparse.SUPPRESS) # help='(required) path to input image file or directory (you should use the full path)')
parser.add_argument('--version', action='store_true', help='Displays version information and exits.')
parser.add_argument("--help", action="help", help="show this help message and exit")
parser.add_argument("--list-supported-formats", action="store_true", help="dump currently supported format list")
parser.add_argument("--list-opencv-formats", action="store_true", help="(deprecated. Use --list-supported-formats) dump opencv supported format list")
parser.add_argument("-l", "--list-processor", action="store_true", help="dump processor list")
parser.add_argument("-f", "--output-format", choices=["png", "jpg"], help="The format used when running in recursive/folder mode\nSee --list-supported-formats for a list of supported formats/extensions.")
parser.add_argument("-c", "--png-compression", type=int, choices=range(10), help="Set PNG compression level (0-9), 9 = Max compression (slowest & smallest)")
parser.add_argument("-q", "--image-quality", type=int, choices=range(-1, 102), help="JPEG & WebP Compression quality (0-101, 0 being smallest size and lowest quality), use 101 for lossless WebP")
parser.add_argument("--block-size", type=int, help="block size")
parser.add_argument("--disable-gpu", action="store_true", help="disable GPU")
parser.add_argument("--force-OpenCL", action="store_true", help="force to use OpenCL on Intel Platform")
parser.add_argument("-p", "--processor", type=int, help="set target processor")
parser.add_argument("-j", "--jobs", type=int, help="number of threads launching at the same time")
parser.add_argument("--model-dir", type=str, help="path to custom model directory (don\"t append last / )")
parser.add_argument("--scale-ratio", type=float, help="custom scale ratio")
parser.add_argument("--noise-level", type=int, choices=range(4), help="noise reduction level")
parser.add_argument("-m", "--mode", choices=["noise", "scale", "noise-scale"], help="image processing mode")
parser.add_argument("-v", "--log-level", type=int, choices=range(5), help="Set log level")
parser.add_argument("-s", "--silent", action="store_true", help="Enable silent mode. (same as --log-level 1)")
parser.add_argument("-t", "--tta", type=int, choices=range(2), help="Enable Test-Time Augmentation mode.")
parser.add_argument("-g", "--generate-subdir", type=int, choices=range(2), help="Generate sub folder when recursive directory is enabled.")
parser.add_argument("-a", "--auto-naming", type=int, choices=range(2), help="Add postfix to output name when output path is not specified.\nSet 0 to disable this.")
parser.add_argument("-r", "--recursive-directory", type=int, choices=range(2), help="Search recursively through directories to find more images to process.")
parser.add_argument("-o", "--output", type=str, help=argparse.SUPPRESS) # help="path to output image file or directory (you should use the full path)")
parser.add_argument("-i", "--input", type=str, help=argparse.SUPPRESS) # help="(required) path to input image file or directory (you should use the full path)")
parser.add_argument("--version", action="store_true", help="Displays version information and exits.")
return parser.parse_args(arguments)
# fmt: on
def load_configurations(self, upscaler):
# self.driver_settings['scale-ratio'] = upscaler.scale_ratio
self.driver_settings['jobs'] = upscaler.processes
self.driver_settings['output-format'] = upscaler.extracted_frame_format.lower()
self.driver_settings["jobs"] = upscaler.processes
self.driver_settings["output-format"] = upscaler.extracted_frame_format.lower()
def set_scale_ratio(self, scale_ratio: float):
self.driver_settings['scale-ratio'] = scale_ratio
self.driver_settings["scale-ratio"] = scale_ratio
def upscale(self, input_directory, output_directory):
""" Waifu2x Converter Driver Upscaler
"""Waifu2x Converter Driver Upscaler
This method executes the upscaling of extracted frames.
Arguments:
@ -87,33 +89,35 @@ class WrapperMain:
# change the working directory to the binary's parent directory
# so the binary can find shared object files and other files
os.chdir(pathlib.Path(self.driver_settings['path']).parent)
os.chdir(pathlib.Path(self.driver_settings["path"]).parent)
# overwrite config file settings
self.driver_settings['input'] = input_directory
self.driver_settings['output'] = output_directory
self.driver_settings["input"] = input_directory
self.driver_settings["output"] = output_directory
# models_rgb must be specified manually for waifu2x-converter-cpp
# if it's not specified in the arguments, create automatically
if self.driver_settings['model-dir'] is None:
self.driver_settings['model-dir'] = pathlib.Path(self.driver_settings['path']).parent / 'models_rgb'
if self.driver_settings["model-dir"] is None:
self.driver_settings["model-dir"] = (
pathlib.Path(self.driver_settings["path"]).parent / "models_rgb"
)
# list to be executed
# initialize the list with waifu2x binary path as the first element
execute = [self.driver_settings['path']]
execute = [self.driver_settings["path"]]
for key in self.driver_settings.keys():
value = self.driver_settings[key]
# null or None means that leave this option out (keep default)
if key == 'path' or value is None or value is False:
if key == "path" or value is None or value is False:
continue
else:
if len(key) == 1:
execute.append(f'-{key}')
execute.append(f"-{key}")
else:
execute.append(f'--{key}')
execute.append(f"--{key}")
# true means key is an option
if value is not True:
@ -121,6 +125,8 @@ class WrapperMain:
# return the Popen object of the new process created
self.print_lock.acquire()
Avalon.debug_info(f'[upscaler] Subprocess {os.getpid()} executing: {" ".join(execute)}')
Avalon.debug_info(
f'[upscaler] Subprocess {os.getpid()} executing: {" ".join(execute)}'
)
self.print_lock.release()
return subprocess.Popen(execute, stdout=sys.stdout, stderr=sys.stderr)

View File

@ -41,32 +41,36 @@ class WrapperMain:
@staticmethod
def parse_arguments(arguments):
# fmt: off
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter, add_help=False)
parser.error = lambda message: (_ for _ in ()).throw(AttributeError(message))
parser.add_argument('--help', action='help', help='show this help message and exit')
parser.add_argument('-v', action='store_true', help='verbose output')
parser.add_argument('-i', type=str, help=argparse.SUPPRESS) # help='input image path (jpg/png/webp) or directory')
parser.add_argument('-o', type=str, help=argparse.SUPPRESS) # help='output image path (jpg/png/webp) or directory')
parser.add_argument('-n', type=int, choices=range(-1, 4), help='denoise level')
parser.add_argument('-s', type=int, help='upscale ratio')
parser.add_argument('-t', type=int, help='tile size (>=32)')
parser.add_argument('-m', type=str, help='waifu2x model path')
parser.add_argument('-g', type=int, help='gpu device to use')
parser.add_argument('-j', type=str, help='thread count for load/proc/save')
parser.add_argument('-x', action='store_true', help='enable tta mode')
parser.add_argument('-f', type=str, help=argparse.SUPPRESS) # help='output image format (jpg/png/webp, default=ext/png)')
parser.add_argument("--help", action="help", help="show this help message and exit")
parser.add_argument("-v", action="store_true", help="verbose output")
parser.add_argument("-i", type=str, help=argparse.SUPPRESS) # help="input image path (jpg/png/webp) or directory")
parser.add_argument("-o", type=str, help=argparse.SUPPRESS) # help="output image path (jpg/png/webp) or directory")
parser.add_argument("-n", type=int, choices=range(-1, 4), help="denoise level")
parser.add_argument("-s", type=int, help="upscale ratio")
parser.add_argument("-t", type=int, help="tile size (>=32)")
parser.add_argument("-m", type=str, help="waifu2x model path")
parser.add_argument("-g", type=int, help="gpu device to use")
parser.add_argument("-j", type=str, help="thread count for load/proc/save")
parser.add_argument("-x", action="store_true", help="enable tta mode")
parser.add_argument("-f", type=str, help=argparse.SUPPRESS) # help="output image format (jpg/png/webp, default=ext/png)")
return parser.parse_args(arguments)
# fmt: on
def load_configurations(self, upscaler):
# self.driver_settings['s'] = int(upscaler.scale_ratio)
self.driver_settings['j'] = '{}:{}:{}'.format(upscaler.processes, upscaler.processes, upscaler.processes)
self.driver_settings['f'] = upscaler.extracted_frame_format.lower()
self.driver_settings["j"] = "{}:{}:{}".format(
upscaler.processes, upscaler.processes, upscaler.processes
)
self.driver_settings["f"] = upscaler.extracted_frame_format.lower()
def set_scale_ratio(self, scale_ratio: int):
self.driver_settings['s'] = int(scale_ratio)
self.driver_settings["s"] = int(scale_ratio)
def upscale(self, input_directory, output_directory):
""" This is the core function for waifu2x class
"""This is the core function for waifu2x class
Arguments:
input_directory {string} -- source directory path
@ -76,33 +80,33 @@ class WrapperMain:
# change the working directory to the binary's parent directory
# so the binary can find shared object files and other files
os.chdir(pathlib.Path(self.driver_settings['path']).parent)
os.chdir(pathlib.Path(self.driver_settings["path"]).parent)
# overwrite config file settings
self.driver_settings['i'] = input_directory
self.driver_settings['o'] = output_directory
self.driver_settings["i"] = input_directory
self.driver_settings["o"] = output_directory
# by default, waifu2x-ncnn-vulkan will look for the models under the current working directory
# change the working directory to its containing folder if model directory not specified
if self.driver_settings['m'] is None and platform.system() == 'Windows':
os.chdir(pathlib.Path(self.driver_settings['path']).parent)
if self.driver_settings["m"] is None and platform.system() == "Windows":
os.chdir(pathlib.Path(self.driver_settings["path"]).parent)
# list to be executed
# initialize the list with waifu2x binary path as the first element
execute = [self.driver_settings['path']]
execute = [self.driver_settings["path"]]
for key in self.driver_settings.keys():
value = self.driver_settings[key]
# null or None means that leave this option out (keep default)
if key == 'path' or value is None or value is False:
if key == "path" or value is None or value is False:
continue
else:
if len(key) == 1:
execute.append(f'-{key}')
execute.append(f"-{key}")
else:
execute.append(f'--{key}')
execute.append(f"--{key}")
# true means key is an option
if value is not True:
@ -110,6 +114,8 @@ class WrapperMain:
# return the Popen object of the new process created
self.print_lock.acquire()
Avalon.debug_info(f'[upscaler] Subprocess {os.getpid()} executing: {" ".join(execute)}')
Avalon.debug_info(
f'[upscaler] Subprocess {os.getpid()} executing: {" ".join(execute)}'
)
self.print_lock.release()
return subprocess.Popen(execute, stdout=sys.stdout, stderr=sys.stderr)