-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathscript.py
More file actions
155 lines (132 loc) · 5.46 KB
/
script.py
File metadata and controls
155 lines (132 loc) · 5.46 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
import requests
import pymysql
import json
import logging
from time import sleep
import urllib.parse
import re
import os
from rapidfuzz import fuzz
# ==============================
# 📌 Load Configuration
# ==============================
CONFIG_FILE = "config.json"
if not os.path.exists(CONFIG_FILE):
raise FileNotFoundError(f"Configuration file '{CONFIG_FILE}' not found!")
with open(CONFIG_FILE, 'r') as config_file:
config = json.load(config_file)
# Spotify API Credentials
SPOTIFY_CREDENTIALS = config["spotify"]
# Database Configuration
DB_HOST = config["database"]["host"]
DB_USER = config["database"]["user"]
DB_PASSWORD = config["database"]["password"]
DB_NAME = config["database"]["database"]
# Logging Configuration
LOG_FILE = config["logging"]["log_file"]
LOG_LEVEL = getattr(logging, config["logging"]["log_level"].upper(), logging.INFO)
# Setup logging
logging.basicConfig(
filename=LOG_FILE,
level=LOG_LEVEL,
format="%(asctime)s - %(levelname)s - %(message)s"
)
# ==============================
# 📌 Function: Get Next Spotify Credentials
# ==============================
def get_next_credentials():
"""Rotates through Spotify API credentials."""
credentials = SPOTIFY_CREDENTIALS[0] # Using the first credential pair
logging.info("Using Spotify API credentials")
return credentials
# ==============================
# 📌 Function: Get Spotify OAuth Token
# ==============================
def get_spotify_token(client_id, client_secret):
"""Fetches an OAuth token from Spotify API."""
url = "https://accounts.spotify.com/api/token"
headers = {"Content-Type": "application/x-www-form-urlencoded"}
data = {"grant_type": "client_credentials"}
try:
response = requests.post(url, headers=headers, data=data, auth=(client_id, client_secret))
response.raise_for_status()
token = response.json().get("access_token")
logging.info("Spotify token retrieved successfully.")
return token
except requests.exceptions.RequestException as e:
logging.error(f"Failed to get Spotify token: {e}")
raise
# ==============================
# 📌 Function: Search Spotify for Podcasts
# ==============================
def search_spotify(title, author, token):
"""Searches Spotify for a matching podcast based on title and author."""
search_query = f"{title} {author}"
encoded_query = urllib.parse.quote(search_query)
url = f"https://api.spotify.com/v1/search?q={encoded_query}&type=show&limit=1"
headers = {"Authorization": f"Bearer {token}"}
try:
response = requests.get(url, headers=headers)
response.raise_for_status()
results = response.json()
if results.get("shows") and results["shows"]["items"]:
show = results["shows"]["items"][0]
spotify_url = show["external_urls"]["spotify"]
spotify_title = show["name"]
spotify_publisher = show.get("publisher", "")
logging.info(f"Spotify match found: {spotify_title} ({spotify_url})")
return spotify_url, spotify_title, spotify_publisher
else:
logging.warning(f"No Spotify match found for query: {search_query}")
return None, None, None
except requests.exceptions.RequestException as e:
logging.error(f"Error searching Spotify: {e}")
return None, None, None
# ==============================
# 📌 Function: Process Podcasts in Batch
# ==============================
def process_rows():
"""Fetches and processes multiple podcast records from the database."""
credentials = get_next_credentials()
token = get_spotify_token(credentials["client_id"], credentials["client_secret"])
connection = pymysql.connect(
host=DB_HOST,
user=DB_USER,
password=DB_PASSWORD,
database=DB_NAME,
charset="utf8mb4",
cursorclass=pymysql.cursors.DictCursor,
)
try:
with connection.cursor() as cursor:
while True:
cursor.execute("""
SELECT apple_id, apple_podcast_url, name, artistName
FROM apple_podcast_live_updates
WHERE apple_id NOT IN (SELECT apple_id FROM apple_podcast_spotify_match)
LIMIT 10;
""")
rows = cursor.fetchall()
if not rows:
logging.info("No more rows to process.")
print("No more rows to process.")
break
for row in rows:
apple_id, title, author = row["apple_id"], row["name"], row["artistName"]
spotify_url, spotify_title, spotify_publisher = search_spotify(title, author, token)
is_matched = 1 if spotify_url else 0
cursor.execute("""
INSERT INTO apple_podcast_spotify_match (apple_id, spotify_url, spotify_title, is_matched)
VALUES (%s, %s, %s, %s)
ON DUPLICATE KEY UPDATE spotify_url = VALUES(spotify_url),
spotify_title = VALUES(spotify_title),
is_matched = VALUES(is_matched)
""", (apple_id, spotify_url, spotify_title, is_matched))
connection.commit()
print("Batch processed and committed.")
except Exception as e:
logging.error(f"Error processing rows: {e}")
finally:
connection.close()
if __name__ == "__main__":
process_rows()