Feat/import gdrp data (#26)

* Some simple code for extracting data from the jsons

* Jupiter Notebook

* Mac specific gitignore

* Fixed finding paths to floders

* Delete src/gdpr_data directory

* Updated gitignore to include my testing file

* Added the standard saving path for the database in the database handler, this way multiple files dont have to be updated when moving database position

* Moved the API usage wrappers into an own file, added a function for getting multiple track_ids at once, this still needs to be tested more

* Further code for extracting data from the gdpr files

* Forgor

* Final&Tested version of get_multiple_tracks_information endpoint

* Further functionality: The code now extracts the id of each listened song and makes a api call to get info about these songs via the multiple tracks api. Furthermore we track the songs witch the call is made for already and skip these

* Added function to map catalouged ids into the play history

* Added args parser to runtime program, cleaned up some code

* Fixed a bug where the database would always try to create tables, eaven if it exists

* Added some small text for clean interface

* Some final fixes to actual code, fixed db bug, reversed the order of database entries

* Some documentation

* Added -export args to docker runtime

* fix
This commit is contained in:
Dominik
2025-03-23 18:48:57 +01:00
committed by GitHub
parent 535225392f
commit ff9d726b47
11 changed files with 412 additions and 90 deletions
+12
View File
@@ -1,3 +1,15 @@
# My testing file
main_test.py
# .db
*.db
# DS_Store
.DS_Store
# Gdpr Data file
Streaming_History*
# Test running file
main_test.py
+1 -1
View File
@@ -22,7 +22,7 @@ repos:
files: \.(json)$
- id: check-added-large-files # Prevent large files from being committed
args: ['--maxkb=1000']
args: ['--maxkb=2000']
- id: check-ast # Check for parse errors in Python files
exclude: '.*test.*'
+4
View File
@@ -47,6 +47,10 @@ docker run \
predictify:unstable
```
## GDPR Data
If you have gdpr data, create a folder: ```data/gdpr_data``` and add all .json files containing your play history into it. In order to extract it, run the script: ```python3 src/runtime.py --export```
## Authors
[Chris Kiriakou](https://github.com/ckiri)
+1 -1
View File
@@ -2,4 +2,4 @@
#
# Startup predictify. Don't use this. This is for docker specifically.
source .venv/bin/activate
.venv/bin/python src/runtime.py
.venv/bin/python src/runtime.py --export
+1 -1
View File
@@ -9,7 +9,7 @@ from urllib.parse import parse_qs, urlencode, urlparse
import dotenv
import requests
TOKEN_FILE_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), '../data', 'tokens.json')
TOKEN_FILE_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), '..', 'data', 'tokens.json')
def simple_authenticate(grant_type: str = "client_credentials") -> str:
+4 -1
View File
@@ -1,7 +1,10 @@
import logging as log
import os
import sqlite3
from enum import Enum
DATABASE_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), '..', 'data', 'spotify_scraped.db')
class Table(Enum):
TRACK_INFORMATION = "track_information"
@@ -16,7 +19,7 @@ class Database:
A class to handle the database connection and operations
"""
def __init__(self, db_name):
def __init__(self, db_name: str = DATABASE_PATH):
"""Initialize the connection to the database"""
self.db_name = db_name
self.conn = sqlite3.connect(db_name)
+96
View File
@@ -0,0 +1,96 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Read out data from files"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [],
"source": [
"import json\n",
"import os\n",
"from collections import defaultdict\n",
"\n",
"folder_path = os.path.join(os.getcwd(), '..', 'data', 'gdpr_data')\n",
"\n",
"play_list = []\n",
"\n",
"for filename in os.listdir(folder_path):\n",
"\n",
" if filename.endswith('.json'):\n",
" file_path = os.path.join(folder_path, filename)\n",
" \n",
" with open(file_path, 'r') as file:\n",
" data = json.load(file)\n",
"\n",
" for entry in data:\n",
" try:\n",
" track_id = entry['spotify_track_uri']\n",
" name = entry['master_metadata_track_name']\n",
" artist = entry['master_metadata_album_artist_name']\n",
" album = entry['master_metadata_album_album_name']\n",
" conn_country = entry['conn_country']\n",
" played_on = entry['ts']\n",
" played_track = {'track_id': track_id,\n",
" 'timestamp': played_on,\n",
" 'name': name,\n",
" 'album': album,\n",
" 'artist': artist,\n",
" 'played_from': conn_country\n",
" }\n",
" play_list.append(played_track)\n",
" \n",
" except Exception as e:\n",
" print(f'Missing field: {e}')\n",
" \n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"\n",
"# Sort the playlist by timestamp\n",
"play_list.sort(key=lambda x: x['timestamp'])\n",
"\n",
"# Create a dictionary to store the number of times a track has been played\n",
"track_count = defaultdict(int)\n",
"\n",
"for track in play_list:\n",
" track_count[track['name'], track['artist']] += 1\n",
"\n",
"# Make Track count readable in Data wrangler\n",
"track_count = [{'track': k, 'count': v} for k, v in track_count.items()]\n"
]
}
],
"metadata": {
"kernelspec": {
"display_name": ".venv",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.13.2"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
+143
View File
@@ -0,0 +1,143 @@
import json
import logging as log
import os
from auth import simple_authenticate
from database_handler import Database, Table
from spotify_api import get_multiple_tracks_information
# Define the absolute folder path to the folder containing the gdrp retrieved data
folder_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), '..', 'data', 'gdpr_data')
# Define the db
db = Database()
def _read_gdrp_data() -> list:
"""
This function reads all .json files in the folder containing the gdpr data.
This data is then extracted into a dict and sorted by timestamp ascending.
:return: all_songs_played: A dict with an items field containing all songs played for the user
"""
all_songs_played = []
for filename in os.listdir(folder_path):
if filename.endswith('.json'):
file_path = os.path.join(folder_path, filename)
with open(file_path, 'r') as file:
data = json.load(file)
for entry in data:
# This removes all podcasts from the list
if entry['spotify_track_uri'] is None:
continue
try:
track = {
'timestamp': entry['ts'],
'id': _extract_id(entry['spotify_track_uri']),
'track_name': entry['master_metadata_track_name'],
'artist_name': entry['master_metadata_album_artist_name'],
'album_name': entry['master_metadata_album_album_name'],
'conn_country': entry['conn_country'],
'ms_played': entry['ms_played']
}
all_songs_played.append(track)
except Exception as e:
print(f'Missing field: {e}')
all_songs_played = sorted(all_songs_played, key=lambda x: x['timestamp'])
return all_songs_played
def _extract_id(spotify_id: str) -> str:
"""
This function gets a id with extra details and extracts the id from it.
:param: id a string containing the id
:return: str the ID
"""
prefix = "spotify:track:"
prefix_removed_id = spotify_id[len(prefix):]
return prefix_removed_id
def _populate_ids(all_songs_played: list):
track_ids = []
all_songs_played_info = []
token = simple_authenticate()
processed_songs_id = set()
for i, entry in enumerate(all_songs_played):
track_id = entry['id']
if track_id not in processed_songs_id:
track_ids.append(track_id)
processed_songs_id.add(track_id)
if (i + 1) % 50 == 0:
track_ids_tuple = tuple(track_ids)
track_ids.clear()
response = get_multiple_tracks_information(token, *track_ids_tuple)
all_songs_played_info.extend(_sort_and_create_required_dataset(response))
if track_ids:
track_ids_tuple = tuple(track_ids)
response = get_multiple_tracks_information(token, *track_ids_tuple)
all_songs_played_info.extend(_sort_and_create_required_dataset(response))
return all_songs_played_info
def _sort_and_create_required_dataset(response) -> dict:
track_list = []
for entry in response['tracks']:
track_data = {
'track_id': entry['id'],
'album_id': entry['album']['id'],
'artist_id': entry['artists'][0]['id']
}
track_list.append(track_data)
return track_list
def _fill_missing_ids(all_songs_played, all_songs_catalogued):
# Create a dictionary to map track_id to artist_id and album_id
track_id_to_artist_album = {data['track_id']: {'album_id': data['album_id'], 'artist_id': data['artist_id']} for data in all_songs_catalogued}
# Now, we will update the original `tracks` list by adding artist_id and album_id
for track in all_songs_played:
track_info = track_id_to_artist_album.get(track['id'])
if track_info:
track['artist_id'] = track_info['artist_id']
track['album_id'] = track_info['album_id']
return all_songs_played
def _insert_data_into_db(all_songs_played: list):
"""
This function takes a list of all played songs and inserts these into the database.
:param: all_songs_played list of all songs
"""
for entry in all_songs_played:
try:
db.add_row(Table.RECENTLY_PLAYED, (entry['timestamp'], entry['id'], entry['artist_id'], entry['album_id']))
except Exception as e:
log.error(f'Failed adding {entry} to database, error {e}')
def export_gdpr_data(n_limit: int = 100):
all_songs_played = _read_gdrp_data()
all_songs_played = all_songs_played[-n_limit:]
all_songs_catalogued = _populate_ids(all_songs_played)
all_songs_played = _fill_missing_ids(all_songs_played, all_songs_catalogued)
_insert_data_into_db(all_songs_played)
+28 -2
View File
@@ -1,8 +1,34 @@
import argparse
from time import sleep
from scraper import scraping
from gdpr_export import export_gdpr_data
from scraper import scrape_missing_infos, scraping
# Initialize the parser
parser = argparse.ArgumentParser(description="A python script written in Python3.13 which continuously checks what spotify songs "
"the user is listening to and logging these in a local database. \n"
"The Script also has a export function where it can read out the gdpr data exported by the user.")
# Add optional arguments
parser.add_argument('--verbose', '-v', action='store_true', help="Enable verbose output")
parser.add_argument('--export', '-e', action='store_true', help="Export the gdpr data from spotify if not done already")
# Parse the arguments
args = parser.parse_args()
if args.verbose:
print('Enabled verbose mode')
# implement logger
if args.export:
print('Scraping GDPR Data')
# The next function can gat a int witch defines the amount of songs witch will be scraped from the gdpr files.
# e.g. if 500 is input, the last 500 played songs will come up, if left empty, the last 100.
export_gdpr_data()
scrape_missing_infos()
# Run forever on intervals of 30 minutes
while True:
print('Scraping API...')
scraping()
print('Done Scraping')
sleep(1800)
+16 -84
View File
@@ -1,9 +1,14 @@
import requests
from auth import authenticate, simple_authenticate
from database_handler import Database, Table
from spotify_api import (
get_album_information,
get_artist_information,
get_last_played_track,
get_track_information,
)
db = Database('./data/spotify_scraped.db')
# Define DB
db = Database()
def scraping():
@@ -17,19 +22,20 @@ def scraping():
# Once each 30 mins
_read_recently_played_page_and_add_to_db(bearer_token=bearer_token)
_scrape_missing_infos()
scrape_missing_infos()
db.close()
def _read_recently_played_page_and_add_to_db(bearer_token: str):
"""
This function gets a list of song play history and adds it into the database.
"""
global db
last_played_track = _get_last_played_track(bearer_token=bearer_token)
last_played_track = get_last_played_track(bearer_token=bearer_token)
for track in last_played_track['items']:
for track in reversed(last_played_track['items']):
track_id = track['track']['id']
played_at = track['played_at']
album_id = track['track']['album']['id']
@@ -37,83 +43,9 @@ def _read_recently_played_page_and_add_to_db(bearer_token: str):
db.add_row(Table.RECENTLY_PLAYED, (played_at, track_id, artist_id, album_id))
def _get_last_played_track(url: str = "https://api.spotify.com/v1/me/player/recently-played?limit=50", bearer_token: str = "") -> dict:
"""
This function returns the last played track based on the limit size
:param limit: str
:param bearer_token: str
:return: dict
def scrape_missing_infos():
"""
header = {
'Authorization': f'Bearer {bearer_token}'
}
response = requests.get(url, headers=header)
response_json = response.json()
return response_json
def _get_track_information(track_id: str, bearer_token: str) -> dict:
"""
This function returns the track information based on the track id
:param track_id: str
:param bearer_token: str
:return: dict
"""
url = f"https://api.spotify.com/v1/tracks/{track_id}"
header = {
'Authorization': f'Bearer {bearer_token}'
}
response = requests.get(url, headers=header)
response_json = response.json()
return response_json
def _get_artist_information(artist_id: str, bearer_token: str) -> dict:
"""
This function returns the artist information based on the artist id
:param artist_id: str
:param bearer_token: str
:return: dict
"""
url = f"https://api.spotify.com/v1/artists/{artist_id}"
header = {
'Authorization': f'Bearer {bearer_token}'
}
response = requests.get(url, headers=header)
response_json = response.json()
return response_json
def _get_album_information(album_id: str, bearer_token: str) -> dict:
"""
This function returns the album information based on the album id
:param album_id: str
:param bearer_token: str
:return: dict
"""
url = f"https://api.spotify.com/v1/albums/{album_id}"
header = {
'Authorization': f'Bearer {bearer_token}'
}
response = requests.get(url, headers=header)
response_json = response.json()
return response_json
def _scrape_missing_infos():
"""
"""
global db
@@ -124,14 +56,14 @@ def _scrape_missing_infos():
all_track_ids_saved = db.read_all_rows(Table.TRACK_INFORMATION, 'track_id')
all_track_ids_missing = list(set(all_track_ids_recently_played) - set(all_track_ids_saved))
for track_id in all_track_ids_missing:
response = _get_track_information(track_id=track_id[0], bearer_token=bearer_token_simple)
response = get_track_information(track_id=track_id[0], bearer_token=bearer_token_simple)
db.add_row(Table.TRACK_INFORMATION, (response['id'], response['name'], response['duration_ms'], response['explicit'], response['popularity']))
# Album Info
all_album_ids_recently_played = db.read_all_rows(Table.RECENTLY_PLAYED, 'album_id')
all_album_ids_saved = db.read_all_rows(Table.ALBUM_INFORMATION, 'album_id')
all_album_ids_missing = list(set(all_album_ids_recently_played) - set(all_album_ids_saved))
for album_id in all_album_ids_missing:
response = _get_album_information(album_id=album_id[0], bearer_token=bearer_token_simple)
response = get_album_information(album_id=album_id[0], bearer_token=bearer_token_simple)
try:
release_year = response['release_date'][:4]
except Exception:
@@ -142,7 +74,7 @@ def _scrape_missing_infos():
all_artist_ids_saved = db.read_all_rows(Table.ARTIST_INFORMATION, 'artist_id')
all_artist_ids_missing = list(set(all_artist_ids_recently_played) - set(all_artist_ids_saved))
for artist_id in all_artist_ids_missing:
response = _get_artist_information(artist_id=artist_id[0], bearer_token=bearer_token_simple)
response = get_artist_information(artist_id=artist_id[0], bearer_token=bearer_token_simple)
try:
genre = response['genres'][0]
except IndexError:
+106
View File
@@ -0,0 +1,106 @@
import logging as log
import requests
def get_last_played_track(bearer_token: str, url: str = "https://api.spotify.com/v1/me/player/recently-played?limit=50") -> dict:
"""
This function returns the last played track based on the limit size
:param limit: str
:param bearer_token: str
:return: dict
"""
header = {
'Authorization': f'Bearer {bearer_token}'
}
response = requests.get(url, headers=header)
response_json = response.json()
return response_json
def get_track_information(track_id: str, bearer_token: str) -> dict:
"""
This function returns the track information based on the track id
:param track_id: str
:param bearer_token: str
:return: dict
"""
url = f"https://api.spotify.com/v1/tracks/{track_id}"
header = {
'Authorization': f'Bearer {bearer_token}'
}
response = requests.get(url, headers=header)
response_json = response.json()
return response_json
def get_multiple_tracks_information(bearer_token: str, *track_ids) -> dict:
"""
This function returns the track information based on the track id
:param *track_id: str
:param bearer_token: str
:return: dict
"""
if len(track_ids) > 50:
log.error('Passed more than 50 track ids to get_multiple_tracks_information')
return None
url_suffix = "ids="
separator = ","
for track_id in track_ids:
url_suffix = url_suffix + track_id + separator
url = f"https://api.spotify.com/v1/tracks?{url_suffix}"
url = url[:-len(separator)]
header = {
'Authorization': f'Bearer {bearer_token}'
}
response = requests.get(url, headers=header)
response_json = response.json()
return response_json
def get_artist_information(artist_id: str, bearer_token: str) -> dict:
"""
This function returns the artist information based on the artist id
:param artist_id: str
:param bearer_token: str
:return: dict
"""
url = f"https://api.spotify.com/v1/artists/{artist_id}"
header = {
'Authorization': f'Bearer {bearer_token}'
}
response = requests.get(url, headers=header)
response_json = response.json()
return response_json
def get_album_information(album_id: str, bearer_token: str) -> dict:
"""
This function returns the album information based on the album id
:param album_id: str
:param bearer_token: str
:return: dict
"""
url = f"https://api.spotify.com/v1/albums/{album_id}"
header = {
'Authorization': f'Bearer {bearer_token}'
}
response = requests.get(url, headers=header)
response_json = response.json()
return response_json