456 lines
12 KiB
Python
456 lines
12 KiB
Python
from collections.abc import Coroutine
|
|
from typing import Literal, NoReturn, overload
|
|
|
|
from oxidehttp.client import OxideHTTP
|
|
from oxidehttp.schema import CachedResponse, Response
|
|
from pydantic import BaseModel
|
|
|
|
from . import schema as s
|
|
|
|
|
|
class SpotifyAPIClient(OxideHTTP):
|
|
def __init__(
|
|
self,
|
|
redis_url: str | None = None,
|
|
proxy_url: str | None = None,
|
|
) -> None:
|
|
super().__init__(
|
|
base_url='https://api.spotify.com/v1/',
|
|
redis_url=redis_url,
|
|
proxy_url=proxy_url,
|
|
)
|
|
|
|
def _auth(self, access_token: str) -> dict[str, str]:
|
|
return {'Authorization': f'Bearer {access_token}'}
|
|
|
|
async def _process_request[T: Response | CachedResponse](
|
|
self, func: Coroutine[None, None, T]
|
|
) -> T:
|
|
req = await func
|
|
|
|
if req.status_code == 504:
|
|
return await self._process_request(func)
|
|
|
|
return req
|
|
|
|
async def _process[T: BaseModel](
|
|
self, req: Response | CachedResponse, schema: type[T] | None
|
|
) -> T | None:
|
|
if req.status_code == 204:
|
|
return None
|
|
|
|
if req.status_code >= 500:
|
|
raise s.InternalError(req.status_code, 'Internal server error')
|
|
|
|
data = await req.json()
|
|
if req.status_code >= 400:
|
|
raise s.ClientError(req.status_code, data['error']['message'])
|
|
|
|
if schema:
|
|
return schema.model_validate(data)
|
|
|
|
async def get_album(self) -> NoReturn:
|
|
raise NotImplementedError
|
|
|
|
async def get_album_tracks(self) -> NoReturn:
|
|
raise NotImplementedError
|
|
|
|
async def get_users_saved_albums(self) -> NoReturn:
|
|
raise NotImplementedError
|
|
|
|
async def get_artist(self) -> NoReturn:
|
|
raise NotImplementedError
|
|
|
|
async def get_artists_albums(self) -> NoReturn:
|
|
raise NotImplementedError
|
|
|
|
async def get_an_audiobook(self) -> NoReturn:
|
|
raise NotImplementedError
|
|
|
|
async def get_audiobook_chapters(self) -> NoReturn:
|
|
raise NotImplementedError
|
|
|
|
async def get_users_saved_audiobooks(self) -> NoReturn:
|
|
raise NotImplementedError
|
|
|
|
async def get_a_chapter(self) -> NoReturn:
|
|
raise NotImplementedError
|
|
|
|
async def get_episode(self) -> NoReturn:
|
|
raise NotImplementedError
|
|
|
|
async def get_users_saved_episodes(self) -> NoReturn:
|
|
raise NotImplementedError
|
|
|
|
async def save_item_to_library(self) -> NoReturn:
|
|
raise NotImplementedError
|
|
|
|
async def remove_item_from_library(self) -> NoReturn:
|
|
raise NotImplementedError
|
|
|
|
async def check_users_saved_items(self) -> NoReturn:
|
|
raise NotImplementedError
|
|
|
|
async def get_playback_state(
|
|
self,
|
|
access_token: str,
|
|
*,
|
|
market: str | None = None,
|
|
additional_types: list[Literal['track', 'episode']] | None = None,
|
|
cache_ttl: int | None = None,
|
|
) -> s.PlaybackState | None:
|
|
req = self.get(
|
|
'/me/player',
|
|
cache_ttl=cache_ttl,
|
|
params=self.clean_dict(
|
|
{
|
|
'market': market,
|
|
'additional_types': ','.join(additional_types)
|
|
if additional_types
|
|
else None,
|
|
}
|
|
),
|
|
headers=self._auth(access_token),
|
|
)
|
|
|
|
req = await self._process_request(req)
|
|
return await self._process(req, s.PlaybackState)
|
|
|
|
async def transfer_playback(self) -> NoReturn:
|
|
raise NotImplementedError
|
|
|
|
async def get_available_devices(self) -> NoReturn:
|
|
raise NotImplementedError
|
|
|
|
async def start_resume_playback(self) -> NoReturn:
|
|
raise NotImplementedError
|
|
|
|
async def pause_playback(self) -> NoReturn:
|
|
raise NotImplementedError
|
|
|
|
async def skip_to_next(self, access_token: str, device_id: str) -> None:
|
|
req = self.post(
|
|
'/me/player/next',
|
|
params={'device_id': device_id},
|
|
headers=self._auth(access_token),
|
|
)
|
|
|
|
req = await self._process_request(req)
|
|
return await self._process(req, None)
|
|
|
|
async def skip_to_previous(
|
|
self, access_token: str, device_id: str
|
|
) -> None:
|
|
req = self.post(
|
|
'/me/player/previous',
|
|
params={'device_id': device_id},
|
|
headers=self._auth(access_token),
|
|
)
|
|
|
|
req = await self._process_request(req)
|
|
return await self._process(req, None)
|
|
|
|
async def seek_to_position(self) -> NoReturn:
|
|
raise NotImplementedError
|
|
|
|
async def set_repeat_mode(self) -> NoReturn:
|
|
raise NotImplementedError
|
|
|
|
async def set_playback_volume(self) -> NoReturn:
|
|
raise NotImplementedError
|
|
|
|
async def toggle_playback_shuffle(self) -> NoReturn:
|
|
raise NotImplementedError
|
|
|
|
async def get_recently_played_tracks(
|
|
self,
|
|
access_token: str,
|
|
*,
|
|
limit: int = 20,
|
|
after: int | None = None,
|
|
before: int | None = None,
|
|
cache_ttl: int | None = None,
|
|
) -> s.RecentlyPlayed:
|
|
req = self.get(
|
|
'/me/player/recently-played',
|
|
cache_ttl=cache_ttl,
|
|
params=self.clean_dict(
|
|
{'limit': limit, 'after': after, 'before': before}
|
|
),
|
|
headers=self._auth(access_token),
|
|
)
|
|
|
|
req = await self._process_request(req)
|
|
res = await self._process(req, s.RecentlyPlayed)
|
|
|
|
if res is None:
|
|
raise s.InternalError(status_code=0, error="Can't be here")
|
|
|
|
return res
|
|
|
|
async def get_the_users_queue(self) -> NoReturn:
|
|
raise NotImplementedError
|
|
|
|
async def add_item_to_playback_queue(
|
|
self, access_token: str, uri: str, device_id: str
|
|
) -> None:
|
|
req = self.post(
|
|
'/me/player/queue',
|
|
params={'uri': uri, 'device_id': device_id},
|
|
headers=self._auth(access_token),
|
|
)
|
|
|
|
req = await self._process_request(req)
|
|
res = await self._process(req, None)
|
|
|
|
return res
|
|
|
|
async def get_playlist(self) -> NoReturn:
|
|
raise NotImplementedError
|
|
|
|
async def change_playlist_details(self) -> NoReturn:
|
|
raise NotImplementedError
|
|
|
|
async def get_playlist_items(self) -> NoReturn:
|
|
raise NotImplementedError
|
|
|
|
async def update_playlist_items(self) -> NoReturn:
|
|
raise NotImplementedError
|
|
|
|
async def add_items_to_playlist(self) -> NoReturn:
|
|
raise NotImplementedError
|
|
|
|
async def remove_playlist_items(self) -> NoReturn:
|
|
raise NotImplementedError
|
|
|
|
async def get_current_users_playlists(self) -> NoReturn:
|
|
raise NotImplementedError
|
|
|
|
async def create_playlist(self) -> NoReturn:
|
|
raise NotImplementedError
|
|
|
|
async def get_playlist_cover_image(self) -> NoReturn:
|
|
raise NotImplementedError
|
|
|
|
async def add_custom_playlist_cover_image(self) -> NoReturn:
|
|
raise NotImplementedError
|
|
|
|
@overload
|
|
async def search_for_item(
|
|
self,
|
|
access_token: str,
|
|
q: str,
|
|
search_type: Literal['album'],
|
|
*,
|
|
market: str | None = None,
|
|
limit: int = 5,
|
|
offset: int = 0,
|
|
include_external: Literal['audio'] | None = None,
|
|
cache_ttl: int | None = None,
|
|
) -> s.SearchAlbum: ...
|
|
|
|
@overload
|
|
async def search_for_item(
|
|
self,
|
|
access_token: str,
|
|
q: str,
|
|
search_type: Literal['artist'],
|
|
*,
|
|
market: str | None = None,
|
|
limit: int = 5,
|
|
offset: int = 0,
|
|
include_external: Literal['audio'] | None = None,
|
|
cache_ttl: int | None = None,
|
|
) -> s.SearchArtist: ...
|
|
|
|
@overload
|
|
async def search_for_item(
|
|
self,
|
|
access_token: str,
|
|
q: str,
|
|
search_type: Literal['playlist'],
|
|
*,
|
|
market: str | None = None,
|
|
limit: int = 5,
|
|
offset: int = 0,
|
|
include_external: Literal['audio'] | None = None,
|
|
cache_ttl: int | None = None,
|
|
) -> s.SearchPlaylist: ...
|
|
|
|
@overload
|
|
async def search_for_item(
|
|
self,
|
|
access_token: str,
|
|
q: str,
|
|
search_type: Literal['track'],
|
|
*,
|
|
market: str | None = None,
|
|
limit: int = 5,
|
|
offset: int = 0,
|
|
include_external: Literal['audio'] | None = None,
|
|
cache_ttl: int | None = None,
|
|
) -> s.SearchTrack: ...
|
|
|
|
@overload
|
|
async def search_for_item(
|
|
self,
|
|
access_token: str,
|
|
q: str,
|
|
search_type: Literal['show'],
|
|
*,
|
|
market: str | None = None,
|
|
limit: int = 5,
|
|
offset: int = 0,
|
|
include_external: Literal['audio'] | None = None,
|
|
cache_ttl: int | None = None,
|
|
) -> s.SearchShow: ...
|
|
|
|
@overload
|
|
async def search_for_item(
|
|
self,
|
|
access_token: str,
|
|
q: str,
|
|
search_type: Literal['episode'],
|
|
*,
|
|
market: str | None = None,
|
|
limit: int = 5,
|
|
offset: int = 0,
|
|
include_external: Literal['audio'] | None = None,
|
|
cache_ttl: int | None = None,
|
|
) -> s.SearchEpisode: ...
|
|
|
|
@overload
|
|
async def search_for_item(
|
|
self,
|
|
access_token: str,
|
|
q: str,
|
|
search_type: Literal['audiobook'],
|
|
*,
|
|
market: str | None = None,
|
|
limit: int = 5,
|
|
offset: int = 0,
|
|
include_external: Literal['audio'] | None = None,
|
|
cache_ttl: int | None = None,
|
|
) -> s.SearchAudiobook: ...
|
|
|
|
async def search_for_item(
|
|
self,
|
|
access_token: str,
|
|
q: str,
|
|
search_type: Literal[
|
|
'album',
|
|
'artist',
|
|
'playlist',
|
|
'track',
|
|
'show',
|
|
'episode',
|
|
'audiobook',
|
|
],
|
|
*,
|
|
market: str | None = None,
|
|
limit: int = 5,
|
|
offset: int = 0,
|
|
include_external: Literal['audio'] | None = None,
|
|
cache_ttl: int | None = None,
|
|
) -> (
|
|
s.SearchAlbum
|
|
| s.SearchArtist
|
|
| s.SearchTrack
|
|
| s.SearchPlaylist
|
|
| s.SearchShow
|
|
| s.SearchEpisode
|
|
| s.SearchAudiobook
|
|
):
|
|
type_mapping = {
|
|
'album': s.SearchAlbum,
|
|
'artist': s.SearchArtist,
|
|
'track': s.SearchTrack,
|
|
'playlist': s.SearchPlaylist,
|
|
'show': s.SearchShow,
|
|
'episode': s.SearchEpisode,
|
|
'audiobook': s.SearchAudiobook,
|
|
}
|
|
target_model = type_mapping[search_type]
|
|
|
|
req = self.get(
|
|
'/search',
|
|
cache_ttl=cache_ttl,
|
|
params=self.clean_dict(
|
|
{
|
|
'q': q,
|
|
'type': search_type,
|
|
'market': market,
|
|
'limit': limit,
|
|
'offset': offset,
|
|
'include_external': include_external,
|
|
}
|
|
),
|
|
headers=self._auth(access_token),
|
|
)
|
|
|
|
req = await self._process_request(req)
|
|
res = await self._process(req, target_model)
|
|
|
|
if res is None:
|
|
raise s.InternalError(status_code=0, error="Can't be here")
|
|
|
|
return res
|
|
|
|
async def get_show(self) -> NoReturn:
|
|
raise NotImplementedError
|
|
|
|
async def get_show_episodes(self) -> NoReturn:
|
|
raise NotImplementedError
|
|
|
|
async def get_users_saved_shows(self) -> NoReturn:
|
|
raise NotImplementedError
|
|
|
|
async def get_track(
|
|
self,
|
|
access_token: str,
|
|
track_id: str,
|
|
*,
|
|
market: str | None = None,
|
|
cache_ttl: int | None = None,
|
|
) -> s.Track:
|
|
req = self.get(
|
|
f'/tracks/{track_id}',
|
|
cache_ttl=cache_ttl,
|
|
params=self.clean_dict({'market': market}),
|
|
headers=self._auth(access_token),
|
|
)
|
|
|
|
req = await self._process_request(req)
|
|
res = await self._process(req, s.Track)
|
|
|
|
if res is None:
|
|
raise s.InternalError(status_code=0, error="Can't be here")
|
|
|
|
return res
|
|
|
|
async def get_users_saved_tracks(self) -> NoReturn:
|
|
raise NotImplementedError
|
|
|
|
async def get_current_users_profile(
|
|
self, access_token: str, *, cache_ttl: int | None = None
|
|
) -> s.UserProfile:
|
|
req = self.get(
|
|
'/me',
|
|
cache_ttl=cache_ttl,
|
|
headers=self._auth(access_token),
|
|
)
|
|
|
|
req = await self._process_request(req)
|
|
res = await self._process(req, s.UserProfile)
|
|
|
|
if res is None:
|
|
raise s.InternalError(status_code=0, error="Can' be here")
|
|
|
|
return res
|
|
|
|
async def get_users_top_items(self) -> NoReturn:
|
|
raise NotImplementedError
|
|
|
|
async def get_followed_artists(self) -> NoReturn:
|
|
raise NotImplementedError
|