Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
luigi311 committed Feb 19, 2025
1 parent 829e80c commit 80a2ac3
Show file tree
Hide file tree
Showing 5 changed files with 3 additions and 283 deletions.
15 changes: 0 additions & 15 deletions src/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,21 +66,6 @@ def str_to_bool(value: str) -> bool:
return str(value).lower() in ("y", "yes", "t", "true", "on", "1")


# Search for nested element in list
def contains_nested(element: str, lst: list[tuple[str] | None] | tuple[str] | None):
if lst is None:
return None

for i, item in enumerate(lst):
if item is None:
continue
if element in item:
return i
elif element == item:
return i
return None


# Get mapped value
def search_mapping(dictionary: dict[str, str], key_value: str) -> str | None:
if key_value in dictionary.keys():
Expand Down
51 changes: 0 additions & 51 deletions src/jellyfin_emby.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,9 @@
from src.functions import (
logger,
search_mapping,
contains_nested,
log_marked,
str_to_bool,
)
from src.library import generate_library_guids_dict
from src.watched import (
LibraryData,
MediaIdentifiers,
Expand All @@ -30,55 +28,6 @@
generate_guids = str_to_bool(os.getenv("GENERATE_GUIDS", "True"))
generate_locations = str_to_bool(os.getenv("GENERATE_LOCATIONS", "True"))


def get_video_status(server_video, videos_ids, videos):
video_status = None

if generate_locations:
if "MediaSources" in server_video:
for video_location in server_video["MediaSources"]:
if "Path" in video_location:
if (
contains_nested(
video_location["Path"].split("/")[-1],
videos_ids["locations"],
)
is not None
):
for video in videos:
if (
contains_nested(
video_location["Path"].split("/")[-1],
video["locations"],
)
is not None
):
video_status = video["status"]
break
break

if generate_guids:
if not video_status:
for (
video_provider_source,
video_provider_id,
) in server_video["ProviderIds"].items():
if video_provider_source.lower() in videos_ids:
if (
video_provider_id.lower()
in videos_ids[video_provider_source.lower()]
):
for video in videos:
if video_provider_id.lower() in video.get(
video_provider_source.lower(), []
):
video_status = video["status"]
break
break

return video_status


def extract_identifiers_from_item(server_type, item: dict) -> MediaIdentifiers:
title = item.get("Name", None)
id = None
Expand Down
158 changes: 0 additions & 158 deletions src/library.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,161 +199,3 @@ def setup_libraries(
)

return output_server_1_libaries, output_server_2_libaries


def show_title_dict(user_list) -> dict[str, list[tuple[str] | None]]:
try:
if not isinstance(user_list, dict):
return {}

show_output_dict: dict[str, list[tuple[str] | None]] = {}
show_output_dict["locations"] = []
show_counter = 0 # Initialize a counter for the current show position

show_output_keys = [dict(x) for x in list(user_list.keys())]
for show_key in show_output_keys:
for provider_key, provider_value in show_key.items():
# Skip title
if provider_key.lower() == "title":
continue
if provider_key.lower() not in show_output_dict:
show_output_dict[provider_key.lower()] = [None] * show_counter
if provider_key.lower() == "locations":
show_output_dict[provider_key.lower()].append(provider_value)
else:
show_output_dict[provider_key.lower()].append(
provider_value.lower()
)

show_counter += 1
for key in show_output_dict:
if len(show_output_dict[key]) < show_counter:
show_output_dict[key].append(None)

return show_output_dict
except Exception:
return {}


def episode_title_dict(
user_list,
) -> dict[
str, list[str | bool | int | tuple[str] | dict[str, str | tuple[str]] | None]
]:
try:
if not isinstance(user_list, dict):
return {}

episode_output_dict: dict[
str,
list[str | bool | int | tuple[str] | dict[str, str | tuple[str]] | None],
] = {}
episode_output_dict["completed"] = []
episode_output_dict["time"] = []
episode_output_dict["locations"] = []
episode_output_dict["show"] = []
episode_counter = 0 # Initialize a counter for the current episode position

# Iterate through the shows and episodes in user_list
for show in user_list:

for episode in user_list[show]:
# Add the show title to the episode_output_dict if it doesn't exist
if "show" not in episode_output_dict:
episode_output_dict["show"] = [None] * episode_counter

# Add the show title to the episode_output_dict
episode_output_dict["show"].append(dict(show))

# Iterate through the keys and values in each episode
for episode_key, episode_value in episode.items():
# If the key is not "status", add the key to episode_output_dict if it doesn't exist
if episode_key != "status":
if episode_key.lower() not in episode_output_dict:
# Initialize the list with None values up to the current episode position
episode_output_dict[episode_key.lower()] = [
None
] * episode_counter

# If the key is "locations", append each location to the list
if episode_key == "locations":
episode_output_dict[episode_key.lower()].append(episode_value)

# If the key is "status", append the "completed" and "time" values
elif episode_key == "status":
episode_output_dict["completed"].append(
episode_value["completed"]
)
episode_output_dict["time"].append(episode_value["time"])

# For other keys, append the value to the list
else:
episode_output_dict[episode_key.lower()].append(
episode_value.lower()
)

# Increment the episode_counter
episode_counter += 1

# Extend the lists in episode_output_dict with None values to match the current episode_counter
for key in episode_output_dict:
if len(episode_output_dict[key]) < episode_counter:
episode_output_dict[key].append(None)

return episode_output_dict
except Exception:
return {}


def movies_title_dict(
user_list,
) -> dict[str, list[str | bool | int | tuple[str] | None]]:
try:
if not isinstance(user_list, list):
return {}

movies_output_dict: dict[str, list[str | bool | int | tuple[str] | None]] = {
"completed": [],
"time": [],
"locations": [],
}
movie_counter = 0 # Initialize a counter for the current movie position

for movie in user_list:
for movie_key, movie_value in movie.items():
if movie_key != "status":
if movie_key.lower() not in movies_output_dict:
movies_output_dict[movie_key.lower()] = []

if movie_key == "locations":
movies_output_dict[movie_key.lower()].append(movie_value)
elif movie_key == "status":
movies_output_dict["completed"].append(movie_value["completed"])
movies_output_dict["time"].append(movie_value["time"])
else:
movies_output_dict[movie_key.lower()].append(movie_value.lower())

movie_counter += 1
for key in movies_output_dict:
if len(movies_output_dict[key]) < movie_counter:
movies_output_dict[key].append(None)

return movies_output_dict
except Exception:
return {}


def generate_library_guids_dict(user_list) -> tuple[
dict[str, list[tuple[str] | None]],
dict[str, list[str | bool | int | tuple[str] | dict[str, str | tuple[str]] | None]],
dict[str, list[str | bool | int | tuple[str] | None]],
]:
# Handle the case where user_list is empty or does not contain the expected keys and values
if not user_list:
return {}, {}, {}

show_output_dict = show_title_dict(user_list)
episode_output_dict = episode_title_dict(user_list)
movies_output_dict = movies_title_dict(user_list)

return show_output_dict, episode_output_dict, movies_output_dict
57 changes: 2 additions & 55 deletions src/plex.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import os, requests, traceback
import os, requests
from dotenv import load_dotenv
from typing import Union, FrozenSet

from urllib3.poolmanager import PoolManager
from math import floor
Expand All @@ -14,8 +13,6 @@
from src.functions import (
logger,
search_mapping,
future_thread_executor,
contains_nested,
log_marked,
str_to_bool,
)
Expand Down Expand Up @@ -47,7 +44,7 @@ def init_poolmanager(self, connections, maxsize, block=..., **pool_kwargs):
)


