video2x/bin/ffmpeg.py

287 lines
8.5 KiB
Python
Raw Normal View History

2018-12-11 20:52:48 +00:00
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
2018-12-11 20:52:48 +00:00
"""
Name: Video2X FFmpeg Controller
2018-12-11 20:52:48 +00:00
Author: K4YT3X
Date Created: Feb 24, 2018
Last Modified: July 27, 2019
2018-12-11 20:52:48 +00:00
2019-07-26 18:22:49 +00:00
Description: This class handles all FFmpeg related operations.
2018-12-11 20:52:48 +00:00
"""
# built-in imports
import json
2019-06-14 23:19:12 +00:00
import os
import pathlib
import subprocess
2018-12-11 20:52:48 +00:00
# third-party imports
from avalon_framework import Avalon
2018-12-11 20:52:48 +00:00
class Ffmpeg:
2019-07-26 18:22:49 +00:00
"""This class communicates with FFmpeg
2018-12-11 20:52:48 +00:00
This class deals with FFmpeg. It handles extracting
2018-12-11 20:52:48 +00:00
frames, stripping audio, converting images into videos
and inserting audio tracks to videos.
"""
def __init__(self, ffmpeg_settings, image_format):
self.ffmpeg_settings = ffmpeg_settings
self.ffmpeg_path = pathlib.Path(self.ffmpeg_settings['ffmpeg_path'])
self.ffmpeg_binary = self.ffmpeg_path / 'ffmpeg.exe'
self.ffmpeg_probe_binary = self.ffmpeg_path / 'ffprobe.exe'
self.image_format = image_format
self.pixel_format = None
def get_pixel_formats(self):
""" Get a dictionary of supported pixel formats
List all supported pixel formats and their
corresponding bit depth.
Returns:
dictionary -- JSON dict of all pixel formats to bit depth
"""
execute = [
self.ffmpeg_probe_binary,
'-v',
'quiet',
'-pix_fmts'
]
# turn elements into str
execute = [str(e) for e in execute]
Avalon.debug_info(f'Executing: {" ".join(execute)}')
# initialize dictionary to store pixel formats
pixel_formats = {}
# record all pixel formats into dictionary
for line in subprocess.run(execute, check=True, stdout=subprocess.PIPE).stdout.decode().split('\n'):
try:
pixel_formats[' '.join(line.split()).split()[1]] = int(' '.join(line.split()).split()[3])
except (IndexError, ValueError):
pass
# print pixel formats for debugging
Avalon.debug_info(pixel_formats)
return pixel_formats
2018-12-11 20:52:48 +00:00
def get_video_info(self, input_video):
""" Gets input video information
2018-12-11 20:52:48 +00:00
This method reads input video information
using ffprobe in dictionary
2018-12-11 20:52:48 +00:00
Arguments:
input_video {string} -- input video file path
Returns:
dictionary -- JSON text of input video information
2018-12-11 20:52:48 +00:00
"""
# this execution command needs to be hard-coded
# since video2x only strictly recignizes this one format
2019-03-19 17:07:20 +00:00
execute = [
self.ffmpeg_probe_binary,
2019-03-19 17:07:20 +00:00
'-v',
'quiet',
'-print_format',
'json',
'-show_format',
'-show_streams',
'-i',
input_video
2019-03-19 17:07:20 +00:00
]
# turn elements into str
execute = [str(e) for e in execute]
Avalon.debug_info(f'Executing: {" ".join(execute)}')
2019-03-19 17:07:20 +00:00
json_str = subprocess.run(execute, check=True, stdout=subprocess.PIPE).stdout
return json.loads(json_str.decode('utf-8'))
2018-12-11 20:52:48 +00:00
def extract_frames(self, input_video, extracted_frames):
"""Extract every frame from original videos
2018-12-11 20:52:48 +00:00
This method extracts every frame from input video using FFmpeg
2018-12-11 20:52:48 +00:00
Arguments:
input_video {string} -- input video path
extracted_frames {string} -- video output directory
2018-12-11 20:52:48 +00:00
"""
2019-03-19 17:07:20 +00:00
execute = [
self.ffmpeg_binary
]
execute.extend([
2019-03-19 17:07:20 +00:00
'-i',
input_video
])
2019-06-05 16:18:51 +00:00
execute.extend(self._read_configuration(phase='video_to_frames', section='output_options'))
execute.extend([
extracted_frames / f'extracted_%0d.{self.image_format}'
])
2019-06-05 16:18:51 +00:00
execute.extend(self._read_configuration(phase='video_to_frames'))
self._execute(execute)
2018-12-11 20:52:48 +00:00
def convert_video(self, framerate, resolution, upscaled_frames):
2018-12-11 20:52:48 +00:00
"""Converts images into videos
This method converts a set of images into a video
2018-12-11 20:52:48 +00:00
Arguments:
framerate {float} -- target video framerate
resolution {string} -- target video resolution
upscaled_frames {string} -- source images directory
2018-12-11 20:52:48 +00:00
"""
2019-03-19 17:07:20 +00:00
execute = [
self.ffmpeg_binary,
'-r',
str(framerate),
'-s',
resolution
]
# read FFmpeg input options
execute.extend(self._read_configuration(phase='frames_to_video', section='input_options'))
# WORKAROUND FOR WAIFU2X-NCNN-VULKAN
# Dev: SAT3LL
# rename all .png.png suffixes to .png
import re
import shutil
regex = re.compile(r'\.png\.png$')
for frame_name in upscaled_frames.iterdir():
(upscaled_frames / frame_name).rename(upscaled_frames / regex.sub('.png', str(frame_name)))
# END WORKAROUND
# append input frames path into command
execute.extend([
2019-03-19 17:07:20 +00:00
'-i',
upscaled_frames / f'extracted_%d.{self.image_format}'
])
# read FFmpeg output options
execute.extend(self._read_configuration(phase='frames_to_video', section='output_options'))
# read other options
execute.extend(self._read_configuration(phase='frames_to_video'))
# specify output file location
execute.extend([
upscaled_frames / 'no_audio.mp4'
])
self._execute(execute)
2018-12-11 20:52:48 +00:00
def migrate_audio_tracks_subtitles(self, input_video, output_video, upscaled_frames):
""" Migrates audio tracks and subtitles from input video to output video
2018-12-11 20:52:48 +00:00
Arguments:
input_video {string} -- input video file path
output_video {string} -- output video file path
upscaled_frames {string} -- directory containing upscaled frames
2018-12-11 20:52:48 +00:00
"""
2019-03-19 17:07:20 +00:00
execute = [
self.ffmpeg_binary,
'-i',
upscaled_frames / 'no_audio.mp4',
2019-03-19 17:07:20 +00:00
'-i',
input_video
2019-03-19 17:07:20 +00:00
]
2019-06-14 05:15:13 +00:00
execute.extend(self._read_configuration(phase='migrating_tracks', section='output_options'))
execute.extend([
output_video
])
execute.extend(self._read_configuration(phase='migrating_tracks'))
self._execute(execute)
def _read_configuration(self, phase, section=None):
""" read configuration from JSON
Read the configurations (arguments) from the JSON
configuration file and append them to the end of the
FFmpeg command.
Arguments:
execute {list} -- list of arguments to be executed
phase {str} -- phase of operation
"""
configuration = []
# if section is specified, read configurations or keys
# from only that section
if section:
source = self.ffmpeg_settings[phase][section].keys()
# if pixel format is not specified, use the source pixel format
try:
if self.ffmpeg_settings[phase][section].get('-pix_fmt') is None:
self.ffmpeg_settings[phase][section]['-pix_fmt'] = self.pixel_format
except KeyError:
pass
else:
source = self.ffmpeg_settings[phase].keys()
for key in source:
if section:
value = self.ffmpeg_settings[phase][section][key]
else:
value = self.ffmpeg_settings[phase][key]
# null or None means that leave this option out (keep default)
2019-07-26 18:22:49 +00:00
if value is None or value is False or isinstance(value, dict):
continue
2019-07-26 18:22:49 +00:00
# if the value is a list, append the same argument and all values
elif isinstance(value, list):
for subvalue in value:
configuration.append(key)
if value is not True:
configuration.append(str(subvalue))
# otherwise the value is typical
else:
configuration.append(key)
# true means key is an option
if value is True:
continue
configuration.append(str(value))
return configuration
def _execute(self, execute):
""" execute command
Arguments:
execute {list} -- list of arguments to be executed
Returns:
int -- execution return code
"""
Avalon.debug_info(f'Executing: {execute}')
# turn all list elements into string to avoid errors
execute = [str(e) for e in execute]
return subprocess.run(execute, shell=True, check=True).returncode