from collections.abc import Coroutine from typing import Literal, NoReturn, overload from oxidehttp.client import OxideHTTP from oxidehttp.schema import CachedResponse, Response from pydantic import BaseModel from . import schema as s class SpotifyAPIClient(OxideHTTP): def __init__( self, redis_url: str | None = None, proxy_url: str | None = None, ) -> None: super().__init__( base_url='https://api.spotify.com/v1/', redis_url=redis_url, proxy_url=proxy_url, ) def _auth(self, access_token: str) -> dict[str, str]: return {'Authorization': f'Bearer {access_token}'} async def _process_request[T: Response | CachedResponse]( self, func: Coroutine[None, None, T] ) -> T: req = await func if req.status_code == 504: return await self._process_request(func) return req async def _process[T: BaseModel]( self, req: Response | CachedResponse, schema: type[T] | None ) -> T | None: if req.status_code == 204 or ( req.status_code == 200 and schema is None ): return None if req.status_code >= 500: raise s.InternalError(req.status_code, 'Internal server error') data = await req.json() if req.status_code >= 400: raise s.ClientError(req.status_code, data['error']['message']) if schema: return schema.model_validate(data) async def get_album(self) -> NoReturn: raise NotImplementedError async def get_album_tracks(self) -> NoReturn: raise NotImplementedError async def get_users_saved_albums(self) -> NoReturn: raise NotImplementedError async def get_artist(self) -> NoReturn: raise NotImplementedError async def get_artists_albums(self) -> NoReturn: raise NotImplementedError async def get_an_audiobook(self) -> NoReturn: raise NotImplementedError async def get_audiobook_chapters(self) -> NoReturn: raise NotImplementedError async def get_users_saved_audiobooks(self) -> NoReturn: raise NotImplementedError async def get_a_chapter(self) -> NoReturn: raise NotImplementedError async def get_episode(self) -> NoReturn: raise NotImplementedError async def get_users_saved_episodes(self) -> NoReturn: raise NotImplementedError async def save_item_to_library(self) -> NoReturn: raise NotImplementedError async def remove_item_from_library(self) -> NoReturn: raise NotImplementedError async def check_users_saved_items(self) -> NoReturn: raise NotImplementedError async def get_playback_state( self, access_token: str, *, market: str | None = None, additional_types: list[Literal['track', 'episode']] | None = None, cache_ttl: int | None = None, ) -> s.PlaybackState | None: req = self.get( '/me/player', cache_ttl=cache_ttl, params=self.clean_dict( { 'market': market, 'additional_types': ','.join(additional_types) if additional_types else None, } ), headers=self._auth(access_token), ) req = await self._process_request(req) return await self._process(req, s.PlaybackState) async def transfer_playback(self) -> NoReturn: raise NotImplementedError async def get_available_devices(self) -> NoReturn: raise NotImplementedError async def start_resume_playback(self) -> NoReturn: raise NotImplementedError async def pause_playback(self) -> NoReturn: raise NotImplementedError async def skip_to_next(self, access_token: str, device_id: str) -> None: req = self.post( '/me/player/next', params={'device_id': device_id}, headers=self._auth(access_token), ) req = await self._process_request(req) return await self._process(req, None) async def skip_to_previous( self, access_token: str, device_id: str ) -> None: req = self.post( '/me/player/previous', params={'device_id': device_id}, headers=self._auth(access_token), ) req = await self._process_request(req) return await self._process(req, None) async def seek_to_position(self) -> NoReturn: raise NotImplementedError async def set_repeat_mode(self) -> NoReturn: raise NotImplementedError async def set_playback_volume(self) -> NoReturn: raise NotImplementedError async def toggle_playback_shuffle(self) -> NoReturn: raise NotImplementedError async def get_recently_played_tracks( self, access_token: str, *, limit: int = 20, after: int | None = None, before: int | None = None, cache_ttl: int | None = None, ) -> s.RecentlyPlayed: req = self.get( '/me/player/recently-played', cache_ttl=cache_ttl, params=self.clean_dict( {'limit': limit, 'after': after, 'before': before} ), headers=self._auth(access_token), ) req = await self._process_request(req) res = await self._process(req, s.RecentlyPlayed) if res is None: raise s.InternalError(status_code=0, error="Can't be here") return res async def get_the_users_queue(self) -> NoReturn: raise NotImplementedError async def add_item_to_playback_queue( self, access_token: str, uri: str, device_id: str ) -> None: req = self.post( '/me/player/queue', params={'uri': uri, 'device_id': device_id}, headers=self._auth(access_token), ) req = await self._process_request(req) res = await self._process(req, None) return res async def get_playlist(self) -> NoReturn: raise NotImplementedError async def change_playlist_details(self) -> NoReturn: raise NotImplementedError async def get_playlist_items(self) -> NoReturn: raise NotImplementedError async def update_playlist_items(self) -> NoReturn: raise NotImplementedError async def add_items_to_playlist(self) -> NoReturn: raise NotImplementedError async def remove_playlist_items(self) -> NoReturn: raise NotImplementedError async def get_current_users_playlists(self) -> NoReturn: raise NotImplementedError async def create_playlist(self) -> NoReturn: raise NotImplementedError async def get_playlist_cover_image(self) -> NoReturn: raise NotImplementedError async def add_custom_playlist_cover_image(self) -> NoReturn: raise NotImplementedError @overload async def search_for_item( self, access_token: str, q: str, search_type: Literal['album'], *, market: str | None = None, limit: int = 5, offset: int = 0, include_external: Literal['audio'] | None = None, cache_ttl: int | None = None, ) -> s.SearchAlbum: ... @overload async def search_for_item( self, access_token: str, q: str, search_type: Literal['artist'], *, market: str | None = None, limit: int = 5, offset: int = 0, include_external: Literal['audio'] | None = None, cache_ttl: int | None = None, ) -> s.SearchArtist: ... @overload async def search_for_item( self, access_token: str, q: str, search_type: Literal['playlist'], *, market: str | None = None, limit: int = 5, offset: int = 0, include_external: Literal['audio'] | None = None, cache_ttl: int | None = None, ) -> s.SearchPlaylist: ... @overload async def search_for_item( self, access_token: str, q: str, search_type: Literal['track'], *, market: str | None = None, limit: int = 5, offset: int = 0, include_external: Literal['audio'] | None = None, cache_ttl: int | None = None, ) -> s.SearchTrack: ... @overload async def search_for_item( self, access_token: str, q: str, search_type: Literal['show'], *, market: str | None = None, limit: int = 5, offset: int = 0, include_external: Literal['audio'] | None = None, cache_ttl: int | None = None, ) -> s.SearchShow: ... @overload async def search_for_item( self, access_token: str, q: str, search_type: Literal['episode'], *, market: str | None = None, limit: int = 5, offset: int = 0, include_external: Literal['audio'] | None = None, cache_ttl: int | None = None, ) -> s.SearchEpisode: ... @overload async def search_for_item( self, access_token: str, q: str, search_type: Literal['audiobook'], *, market: str | None = None, limit: int = 5, offset: int = 0, include_external: Literal['audio'] | None = None, cache_ttl: int | None = None, ) -> s.SearchAudiobook: ... async def search_for_item( self, access_token: str, q: str, search_type: Literal[ 'album', 'artist', 'playlist', 'track', 'show', 'episode', 'audiobook', ], *, market: str | None = None, limit: int = 5, offset: int = 0, include_external: Literal['audio'] | None = None, cache_ttl: int | None = None, ) -> ( s.SearchAlbum | s.SearchArtist | s.SearchTrack | s.SearchPlaylist | s.SearchShow | s.SearchEpisode | s.SearchAudiobook ): type_mapping = { 'album': s.SearchAlbum, 'artist': s.SearchArtist, 'track': s.SearchTrack, 'playlist': s.SearchPlaylist, 'show': s.SearchShow, 'episode': s.SearchEpisode, 'audiobook': s.SearchAudiobook, } target_model = type_mapping[search_type] req = self.get( '/search', cache_ttl=cache_ttl, params=self.clean_dict( { 'q': q, 'type': search_type, 'market': market, 'limit': limit, 'offset': offset, 'include_external': include_external, } ), headers=self._auth(access_token), ) req = await self._process_request(req) res = await self._process(req, target_model) if res is None: raise s.InternalError(status_code=0, error="Can't be here") return res async def get_show(self) -> NoReturn: raise NotImplementedError async def get_show_episodes(self) -> NoReturn: raise NotImplementedError async def get_users_saved_shows(self) -> NoReturn: raise NotImplementedError async def get_track( self, access_token: str, track_id: str, *, market: str | None = None, cache_ttl: int | None = None, ) -> s.Track: req = self.get( f'/tracks/{track_id}', cache_ttl=cache_ttl, params=self.clean_dict({'market': market}), headers=self._auth(access_token), ) req = await self._process_request(req) res = await self._process(req, s.Track) if res is None: raise s.InternalError(status_code=0, error="Can't be here") return res async def get_users_saved_tracks(self) -> NoReturn: raise NotImplementedError async def get_current_users_profile( self, access_token: str, *, cache_ttl: int | None = None ) -> s.UserProfile: req = self.get( '/me', cache_ttl=cache_ttl, headers=self._auth(access_token), ) req = await self._process_request(req) res = await self._process(req, s.UserProfile) if res is None: raise s.InternalError(status_code=0, error="Can' be here") return res async def get_users_top_items(self) -> NoReturn: raise NotImplementedError async def get_followed_artists(self) -> NoReturn: raise NotImplementedError