removed multi-threading in favor of muti-processing

This commit is contained in:
k4yt3x 2020-02-26 05:27:57 -05:00
parent a2150a8dbc
commit 6a100b1526
5 changed files with 254 additions and 274 deletions

View File

@ -4,13 +4,16 @@
Name: Anime4K Driver
Author: K4YT3X
Date Created: August 15, 2019
Last Modified: November 15, 2019
Last Modified: February 26, 2020
Description: This class is a high-level wrapper
for Anime4k.
"""
# built-in imports
import os
import queue
import shlex
import subprocess
import threading
@ -31,7 +34,7 @@ class Anime4k:
self.driver_settings = driver_settings
self.print_lock = threading.Lock()
def upscale(self, input_directory, output_directory, scale_ratio, upscaler_exceptions, push_strength=None, push_grad_strength=None):
def upscale(self, input_directory, output_directory, scale_ratio, processes, push_strength=None, push_grad_strength=None):
""" Anime4K wrapper
Arguments:
@ -46,47 +49,63 @@ class Anime4k:
Returns:
subprocess.Popen.returncode -- command line return value of execution
"""
try:
# return value is the sum of all execution return codes
return_value = 0
# get a list lof all image files in input_directory
extracted_frame_files = [f for f in input_directory.iterdir() if str(f).lower().endswith('.png') or str(f).lower().endswith('.jpg')]
# a list of all commands to be executed
commands = queue.Queue()
# upscale each image in input_directory
for image in extracted_frame_files:
# get a list lof all image files in input_directory
extracted_frame_files = [f for f in input_directory.iterdir() if str(f).lower().endswith('.png') or str(f).lower().endswith('.jpg')]
execute = [
self.driver_settings['java_path'],
'-jar',
self.driver_settings['path'],
str(image.absolute()),
str(output_directory / image.name),
str(scale_ratio)
]
# upscale each image in input_directory
for image in extracted_frame_files:
# optional arguments
kwargs = [
'push_strength',
'push_grad_strength'
]
execute = [
self.driver_settings['java_path'],
'-jar',
self.driver_settings['path'],
str(image.absolute()),
str(output_directory / image.name),
str(scale_ratio)
]
# if optional argument specified, append value to execution list
for arg in kwargs:
if locals()[arg] is not None:
execute.extend([locals([arg])])
# optional arguments
kwargs = [
'push_strength',
'push_grad_strength'
]
# if optional argument specified, append value to execution list
for arg in kwargs:
if locals()[arg] is not None:
execute.extend([locals([arg])])
commands.put(execute)
# initialize two lists to hold running and finished processes
anime4k_running_processes = []
anime4k_finished_processes = []
# run all commands in queue
while not commands.empty():
# if any commands have completed
# remove the subprocess.Popen project and move it into finished processes
for process in anime4k_running_processes:
if process.poll() is not None:
Avalon.debug_info(f'Subprocess {process.pid} exited with code {process.poll()}')
anime4k_finished_processes.append(process)
anime4k_running_processes.remove(process)
# when number running processes is less than what's specified
# create new processes and add to running process pool
while len(anime4k_running_processes) < processes:
next_in_queue = commands.get()
new_process = subprocess.Popen(next_in_queue)
anime4k_running_processes.append(new_process)
self.print_lock.acquire()
Avalon.debug_info(f'Executing: {execute}', )
Avalon.debug_info(f'[upscaler] Subprocess {new_process.pid} executing: {shlex.join(next_in_queue)}')
self.print_lock.release()
return_value += subprocess.run(execute, check=True).returncode
# print thread exiting message
self.print_lock.acquire()
Avalon.debug_info(f'[upscaler] Thread {threading.current_thread().name} exiting')
self.print_lock.release()
# return command execution return code
return return_value
except Exception as e:
upscaler_exceptions.append(e)
# return command execution return code
return anime4k_finished_processes

View File

@ -30,6 +30,7 @@ import copy
import pathlib
import re
import shutil
import sys
import tempfile
import threading
import time
@ -65,7 +66,7 @@ class Upscaler:
self.scale_height = None
self.scale_ratio = None
self.model_dir = None
self.threads = 1
self.processes = 1
self.video2x_cache_directory = pathlib.Path(tempfile.gettempdir()) / 'video2x'
self.image_format = 'png'
self.preserve_frames = False
@ -150,150 +151,136 @@ class Upscaler:
w2 {Waifu2x Object} -- initialized waifu2x object
"""
# progress bar thread exit signal
# progress bar process exit signal
self.progress_bar_exit_signal = False
# create a container for exceptions in threads
# if this thread is not empty, then an exception has occured
self.upscaler_exceptions = []
# initialize waifu2x driver
drivers = AVAILABLE_DRIVERS
if self.waifu2x_driver not in drivers:
raise UnrecognizedDriverError(f'Unrecognized waifu2x driver: {self.waifu2x_driver}')
if self.waifu2x_driver not in AVAILABLE_DRIVERS:
raise UnrecognizedDriverError(f'Unrecognized driver: {self.waifu2x_driver}')
# it's easier to do multi-threading with waifu2x_converter
# the number of threads can be passed directly to waifu2x_converter
if self.waifu2x_driver == 'waifu2x_converter':
w2 = Waifu2xConverter(self.driver_settings, self.model_dir)
# create a container for all upscaler processes
upscaler_processes = []
progress_bar = threading.Thread(target=self._progress_bar, args=([self.extracted_frames],))
progress_bar.start()
# list all images in the extracted frames
frames = [(self.extracted_frames / f) for f in self.extracted_frames.iterdir() if f.is_file]
w2.upscale(self.extracted_frames, self.upscaled_frames, self.scale_ratio, self.threads, self.image_format, self.upscaler_exceptions)
for image in [f for f in self.upscaled_frames.iterdir() if f.is_file()]:
renamed = re.sub(f'_\[.*-.*\]\[x(\d+(\.\d+)?)\]\.{self.image_format}', f'.{self.image_format}', str(image))
(self.upscaled_frames / image).rename(self.upscaled_frames / renamed)
# if we have less images than processes,
# create only the processes necessary
if len(frames) < self.processes:
self.processes = len(frames)
self.progress_bar_exit_signal = True
progress_bar.join()
return
# create a directory for each process and append directory
# name into a list
process_directories = []
for process_id in range(self.processes):
process_directory = self.extracted_frames / str(process_id)
process_directories.append(process_directory)
# delete old directories and create new directories
if process_directory.is_dir():
shutil.rmtree(process_directory)
process_directory.mkdir(parents=True, exist_ok=True)
# waifu2x-converter-cpp will perform multi-threading within its own process
if self.waifu2x_driver in ['waifu2x_converter', 'anime4k'] :
process_directories = [self.extracted_frames]
# drivers that are to be multi-threaded by video2x
else:
# create a container for all upscaler threads
upscaler_threads = []
# list all images in the extracted frames
frames = [(self.extracted_frames / f) for f in self.extracted_frames.iterdir() if f.is_file]
# if we have less images than threads,
# create only the threads necessary
if len(frames) < self.threads:
self.threads = len(frames)
# create a directory for each thread and append directory
# name into a list
thread_pool = []
thread_directories = []
for thread_id in range(self.threads):
thread_directory = self.extracted_frames / str(thread_id)
thread_directories.append(thread_directory)
# delete old directories and create new directories
if thread_directory.is_dir():
shutil.rmtree(thread_directory)
thread_directory.mkdir(parents=True, exist_ok=True)
# append directory path into list
thread_pool.append((thread_directory, thread_id))
# evenly distribute images into each directory
# until there is none left in the directory
for image in frames:
# move image
image.rename(thread_pool[0][0] / image.name)
image.rename(process_directories[0] / image.name)
# rotate list
thread_pool = thread_pool[-1:] + thread_pool[:-1]
process_directories = process_directories[-1:] + process_directories[:-1]
# create threads and start them
for thread_info in thread_pool:
# create threads and start them
for process_directory in process_directories:
# create a separate w2 instance for each thread
if self.waifu2x_driver == 'waifu2x_caffe':
w2 = Waifu2xCaffe(copy.deepcopy(self.driver_settings), self.method, self.model_dir, self.bit_depth)
if self.scale_ratio:
thread = threading.Thread(target=w2.upscale,
args=(thread_info[0],
self.upscaled_frames,
self.scale_ratio,
False,
False,
self.image_format,
self.upscaler_exceptions))
else:
thread = threading.Thread(target=w2.upscale,
args=(thread_info[0],
self.upscaled_frames,
False,
self.scale_width,
self.scale_height,
self.image_format,
self.upscaler_exceptions))
# if the driver being used is waifu2x-caffe
if self.waifu2x_driver == 'waifu2x_caffe':
driver = Waifu2xCaffe(copy.deepcopy(self.driver_settings), self.method, self.model_dir, self.bit_depth)
if self.scale_ratio:
upscaler_processes.append(driver.upscale(process_directory,
self.upscaled_frames,
self.scale_ratio,
False,
False,
self.image_format))
else:
upscaler_processes.append(driver.upscale(process_directory,
self.upscaled_frames,
False,
self.scale_width,
self.scale_height,
self.image_format))
# if the driver being used is waifu2x_ncnn_vulkan
elif self.waifu2x_driver == 'waifu2x_ncnn_vulkan':
w2 = Waifu2xNcnnVulkan(copy.deepcopy(self.driver_settings))
thread = threading.Thread(target=w2.upscale,
args=(thread_info[0],
self.upscaled_frames,
self.scale_ratio,
self.upscaler_exceptions))
# if the driver being used is waifu2x-converter-cpp
elif self.waifu2x_driver == 'waifu2x_converter':
driver = Waifu2xConverter(self.driver_settings, self.model_dir)
upscaler_processes.append(driver.upscale(process_directory,
self.upscaled_frames,
self.scale_ratio,
self.processes,
self.image_format))
# if the driver being used is anime4k
elif self.waifu2x_driver == 'anime4k':
w2 = Anime4k(copy.deepcopy(self.driver_settings))
thread = threading.Thread(target=w2.upscale,
args=(thread_info[0],
self.upscaled_frames,
self.scale_ratio,
self.upscaler_exceptions))
# if the driver being used is waifu2x-ncnn-vulkan
elif self.waifu2x_driver == 'waifu2x_ncnn_vulkan':
driver = Waifu2xNcnnVulkan(copy.deepcopy(self.driver_settings))
upscaler_processes.append(driver.upscale(process_directory,
self.upscaled_frames,
self.scale_ratio))
# create thread
thread.name = thread_info[1]
# if the driver being used is anime4k
elif self.waifu2x_driver == 'anime4k':
driver = Anime4k(copy.deepcopy(self.driver_settings))
upscaler_processes += driver.upscale(process_directory,
self.upscaled_frames,
self.scale_ratio,
self.processes)
# add threads into the pool
upscaler_threads.append(thread)
# start progress bar in a different thread
progress_bar = threading.Thread(target=self._progress_bar, args=(process_directories,))
progress_bar.start()
# start progress bar in a different thread
progress_bar = threading.Thread(target=self._progress_bar, args=(thread_directories,))
progress_bar.start()
# create the clearer and start it
Avalon.debug_info('Starting upscaled image cleaner')
image_cleaner = ImageCleaner(self.extracted_frames, self.upscaled_frames, len(upscaler_processes))
image_cleaner.start()
# create the clearer and start it
Avalon.debug_info('Starting upscaled image cleaner')
image_cleaner = ImageCleaner(self.extracted_frames, self.upscaled_frames, len(upscaler_threads))
image_cleaner.start()
# wait for all process to exit
try:
Avalon.debug_info('Main process waiting for subprocesses to exit')
for process in upscaler_processes:
Avalon.debug_info(f'Subprocess {process.pid} exited with code {process.wait()}')
except (KeyboardInterrupt, SystemExit):
Avalon.warning('Exit signal received')
Avalon.warning('Killing processes')
for process in upscaler_processes:
process.terminate()
# start all threads
for thread in upscaler_threads:
thread.start()
# wait for threads to finish
for thread in upscaler_threads:
thread.join()
# upscaling done, kill the clearer
# cleanup and exit with exit code 1
Avalon.debug_info('Killing upscaled image cleaner')
image_cleaner.stop()
self.progress_bar_exit_signal = True
sys.exit(1)
if len(self.upscaler_exceptions) != 0:
raise(self.upscaler_exceptions[0])
# if the driver is waifu2x-converter-cpp
# images need to be renamed to be recognizable for FFmpeg
if self.waifu2x_driver == 'waifu2x_converter':
for image in [f for f in self.upscaled_frames.iterdir() if f.is_file()]:
renamed = re.sub(f'_\\[.*\\]\\[x(\\d+(\\.\\d+)?)\\]\\.{self.image_format}', f'.{self.image_format}', str(image.name))
(self.upscaled_frames / image).rename(self.upscaled_frames / renamed)
# upscaling done, kill the clearer
Avalon.debug_info('Killing upscaled image cleaner')
image_cleaner.stop()
# pass exit signal to progress bar thread
self.progress_bar_exit_signal = True
def run(self):
"""Main controller for Video2X
""" Main controller for Video2X
This function controls the flow of video conversion
and handles all necessary functions.
@ -337,6 +324,7 @@ class Upscaler:
# get a dict of all pixel formats and corresponding bit depth
pixel_formats = fm.get_pixel_formats()
# try getting pixel format's corresponding bti depth
try:
self.bit_depth = pixel_formats[fm.pixel_format]
except KeyError:

