Merge branch 'staging' into bug/write_closed_database

This commit is contained in:
Chris Kiriakou
2025-03-27 15:04:19 +01:00
4 changed files with 116 additions and 73 deletions
+10 -6
View File
@@ -4,7 +4,7 @@ import os
from auth import simple_authenticate from auth import simple_authenticate
from database_handler import Database, Table from database_handler import Database, Table
from spotify_api import get_multiple_tracks_information from spotify_api import get_multiple_field_information
# Define the absolute folder path to the folder containing the gdrp retrieved data # Define the absolute folder path to the folder containing the gdrp retrieved data
folder_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), '..', 'data', 'gdpr_data') folder_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), '..', 'data', 'gdpr_data')
@@ -71,22 +71,26 @@ def _populate_ids(all_songs_played: list):
processed_songs_id = set() processed_songs_id = set()
for i, entry in enumerate(all_songs_played): counter = 0
for entry in all_songs_played:
track_id = entry['id'] track_id = entry['id']
if track_id not in processed_songs_id: if track_id not in processed_songs_id:
track_ids.append(track_id) track_ids.append(track_id)
processed_songs_id.add(track_id) processed_songs_id.add(track_id)
counter += 1
if (i + 1) % 50 == 0: if (counter + 1) % 50 == 0 and len(track_ids) > 0:
track_ids_tuple = tuple(track_ids) track_ids_tuple = tuple(track_ids)
track_ids.clear() track_ids.clear()
response = get_multiple_tracks_information(token, *track_ids_tuple) response = get_multiple_field_information(token, 'tracks', 50, *track_ids_tuple)
all_songs_played_info.extend(_sort_and_create_required_dataset(response)) all_songs_played_info.extend(_sort_and_create_required_dataset(response))
counter = 0
if track_ids: if len(track_ids) > 0:
track_ids_tuple = tuple(track_ids) track_ids_tuple = tuple(track_ids)
response = get_multiple_tracks_information(token, *track_ids_tuple) response = get_multiple_field_information(token, 'tracks', 50, *track_ids_tuple)
all_songs_played_info.extend(_sort_and_create_required_dataset(response)) all_songs_played_info.extend(_sort_and_create_required_dataset(response))
return all_songs_played_info return all_songs_played_info
+8 -1
View File
@@ -1,9 +1,12 @@
import argparse import argparse
from time import sleep from time import sleep
from database_handler import Database
from gdpr_export import export_gdpr_data from gdpr_export import export_gdpr_data
from scraper import scrape_missing_infos, scraping from scraper import scrape_missing_infos, scraping
db = Database()
# Initialize the parser # Initialize the parser
parser = argparse.ArgumentParser(description="A python script written in Python3.13 which continuously checks what spotify songs " parser = argparse.ArgumentParser(description="A python script written in Python3.13 which continuously checks what spotify songs "
"the user is listening to and logging these in a local database. \n" "the user is listening to and logging these in a local database. \n"
@@ -24,7 +27,7 @@ if args.export:
print('Scraping GDPR Data') print('Scraping GDPR Data')
# The next function can gat a int witch defines the amount of songs witch will be scraped from the gdpr files. # The next function can gat a int witch defines the amount of songs witch will be scraped from the gdpr files.
# e.g. if 500 is input, the last 500 played songs will come up, if left empty, the last 100. # e.g. if 500 is input, the last 500 played songs will come up, if left empty, the last 100.
export_gdpr_data() export_gdpr_data(1000000)
scrape_missing_infos() scrape_missing_infos()
while True: while True:
@@ -32,3 +35,7 @@ while True:
scraping() scraping()
print('Done Scraping') print('Done Scraping')
sleep(1800) sleep(1800)
# TODO: Trap this:
db.close()
+69 -38
View File
@@ -1,11 +1,6 @@
from auth import authenticate, simple_authenticate from auth import authenticate, simple_authenticate
from database_handler import Database, Table from database_handler import Database, Table
from spotify_api import ( from spotify_api import get_last_played_track, get_multiple_field_information
get_album_information,
get_artist_information,
get_last_played_track,
get_track_information,
)
def scraping(): def scraping():
@@ -16,12 +11,10 @@ def scraping():
scope = "user-read-recently-played" scope = "user-read-recently-played"
bearer_token = authenticate(scope) bearer_token = authenticate(scope)
# Once each 30 mins
_read_recently_played_page_and_add_to_db(bearer_token=bearer_token) _read_recently_played_page_and_add_to_db(bearer_token=bearer_token)
scrape_missing_infos() scrape_missing_infos()
def _read_recently_played_page_and_add_to_db(bearer_token: str): def _read_recently_played_page_and_add_to_db(bearer_token: str):
""" """
This function gets a list of song play history and adds it into the database. This function gets a list of song play history and adds it into the database.
@@ -38,41 +31,79 @@ def _read_recently_played_page_and_add_to_db(bearer_token: str):
db.add_row(Table.RECENTLY_PLAYED, (played_at, track_id, artist_id, album_id)) db.add_row(Table.RECENTLY_PLAYED, (played_at, track_id, artist_id, album_id))
db.close() db.close()
def scrape_missing_infos(): def scrape_missing_infos():
""" """
""" """
bearer_token_simple = simple_authenticate() bearer_token_simple = simple_authenticate()
_scrape_missing_info(bearer_token_simple, Table.TRACK_INFORMATION, 'track_id', 'tracks')
_scrape_missing_info(bearer_token_simple, Table.ALBUM_INFORMATION, 'album_id', 'albums')
_scrape_missing_info(bearer_token_simple, Table.ARTIST_INFORMATION, 'artist_id', 'artists')
def _scrape_missing_info(bearer_token_simple: str, table_name: Table, id_field_name: str, endpoint_name: str):
if endpoint_name == 'albums':
limit = 20
else:
limit = 50
db = Database() db = Database()
# Track Info all_ids_recently_played = db.read_all_rows(Table.RECENTLY_PLAYED, id_field_name)
all_track_ids_recently_played = db.read_all_rows(Table.RECENTLY_PLAYED, 'track_id') all_ids_saved = db.read_all_rows(table_name, id_field_name)
all_track_ids_saved = db.read_all_rows(Table.TRACK_INFORMATION, 'track_id') all_ids_missing = list(set(all_ids_recently_played) - set(all_ids_saved))
all_track_ids_missing = list(set(all_track_ids_recently_played) - set(all_track_ids_saved)) db.close()
for track_id in all_track_ids_missing:
response = get_track_information(track_id=track_id[0], bearer_token=bearer_token_simple) ids = []
db.add_row(Table.TRACK_INFORMATION, (response['id'], response['name'], response['duration_ms'], response['explicit'], response['popularity'])) processed_ids = set()
# Album Info
all_album_ids_recently_played = db.read_all_rows(Table.RECENTLY_PLAYED, 'album_id') counter = 0
all_album_ids_saved = db.read_all_rows(Table.ALBUM_INFORMATION, 'album_id')
all_album_ids_missing = list(set(all_album_ids_recently_played) - set(all_album_ids_saved)) for id_value in all_ids_missing:
for album_id in all_album_ids_missing:
response = get_album_information(album_id=album_id[0], bearer_token=bearer_token_simple) id_value_str = id_value[0]
try:
release_year = response['release_date'][:4] if id_value_str not in processed_ids:
except Exception: ids.append(id_value_str)
release_year = "" processed_ids.add(id_value_str)
db.add_row(Table.ALBUM_INFORMATION, (response['id'], response['name'], response['album_type'], response['total_tracks'], release_year, response['label'])) counter += 1
# Artist Info
all_artist_ids_recently_played = db.read_all_rows(Table.RECENTLY_PLAYED, 'artist_id') if (counter + 1) % limit == 0 and len(ids) > 0:
all_artist_ids_saved = db.read_all_rows(Table.ARTIST_INFORMATION, 'artist_id') ids_tuple = tuple(ids)
all_artist_ids_missing = list(set(all_artist_ids_recently_played) - set(all_artist_ids_saved)) ids.clear()
for artist_id in all_artist_ids_missing: response = get_multiple_field_information(bearer_token_simple, endpoint_name, limit, *ids_tuple)
response = get_artist_information(artist_id=artist_id[0], bearer_token=bearer_token_simple) _add_data_to_database(table_name, response)
try: counter = 0
genre = response['genres'][0]
except IndexError: if len(ids) > 0:
genre = "" ids_tuple = tuple(ids)
db.add_row(Table.ARTIST_INFORMATION, (response['id'], response['name'], response['followers']['total'], genre, response['popularity'])) ids.clear()
response = get_multiple_field_information(bearer_token_simple, endpoint_name, limit, *ids_tuple)
_add_data_to_database(table_name, response)
def _add_data_to_database(table_name: Table, response):
db = Database()
if table_name == Table.TRACK_INFORMATION:
for entry in response['tracks']:
db.add_row(table_name, (entry['id'], entry['name'], entry['duration_ms'], entry['explicit'], entry['popularity']))
elif table_name == Table.ALBUM_INFORMATION:
for entry in response['albums']:
try:
release_year = entry['release_date'][:4]
except Exception:
release_year = ""
db.add_row(table_name, (entry['id'], entry['name'], entry['album_type'], entry['total_tracks'], release_year, entry['label']))
elif table_name == Table.ARTIST_INFORMATION:
for entry in response['artists']:
try:
genre = entry['genres'][0]
except IndexError:
genre = ""
db.add_row(Table.ARTIST_INFORMATION, (entry['id'], entry['name'], entry['followers']['total'], genre, entry['popularity']))
db.close() db.close()
+29 -28
View File
@@ -40,34 +40,6 @@ def get_track_information(track_id: str, bearer_token: str) -> dict:
return response_json return response_json
def get_multiple_tracks_information(bearer_token: str, *track_ids) -> dict:
"""
This function returns the track information based on the track id
:param *track_id: str
:param bearer_token: str
:return: dict
"""
if len(track_ids) > 50:
log.error('Passed more than 50 track ids to get_multiple_tracks_information')
return None
url_suffix = "ids="
separator = ","
for track_id in track_ids:
url_suffix = url_suffix + track_id + separator
url = f"https://api.spotify.com/v1/tracks?{url_suffix}"
url = url[:-len(separator)]
header = {
'Authorization': f'Bearer {bearer_token}'
}
response = requests.get(url, headers=header)
response_json = response.json()
return response_json
def get_artist_information(artist_id: str, bearer_token: str) -> dict: def get_artist_information(artist_id: str, bearer_token: str) -> dict:
""" """
This function returns the artist information based on the artist id This function returns the artist information based on the artist id
@@ -104,3 +76,32 @@ def get_album_information(album_id: str, bearer_token: str) -> dict:
response = requests.get(url, headers=header) response = requests.get(url, headers=header)
response_json = response.json() response_json = response.json()
return response_json return response_json
def get_multiple_field_information(bearer_token: str, api_type: str, limit: int, *track_ids) -> dict:
"""
This function returns the track information based on the track id
:param *track_id: str
:param bearer_token: str
:return: dict
"""
if len(track_ids) > limit:
log.error('Passed more than 20/50 ids to get_multiple_field_information')
return None
url_suffix = "ids="
separator = ","
for track_id in track_ids:
url_suffix = url_suffix + track_id + separator
url = f"https://api.spotify.com/v1/{api_type}?{url_suffix}"
url = url[:-len(separator)]
header = {
'Authorization': f'Bearer {bearer_token}'
}
response = requests.get(url, headers=header)
response_json = response.json()
return response_json