refacor & simple logger

This commit is contained in:
agres
2025-03-19 19:38:22 +01:00
parent 7db5c51867
commit 6b35a34826
4 changed files with 62 additions and 50 deletions
+5 -4
View File
@@ -1,5 +1,6 @@
import base64
import json
import logging as log
import os
import time
from http.server import BaseHTTPRequestHandler, HTTPServer
@@ -37,7 +38,7 @@ def simple_authenticate(grant_type: str = "client_credentials") -> str:
access_token = response.json().get('access_token')
return access_token
else:
print(f"Error {response.status_code}: {response.text}")
log.error(f"Error {response.status_code}: {response.text}")
def authenticate(scope: str) -> str:
@@ -55,7 +56,7 @@ def authenticate(scope: str) -> str:
if time.time() < expires_at:
return access_token
else:
print(f"Token for scope {scope} expired, refreshing...")
log.info(f"Token for scope {scope} expired, refreshing...")
access_token, expires_at = _refresh_access_token(refresh_token, spotify_client_id, spotify_client_secret)
_refresh_tokens_file(access_token, scope, expires_at)
return access_token
@@ -128,7 +129,7 @@ def _start_server_and_wait_for_code() -> any:
self.wfile.write(b"Authorization successful! You can close this window.")
server = HTTPServer(('localhost', 8888), CallbackHandler)
print("Starting server to capture the authorization code...")
log.info("Starting server to capture the authorization code...")
server.handle_request()
return server.authorization_code
@@ -253,4 +254,4 @@ def _refresh_tokens_file(access_token: str, scope: str, expires_at) -> None:
with open(TOKEN_FILE_PATH, 'w') as file:
json.dump(tokens, file, indent=4)
else:
print(f"Error: Scope '{scope}' or refresh_token not found in the tokens file.")
log.error(f"Error: Scope '{scope}' or refresh_token not found in the tokens file.")
+16 -19
View File
@@ -1,3 +1,4 @@
import logging as log
import sqlite3
from enum import Enum
@@ -28,21 +29,31 @@ class Database:
self.cursor.execute(f'''
CREATE TABLE IF NOT EXISTS {Table.TRACK_INFORMATION.value} (
track_id TEXT PRIMARY KEY,
title TEXT
title TEXT,
duration_ms INTEGER,
explicit BOOLEAN,
popularity INTEGER
);
''')
self.cursor.execute(f'''
CREATE TABLE IF NOT EXISTS {Table.ARTIST_INFORMATION.value} (
artist_id TEXT PRIMARY KEY,
artist_name TEXT
artist_name TEXT,
followers INTEGER,
genres TEXT,
popularity INTEGER
);
''')
self.cursor.execute(f'''
CREATE TABLE IF NOT EXISTS {Table.ALBUM_INFORMATION.value} (
album_id TEXT PRIMARY KEY,
album_name TEXT
album_name TEXT,
album_type TEXT,
total_tracks INTEGER,
release_date TEXT,
label TEXT
);
''')
@@ -77,7 +88,7 @@ class Database:
self.cursor.execute(query, values)
self.conn.commit()
except Exception as e:
print(f"Error: {e}")
log.debug(f"Error: {e}")
def read_all_rows(self, table: Table, column: str = "*"):
"""Read all rows from the specified table"""
@@ -111,19 +122,5 @@ class Database:
rows = self.cursor.fetchall()
return rows
except Exception as e:
print(f"Error retrieving total overview: {e}")
log.error(f"Error retrieving total overview: {e}")
return []
"""
print(rows)
if rows:
print(f"{'Played At':<20} {'Track ID':<20} {'Track Title':<50} {'Artist ID':<20} {'Artist Name':<50} {'Album ID':<20} {'Album Name':<50}")
print("-" * 160)
for row in rows:
played_at, track_id, title, artist_id, artist_name, album_id, album_name = row
print(f"{played_at:<20} {track_id:<20} {title:<50} {artist_id:<20} {artist_name:<50} {album_id:<20} {album_name:<50}")
else:
print("No recently played songs found.")
except Exception as e:
print(f"Error retrieving total overview: {e}")"""
+3
View File
@@ -0,0 +1,3 @@
from scraper import scraping
scraping()
+38 -27
View File
@@ -6,7 +6,7 @@ from database_handler import Database, Table
db = Database('spotify_scraped.db')
def main():
def scraping():
"""
This function is the main function that will be executed when the script is run
"""
@@ -17,31 +17,8 @@ def main():
# Once each 30 mins
_read_recently_played_page_and_add_to_db(bearer_token=bearer_token)
_scrape_missing_infos()
bearer_token_simple = simple_authenticate()
# Once a day
all_track_ids_recently_played = db.read_all_rows(Table.RECENTLY_PLAYED, 'track_id')
all_track_ids_saved = db.read_all_rows(Table.TRACK_INFORMATION, 'track_id')
all_track_ids_missing = list(set(all_track_ids_recently_played) - set(all_track_ids_saved))
for track_id in all_track_ids_missing:
response = _get_track_information(track_id=track_id[0], bearer_token=bearer_token_simple)
db.add_row(Table.TRACK_INFORMATION, (response['id'], response['name']))
# Once a day
all_album_ids_recently_played = db.read_all_rows(Table.RECENTLY_PLAYED, 'album_id')
all_album_ids_saved = db.read_all_rows(Table.ALBUM_INFORMATION, 'album_id')
all_album_ids_missing = list(set(all_album_ids_recently_played) - set(all_album_ids_saved))
for album_id in all_album_ids_missing:
response = _get_album_information(album_id=album_id[0], bearer_token=bearer_token_simple)
db.add_row(Table.ALBUM_INFORMATION, (response['id'], response['name']))
# Once a day
all_artist_ids_recently_played = db.read_all_rows(Table.RECENTLY_PLAYED, 'artist_id')
all_artist_ids_saved = db.read_all_rows(Table.ARTIST_INFORMATION, 'artist_id')
all_artist_ids_missing = list(set(all_artist_ids_recently_played) - set(all_artist_ids_saved))
for artist_id in all_artist_ids_missing:
response = _get_artist_information(artist_id=artist_id[0], bearer_token=bearer_token_simple)
db.add_row(Table.ARTIST_INFORMATION, (response['id'], response['name']))
# Close the database connection
db.close()
@@ -135,5 +112,39 @@ def _get_album_information(album_id: str, bearer_token: str) -> dict:
return response_json
if __name__ == '__main__':
main()
def _scrape_missing_infos():
"""
"""
global db
bearer_token_simple = simple_authenticate()
# Track Info
all_track_ids_recently_played = db.read_all_rows(Table.RECENTLY_PLAYED, 'track_id')
all_track_ids_saved = db.read_all_rows(Table.TRACK_INFORMATION, 'track_id')
all_track_ids_missing = list(set(all_track_ids_recently_played) - set(all_track_ids_saved))
for track_id in all_track_ids_missing:
response = _get_track_information(track_id=track_id[0], bearer_token=bearer_token_simple)
db.add_row(Table.TRACK_INFORMATION, (response['id'], response['name'], response['duration_ms'], response['explicit'], response['popularity']))
# Album Info
all_album_ids_recently_played = db.read_all_rows(Table.RECENTLY_PLAYED, 'album_id')
all_album_ids_saved = db.read_all_rows(Table.ALBUM_INFORMATION, 'album_id')
all_album_ids_missing = list(set(all_album_ids_recently_played) - set(all_album_ids_saved))
for album_id in all_album_ids_missing:
response = _get_album_information(album_id=album_id[0], bearer_token=bearer_token_simple)
try:
release_year = response['release_date'][:4]
except Exception:
release_year = ""
db.add_row(Table.ALBUM_INFORMATION, (response['id'], response['name'], response['album_type'], response['total_tracks'], release_year, response['label']))
# Artist Info
all_artist_ids_recently_played = db.read_all_rows(Table.RECENTLY_PLAYED, 'artist_id')
all_artist_ids_saved = db.read_all_rows(Table.ARTIST_INFORMATION, 'artist_id')
all_artist_ids_missing = list(set(all_artist_ids_recently_played) - set(all_artist_ids_saved))
for artist_id in all_artist_ids_missing:
response = _get_artist_information(artist_id=artist_id[0], bearer_token=bearer_token_simple)
try:
genre = response['genres'][0]
except IndexError:
genre = ""
db.add_row(Table.ARTIST_INFORMATION, (response['id'], response['name'], response['followers']['total'], genre, response['popularity']))