Merge pull request #19 from agresdominik/feat/database_handler

Feat/database handler
This commit is contained in:
Dominik
2025-03-19 19:55:03 +01:00
committed by GitHub
4 changed files with 135 additions and 22 deletions
+5 -4
View File
@@ -1,5 +1,6 @@
import base64 import base64
import json import json
import logging as log
import os import os
import time import time
from http.server import BaseHTTPRequestHandler, HTTPServer from http.server import BaseHTTPRequestHandler, HTTPServer
@@ -37,7 +38,7 @@ def simple_authenticate(grant_type: str = "client_credentials") -> str:
access_token = response.json().get('access_token') access_token = response.json().get('access_token')
return access_token return access_token
else: else:
print(f"Error {response.status_code}: {response.text}") log.error(f"Error {response.status_code}: {response.text}")
def authenticate(scope: str) -> str: def authenticate(scope: str) -> str:
@@ -55,7 +56,7 @@ def authenticate(scope: str) -> str:
if time.time() < expires_at: if time.time() < expires_at:
return access_token return access_token
else: else:
print(f"Token for scope {scope} expired, refreshing...") log.info(f"Token for scope {scope} expired, refreshing...")
access_token, expires_at = _refresh_access_token(refresh_token, spotify_client_id, spotify_client_secret) access_token, expires_at = _refresh_access_token(refresh_token, spotify_client_id, spotify_client_secret)
_refresh_tokens_file(access_token, scope, expires_at) _refresh_tokens_file(access_token, scope, expires_at)
return access_token return access_token
@@ -128,7 +129,7 @@ def _start_server_and_wait_for_code() -> any:
self.wfile.write(b"Authorization successful! You can close this window.") self.wfile.write(b"Authorization successful! You can close this window.")
server = HTTPServer(('localhost', 8888), CallbackHandler) server = HTTPServer(('localhost', 8888), CallbackHandler)
print("Starting server to capture the authorization code...") log.info("Starting server to capture the authorization code...")
server.handle_request() server.handle_request()
return server.authorization_code return server.authorization_code
@@ -253,4 +254,4 @@ def _refresh_tokens_file(access_token: str, scope: str, expires_at) -> None:
with open(TOKEN_FILE_PATH, 'w') as file: with open(TOKEN_FILE_PATH, 'w') as file:
json.dump(tokens, file, indent=4) json.dump(tokens, file, indent=4)
else: else:
print(f"Error: Scope '{scope}' or refresh_token not found in the tokens file.") log.error(f"Error: Scope '{scope}' or refresh_token not found in the tokens file.")
+46 -7
View File
@@ -1,3 +1,4 @@
import logging as log
import sqlite3 import sqlite3
from enum import Enum from enum import Enum
@@ -28,21 +29,31 @@ class Database:
self.cursor.execute(f''' self.cursor.execute(f'''
CREATE TABLE IF NOT EXISTS {Table.TRACK_INFORMATION.value} ( CREATE TABLE IF NOT EXISTS {Table.TRACK_INFORMATION.value} (
track_id TEXT PRIMARY KEY, track_id TEXT PRIMARY KEY,
title TEXT title TEXT,
duration_ms INTEGER,
explicit BOOLEAN,
popularity INTEGER
); );
''') ''')
self.cursor.execute(f''' self.cursor.execute(f'''
CREATE TABLE IF NOT EXISTS {Table.ARTIST_INFORMATION.value} ( CREATE TABLE IF NOT EXISTS {Table.ARTIST_INFORMATION.value} (
artist_id TEXT PRIMARY KEY, artist_id TEXT PRIMARY KEY,
artist_name TEXT artist_name TEXT,
followers INTEGER,
genres TEXT,
popularity INTEGER
); );
''') ''')
self.cursor.execute(f''' self.cursor.execute(f'''
CREATE TABLE IF NOT EXISTS {Table.ALBUM_INFORMATION.value} ( CREATE TABLE IF NOT EXISTS {Table.ALBUM_INFORMATION.value} (
album_id TEXT PRIMARY KEY, album_id TEXT PRIMARY KEY,
album_name TEXT album_name TEXT,
album_type TEXT,
total_tracks INTEGER,
release_date TEXT,
label TEXT
); );
''') ''')
@@ -71,10 +82,13 @@ class Database:
def add_row(self, table: Table, values): def add_row(self, table: Table, values):
"""Add a new row into the specified table""" """Add a new row into the specified table"""
placeholders = ', '.join(['?'] * len(values)) try:
query = f"INSERT INTO {table.value} VALUES ({placeholders})" placeholders = ', '.join(['?'] * len(values))
self.cursor.execute(query, values) query = f"INSERT INTO {table.value} VALUES ({placeholders})"
self.conn.commit() self.cursor.execute(query, values)
self.conn.commit()
except Exception as e:
log.debug(f"Error: {e}")
def read_all_rows(self, table: Table, column: str = "*"): def read_all_rows(self, table: Table, column: str = "*"):
"""Read all rows from the specified table""" """Read all rows from the specified table"""
@@ -85,3 +99,28 @@ class Database:
def close(self): def close(self):
"""Close the database connection""" """Close the database connection"""
self.conn.close() self.conn.close()
def get_total_overview(self) -> list:
"""Retrieve a total overview of all recently played songs with full details"""
try:
# Join recently_played with track_information, artist_information, and album_information
query = f'''
SELECT rp.played_at,
ti.track_id,
ti.title,
ai.artist_id,
ai.artist_name,
al.album_id,
al.album_name
FROM {Table.RECENTLY_PLAYED.value} rp
JOIN {Table.TRACK_INFORMATION.value} ti ON rp.track_id = ti.track_id
JOIN {Table.ARTIST_INFORMATION.value} ai ON rp.artist_id = ai.artist_id
JOIN {Table.ALBUM_INFORMATION.value} al ON rp.album_id = al.album_id
ORDER BY rp.played_at DESC
'''
self.cursor.execute(query)
rows = self.cursor.fetchall()
return rows
except Exception as e:
log.error(f"Error retrieving total overview: {e}")
return []
+8
View File
@@ -0,0 +1,8 @@
from time import sleep
from scraper import scraping
# Run forever on intervals of 30 minutes
while True:
scraping()
sleep(1800)
+76 -11
View File
@@ -6,7 +6,7 @@ from database_handler import Database, Table
db = Database('spotify_scraped.db') db = Database('spotify_scraped.db')
def main(): def scraping():
""" """
This function is the main function that will be executed when the script is run This function is the main function that will be executed when the script is run
""" """
@@ -17,15 +17,8 @@ def main():
# Once each 30 mins # Once each 30 mins
_read_recently_played_page_and_add_to_db(bearer_token=bearer_token) _read_recently_played_page_and_add_to_db(bearer_token=bearer_token)
_scrape_missing_infos()
# Once a day
all_track_ids = db.read_all_rows(Table.RECENTLY_PLAYED, 'track_id')
bearer_toke_simple = simple_authenticate()
for track_id in all_track_ids:
response = _get_track_information(track_id=track_id[0], bearer_token=bearer_toke_simple)
print(response)
# Close the database connection
db.close() db.close()
@@ -81,5 +74,77 @@ def _get_track_information(track_id: str, bearer_token: str) -> dict:
return response_json return response_json
if __name__ == '__main__': def _get_artist_information(artist_id: str, bearer_token: str) -> dict:
main() """
This function returns the artist information based on the artist id
:param artist_id: str
:param bearer_token: str
:return: dict
"""
url = f"https://api.spotify.com/v1/artists/{artist_id}"
header = {
'Authorization': f'Bearer {bearer_token}'
}
response = requests.get(url, headers=header)
response_json = response.json()
return response_json
def _get_album_information(album_id: str, bearer_token: str) -> dict:
"""
This function returns the album information based on the album id
:param album_id: str
:param bearer_token: str
:return: dict
"""
url = f"https://api.spotify.com/v1/albums/{album_id}"
header = {
'Authorization': f'Bearer {bearer_token}'
}
response = requests.get(url, headers=header)
response_json = response.json()
return response_json
def _scrape_missing_infos():
"""
"""
global db
bearer_token_simple = simple_authenticate()
# Track Info
all_track_ids_recently_played = db.read_all_rows(Table.RECENTLY_PLAYED, 'track_id')
all_track_ids_saved = db.read_all_rows(Table.TRACK_INFORMATION, 'track_id')
all_track_ids_missing = list(set(all_track_ids_recently_played) - set(all_track_ids_saved))
for track_id in all_track_ids_missing:
response = _get_track_information(track_id=track_id[0], bearer_token=bearer_token_simple)
db.add_row(Table.TRACK_INFORMATION, (response['id'], response['name'], response['duration_ms'], response['explicit'], response['popularity']))
# Album Info
all_album_ids_recently_played = db.read_all_rows(Table.RECENTLY_PLAYED, 'album_id')
all_album_ids_saved = db.read_all_rows(Table.ALBUM_INFORMATION, 'album_id')
all_album_ids_missing = list(set(all_album_ids_recently_played) - set(all_album_ids_saved))
for album_id in all_album_ids_missing:
response = _get_album_information(album_id=album_id[0], bearer_token=bearer_token_simple)
try:
release_year = response['release_date'][:4]
except Exception:
release_year = ""
db.add_row(Table.ALBUM_INFORMATION, (response['id'], response['name'], response['album_type'], response['total_tracks'], release_year, response['label']))
# Artist Info
all_artist_ids_recently_played = db.read_all_rows(Table.RECENTLY_PLAYED, 'artist_id')
all_artist_ids_saved = db.read_all_rows(Table.ARTIST_INFORMATION, 'artist_id')
all_artist_ids_missing = list(set(all_artist_ids_recently_played) - set(all_artist_ids_saved))
for artist_id in all_artist_ids_missing:
response = _get_artist_information(artist_id=artist_id[0], bearer_token=bearer_token_simple)
try:
genre = response['genres'][0]
except IndexError:
genre = ""
db.add_row(Table.ARTIST_INFORMATION, (response['id'], response['name'], response['followers']['total'], genre, response['popularity']))