Media-Manager/Video/videoservice.py

252 lines
15 KiB
Python

import os
import re
import time
import default
import subprocess
from Universal import tools
from Universal import ioservice
from Universal import loggingservice
from Video.Services import tmdbservice
class VideoService:
def __init__(self):
self.logger = loggingservice.Output(True, False)
self.video_settings = default.VideoSettings()
self.tmdbservice = tmdbservice(self.logger)
self.video_settings.updateSettings()
def startNewProcess(self, num=1):
self.logger.debugReport("[Video] startNewProcess")
self.current_library = num
self.total_processed = 0
self.is_processing = True
self.video_settings.updateLibrary(self.current_library)
if self.video_settings.library["Server"].lower().find("none") == -1:
self.video_settings.updateServer(self.video_settings.library["Server"])
while self.is_processing:
self.processLibraries()
def processLibraries(self, refreshLib = True):
self.logger.debugReport("[Video] processLibraries")
tmp_library_time = time.time()
if self.video_settings.library is not None:
if self.video_settings.library["Input"] is not None:
if refreshLib:
self.total_processed = 0
self.logger.infoReport("[Video] Current Library: " + self.video_settings.library["Name"])
self.current_library_list = ioservice.listAllFiles(self.video_settings.library["Input"])
if self.current_library_list is not None:
self.current_library_list = sorted(self.current_library_list)
if self.current_library_list is not None:
if refreshLib:
self.srt_list = []
for item_search in self.current_library_list:
if item_search.lower().find(".srt") != -1:
self.srt_list.append(item_search)
if self.srt_list is not None:
for item_search in self.srt_list:
self.current_library_list.remove(item_search)
self.logger.infoReport("[Video] Found a total of " + str(len(self.current_library_list)) + " files")
for self.current_file in self.current_library_list:
if self.total_processed != 0:
self.logger.infoReport("[Video] Total file remaining: " + str(len(self.current_library_list) - self.total_processed))
self.processVideo()
else:
self.logger.infoReport("[Video] Folder is empty")
ioservice.deleteEmptyFolders(self.video_settings.library["Input"])
else:
self.is_processing = False
else:
self.is_processing = False
self.logger.infoReport("[Video] Current Library: " + self.video_settings.library["Name"] + " Time Elapsed: " + time.strftime("%H:%M:%S", time.gmtime(time.time() - tmp_library_time)))
self.current_library += 1
def processVideo(self):
self.logger.debugReport("[Video] processVideo")
self.logger.infoReport("[Video] Current File: " + self.current_file)
tmp_isDeleted = False
if not ioservice.checkSizeChange(self.current_file):
if ioservice.getFileSize(self.current_file) == 0:
self.logger.infoReport("[Video] Assuming file has been deleted or moved... Skipping...")
tmp_isDeleted = True
else:
self.logger.infoReport("[Video] File size has changed")
self.processVideo()
else:
tmp_success = True
if not tmp_isDeleted:
tmp_current_file_info = self.extractVideoInformation(self.current_file)
# self.constructOutputPath(tmp_current_file_info)
if tmp_current_file_info["Episode"] is None and tmp_current_file_info["Season"] is None:
self.logger.infoReport("[Video] " + tmp_current_file_info["Name"] + " " + str(tmp_current_file_info["Year"]))
else:
self.logger.infoReport("[Video] " + tmp_current_file_info["Name"] + " Season: " + str(tmp_current_file_info["Season"]) + " Episode: " + str(tmp_current_file_info["Episode"]))
self.find_srts()
def extractVideoInformation(self, file):
self.logger.debugReport("[Video] extractVideoInformation")
file = file.replace(self.video_settings.library["Input"], "").lstrip().replace("_", "")
for blacklist_item in self.video_settings.settings["Blacklist"].split(", "):
file = file.replace(blacklist_item, "")
tmp_input_format = file.rsplit(".", 1)[-1]
tmp_name = None
tmp_season = None
tmp_episode = None
tmp_episode_title = None
tmp_year = None
tmp_episode_year = None
tmp_length = self.getVideoLength()
tmp_folders, tmp_file = os.path.split(file)
for possible_years in re.findall(r'\d+', file):
if len(possible_years) == 4:
tmp_year = int(possible_years)
match len(tmp_folders.split(ioservice.getPathSeperator())):
case 1:
tmp_name = tmp_folders.split(ioservice.getPathSeperator())[0]
if not tmp_name:
tmp_name = tmp_file.replace(str(tmp_year), "").replace("." + tmp_input_format, "").strip()
case 2:
tmp_name = tmp_folders.split(ioservice.getPathSeperator())[0]
case 3:
tmp_name = tmp_folders.split(ioservice.getPathSeperator())[0]
case _:
tmp_name = tmp_file.replace("." + tmp_input_format, "")
if self.video_settings.library["Type"].lower().find("show") != -1:
if tmp_season is None or tmp_episode is None:
match len(tmp_folders.split(ioservice.getPathSeperator())):
case 2:
tmp_season = re.findall(r'\d+', tmp_folders.replace(tmp_name, ""))[0]
tmp_episode = re.findall(r'([0-9]*[0-9])', tmp_file.replace("." + tmp_input_format, "").replace(tmp_name, ""))[-1]
if tmp_file.lower().find(tmp_episode + "a") != -1:
tmp_episode = tmp_episode + " - Part 1"
elif tmp_file.lower().find(tmp_episode + "b") != -1:
tmp_episode = tmp_episode + " - Part 2"
case 3:
tmp_season = re.findall(r'\d+', tmp_folders.replace(tmp_name, ""))[0]
tmp_episode = re.findall(r'([0-9]*[0-9])', tmp_folders.replace("." + tmp_input_format, "").replace(tmp_name, ""))[-1]
if tmp_file.lower().find(tmp_episode + "a") != -1:
tmp_episode = tmp_episode + " - Part 1"
elif tmp_file.lower().find(tmp_episode + "b") != -1:
tmp_episode = tmp_episode + " - Part 2"
case _:
tmp_episode = re.findall(r'\d+', tmp_file.replace("." + tmp_input_format, "").replace(tmp_name, ""))[-1]
try:
tmp_season = re.findall(r'\d+', tmp_file.replace("." + tmp_input_format, "").replace(tmp_name, ""))[0]
except:
tmp_episode = re.findall(r'\d+', tmp_file.replace("." + tmp_input_format, ""))[-1]
if tmp_season != "0" and tmp_season:
tmp_season = tmp_season.lstrip("0")
if tmp_episode != "0" and tmp_episode:
tmp_episode = tmp_episode.lstrip("0")
match self.video_settings.library["Database"].lower():
case "tmdb":
if self.tmdbservice.isEnabled():
self.tmdbservice.connectToTMDB()
if self.video_settings.library["Type"].lower().find("movie") != -1:
tmp_name = tmp_name.replace(str(tmp_year), "").replace("(", "").replace(")", "")
if tmp_length is not None or tmp_year is not None:
tmp_data = self.tmdbservice.getBestMovie(tmp_name, tmp_length, tmp_year)
if tmp_data:
tmp_name = tmp_data[0]
tmp_year = tmp_data[2]
else:
tmp_data = self.tmdbservice.searchMovies(tmp_name)
tmp_name = tmp_data[0][0]
tmp_year = tmp_data[0][1]
elif self.video_settings.library["Type"].lower().find("show") != -1:
tmp_data = self.tmdbservice.getShowName(tmp_name)
tmp_name = tmp_data["name"]
tmp_year = tmp_data["first_air_date"][:4]
case "tvdb":
#NotImplemented
pass
case _:
pass
tmp_name = tmp_name.replace(":", "")
if tmp_episode_title:
tmp_episode_title = tmp_episode_title.replace(":", "")
return {"Name": tmp_name, "Season": tmp_season, "Episode": tmp_episode, "Episode Title": tmp_episode_title, "Episode Year": tmp_episode_year, "Input Format": tmp_input_format}
def constructOutputPath(self, file_info, subtitles=False, num=0):
self.logger.debugReport("[Video] constructOutputPath")
self.current_output_path = self.video_settings.library["Output"] + self.video_settings.library["Directory"]
self.current_output_path = tools.replace(self.current_output_path, "${NAME}", file_info["Name"])
self.current_output_path = tools.replace(self.current_output_path, "${YEAR}", file_info["Year"])
self.current_output_path = tools.replace(self.current_output_path, "${SEASON}", file_info["Season"])
if isinstance(file_info["Episode"], list):
tmp_episodes = None
for eps in file_info["Episode"]:
if eps != file_info["Episode"][-1]:
if not tmp_episodes:
tmp_episodes = str(eps) + "-"
else:
tmp_episodes = tmp_episodes + str(eps) + "-"
else:
if not tmp_episodes:
tmp_episodes = str(eps)
else:
tmp_episodes = tmp_episodes + str(eps)
self.current_output_path = tools.replace(self.current_output_path, "${EPISODE}", tmp_episodes)
else:
self.current_output_path = tools.replace(self.current_output_path, "${EPISODE}", file_info["Episode"])
self.current_output_path = tools.replace(self.current_output_path, "${EPISODE_YEAR}", file_info["Episode Year"])
self.current_output_path = tools.replace(self.current_output_path, "${EPISODE_NAME}", file_info["Episode Name"])
if not subtitles:
if self.current_output_path[self.current_output_path.index("${FORMAT}") - 1] != ".":
if file_info["Format"].startswith("."):
self.current_output_path = tools.replace(self.current_output_path, "${FORMAT}", file_info["Format"])
else:
self.current_output_path = tools.replace(self.current_output_path, "${FORMAT}", "." + file_info["Format"])
else:
if file_info["Format"].startswith("."):
self.current_output_path = tools.replace(self.current_output_path, ".${FORMAT}", file_info["Format"])
else:
self.current_output_path = tools.replace(self.current_output_path, ".${FORMAT}", "." + file_info["Format"])
else:
if self.current_output_path[self.current_output_path.index("${FORMAT}") - 1] != ".":
self.current_output_path = tools.replace(self.current_output_path, "${FORMAT}", " - " + self.srt_language[num] + ".srt")
elif self.current_output_path[self.current_output_path.index("${FORMAT}" - 1)] == ".":
self.current_output_path = tools.replace(self.current_output_path, ".${FORMAT}", " - " + self.srt_language[num] + ".srt")
self.logger.debugReport("[Video] Output Path: " + self.current_output_path)
return self.current_output_path
def findSRTs(self):
self.logger.debugReport("[Video] findSRTs")
tmp_srt_lang = ["aar", "abk", "afr", "aka", "alb", "amh", "ara", "arg", "arm", "asm", "ava", "ave", "aym", "aze", "bak", "bam", "baq",
"bel", "ben", "bih", "bis", "bos", "bre", "bul", "bur", "cat", "cha", "che", "chi", "chu", "chv", "cor", "cos", "cre",
"cze", "dan", "div", "dut", "dzo", "eng", "epo", "est", "ewe", "fao", "fij", "fin", "fre", "fry", "ful", "geo", "ger",
"gla", "gle", "glg", "glv", "gre", "grn", "guj", "hat", "hau", "heb", "her", "hin", "hmo", "hrv", "hun", "ibo", "ice",
"ido", "iii", "iku", "ile", "ina", "ind", "ipk", "ita", "jav", "jpn", "kal", "kan", "kas", "kau", "kaz", "khm", "kik",
"kin", "kir", "kom", "kon", "kor", "kua", "kur", "lao", "lat", "lav", "lim", "lin", "lit", "ltz", "lub", "lug", "mac",
"mah", "mal", "mao", "mar", "may", "mlg", "mlt", "mon", "nau", "nav", "nbl", "nde", "ndo", "nep", "nno", "nob", "nor",
"nya", "oci", "oji", "ori", "orm", "oss", "pan", "per", "pli", "pol", "por", "pus", "que", "roh", "rum", "run", "rus",
"sag", "san", "sin", "slo", "slv", "sme", "smo", "sna", "snd", "som", "sot", "spa", "srd", "srp", "ssw", "sun", "swa",
"swe", "tah", "tam", "tat", "tel", "tgk", "tgl", "tha", "tib", "tir", "ton", "tsn", "tso", "tuk", "tur", "twi", "uig",
"ukr", "urd", "uzb", "ven", "vie", "vol", "wel", "wln", "wol", "xho", "yid", "yor", "zha", "zul"]
def getVideoLength(self):
self.logger.debugReport("[Video] getVideoLength")
tmp_time = None
try:
tmp_time = subprocess.check_output([self.video_settings.settings["FFProbe"], '-i', self.current_file, '-show_entries', 'format=duration', '-v', 'quiet', '-of', 'csv=%s' % ("p=0")])
tmp_time = str(tmp_time)
if not tmp_time.find("N/A") != -1:
tmp_hours = time.strftime("%H", time.gmtime(round(float(re.findall('\d+', tmp_time)[0])))).lstrip("0")
tmp_minutes = time.strftime("%M", time.gmtime(round(float(re.findall('\d+', tmp_time)[0])))).lstrip("0")
try:
tmp_hours = int(tmp_hours) * 60
except:
tmp_hours = 0
try:
tmp_length = tmp_hours + int(tmp_minutes)
except:
tmp_length = tmp_hours
else:
self.logger.errorReport("[Video] FFProbe could not get video content length")
tmp_length = None
return tmp_length
except:
self.logger.errorReport("[Video] There was a problem getting the length of video content")
return None