View File

@ -4,13 +4,15 @@
Name: Waifu2x Caffe Driver
Author: K4YT3X
Date Created: Feb 24, 2018
Last Modified: October 6, 2019
Last Modified: February 22, 2020
Description: This class is a high-level wrapper
for waifu2x-caffe.
"""
# built-in imports
import os
import shlex
import subprocess
import threading
@ -38,7 +40,7 @@ class Waifu2xCaffe:
self.model_dir = model_dir
self.print_lock = threading.Lock()
def upscale(self, input_directory, output_directory, scale_ratio, scale_width, scale_height, image_format, upscaler_exceptions):
def upscale(self, input_directory, output_directory, scale_ratio, scale_width, scale_height, image_format):
"""This is the core function for WAIFU2X class
Arguments:
@ -48,51 +50,38 @@ class Waifu2xCaffe:
height {int} -- output video height
"""
try:
# overwrite config file settings
self.driver_settings['input_path'] = input_directory
self.driver_settings['output_path'] = output_directory
# overwrite config file settings
self.driver_settings['input_path'] = input_directory
self.driver_settings['output_path'] = output_directory
if scale_ratio:
self.driver_settings['scale_ratio'] = scale_ratio
elif scale_width and scale_height:
self.driver_settings['scale_width'] = scale_width
self.driver_settings['scale_height'] = scale_height
if scale_ratio:
self.driver_settings['scale_ratio'] = scale_ratio
elif scale_width and scale_height:
self.driver_settings['scale_width'] = scale_width
self.driver_settings['scale_height'] = scale_height
self.driver_settings['output_extention'] = image_format
self.driver_settings['output_extention'] = image_format
# print thread start message
self.print_lock.acquire()
Avalon.debug_info(f'[upscaler] Thread {threading.current_thread().name} started')
self.print_lock.release()
# list to be executed
# initialize the list with waifu2x binary path as the first element
execute = [str(self.driver_settings['path'])]
# list to be executed
# initialize the list with waifu2x binary path as the first element
execute = [str(self.driver_settings['path'])]
for key in self.driver_settings.keys():
for key in self.driver_settings.keys():
value = self.driver_settings[key]
value = self.driver_settings[key]
# is executable key or null or None means that leave this option out (keep default)
if key == 'path' or value is None or value is False:
continue
# is executable key or null or None means that leave this option out (keep default)
if key == 'path' or value is None or value is False:
continue
else:
if len(key) == 1:
execute.append(f'-{key}')
else:
if len(key) == 1:
execute.append(f'-{key}')
else:
execute.append(f'--{key}')
execute.append(str(value))
execute.append(f'--{key}')
execute.append(str(value))
Avalon.debug_info(f'Executing: {execute}')
completed_command = subprocess.run(execute, check=True)
# print thread exiting message
self.print_lock.acquire()
Avalon.debug_info(f'[upscaler] Thread {threading.current_thread().name} exiting')
self.print_lock.release()
# return command execution return code
return completed_command.returncode
except Exception as e:
upscaler_exceptions.append(e)
# return the Popen object of the new process created
self.print_lock.acquire()
Avalon.debug_info(f'[upscaler] Subprocess {os.getpid()} executing: {shlex.join(execute)}')
self.print_lock.release()
return subprocess.Popen(execute)

View File

@ -4,14 +4,16 @@
Name: Waifu2x Converter CPP Driver
Author: K4YT3X
Date Created: February 8, 2019
Last Modified: October 6, 2019
Last Modified: February 22, 2020
Description: This class is a high-level wrapper
for waifu2x-converter-cpp.
"""
# built-in imports
import os
import pathlib
import shlex
import subprocess
import threading
@ -33,7 +35,7 @@ class Waifu2xConverter:
self.driver_settings['model_dir'] = model_dir
self.print_lock = threading.Lock()
def upscale(self, input_directory, output_directory, scale_ratio, jobs, image_format, upscaler_exceptions):
def upscale(self, input_directory, output_directory, scale_ratio, jobs, image_format):
""" Waifu2x Converter Driver Upscaler
This method executes the upscaling of extracted frames.
@ -44,53 +46,47 @@ class Waifu2xConverter:
threads {int} -- number of threads
"""
try:
# overwrite config file settings
self.driver_settings['input'] = input_directory
self.driver_settings['output'] = output_directory
self.driver_settings['scale-ratio'] = scale_ratio
self.driver_settings['jobs'] = jobs
self.driver_settings['output-format'] = image_format
# overwrite config file settings
self.driver_settings['input'] = input_directory
self.driver_settings['output'] = output_directory
self.driver_settings['scale-ratio'] = scale_ratio
self.driver_settings['jobs'] = jobs
self.driver_settings['output-format'] = image_format
# models_rgb must be specified manually for waifu2x-converter-cpp
# if it's not specified in the arguments, create automatically
if self.driver_settings['model-dir'] is None:
self.driver_settings['model-dir'] = pathlib.Path(self.driver_settings['waifu2x_converter_path']) / 'models_rgb'
# models_rgb must be specified manually for waifu2x-converter-cpp
# if it's not specified in the arguments, create automatically
if self.driver_settings['model-dir'] is None:
self.driver_settings['model-dir'] = pathlib.Path(self.driver_settings['path']) / 'models_rgb'
# print thread start message
self.print_lock.acquire()
Avalon.debug_info(f'[upscaler] Thread {threading.current_thread().name} started')
self.print_lock.release()
# list to be executed
# initialize the list with waifu2x binary path as the first element
execute = [str(pathlib.Path(self.driver_settings['path']) / 'waifu2x-converter-cpp.exe')]
# list to be executed
# initialize the list with waifu2x binary path as the first element
execute = [str(pathlib.Path(self.driver_settings['path']) / 'waifu2x-converter-cpp.exe')]
for key in self.driver_settings.keys():
for key in self.driver_settings.keys():
value = self.driver_settings[key]
value = self.driver_settings[key]
# the key doesn't need to be passed in this case
if key == 'path':
continue
# the key doesn't need to be passed in this case
if key == 'path':
continue
# null or None means that leave this option out (keep default)
elif value is None or value is False:
continue
# null or None means that leave this option out (keep default)
elif value is None or value is False:
continue
else:
if len(key) == 1:
execute.append(f'-{key}')
else:
if len(key) == 1:
execute.append(f'-{key}')
else:
execute.append(f'--{key}')
execute.append(f'--{key}')
# true means key is an option
if value is True:
continue
# true means key is an option
if value is True:
continue
execute.append(str(value))
execute.append(str(value))
Avalon.debug_info(f'Executing: {execute}')
return subprocess.run(execute, check=True).returncode
except Exception as e:
upscaler_exceptions.append(e)
# return the Popen object of the new process created
self.print_lock.acquire()
Avalon.debug_info(f'[upscaler] Subprocess {os.getpid()} executing: {shlex.join(execute)}')
self.print_lock.release()
return subprocess.Popen(execute)

