From a777c807e323507e1d28aa09e8541e3975d28842 Mon Sep 17 00:00:00 2001 From: Lukas Hampeis Date: Mon, 7 Jul 2025 11:51:28 +0200 Subject: [PATCH] token cleanup, preflight check of files --- downloader.py | 199 ++++++++++++++++++++++++++++++++------------------ pyspodl.py | 34 +++++++++ utils.py | 21 +++++- 3 files changed, 183 insertions(+), 71 deletions(-) create mode 100644 pyspodl.py diff --git a/downloader.py b/downloader.py index ec04fbd..88860d4 100644 --- a/downloader.py +++ b/downloader.py @@ -32,6 +32,40 @@ def __init__(self, config, utils): ) self.set_metadata = self.config.get_config_value("downloading", "set_metadata") + def _get_track_metadata(self, url): + """ + Fetches track metadata from Spotify API. + """ + track_id_str = self.utils.get_id_type_from_url(url)[0] + headers = {"Authorization": f"Bearer {self.utils.get_token()}"} + resp = requests.get( + f"https://api.spotify.com/v1/tracks/{track_id_str}", + headers=headers, + timeout=10, + ) + resp.raise_for_status() # Raise an exception for bad status codes + return resp.json() + + def _get_path_from_metadata(self, metadata): + """ + Constructs the final file path from metadata. + """ + artist = metadata["artists"][0]["name"] + track_title = metadata["name"] + album_name = metadata["album"]["name"] + album_release = metadata["album"]["release_date"] + track_number = metadata["track_number"] + + filename_format = self.config.get_config_value("downloading", "track_format") + filename = filename_format.format( + artist=artist, + title=track_title, + album=album_name, + tracknumber=track_number, + year=album_release, + ) + return f"{self.download_path}/{filename}.ogg" + def get_track_urls(self, link): """ get all tracks available in a playlist or album (spotify gives max 50 entries) @@ -83,21 +117,43 @@ def download_playlist_or_album(self, link): download songs off an album or playlist """ - tracks = self.get_track_urls(link) - total_tracks = len(tracks) + all_track_urls = self.get_track_urls(link) + if not all_track_urls: + print("[download_playlist_or_album] No tracks found to download.") + return - for count, track in enumerate(tracks): - self.download_track(track) + print("[download_playlist_or_album] Checking for tracks to download...") + tracks_to_download = [] + for url in tqdm.tqdm(all_track_urls, desc="Checking library"): + try: + metadata = self._get_track_metadata(url) + path = self._get_path_from_metadata(metadata) + + if os.path.exists(path): + tqdm.tqdm.write( + f"[download_playlist_or_album] '{metadata['name']}' already exists, skipping." + ) + else: + tracks_to_download.append((url, metadata)) + except requests.exceptions.RequestException as e: + tqdm.tqdm.write( + f"[download_playlist_or_album] Failed to get metadata for a track, skipping. Error: {e}" + ) + + total_to_download = len(tracks_to_download) + print(f"[download_playlist_or_album] Found {total_to_download} new track(s) to download.") + + for count, (track_url, track_metadata) in enumerate(tracks_to_download): + self.download_track(track_url, track_metadata) print( - f"[download_playlist_or_album] Progress: {count + 1}/{total_tracks}\n" + f"[download_playlist_or_album] Progress: {count + 1}/{total_to_download}\n" ) - def download_track(self, url): + def download_track(self, url, metadata): """ - download a track + download a track, metadata must be pre-fetched """ - try: timeout = self.config.get_config_value("downloading", "timeout") @@ -110,17 +166,7 @@ def download_track(self, url): '[download_track] "timeout" from config file must be a number (without quotes).' ) - track_id = TrackId.from_uri( - f"spotify:track:{self.utils.get_id_type_from_url(url)[0]}" - ) - headers = {"Authorization": f"Bearer {self.utils.get_token()}"} - - resp = requests.get( - f"https://api.spotify.com/v1/tracks/{self.utils.get_id_type_from_url(url)[0]}", - headers=headers, - timeout=10, - ).json() - + resp = metadata artist = resp["artists"][0]["name"] # artist track_title = resp["name"] # title album_name = resp["album"]["name"] # album @@ -128,73 +174,86 @@ def download_track(self, url): track_number = resp["track_number"] # tracknumber cover_image = resp["album"]["images"][0] # coverart, width, height - if self.premium_downloads: - stream = self.session.content_feeder().load( - track_id, VorbisOnlyAudioQuality(AudioQuality.VERY_HIGH), False, None - ) + quality = AudioQuality.VERY_HIGH if self.premium_downloads else AudioQuality.HIGH + quality_str = "VERY_HIGH" if self.premium_downloads else "HIGH" - else: - stream = self.session.content_feeder().load( - track_id, VorbisOnlyAudioQuality(AudioQuality.HIGH), False, None - ) + print(f"[download_track] Requesting stream for '{track_title}' in {quality_str} quality.") - filename_format = self.config.get_config_value("downloading", "track_format") - filename = filename_format.format( - artist=artist, - title=track_title, - album=album_name, - tracknumber=track_number, - year=album_release, + track_id = TrackId.from_uri( + f"spotify:track:{self.utils.get_id_type_from_url(url)[0]}" ) + try: + stream = self.session.content_feeder().load( + track_id, VorbisOnlyAudioQuality(quality), False, None + ) + except RuntimeError as e: + if "Failed fetching audio key" in str(e): + print( + f"\n[download_track] ERROR: Could not get audio key for '{track_title}'.", + file=sys.stderr, + ) + print( + "[download_track] This can happen due to regional restrictions, or if you're trying to download VERY_HIGH quality on a non-premium account.", + file=sys.stderr, + ) + print(f"[download_track] Skipping this track. Original error: {e}\n", file=sys.stderr) + return + raise e + + path_filename_with_ext = self._get_path_from_metadata(metadata) + path_filename = os.path.splitext(path_filename_with_ext)[0] print(f"[download_track] Downloading {track_title} by {artist}") - path_filename = f"{self.download_path}/{filename}" - - if os.path.exists(path_filename + ".ogg"): - print("[download_track] Track exists, skipping") - - else: - directory_path = os.path.dirname(path_filename) - - if directory_path and not os.path.exists(directory_path): - os.makedirs(directory_path) - - with ( - open(f"{path_filename}.ogg", "wb+") as track_file, - tqdm.tqdm( - unit="B", - unit_scale=True, - unit_divisor=1024, - total=stream.input_stream.size, - bar_format="{percentage:3.0f}%|{bar:16}|{n_fmt} / {total_fmt} | {rate_fmt}, ETA {remaining}", - ) as progress_bar, - ): - for _ in range(int(stream.input_stream.size / 5000) + 1): - progress_bar.update( - track_file.write(stream.input_stream.stream().read(50000)) - ) + directory_path = os.path.dirname(path_filename) + + if directory_path and not os.path.exists(directory_path): + os.makedirs(directory_path) + + with ( + open(f"{path_filename}.ogg", "wb+") as track_file, + tqdm.tqdm( + unit="B", + unit_scale=True, + unit_divisor=1024, + total=stream.input_stream.size, + bar_format="{percentage:3.0f}%|{bar:16}|{n_fmt} / {total_fmt} | {rate_fmt}, ETA {remaining}", + ) as progress_bar, + ): + chunk_size = 8192 # A common chunk size for I/O + while True: + chunk = stream.input_stream.stream().read(chunk_size) + if not chunk: + break + progress_bar.update(track_file.write(chunk)) - if self.set_metadata: - tags = { - "artist": artist, - "title": track_title, - "album": album_name, - "date": album_release, # .split("-")[0], - "tracknumber": track_number, - } + if self.set_metadata: + tags = { + "artist": artist, + "title": track_title, + "album": album_name, + "date": album_release, # .split("-")[0], + "tracknumber": track_number, + } - self.utils.set_metadata(tags, cover_image, path_filename) + self.utils.set_metadata(tags, cover_image, path_filename) def download(self, link): """ execute the function based on the link """ - link_type = self.utils.get_id_type_from_url(link)[1] if link_type == "track": - self.download_track(link) + try: + metadata = self._get_track_metadata(link) + path = self._get_path_from_metadata(metadata) + if os.path.exists(path): + print("[download] Track already exists, skipping.") + else: + self.download_track(link, metadata) + except requests.exceptions.RequestException as e: + print(f"[download] Failed to get metadata for track: {e}", file=sys.stderr) elif link_type in ("album", "playlist"): self.download_playlist_or_album(link) diff --git a/pyspodl.py b/pyspodl.py new file mode 100644 index 0000000..1a80e15 --- /dev/null +++ b/pyspodl.py @@ -0,0 +1,34 @@ +#!/usr/bin/env python3 + +""" +pyspodl - a spotify downloader using librespot +""" + +import logging +from arguments import get_arguments + +from downloader import Downloader +from config import Config +from utils import Utils + +def main(): + logging.basicConfig(level=logging.INFO) + + arguments = get_arguments() + config = Config(arguments.config_path) + utils = Utils(config) + + downloader = Downloader(config, utils) + + if not arguments.link: + print("[main] No download links provided. Use -l or --link. Exiting.") + return + + for link in arguments.link.split(" "): + downloader.download(link) + + utils.clear_token() + + +if __name__ == "__main__": + main() diff --git a/utils.py b/utils.py index b2db8fe..7701b75 100644 --- a/utils.py +++ b/utils.py @@ -46,7 +46,7 @@ def generate_new_token(self): config["account"]["token"] = resp["access_token"] try: - with open("config.toml", "w", encoding="utf-8") as f: + with open(self.config.config_file, "w", encoding="utf-8") as f: toml.dump(config, f) print("[generate_new_token] Token was updated") @@ -120,6 +120,25 @@ def set_metadata(self, metadata, cover_image, filename): pass # fuck you # seriously fuck it, idk why it happens + def clear_token(self): + """ + Clears the access token from the config file upon exit. + """ + print("\n[clear_token] Clearing access token from config file...") + try: + config_data = self.config.read_config() + if config_data and "token" in config_data.get("account", {}): + config_data["account"]["token"] = "" + with open(self.config.config_file, "w", encoding="utf-8") as f: + toml.dump(config_data, f) + print("[clear_token] Token cleared successfully.") + else: + print("[clear_token] Token not found in config, nothing to clear.") + except (ConfigError, FileNotFoundError) as e: + print(f"[clear_token] Could not read or find config file: {e}", file=sys.stderr) + except Exception as e: + print(f"[clear_token] An unexpected error occurred while clearing token: {e}", file=sys.stderr) + def get_session(self): """ create a user session and return it