Feat/auth multiple tokens integration (#15)

* Implemented a multiple token read/write/refresh function for the authentificator

* Rework auth with simple authentificate
This commit is contained in:
Dominik
2025-03-19 17:44:17 +01:00
committed by GitHub
parent 57ac860f1f
commit e574c6bc00
3 changed files with 77 additions and 20 deletions
+71 -18
View File
@@ -1,3 +1,4 @@
import base64
import json
import os
import time
@@ -10,6 +11,35 @@ import requests
TOKEN_FILE_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'env', 'tokens.json')
def simple_authenticate(grant_type: str = "client_credentials") -> str:
"""
This function authenticates the user and returns the access token
:return: str
"""
spotify_client_id, spotify_client_secret, spotify_redirect_uri = _read_env_file()
token_url = "https://accounts.spotify.com/api/token"
auth_value = f"{spotify_client_id}:{spotify_client_secret}"
auth_header = base64.b64encode(auth_value.encode('utf-8')).decode('utf-8')
headers = {
"Authorization": f"Basic {auth_header}",
"Content-Type": "application/x-www-form-urlencoded"
}
data = {
"grant_type": f"{grant_type}"
}
response = requests.post(token_url, headers=headers, data=data)
if response.status_code == 200:
access_token = response.json().get('access_token')
return access_token
else:
print(f"Error {response.status_code}: {response.text}")
def authenticate(scope: str) -> str:
"""
This function authenticates the user and returns the access token
@@ -19,15 +49,15 @@ def authenticate(scope: str) -> str:
"""
spotify_client_id, spotify_client_secret, spotify_redirect_uri = _read_env_file()
tokens = _load_tokens()
tokens = _load_tokens(scope)
if tokens:
access_token, refresh_token, expires_at = tokens
if time.time() < expires_at:
return access_token
else:
print("Token expired, refreshing...")
access_token, refresh_token = _refresh_access_token(refresh_token, spotify_client_id, spotify_client_secret)
_save_tokens(access_token, refresh_token)
print(f"Token for scope {scope} expired, refreshing...")
access_token, expires_at = _refresh_access_token(refresh_token, spotify_client_id, spotify_client_secret)
_refresh_tokens_file(access_token, scope, expires_at)
return access_token
auth_url = _get_authorization_url(spotify_client_id, spotify_redirect_uri, scope)
@@ -35,10 +65,10 @@ def authenticate(scope: str) -> str:
authorization_code = _start_server_and_wait_for_code()
access_token, refresh_token = _exchange_code_for_token(authorization_code, redirect_uri=spotify_redirect_uri,
client_id=spotify_client_id, client_secret=spotify_client_secret)
access_token, refresh_token, expires_at = _exchange_code_for_token(authorization_code, redirect_uri=spotify_redirect_uri,
client_id=spotify_client_id, client_secret=spotify_client_secret)
_save_tokens(access_token, refresh_token)
_save_tokens(access_token, refresh_token, scope, expires_at)
return access_token
@@ -135,7 +165,9 @@ def _exchange_code_for_token(code: str, redirect_uri: str, client_id: str, clien
access_token = response_data['access_token']
refresh_token = response_data.get('refresh_token', None)
return access_token, refresh_token
expires_in = response_data['expires_in']
expires_at = time.time() + expires_in
return access_token, refresh_token, expires_at
def _refresh_access_token(refresh_token: str, client_id: str, client_secret: str) -> tuple:
@@ -171,7 +203,7 @@ def _refresh_access_token(refresh_token: str, client_id: str, client_secret: str
return access_token, expires_at
def _load_tokens() -> tuple:
def _load_tokens(scope: str) -> tuple:
"""
Loads the tokens from the local file if they exist and are still valid.
@@ -180,24 +212,45 @@ def _load_tokens() -> tuple:
if os.path.exists(TOKEN_FILE_PATH):
with open(TOKEN_FILE_PATH, 'r') as f:
tokens = json.load(f)
if 'access_token' in tokens and 'expires_at' in tokens and time.time() < tokens['expires_at']:
return tokens['access_token'], tokens['refresh_token'], tokens['expires_at']
if scope in tokens:
if 'access_token' in tokens[scope] and 'expires_at' in tokens[scope] and 'expires_at' in tokens[scope]:
return tokens[scope]['access_token'], tokens[scope]['refresh_token'], tokens[scope]['expires_at']
return None
def _save_tokens(access_token: str, refresh_token: str) -> None:
def _save_tokens(access_token: str, refresh_token: str, scope: str, expires_at) -> None:
"""
Saves the access and refresh tokens to a local file.
:param access_token: str
:param refresh_token: str
:param scope: str
"""
expires_in = 3600 # Default expiration time, adjust as needed
expires_at = time.time() + expires_in
tokens = {
'access_token': access_token,
'refresh_token': refresh_token,
'expires_at': expires_at
}
scope: {
'access_token': access_token,
'refresh_token': refresh_token,
'expires_at': expires_at
},
}
with open(TOKEN_FILE_PATH, 'w') as f:
json.dump(tokens, f)
def _refresh_tokens_file(access_token: str, scope: str, expires_at) -> None:
"""
Saves the access and refresh tokens to a local file.
:param access_token: str
:param scope: str
"""
with open(TOKEN_FILE_PATH, 'r') as file:
tokens = json.load(file)
if scope in tokens and 'refresh_token' in tokens[scope]:
tokens[scope]['access_token'] = access_token
tokens[scope]['expires_at'] = expires_at
with open(TOKEN_FILE_PATH, 'w') as file:
json.dump(tokens, file, indent=4)
else:
print(f"Error: Scope '{scope}' or refresh_token not found in the tokens file.")
+3 -2
View File
@@ -1,6 +1,6 @@
import requests
from auth import authenticate
from auth import authenticate, simple_authenticate
from database_handler import Database, Table
db = Database('spotify_scraped.db')
@@ -20,8 +20,9 @@ def main():
# Once a day
all_track_ids = db.read_all_rows(Table.RECENTLY_PLAYED, 'track_id')
bearer_toke_simple = simple_authenticate()
for track_id in all_track_ids:
response = _get_track_information(track_id=track_id, bearer_token=bearer_token)
response = _get_track_information(track_id=track_id[0], bearer_token=bearer_toke_simple)
print(response)
# Close the database connection