diff --git a/src/auth.py b/src/auth.py new file mode 100644 index 0000000..a6a0015 --- /dev/null +++ b/src/auth.py @@ -0,0 +1,203 @@ +import dotenv +import time +from urllib.parse import urlencode, urlparse, parse_qs +from http.server import BaseHTTPRequestHandler, HTTPServer +import requests +import os +import json + + +TOKEN_FILE_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'env', 'tokens.json') + + +def authenticate(scope: str) -> tuple: + """ + This function authenticates the user and returns the access token + + :param scope: str + :return: str + """ + spotify_client_id, spotify_client_secret, spotify_redirect_uri = _read_env_file() + + tokens = _load_tokens() + if tokens: + access_token, refresh_token, expires_at = tokens + if time.time() < expires_at: + return access_token + else: + print("Token expired, refreshing...") + access_token, refresh_token = _refresh_access_token(refresh_token, spotify_client_id, spotify_client_secret) + _save_tokens(access_token, refresh_token) + return access_token + + auth_url = _get_authorization_url(spotify_client_id, spotify_redirect_uri, scope) + print(f'Please go to the following URL to authorize the app: {auth_url}') + + authorization_code = _start_server_and_wait_for_code() + + access_token, refresh_token = _exchange_code_for_token(authorization_code, redirect_uri=spotify_redirect_uri, + client_id=spotify_client_id, client_secret=spotify_client_secret) + + _save_tokens(access_token, refresh_token) + + return access_token + + +def _get_authorization_url(client_id: str, redirect_uri: str, scope: str) -> str: + """ + This function generates the URL that the user needs to visit to authorize the app + + :param client_id: str + :param redirect_uri: str + :param scope: str + :return: str + """ + + auth_params = { + "response_type": "code", + "client_id": client_id, + "scope": scope, + "redirect_uri": redirect_uri, + "state": str(int(time.time())) + } + auth_url = "https://accounts.spotify.com/authorize?" + urlencode(auth_params) + return auth_url + + +def _read_env_file() -> tuple: + """ + This function reads the .env file and returns the client_id, client_secret and redirect_uri + + :return: tuple + """ + current_dir = os.path.dirname(os.path.abspath(__file__)) + dotenv_folder_path = os.path.join(current_dir, 'env') + dotenv_path = os.path.join(dotenv_folder_path, '.env') + contents = dotenv.dotenv_values(dotenv_path=dotenv_path) + spotify_client_id = contents['SPOTIFY_CLIENT_ID'] + spotify_client_secret = contents['SPOTIFY_CLIENT_SECRET'] + spotify_redirect_uri = contents['SPOTIFY_REDIRECT_URI'] + return spotify_client_id, spotify_client_secret, spotify_redirect_uri + + +def _start_server_and_wait_for_code() -> any: + """ + This function starts a server and waits for the user to visit the authorization URL + and get the authorization code + + :return: any + """ + class CallbackHandler(BaseHTTPRequestHandler): + def do_GET(self): + parsed_url = urlparse(self.path) + query_params = parse_qs(parsed_url.query) + if 'code' in query_params: + self.server.authorization_code = query_params['code'][0] + self.send_response(200) + self.end_headers() + self.wfile.write(b"Authorization successful! You can close this window.") + + server = HTTPServer(('localhost', 8888), CallbackHandler) + print("Starting server to capture the authorization code...") + server.handle_request() + return server.authorization_code + + +def _exchange_code_for_token(code: str, redirect_uri: str, client_id: str, client_secret: str) -> tuple: + """ + This function exchanges the authorization code for an access token + + :param code: str + :param redirect_uri: str + :param client_id: str + :param client_secret: str + :return: tuple + """ + + token_url = "https://accounts.spotify.com/api/token" + headers = { + 'Content-Type': 'application/x-www-form-urlencoded', + } + + data = { + 'grant_type': 'authorization_code', + 'code': code, + 'redirect_uri': redirect_uri, + 'client_id': client_id, + 'client_secret': client_secret, + } + + response = requests.post(token_url, data=data, headers=headers) + response_data = response.json() + + if 'access_token' not in response_data: + raise Exception("Failed to get access token") + + access_token = response_data['access_token'] + refresh_token = response_data.get('refresh_token', None) + return access_token, refresh_token + + +def _refresh_access_token(refresh_token: str, client_id: str, client_secret: str) -> tuple: + """ + Refreshes the access token using the refresh token. + + :param refresh_token: str + :param client_id: str + :param client_secret: str + :return: tuple + """ + token_url = "https://accounts.spotify.com/api/token" + headers = { + 'Content-Type': 'application/x-www-form-urlencoded', + } + + data = { + 'grant_type': 'refresh_token', + 'refresh_token': refresh_token, + 'client_id': client_id, + 'client_secret': client_secret, + } + + response = requests.post(token_url, data=data, headers=headers) + response_data = response.json() + + if 'access_token' not in response_data: + raise Exception("Failed to refresh access token") + + access_token = response_data['access_token'] + expires_in = response_data['expires_in'] + expires_at = time.time() + expires_in + return access_token, expires_at + + +def _load_tokens() -> tuple: + """ + Loads the tokens from the local file if they exist and are still valid. + + :return: tuple or None + """ + if os.path.exists(TOKEN_FILE_PATH): + with open(TOKEN_FILE_PATH, 'r') as f: + tokens = json.load(f) + if 'access_token' in tokens and 'expires_at' in tokens and time.time() < tokens['expires_at']: + return tokens['access_token'], tokens['refresh_token'], tokens['expires_at'] + return None + + +def _save_tokens(access_token: str, refresh_token: str) -> None: + """ + Saves the access and refresh tokens to a local file. + + :param access_token: str + :param refresh_token: str + """ + expires_in = 3600 # Default expiration time, adjust as needed + expires_at = time.time() + expires_in + tokens = { + 'access_token': access_token, + 'refresh_token': refresh_token, + 'expires_at': expires_at + } + with open(TOKEN_FILE_PATH, 'w') as f: + json.dump(tokens, f) \ No newline at end of file diff --git a/src/scraper.py b/src/scraper.py index b50b8c7..02ef9b5 100644 --- a/src/scraper.py +++ b/src/scraper.py @@ -1,34 +1,14 @@ -import dotenv -import time -from urllib.parse import urlencode, urlparse, parse_qs -from http.server import BaseHTTPRequestHandler, HTTPServer import requests -import os +from auth import authenticate def main(): - - recently_played_access_token = authenticate() - last_played_track = get_last_played_track(limit=1, bearer_token=recently_played_access_token) + scope = "user-read-recently-played" + bearer_token = authenticate(scope) + last_played_track = _get_last_played_track(bearer_token=bearer_token) + print(last_played_track) -def authenticate() -> str: - """ - This function authenticates the user and returns the access token - """ - spotify_client_id, spotify_client_secret, spotify_redirect_uri = _read_env_file() - - auth_url = _get_authorization_url(spotify_client_id, spotify_redirect_uri) - print(f'Please go to the following URL to authorize the app: {auth_url}') - - authorization_code = _start_server_and_wait_for_code() - - access_token, refresh_token = _exchange_code_for_token(authorization_code, redirect_uri=spotify_redirect_uri, - client_id=spotify_client_id, client_secret=spotify_client_secret) - - return access_token - - -def get_last_played_track(limit: str = "1", bearer_token: str = "") -> dict: +def _get_last_played_track(limit: str = "1", bearer_token: str = "") -> dict: """ This function returns the last played track based on the limit size @@ -46,99 +26,5 @@ def get_last_played_track(limit: str = "1", bearer_token: str = "") -> dict: return response_json -def _get_authorization_url(client_id: str, redirect_uri: str) -> str: - """ - This function generates the URL that the user needs to visit to authorize the app - - :param client_id: str - :param redirect_uri: str - :return: str - """ - - auth_params = { - "response_type": "code", - "client_id": client_id, - "scope": "user-read-recently-played", - "redirect_uri": redirect_uri, - "state": str(int(time.time())) - } - auth_url = "https://accounts.spotify.com/authorize?" + urlencode(auth_params) - return auth_url - - -def _read_env_file() -> tuple: - """ - This function reads the .env file and returns the client_id, client_secret and redirect_uri - - :return: tuple - """ - current_dir = os.path.dirname(os.path.abspath(__file__)) - dotenv_folder_path = os.path.join(current_dir, 'env') - dotenv_path = os.path.join(dotenv_folder_path, '.env') - contents = dotenv.dotenv_values(dotenv_path=dotenv_path) - spotify_client_id = contents['SPOTIFY_CLIENT_ID'] - spotify_client_secret = contents['SPOTIFY_CLIENT_SECRET'] - spotify_redirect_uri = contents['SPOTIFY_REDIRECT_URI'] - return spotify_client_id, spotify_client_secret, spotify_redirect_uri - - -def _start_server_and_wait_for_code() -> any: - """ - This function starts a server and waits for the user to visit the authorization URL - and get the authorization code - - :return: any - """ - class CallbackHandler(BaseHTTPRequestHandler): - def do_GET(self): - parsed_url = urlparse(self.path) - query_params = parse_qs(parsed_url.query) - if 'code' in query_params: - self.server.authorization_code = query_params['code'][0] - self.send_response(200) - self.end_headers() - self.wfile.write(b"Authorization successful! You can close this window.") - - server = HTTPServer(('localhost', 8888), CallbackHandler) - print("Starting server to capture the authorization code...") - server.handle_request() - return server.authorization_code - - -def _exchange_code_for_token(code: str, redirect_uri: str, client_id: str, client_secret: str) -> tuple: - """ - This function exchanges the authorization code for an access token - - :param code: str - :param redirect_uri: str - :param client_id: str - :param client_secret: str - :return: tuple - """ - - token_url = "https://accounts.spotify.com/api/token" - headers = { - 'Content-Type': 'application/x-www-form-urlencoded', - } - - data = { - 'grant_type': 'authorization_code', - 'code': code, - 'redirect_uri': redirect_uri, - 'client_id': client_id, - 'client_secret': client_secret, - } - - response = requests.post(token_url, data=data, headers=headers) - response_data = response.json() - - if 'access_token' not in response_data: - raise Exception("Failed to get access token") - - access_token = response_data['access_token'] - refresh_token = response_data.get('refresh_token', None) - return access_token, refresh_token - - if __name__ == '__main__': main() \ No newline at end of file