Files
predictify/src/auth.py
T
Chris Kiriakou 38e0c69bea Move .env file to config dir, change db path for improved project structure
* Change path of db and token to `data` which helps to sperate source
  code from data files like `tokens.json` & `spotify_scraped.db`
* Change path of `.env` `src/env/` to `config/` for the same reason as
  mentioned above
* Added the newly created files to `.gitigore` accordingly
2025-03-21 18:57:13 +01:00

258 lines
8.2 KiB
Python

import base64
import json
import logging as log
import os
import time
from http.server import BaseHTTPRequestHandler, HTTPServer
from urllib.parse import parse_qs, urlencode, urlparse
import dotenv
import requests
TOKEN_FILE_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), '../data', 'tokens.json')
def simple_authenticate(grant_type: str = "client_credentials") -> str:
"""
This function authenticates the user and returns the access token
:return: str
"""
spotify_client_id, spotify_client_secret, spotify_redirect_uri = _read_env_file()
token_url = "https://accounts.spotify.com/api/token"
auth_value = f"{spotify_client_id}:{spotify_client_secret}"
auth_header = base64.b64encode(auth_value.encode('utf-8')).decode('utf-8')
headers = {
"Authorization": f"Basic {auth_header}",
"Content-Type": "application/x-www-form-urlencoded"
}
data = {
"grant_type": f"{grant_type}"
}
response = requests.post(token_url, headers=headers, data=data)
if response.status_code == 200:
access_token = response.json().get('access_token')
return access_token
else:
log.error(f"Error {response.status_code}: {response.text}")
def authenticate(scope: str) -> str:
"""
This function authenticates the user and returns the access token
:param scope: str
:return: str
"""
spotify_client_id, spotify_client_secret, spotify_redirect_uri = _read_env_file()
tokens = _load_tokens(scope)
if tokens:
access_token, refresh_token, expires_at = tokens
if time.time() < expires_at:
return access_token
else:
log.info(f"Token for scope {scope} expired, refreshing...")
access_token, expires_at = _refresh_access_token(refresh_token, spotify_client_id, spotify_client_secret)
_refresh_tokens_file(access_token, scope, expires_at)
return access_token
auth_url = _get_authorization_url(spotify_client_id, spotify_redirect_uri, scope)
print(f'Please go to the following URL to authorize the app: {auth_url}')
authorization_code = _start_server_and_wait_for_code()
access_token, refresh_token, expires_at = _exchange_code_for_token(authorization_code, redirect_uri=spotify_redirect_uri,
client_id=spotify_client_id, client_secret=spotify_client_secret)
_save_tokens(access_token, refresh_token, scope, expires_at)
return access_token
def _get_authorization_url(client_id: str, redirect_uri: str, scope: str) -> str:
"""
This function generates the URL that the user needs to visit to authorize the app
:param client_id: str
:param redirect_uri: str
:param scope: str
:return: str
"""
auth_params = {
"response_type": "code",
"client_id": client_id,
"scope": scope,
"redirect_uri": redirect_uri,
"state": str(int(time.time()))
}
auth_url = "https://accounts.spotify.com/authorize?" + urlencode(auth_params)
return auth_url
def _read_env_file() -> tuple:
"""
This function reads the .env file and returns the client_id, client_secret and redirect_uri
:return: tuple
"""
current_dir = os.path.dirname(os.path.abspath(__file__))
dotenv_folder_path = os.path.join(current_dir, '../config')
dotenv_path = os.path.join(dotenv_folder_path, '.env')
contents = dotenv.dotenv_values(dotenv_path=dotenv_path)
spotify_client_id = contents['SPOTIFY_CLIENT_ID']
spotify_client_secret = contents['SPOTIFY_CLIENT_SECRET']
spotify_redirect_uri = contents['SPOTIFY_REDIRECT_URI']
return spotify_client_id, spotify_client_secret, spotify_redirect_uri
def _start_server_and_wait_for_code() -> any:
"""
This function starts a server and waits for the user to visit the authorization URL
and get the authorization code
:return: any
"""
class CallbackHandler(BaseHTTPRequestHandler):
def do_GET(self):
parsed_url = urlparse(self.path)
query_params = parse_qs(parsed_url.query)
if 'code' in query_params:
self.server.authorization_code = query_params['code'][0]
self.send_response(200)
self.end_headers()
self.wfile.write(b"Authorization successful! You can close this window.")
server = HTTPServer(('localhost', 8888), CallbackHandler)
log.info("Starting server to capture the authorization code...")
server.handle_request()
return server.authorization_code
def _exchange_code_for_token(code: str, redirect_uri: str, client_id: str, client_secret: str) -> tuple:
"""
This function exchanges the authorization code for an access token
:param code: str
:param redirect_uri: str
:param client_id: str
:param client_secret: str
:return: tuple
"""
token_url = "https://accounts.spotify.com/api/token"
headers = {
'Content-Type': 'application/x-www-form-urlencoded',
}
data = {
'grant_type': 'authorization_code',
'code': code,
'redirect_uri': redirect_uri,
'client_id': client_id,
'client_secret': client_secret,
}
response = requests.post(token_url, data=data, headers=headers)
response_data = response.json()
if 'access_token' not in response_data:
raise Exception("Failed to get access token")
access_token = response_data['access_token']
refresh_token = response_data.get('refresh_token', None)
expires_in = response_data['expires_in']
expires_at = time.time() + expires_in
return access_token, refresh_token, expires_at
def _refresh_access_token(refresh_token: str, client_id: str, client_secret: str) -> tuple:
"""
Refreshes the access token using the refresh token.
:param refresh_token: str
:param client_id: str
:param client_secret: str
:return: tuple
"""
token_url = "https://accounts.spotify.com/api/token"
headers = {
'Content-Type': 'application/x-www-form-urlencoded',
}
data = {
'grant_type': 'refresh_token',
'refresh_token': refresh_token,
'client_id': client_id,
'client_secret': client_secret,
}
response = requests.post(token_url, data=data, headers=headers)
response_data = response.json()
if 'access_token' not in response_data:
raise Exception("Failed to refresh access token")
access_token = response_data['access_token']
expires_in = response_data['expires_in']
expires_at = time.time() + expires_in
return access_token, expires_at
def _load_tokens(scope: str) -> tuple:
"""
Loads the tokens from the local file if they exist and are still valid.
:return: tuple or None
"""
if os.path.exists(TOKEN_FILE_PATH):
with open(TOKEN_FILE_PATH, 'r') as f:
tokens = json.load(f)
if scope in tokens:
if 'access_token' in tokens[scope] and 'expires_at' in tokens[scope] and 'expires_at' in tokens[scope]:
return tokens[scope]['access_token'], tokens[scope]['refresh_token'], tokens[scope]['expires_at']
return None
def _save_tokens(access_token: str, refresh_token: str, scope: str, expires_at) -> None:
"""
Saves the access and refresh tokens to a local file.
:param access_token: str
:param refresh_token: str
:param scope: str
"""
tokens = {
scope: {
'access_token': access_token,
'refresh_token': refresh_token,
'expires_at': expires_at
},
}
with open(TOKEN_FILE_PATH, 'w') as f:
json.dump(tokens, f)
def _refresh_tokens_file(access_token: str, scope: str, expires_at) -> None:
"""
Saves the access and refresh tokens to a local file.
:param access_token: str
:param scope: str
"""
with open(TOKEN_FILE_PATH, 'r') as file:
tokens = json.load(file)
if scope in tokens and 'refresh_token' in tokens[scope]:
tokens[scope]['access_token'] = access_token
tokens[scope]['expires_at'] = expires_at
with open(TOKEN_FILE_PATH, 'w') as file:
json.dump(tokens, file, indent=4)
else:
log.error(f"Error: Scope '{scope}' or refresh_token not found in the tokens file.")