The most convoluted, code-efficient, bloated, boilerplated, duplicated unneccecary undocumented, soon to be regretted code

This commit is contained in:
agres
2025-03-23 20:49:00 +01:00
parent 625f09cfdf
commit af6b3dba13
2 changed files with 93 additions and 40 deletions
+58 -34
View File
@@ -1,11 +1,6 @@
from auth import authenticate, simple_authenticate from auth import authenticate, simple_authenticate
from database_handler import Database, Table from database_handler import Database, Table
from spotify_api import ( from spotify_api import get_last_played_track, get_multiple_field_information
get_album_information,
get_artist_information,
get_last_played_track,
get_track_information,
)
# Define DB # Define DB
db = Database() db = Database()
@@ -20,12 +15,9 @@ def scraping():
scope = "user-read-recently-played" scope = "user-read-recently-played"
bearer_token = authenticate(scope) bearer_token = authenticate(scope)
# Once each 30 mins
_read_recently_played_page_and_add_to_db(bearer_token=bearer_token) _read_recently_played_page_and_add_to_db(bearer_token=bearer_token)
scrape_missing_infos() scrape_missing_infos()
db.close()
def _read_recently_played_page_and_add_to_db(bearer_token: str): def _read_recently_played_page_and_add_to_db(bearer_token: str):
""" """
@@ -47,36 +39,68 @@ def scrape_missing_infos():
""" """
""" """
global db
bearer_token_simple = simple_authenticate() bearer_token_simple = simple_authenticate()
# Track Info _scrape_missing_info(bearer_token_simple, Table.TRACK_INFORMATION, 'track_id', 'tracks')
all_track_ids_recently_played = db.read_all_rows(Table.RECENTLY_PLAYED, 'track_id') _scrape_missing_info(bearer_token_simple, Table.ALBUM_INFORMATION, 'album_id', 'albums')
all_track_ids_saved = db.read_all_rows(Table.TRACK_INFORMATION, 'track_id') _scrape_missing_info(bearer_token_simple, Table.ARTIST_INFORMATION, 'artist_id', 'artists')
all_track_ids_missing = list(set(all_track_ids_recently_played) - set(all_track_ids_saved))
for track_id in all_track_ids_missing:
response = get_track_information(track_id=track_id[0], bearer_token=bearer_token_simple) def _scrape_missing_info(bearer_token_simple: str, table_name: Table, id_field_name: str, endpoint_name: str):
db.add_row(Table.TRACK_INFORMATION, (response['id'], response['name'], response['duration_ms'], response['explicit'], response['popularity']))
# Album Info if endpoint_name == 'albums':
all_album_ids_recently_played = db.read_all_rows(Table.RECENTLY_PLAYED, 'album_id') limit = 20
all_album_ids_saved = db.read_all_rows(Table.ALBUM_INFORMATION, 'album_id') else:
all_album_ids_missing = list(set(all_album_ids_recently_played) - set(all_album_ids_saved)) limit = 50
for album_id in all_album_ids_missing:
response = get_album_information(album_id=album_id[0], bearer_token=bearer_token_simple) all_ids_recently_played = db.read_all_rows(Table.RECENTLY_PLAYED, id_field_name)
all_ids_saved = db.read_all_rows(table_name, id_field_name)
all_ids_missing = list(set(all_ids_recently_played) - set(all_ids_saved))
ids = []
processed_ids = set()
for i, id_value in enumerate(all_ids_missing):
id_value_str = id_value[0]
if id_value_str not in processed_ids:
ids.append(id_value_str)
processed_ids.add(id_value_str)
if (i + 1) % limit == 0:
ids_tuple = tuple(ids)
ids.clear()
response = get_multiple_field_information(bearer_token_simple, endpoint_name, limit, *ids_tuple)
_add_data_to_database(table_name, response)
if ids:
ids_tuple = tuple(ids)
ids.clear()
response = get_multiple_field_information(bearer_token_simple, endpoint_name, limit, *ids_tuple)
_add_data_to_database(table_name, response)
def _add_data_to_database(table_name: Table, response):
global db
if table_name == Table.TRACK_INFORMATION:
for entry in response['tracks']:
db.add_row(table_name, (entry['id'], entry['name'], entry['duration_ms'], entry['explicit'], entry['popularity']))
elif table_name == Table.ALBUM_INFORMATION:
for entry in response['albums']:
try: try:
release_year = response['release_date'][:4] release_year = entry['release_date'][:4]
except Exception: except Exception:
release_year = "" release_year = ""
db.add_row(Table.ALBUM_INFORMATION, (response['id'], response['name'], response['album_type'], response['total_tracks'], release_year, response['label'])) db.add_row(table_name, (entry['id'], entry['name'], entry['album_type'], entry['total_tracks'], release_year, entry['label']))
# Artist Info
all_artist_ids_recently_played = db.read_all_rows(Table.RECENTLY_PLAYED, 'artist_id') elif table_name == Table.ARTIST_INFORMATION:
all_artist_ids_saved = db.read_all_rows(Table.ARTIST_INFORMATION, 'artist_id') for entry in response['artists']:
all_artist_ids_missing = list(set(all_artist_ids_recently_played) - set(all_artist_ids_saved))
for artist_id in all_artist_ids_missing:
response = get_artist_information(artist_id=artist_id[0], bearer_token=bearer_token_simple)
try: try:
genre = response['genres'][0] genre = entry['genres'][0]
except IndexError: except IndexError:
genre = "" genre = ""
db.add_row(Table.ARTIST_INFORMATION, (response['id'], response['name'], response['followers']['total'], genre, response['popularity'])) db.add_row(Table.ARTIST_INFORMATION, (entry['id'], entry['name'], entry['followers']['total'], genre, entry['popularity']))
+29
View File
@@ -104,3 +104,32 @@ def get_album_information(album_id: str, bearer_token: str) -> dict:
response = requests.get(url, headers=header) response = requests.get(url, headers=header)
response_json = response.json() response_json = response.json()
return response_json return response_json
def get_multiple_field_information(bearer_token: str, api_type: str, limit: int, *track_ids) -> dict:
"""
This function returns the track information based on the track id
:param *track_id: str
:param bearer_token: str
:return: dict
"""
if len(track_ids) > limit:
log.error('Passed more than 20/50 ids to get_multiple_field_information')
return None
url_suffix = "ids="
separator = ","
for track_id in track_ids:
url_suffix = url_suffix + track_id + separator
url = f"https://api.spotify.com/v1/{api_type}?{url_suffix}"
url = url[:-len(separator)]
header = {
'Authorization': f'Bearer {bearer_token}'
}
response = requests.get(url, headers=header)
response_json = response.json()
return response_json