def extract_guids_from_item(item: Union[Movie, Show, Episode]) -> dict[str, str]:
def extract_guids_from_item(item: Movie | Show | Episode) -> dict[str, str]:
# If GENERATE_GUIDS is set to False, then return an empty dict
if not generate_guids:
return {}
Expand Down Expand Up @@ -95,56 +92,6 @@ def get_mediaitem(item: Movie | Episode, completed=True) -> MediaItem:
status=WatchedStatus(completed=completed, time=item.viewOffset),
)


def find_video(plex_search, video_ids, videos=None):
try:
if not generate_guids and not generate_locations:
return None

if generate_locations:
for location in plex_search.locations:
if (
contains_nested(location.split("/")[-1], video_ids["locations"])
is not None
):
episode_videos = []
if videos:
for show, episodes in videos.items():
show = {k: v for k, v in show}
if (
contains_nested(
location.split("/")[-1], show["locations"]
)
is not None
):
for episode in episodes:
episode_videos.append(episode)

return episode_videos

if generate_guids:
for guid in plex_search.guids:
guid_source, guid_id = guid.id.split("://")

# If show provider source and show provider id are in videos_shows_ids exactly, then the show is in the list
if guid_source in video_ids.keys():
if guid_id in video_ids[guid_source]:
episode_videos = []
if videos:
for show, episodes in videos.items():
show = {k: v for k, v in show}
if guid_source in show.keys():
if guid_id == show[guid_source]:
for episode in episodes:
episode_videos.append(episode)

return episode_videos

return None
except Exception:
return None


def update_user_watched(
user: MyPlexAccount, user_plex: PlexServer, library: LibraryData, dryrun: bool
):
Expand Down
5 changes: 1 addition & 4 deletions src/watched.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
import copy
from pydantic import BaseModel

from src.functions import logger, search_mapping, contains_nested

from src.library import generate_library_guids_dict

from src.functions import logger, search_mapping

class MediaIdentifiers(BaseModel):
title: str
Expand Down

0 comments on commit 80a2ac3

Please sign in to comment.