video2x/src/wrappers/ffmpeg.py

278 lines
8.3 KiB
Python
Raw Normal View History

2018-12-11 20:52:48 +00:00
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
2018-12-11 20:52:48 +00:00
"""
Name: Video2X FFmpeg Controller
2018-12-11 20:52:48 +00:00
Author: K4YT3X
Date Created: Feb 24, 2018
Last Modified: May 9, 2020
2018-12-11 20:52:48 +00:00
2019-07-26 18:22:49 +00:00
Description: This class handles all FFmpeg related operations.
2018-12-11 20:52:48 +00:00
"""
# built-in imports
import json
import pathlib
2020-05-12 00:24:18 +00:00
import shlex
import subprocess
2018-12-11 20:52:48 +00:00
# third-party imports
from avalon_framework import Avalon
2018-12-11 20:52:48 +00:00
class Ffmpeg:
2019-07-26 18:22:49 +00:00
"""This class communicates with FFmpeg
2018-12-11 20:52:48 +00:00
This class deals with FFmpeg. It handles extracting
2018-12-11 20:52:48 +00:00
frames, stripping audio, converting images into videos
and inserting audio tracks to videos.
"""
def __init__(self, ffmpeg_settings, image_format):
self.ffmpeg_settings = ffmpeg_settings
self.ffmpeg_path = pathlib.Path(self.ffmpeg_settings['ffmpeg_path'])
self.ffmpeg_binary = self.ffmpeg_path / 'ffmpeg'
self.ffmpeg_probe_binary = self.ffmpeg_path / 'ffprobe'
# video metadata
self.image_format = image_format
self.intermediate_file_name = pathlib.Path(self.ffmpeg_settings['intermediate_file_name'])
2020-05-12 00:24:18 +00:00
self.pixel_format = self.ffmpeg_settings['input_to_frames']['output_options']['-pix_fmt']
def get_pixel_formats(self):
""" Get a dictionary of supported pixel formats
List all supported pixel formats and their
corresponding bit depth.
Returns:
dictionary -- JSON dict of all pixel formats to bit depth
"""
execute = [
self.ffmpeg_probe_binary,
2020-05-12 00:24:18 +00:00
# '-v',
# 'quiet',
'-pix_fmts'
]
# turn elements into str
execute = [str(e) for e in execute]
Avalon.debug_info(f'Executing: {" ".join(execute)}')
# initialize dictionary to store pixel formats
pixel_formats = {}
# record all pixel formats into dictionary
for line in subprocess.run(execute, check=True, stdout=subprocess.PIPE).stdout.decode().split('\n'):
try:
pixel_formats[' '.join(line.split()).split()[1]] = int(' '.join(line.split()).split()[3])
except (IndexError, ValueError):
pass
# print pixel formats for debugging
Avalon.debug_info(str(pixel_formats))
return pixel_formats
2018-12-11 20:52:48 +00:00
2020-05-12 00:24:18 +00:00
def probe_file_info(self, input_video):
""" Gets input video information
2018-12-11 20:52:48 +00:00
This method reads input video information
using ffprobe in dictionary
2018-12-11 20:52:48 +00:00
Arguments:
input_video {string} -- input video file path
Returns:
dictionary -- JSON text of input video information
2018-12-11 20:52:48 +00:00
"""
# this execution command needs to be hard-coded
# since video2x only strictly recignizes this one format
2019-03-19 17:07:20 +00:00
execute = [
self.ffmpeg_probe_binary,
2019-03-19 17:07:20 +00:00
'-v',
'quiet',
'-print_format',
'json',
'-show_format',
'-show_streams',
'-i',
input_video
2019-03-19 17:07:20 +00:00
]
# turn elements into str
execute = [str(e) for e in execute]
2020-05-12 00:24:18 +00:00
Avalon.debug_info(f'Executing: {shlex.join(execute)}')
2019-03-19 17:07:20 +00:00
json_str = subprocess.run(execute, check=True, stdout=subprocess.PIPE).stdout
return json.loads(json_str.decode('utf-8'))
2018-12-11 20:52:48 +00:00
2020-05-12 00:24:18 +00:00
def extract_frames(self, input_file, extracted_frames):
""" extract frames from video or GIF file
2018-12-11 20:52:48 +00:00
"""
2019-03-19 17:07:20 +00:00
execute = [
self.ffmpeg_binary
]
2020-05-12 00:24:18 +00:00
execute.extend(self._read_configuration(phase='input_to_frames'))
2019-08-12 03:57:10 +00:00
execute.extend([
2019-03-19 17:07:20 +00:00
'-i',
2020-05-12 00:24:18 +00:00
input_file
])
2020-05-12 00:24:18 +00:00
execute.extend(self._read_configuration(phase='input_to_frames', section='output_options'))
2019-06-05 16:18:51 +00:00
execute.extend([
extracted_frames / f'extracted_%0d.{self.image_format}'
])
return(self._execute(execute))
2018-12-11 20:52:48 +00:00
2020-05-12 00:24:18 +00:00
def assemble_video(self, framerate, upscaled_frames):
2018-12-11 20:52:48 +00:00
"""Converts images into videos
This method converts a set of images into a video
2018-12-11 20:52:48 +00:00
Arguments:
framerate {float} -- target video framerate
resolution {string} -- target video resolution
upscaled_frames {string} -- source images directory
2018-12-11 20:52:48 +00:00
"""
2019-03-19 17:07:20 +00:00
execute = [
self.ffmpeg_binary,
'-r',
2020-05-12 00:24:18 +00:00
str(framerate)
# '-s',
# resolution
]
2019-08-12 03:57:10 +00:00
# read other options
execute.extend(self._read_configuration(phase='frames_to_video'))
# read FFmpeg input options
execute.extend(self._read_configuration(phase='frames_to_video', section='input_options'))
# WORKAROUND FOR WAIFU2X-NCNN-VULKAN
# Dev: SAT3LL
# rename all .png.png suffixes to .png
import re
2019-08-04 03:15:03 +00:00
regex = re.compile(r'\.png\.png$', re.IGNORECASE)
for frame_name in upscaled_frames.iterdir():
(upscaled_frames / frame_name).rename(upscaled_frames / regex.sub('.png', str(frame_name)))
# END WORKAROUND
# append input frames path into command
execute.extend([
2019-03-19 17:07:20 +00:00
'-i',
upscaled_frames / f'extracted_%d.{self.image_format}'
])
# read FFmpeg output options
execute.extend(self._read_configuration(phase='frames_to_video', section='output_options'))
# specify output file location
execute.extend([
upscaled_frames / self.intermediate_file_name
])
return(self._execute(execute))
2018-12-11 20:52:48 +00:00
def migrate_streams(self, input_video, output_video, upscaled_frames):
""" Migrates audio tracks and subtitles from input video to output video
2018-12-11 20:52:48 +00:00
Arguments:
input_video {string} -- input video file path
output_video {string} -- output video file path
upscaled_frames {string} -- directory containing upscaled frames
2018-12-11 20:52:48 +00:00
"""
2019-03-19 17:07:20 +00:00
execute = [
2019-08-12 03:57:10 +00:00
self.ffmpeg_binary
]
execute.extend(self._read_configuration(phase='migrating_tracks'))
execute.extend([
2019-03-19 17:07:20 +00:00
'-i',
upscaled_frames / self.intermediate_file_name,
2019-03-19 17:07:20 +00:00
'-i',
input_video
2019-08-12 03:57:10 +00:00
])
2019-03-19 17:07:20 +00:00
2019-06-14 05:15:13 +00:00
execute.extend(self._read_configuration(phase='migrating_tracks', section='output_options'))
execute.extend([
output_video
])
return(self._execute(execute))
def _read_configuration(self, phase, section=None):
""" read configuration from JSON
Read the configurations (arguments) from the JSON
configuration file and append them to the end of the
FFmpeg command.
Arguments:
execute {list} -- list of arguments to be executed
phase {str} -- phase of operation
"""
configuration = []
# if section is specified, read configurations or keys
# from only that section
if section:
source = self.ffmpeg_settings[phase][section].keys()
# if pixel format is not specified, use the source pixel format
try:
if self.ffmpeg_settings[phase][section].get('-pix_fmt') is None:
self.ffmpeg_settings[phase][section]['-pix_fmt'] = self.pixel_format
except KeyError:
pass
else:
source = self.ffmpeg_settings[phase].keys()
for key in source:
if section:
value = self.ffmpeg_settings[phase][section][key]
else:
value = self.ffmpeg_settings[phase][key]
# null or None means that leave this option out (keep default)
2019-07-26 18:22:49 +00:00
if value is None or value is False or isinstance(value, dict):
continue
2019-07-26 18:22:49 +00:00
# if the value is a list, append the same argument and all values
elif isinstance(value, list):
for subvalue in value:
configuration.append(key)
if value is not True:
configuration.append(str(subvalue))
# otherwise the value is typical
else:
configuration.append(key)
# true means key is an option
if value is True:
continue
configuration.append(str(value))
return configuration
def _execute(self, execute):
# turn all list elements into string to avoid errors
execute = [str(e) for e in execute]
2020-05-12 00:24:18 +00:00
Avalon.debug_info(f'Executing: {shlex.join(execute)}')
2019-08-12 03:57:10 +00:00
return subprocess.Popen(execute)