View File

@ -7,7 +7,7 @@ Date Created: June 26, 2019
Last Modified: November 15, 2019
Editor: K4YT3X
Last Modified: January 4, 2020
Last Modified: February 22, 2020
Description: This class is a high-level wrapper
for waifu2x_ncnn_vulkan.
@ -15,6 +15,7 @@ for waifu2x_ncnn_vulkan.
# built-in imports
import os
import shlex
import subprocess
import threading
@ -42,7 +43,7 @@ class Waifu2xNcnnVulkan:
self.print_lock = threading.Lock()
def upscale(self, input_directory, output_directory, scale_ratio, upscaler_exceptions):
def upscale(self, input_directory, output_directory, scale_ratio):
"""This is the core function for WAIFU2X class
Arguments:
@ -51,44 +52,31 @@ class Waifu2xNcnnVulkan:
ratio {int} -- output video ratio
"""
try:
# overwrite config file settings
self.driver_settings['i'] = input_directory
self.driver_settings['o'] = output_directory
self.driver_settings['s'] = int(scale_ratio)
# overwrite config file settings
self.driver_settings['i'] = input_directory
self.driver_settings['o'] = output_directory
self.driver_settings['s'] = int(scale_ratio)
# print thread start message
self.print_lock.acquire()
Avalon.debug_info(f'[upscaler] Thread {threading.current_thread().name} started')
self.print_lock.release()
# list to be executed
# initialize the list with waifu2x binary path as the first element
execute = [str(self.driver_settings['path'])]
# list to be executed
# initialize the list with waifu2x binary path as the first element
execute = [str(self.driver_settings['path'])]
for key in self.driver_settings.keys():
for key in self.driver_settings.keys():
value = self.driver_settings[key]
value = self.driver_settings[key]
# is executable key or null or None means that leave this option out (keep default)
if key == 'path' or value is None or value is False:
continue
# is executable key or null or None means that leave this option out (keep default)
if key == 'path' or value is None or value is False:
continue
else:
if len(key) == 1:
execute.append(f'-{key}')
else:
if len(key) == 1:
execute.append(f'-{key}')
else:
execute.append(f'--{key}')
execute.append(str(value))
execute.append(f'--{key}')
execute.append(str(value))
Avalon.debug_info(f'Executing: {execute}')
completed_command = subprocess.run(execute, check=True)
# print thread exiting message
self.print_lock.acquire()
Avalon.debug_info(f'[upscaler] Thread {threading.current_thread().name} exiting')
self.print_lock.release()
# return command execution return code
return completed_command.returncode
except Exception as e:
upscaler_exceptions.append(e)
# return the Popen object of the new process created
self.print_lock.acquire()
Avalon.debug_info(f'[upscaler] Subprocess {os.getpid()} executing: {shlex.join(execute)}')
self.print_lock.release()
return subprocess.Popen(execute)