video2x/video2x/pipe_printer.py
2022-03-01 21:11:50 +00:00

64 lines
1.7 KiB
Python
Executable File

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Copyright (C) 2018-2022 K4YT3X and contributors.
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
Name: PIPE Printer
Author: K4YT3X
Date Created: February 27, 2022
Last Modified: February 28, 2022
"""
import os
import sys
import threading
import time
from typing import IO
class PipePrinter(threading.Thread):
def __init__(self, stderr: IO[bytes]) -> None:
threading.Thread.__init__(self)
self.stderr = stderr
self.running = False
# set read mode to non-blocking
os.set_blocking(self.stderr.fileno(), False)
def _print_output(self) -> None:
output = self.stderr.read()
if output is not None and len(output) != 0:
print(output.decode(), file=sys.stderr)
def run(self) -> None:
self.running = True
# keep printing contents in the PIPE
while self.running:
time.sleep(0.5)
try:
self._print_output()
# pipe closed
except ValueError:
break
return super().run()
def stop(self) -> None:
self.running = False