Первый релиз
All checks were successful
Build And Publish Package / publish (push) Successful in 33s

This commit is contained in:
2026-03-08 06:41:33 +03:00
commit 727af5899a
11 changed files with 1233 additions and 0 deletions

439
src/oxidespotify/api.py Normal file
View File

@ -0,0 +1,439 @@
from collections.abc import Coroutine
from typing import Literal, NoReturn, overload
from oxidehttp.client import OxideHTTP
from oxidehttp.schema import CachedResponse, Response
from pydantic import BaseModel
from . import schema as s
class SpotifyAPIClient(OxideHTTP):
def __init__(
self,
redis_url: str | None = None,
proxy_url: str | None = None,
) -> None:
super().__init__(
base_url='https://api.spotify.com/v1/',
redis_url=redis_url,
proxy_url=proxy_url,
)
def _auth(self, access_token: str) -> dict[str, str]:
return {'Authorization': f'Bearer {access_token}'}
async def _process_request[T: Response | CachedResponse](
self, func: Coroutine[None, None, T]
) -> T:
req = await func
if req.status_code == 504:
return await self._process_request(func)
return req
async def _process[T: BaseModel](
self, req: Response | CachedResponse, schema: type[T] | None
) -> T | None:
if req.status_code == 204:
return None
if req.status_code >= 500:
raise s.InternalError(req.status_code, 'Internal server error')
data = await req.json()
if req.status_code >= 400:
raise s.ClientError(req.status_code, data['error']['message'])
if schema:
return schema.model_validate(data)
async def get_album(self) -> NoReturn:
raise NotImplementedError
async def get_album_tracks(self) -> NoReturn:
raise NotImplementedError
async def get_users_saved_albums(self) -> NoReturn:
raise NotImplementedError
async def get_artist(self) -> NoReturn:
raise NotImplementedError
async def get_artists_albums(self) -> NoReturn:
raise NotImplementedError
async def get_an_audiobook(self) -> NoReturn:
raise NotImplementedError
async def get_audiobook_chapters(self) -> NoReturn:
raise NotImplementedError
async def get_users_saved_audiobooks(self) -> NoReturn:
raise NotImplementedError
async def get_a_chapter(self) -> NoReturn:
raise NotImplementedError
async def get_episode(self) -> NoReturn:
raise NotImplementedError
async def get_users_saved_episodes(self) -> NoReturn:
raise NotImplementedError
async def save_item_to_library(self) -> NoReturn:
raise NotImplementedError
async def remove_item_from_library(self) -> NoReturn:
raise NotImplementedError
async def check_users_saved_items(self) -> NoReturn:
raise NotImplementedError
async def get_playback_state(
self,
access_token: str,
*,
market: str | None = None,
additional_types: list[Literal['track', 'episode']] | None = None,
cache_ttl: int | None = None,
) -> s.PlaybackState | None:
req = self.get(
'/me/player',
cache_ttl=cache_ttl,
params=self.clean_dict(
{
'market': market,
'additional_types': ','.join(additional_types)
if additional_types
else None,
}
),
headers=self._auth(access_token),
)
req = await self._process_request(req)
return await self._process(req, s.PlaybackState)
async def transfer_playback(self) -> NoReturn:
raise NotImplementedError
async def get_available_devices(self) -> NoReturn:
raise NotImplementedError
async def start_resume_playback(self) -> NoReturn:
raise NotImplementedError
async def pause_playback(self) -> NoReturn:
raise NotImplementedError
async def skip_to_next(self) -> NoReturn:
raise NotImplementedError
async def skip_to_previous(self) -> NoReturn:
raise NotImplementedError
async def seek_to_position(self) -> NoReturn:
raise NotImplementedError
async def set_repeat_mode(self) -> NoReturn:
raise NotImplementedError
async def set_playback_volume(self) -> NoReturn:
raise NotImplementedError
async def toggle_playback_shuffle(self) -> NoReturn:
raise NotImplementedError
async def get_recently_played_tracks(
self,
access_token: str,
*,
limit: int = 20,
after: int | None = None,
before: int | None = None,
cache_ttl: int | None = None,
) -> s.RecentlyPlayed:
req = self.get(
'/me/player/recently-played',
cache_ttl=cache_ttl,
params=self.clean_dict(
{'limit': limit, 'after': after, 'before': before}
),
headers=self._auth(access_token),
)
req = await self._process_request(req)
res = await self._process(req, s.RecentlyPlayed)
if res is None:
raise s.InternalError(status_code=0, error="Can't be here")
return res
async def get_the_users_queue(self) -> NoReturn:
raise NotImplementedError
async def add_item_to_playback_queue(
self, access_token: str, uri: str, device_id: str
) -> None:
req = self.post(
'/me/player/queue',
params={'uri': uri, 'device_id': device_id},
headers=self._auth(access_token),
)
req = await self._process_request(req)
res = await self._process(req, None)
return res
async def get_playlist(self) -> NoReturn:
raise NotImplementedError
async def change_playlist_details(self) -> NoReturn:
raise NotImplementedError
async def get_playlist_items(self) -> NoReturn:
raise NotImplementedError
async def update_playlist_items(self) -> NoReturn:
raise NotImplementedError
async def add_items_to_playlist(self) -> NoReturn:
raise NotImplementedError
async def remove_playlist_items(self) -> NoReturn:
raise NotImplementedError
async def get_current_users_playlists(self) -> NoReturn:
raise NotImplementedError
async def create_playlist(self) -> NoReturn:
raise NotImplementedError
async def get_playlist_cover_image(self) -> NoReturn:
raise NotImplementedError
async def add_custom_playlist_cover_image(self) -> NoReturn:
raise NotImplementedError
@overload
async def search_for_item(
self,
access_token: str,
q: str,
search_type: Literal['album'],
*,
market: str | None = None,
limit: int = 5,
offset: int = 0,
include_external: Literal['audio'] | None = None,
cache_ttl: int | None = None,
) -> s.SearchAlbum: ...
@overload
async def search_for_item(
self,
access_token: str,
q: str,
search_type: Literal['artist'],
*,
market: str | None = None,
limit: int = 5,
offset: int = 0,
include_external: Literal['audio'] | None = None,
cache_ttl: int | None = None,
) -> s.SearchArtist: ...
@overload
async def search_for_item(
self,
access_token: str,
q: str,
search_type: Literal['playlist'],
*,
market: str | None = None,
limit: int = 5,
offset: int = 0,
include_external: Literal['audio'] | None = None,
cache_ttl: int | None = None,
) -> s.SearchPlaylist: ...
@overload
async def search_for_item(
self,
access_token: str,
q: str,
search_type: Literal['track'],
*,
market: str | None = None,
limit: int = 5,
offset: int = 0,
include_external: Literal['audio'] | None = None,
cache_ttl: int | None = None,
) -> s.SearchTrack: ...
@overload
async def search_for_item(
self,
access_token: str,
q: str,
search_type: Literal['show'],
*,
market: str | None = None,
limit: int = 5,
offset: int = 0,
include_external: Literal['audio'] | None = None,
cache_ttl: int | None = None,
) -> s.SearchShow: ...
@overload
async def search_for_item(
self,
access_token: str,
q: str,
search_type: Literal['episode'],
*,
market: str | None = None,
limit: int = 5,
offset: int = 0,
include_external: Literal['audio'] | None = None,
cache_ttl: int | None = None,
) -> s.SearchEpisode: ...
@overload
async def search_for_item(
self,
access_token: str,
q: str,
search_type: Literal['audiobook'],
*,
market: str | None = None,
limit: int = 5,
offset: int = 0,
include_external: Literal['audio'] | None = None,
cache_ttl: int | None = None,
) -> s.SearchAudiobook: ...
async def search_for_item(
self,
access_token: str,
q: str,
search_type: Literal[
'album',
'artist',
'playlist',
'track',
'show',
'episode',
'audiobook',
],
*,
market: str | None = None,
limit: int = 5,
offset: int = 0,
include_external: Literal['audio'] | None = None,
cache_ttl: int | None = None,
) -> (
s.SearchAlbum
| s.SearchArtist
| s.SearchTrack
| s.SearchPlaylist
| s.SearchShow
| s.SearchEpisode
| s.SearchAudiobook
):
type_mapping = {
'album': s.SearchAlbum,
'artist': s.SearchArtist,
'track': s.SearchTrack,
'playlist': s.SearchPlaylist,
'show': s.SearchShow,
'episode': s.SearchEpisode,
'audiobook': s.SearchAudiobook,
}
target_model = type_mapping[search_type]
req = self.get(
'/search',
cache_ttl=cache_ttl,
params=self.clean_dict(
{
'q': q,
'type': search_type,
'market': market,
'limit': limit,
'offset': offset,
'include_external': include_external,
}
),
headers=self._auth(access_token),
)
req = await self._process_request(req)
res = await self._process(req, target_model)
if res is None:
raise s.InternalError(status_code=0, error="Can't be here")
return res
async def get_show(self) -> NoReturn:
raise NotImplementedError
async def get_show_episodes(self) -> NoReturn:
raise NotImplementedError
async def get_users_saved_shows(self) -> NoReturn:
raise NotImplementedError
async def get_track(
self,
access_token: str,
track_id: str,
*,
market: str | None = None,
cache_ttl: int | None = None,
) -> s.Track:
req = self.get(
f'/tracks/{track_id}',
cache_ttl=cache_ttl,
params=self.clean_dict({'market': market}),
headers=self._auth(access_token),
)
req = await self._process_request(req)
res = await self._process(req, s.Track)
if res is None:
raise s.InternalError(status_code=0, error="Can't be here")
return res
async def get_users_saved_tracks(self) -> NoReturn:
raise NotImplementedError
async def get_current_users_profile(
self, access_token: str, *, cache_ttl: int | None = None
) -> s.UserProfile:
req = self.get(
'/me',
cache_ttl=cache_ttl,
headers=self._auth(access_token),
)
req = await self._process_request(req)
res = await self._process(req, s.UserProfile)
if res is None:
raise s.InternalError(status_code=0, error="Can' be here")
return res
async def get_users_top_items(self) -> NoReturn:
raise NotImplementedError
async def get_followed_artists(self) -> NoReturn:
raise NotImplementedError