diff --git a/pyproject.toml b/pyproject.toml index a530cc0..3726fb0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "oxidetwitch" -version = "1.1.0" +version = "1.2.0" description = "Client for Twitch API" readme = "README.md" authors = [{ name = "Miwory", email = "miwory.uwu@gmail.com" }] @@ -99,6 +99,3 @@ ignore-one-line-docstrings = true quote-style = "single" indent-style = "space" docstring-code-format = true - -[tool.uv.sources] -aiohttpx = { index = "Miwory" } diff --git a/src/oxidetwitch/api.py b/src/oxidetwitch/api.py index d8c0966..860ba19 100644 --- a/src/oxidetwitch/api.py +++ b/src/oxidetwitch/api.py @@ -1,10 +1,12 @@ from collections.abc import AsyncGenerator from datetime import datetime -from typing import Literal +from typing import Any, Literal, overload from zoneinfo import ZoneInfo from oxidehttp import status as st from oxidehttp.client import OxideHTTP +from oxidehttp.schema import CachedResponse, Response +from pydantic import BaseModel from . import schema as s from .eventsub import statuses as sub_status @@ -35,80 +37,107 @@ class TwitchAPIClient(OxideHTTP): proxy_url=proxy_url, ) + def _auth(self, access_token: str) -> dict[str, str]: + return {'Authorization': f'Bearer {access_token}'} + + @overload + async def _process( + self, req: Response | CachedResponse, schema: None + ) -> bool: ... + + @overload + async def _process[T: BaseModel]( + self, req: Response | CachedResponse, schema: type[T] + ) -> T: ... + + async def _process[T: BaseModel]( + self, req: Response | CachedResponse, schema: type[T] | None + ) -> T | bool: + if req.status_code >= 500: + raise s.InternalError(req.status_code, 'Internal Server Error') + + if req.status_code >= 400 and req.status_code < 500: + data = await req.json() + message = data.get('message', 'Twitch API Error') + + raise s.ClientError(req.status_code, message) + + if schema is None: + return True + + data = await req.json() + return schema.model_validate(data) + + async def _paginate[S: s.BasePaginated]( + self, + path: str, + schema: type[S], + headers: dict[str, str] | None, + params: dict[str, Any], + cache_ttl: int | None = None, + ) -> AsyncGenerator[S]: + after = None + + while True: + current_params = {**params, 'after': after} if after else params + + req = await self.get( + path, + headers=headers, + params=current_params, + cache_ttl=cache_ttl, + ) + result = await self._process(req, schema) + yield result + + if isinstance(result.pagination, s.Pagination): + after = result.pagination.cursor + + if after is None: + break + async def start_commercial( self, access_token: str, broadcaster_id: int, length: int ) -> s.StartCommercial: req = await self.post( '/channels/commercial', - headers={'Authorization': f'Bearer {access_token}'}, + headers=self._auth(access_token), json={ 'broadcaster_id': broadcaster_id, 'length': length, }, ) - match req.status_code: - case st.OK: - return s.StartCommercial.model_validate(await req.json()) - - case ( - st.BAD_REQUEST - | st.UNAUTHORIZED - | st.NOT_FOUND - | st.TOO_MANY_REQUESTS - ): - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.StartCommercial) async def get_ad_schedule( - self, access_token: str, broadcaster_id: int + self, + access_token: str, + broadcaster_id: int, + *, + cache_ttl: int | None = None, ) -> s.AdSchedule: req = await self.get( '/channels/ads', - headers={'Authorization': f'Bearer {access_token}'}, - params={ - 'broadcaster_id': broadcaster_id, - }, + cache_ttl=cache_ttl, + headers=self._auth(access_token), + params={'broadcaster_id': broadcaster_id}, ) - match req.status_code: - case st.OK: - return s.AdSchedule.model_validate(await req.json()) - - case st.BAD_REQUEST | st.UNAUTHORIZED: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.AdSchedule) async def snooze_next_ad( self, access_token: str, broadcaster_id: int ) -> s.SnoozeNextAd: req = await self.post( '/channels/ads/schedule/snooze', - headers={'Authorization': f'Bearer {access_token}'}, + headers=self._auth(access_token), params={ 'broadcaster_id': broadcaster_id, }, ) - match req.status_code: - case st.OK: - return s.SnoozeNextAd.model_validate(await req.json()) - - case st.BAD_REQUEST | st.UNAUTHORIZED | st.TOO_MANY_REQUESTS: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.SnoozeNextAd) async def get_extension_analytics( self, @@ -119,48 +148,25 @@ class TwitchAPIClient(OxideHTTP): started_at: datetime | None = None, ended_at: datetime | None = None, first: int = 20, + cache_ttl: int | None = None, ) -> AsyncGenerator[s.ExtensionAnalyticsData]: - after = None - - while True: - req = await self.get( - '/analytics/extensions', - headers={'Authorization': f'Bearer {access_token}'}, - params=self.clean_dict( - { - 'extension_id': extension_id, - 'type': analytics_type, - 'started_at': started_at, - 'ended_at': ended_at, - 'first': first, - 'after': after, - } - ), - ) - - match req.status_code: - case st.OK: - json = await req.json() - analytics = s.ExtensionAnalytics.model_validate(json) - - for data in analytics.data: - yield data - - if isinstance(analytics.pagination, s.Pagination): - after = analytics.pagination.cursor - - if not after: - break - - case st.BAD_REQUEST | st.UNAUTHORIZED | st.NOT_FOUND: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError( - req.status_code, 'Internal Server Error' - ) + async for data in self._paginate( + '/analytics/extensions', + s.ExtensionAnalytics, + self._auth(access_token), + self.clean_dict( + { + 'extension_id': extension_id, + 'type': analytics_type, + 'started_at': started_at, + 'ended_at': ended_at, + 'first': first, + } + ), + cache_ttl, + ): + for item in data.data: + yield item async def get_game_analytics( self, @@ -171,48 +177,25 @@ class TwitchAPIClient(OxideHTTP): started_at: datetime | None = None, ended_at: datetime | None = None, first: int = 20, + cache_ttl: int | None = None, ) -> AsyncGenerator[s.GameAnalyticsData]: - after = None - - while True: - req = await self.get( - '/analytics/games', - headers={'Authorization': f'Bearer {access_token}'}, - params=self.clean_dict( - { - 'game_id': game_id, - 'type': analytics_type, - 'started_at': started_at, - 'ended_at': ended_at, - 'first': first, - 'after': after, - } - ), - ) - - match req.status_code: - case st.OK: - json = await req.json() - analytics = s.GameAnalytics.model_validate(json) - - for data in analytics.data: - yield data - - if isinstance(analytics.pagination, s.Pagination): - after = analytics.pagination.cursor - - if not after: - break - - case st.BAD_REQUEST | st.UNAUTHORIZED | st.NOT_FOUND: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError( - req.status_code, 'Internal Server Error' - ) + async for data in self._paginate( + '/analytics/games', + s.GameAnalytics, + self._auth(access_token), + self.clean_dict( + { + 'game_id': game_id, + 'type': analytics_type, + 'started_at': started_at, + 'ended_at': ended_at, + 'first': first, + } + ), + cache_ttl, + ): + for item in data.data: + yield item async def get_bits_leaderboard( self, @@ -222,10 +205,12 @@ class TwitchAPIClient(OxideHTTP): *, started_at: datetime | None = None, user_id: int | None = None, + cache_ttl: int | None = None, ) -> s.BitsLeaderboard: req = await self.get( '/bits/leaderboard', - headers={'Authorization': f'Bearer {access_token}'}, + cache_ttl=cache_ttl, + headers=self._auth(access_token), params=self.clean_dict( { 'count': count, @@ -236,38 +221,23 @@ class TwitchAPIClient(OxideHTTP): ), ) - match req.status_code: - case st.OK: - return s.BitsLeaderboard.model_validate(await req.json()) - - case st.BAD_REQUEST | st.UNAUTHORIZED | st.FORBIDDEN: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.BitsLeaderboard) async def get_cheermotes( - self, access_token: str, *, broadcaster_id: int | None = None + self, + access_token: str, + *, + broadcaster_id: int | None = None, + cache_ttl: int | None = None, ) -> s.Cheermotes: req = await self.get( '/bits/cheermotes', - headers={'Authorization': f'Bearer {access_token}'}, + cache_ttl=cache_ttl, + headers=self._auth(access_token), params=self.clean_dict({'broadcaster_id': broadcaster_id}), ) - match req.status_code: - case st.OK: - return s.Cheermotes.model_validate(await req.json()) - - case st.BAD_REQUEST | st.UNAUTHORIZED: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.Cheermotes) async def get_extension_transactions( self, @@ -276,69 +246,39 @@ class TwitchAPIClient(OxideHTTP): *, user_id: int | list[int] | None = None, first: int = 20, + cache_ttl: int | None = None, ) -> AsyncGenerator[s.ExtensionTransactionsData]: - after = None - - while True: - req = await self.get( - '/extensions/transactions', - headers={'Authorization': f'Bearer {access_token}'}, - params=self.clean_dict( - { - 'extension_id': extension_id, - 'user_id': user_id, - 'first': first, - 'after': after, - } - ), - ) - - match req.status_code: - case st.OK: - json = await req.json() - transactions = s.ExtensionTransactions.model_validate(json) - - for data in transactions.data: - yield data - - if isinstance(transactions.pagination, s.Pagination): - after = transactions.pagination.cursor - - if not after: - break - - case st.BAD_REQUEST | st.UNAUTHORIZED | st.NOT_FOUND: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError( - req.status_code, 'Internal Server Error' - ) + async for data in self._paginate( + '/extensions/transactions', + s.ExtensionTransactions, + self._auth(access_token), + self.clean_dict( + { + 'extension_id': extension_id, + 'user_id': user_id, + 'first': first, + } + ), + cache_ttl, + ): + for item in data.data: + yield item async def get_channel_information( self, access_token: str, broadcaster_id: int | list[int], + *, + cache_ttl: int | None = None, ) -> s.ChannelsInformation: req = await self.get( '/channels', - headers={'Authorization': f'Bearer {access_token}'}, + cache_ttl=cache_ttl, + headers=self._auth(access_token), params={'broadcaster_id': broadcaster_id}, ) - match req.status_code: - case st.OK: - return s.ChannelsInformation.model_validate(await req.json()) - - case st.BAD_REQUEST | st.UNAUTHORIZED | st.TOO_MANY_REQUESTS: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.ChannelsInformation) async def modify_channel_information( self, @@ -367,48 +307,28 @@ class TwitchAPIClient(OxideHTTP): ) req = await self.patch( '/channels', - headers={'Authorization': f'Bearer {access_token}'}, + headers=self._auth(access_token), params={'broadcaster_id': broadcaster_id}, json=data, ) - match req.status_code: - case st.NO_CONTENT: - return True - - case ( - st.BAD_REQUEST - | st.UNAUTHORIZED - | st.FORBIDDEN - | st.TOO_MANY_REQUESTS - ): - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, None) async def get_channel_editors( - self, access_token: str, broadcaster_id: int + self, + access_token: str, + broadcaster_id: int, + *, + cache_ttl: int | None = None, ) -> s.ChannelEditors: req = await self.get( '/channels/editors', - headers={'Authorization': f'Bearer {access_token}'}, + cache_ttl=cache_ttl, + headers=self._auth(access_token), params={'broadcaster_id': broadcaster_id}, ) - match req.status_code: - case st.OK: - return s.ChannelEditors.model_validate(await req.json()) - - case st.BAD_REQUEST | st.UNAUTHORIZED | st.TOO_MANY_REQUESTS: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.ChannelEditors) async def get_followed_channels( self, @@ -416,45 +336,22 @@ class TwitchAPIClient(OxideHTTP): broadcaster_id: int, *, first: int = 20, + cache_ttl: int | None = None, ) -> AsyncGenerator[tuple[int, s.FollowedChannel]]: - after = None - - while True: - req = await self.get( - '/channels/followed', - headers={'Authorization': f'Bearer {access_token}'}, - params=self.clean_dict( - { - 'broadcaster_id': broadcaster_id, - 'first': first, - 'after': after, - } - ), - ) - - match req.status_code: - case st.OK: - json = await req.json() - followed_channels = s.FollowedChannels.model_validate(json) - - for data in followed_channels.data: - yield followed_channels.total, data - - if isinstance(followed_channels.pagination, s.Pagination): - after = followed_channels.pagination.cursor - - if not after: - break - - case st.BAD_REQUEST | st.UNAUTHORIZED | st.TOO_MANY_REQUESTS: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError( - req.status_code, 'Internal Server Error' - ) + async for data in self._paginate( + '/channels/followed', + s.FollowedChannels, + self._auth(access_token), + self.clean_dict( + { + 'broadcaster_id': broadcaster_id, + 'first': first, + } + ), + cache_ttl, + ): + for item in data.data: + yield data.total, item async def get_channel_followers( self, @@ -463,46 +360,23 @@ class TwitchAPIClient(OxideHTTP): *, user_id: int | None = None, first: int = 20, + cache_ttl: int | None = None, ) -> AsyncGenerator[tuple[int, s.ChannelFollower]]: - after = None - - while True: - req = await self.get( - '/channels/followers', - headers={'Authorization': f'Bearer {access_token}'}, - params=self.clean_dict( - { - 'broadcaster_id': broadcaster_id, - 'user_id': user_id, - 'first': first, - 'after': after, - } - ), - ) - - match req.status_code: - case st.OK: - json = await req.json() - channel_followers = s.ChannelFollowers.model_validate(json) - - for data in channel_followers.data: - yield channel_followers.total, data - - if isinstance(channel_followers.pagination, s.Pagination): - after = channel_followers.pagination.cursor - - if not after: - break - - case st.BAD_REQUEST | st.UNAUTHORIZED | st.TOO_MANY_REQUESTS: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError( - req.status_code, 'Internal Server Error' - ) + async for data in self._paginate( + '/channels/followers', + s.ChannelFollowers, + self._auth(access_token), + self.clean_dict( + { + 'broadcaster_id': broadcaster_id, + 'user_id': user_id, + 'first': first, + } + ), + cache_ttl, + ): + for item in data.data: + yield data.total, item async def create_custom_rewards( self, @@ -546,33 +420,18 @@ class TwitchAPIClient(OxideHTTP): req = await self.post( '/channel_points/custom_rewards', - headers={'Authorization': f'Bearer {access_token}'}, + headers=self._auth(access_token), json=data, ) - match req.status_code: - case st.OK: - return s.CustomRewards.model_validate(await req.json()) - - case ( - st.BAD_REQUEST - | st.UNAUTHORIZED - | st.FORBIDDEN - | st.TOO_MANY_REQUESTS - ): - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.CustomRewards) async def delete_custom_reward( self, access_token: str, broadcaster_id: int, reward_id: str ) -> bool: req = await self.delete( '/channel_points/custom_rewards', - headers={'Authorization': f'Bearer {access_token}'}, + headers=self._auth(access_token), params={'broadcaster_id': broadcaster_id, 'id': reward_id}, ) @@ -601,10 +460,12 @@ class TwitchAPIClient(OxideHTTP): *, reward_id: str | None = None, only_manageable_rewards: bool = False, + cache_ttl: int | None = None, ) -> s.CustomRewards: req = await self.get( '/channel_points/custom_rewards', - headers={'Authorization': f'Bearer {access_token}'}, + cache_ttl=cache_ttl, + headers=self._auth(access_token), params=self.clean_dict( { 'broadcaster_id': broadcaster_id, @@ -614,23 +475,7 @@ class TwitchAPIClient(OxideHTTP): ), ) - match req.status_code: - case st.OK: - return s.CustomRewards.model_validate(await req.json()) - - case ( - st.BAD_REQUEST - | st.UNAUTHORIZED - | st.FORBIDDEN - | st.NOT_FOUND - | st.TOO_MANY_REQUESTS - ): - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.CustomRewards) async def get_custom_reward_redemption( self, @@ -642,56 +487,26 @@ class TwitchAPIClient(OxideHTTP): redemption_id: int | list[int] | None = None, sort: Literal['OLDEST', 'NEWEST'] = 'OLDEST', first: int = 20, + cache_ttl: int | None = None, ) -> AsyncGenerator[s.CustomRewardRedemption]: - after = None - - while True: - req = await self.get( - '/channel_points/custom_rewards/redemptions', - headers={'Authorization': f'Bearer {access_token}'}, - params=self.clean_dict( - { - 'broadcaster_id': broadcaster_id, - 'reward_id': reward_id, - 'status': status, - 'redemption_id': redemption_id, - 'sort': sort, - 'after': after, - 'first': first, - } - ), - ) - - match req.status_code: - case st.OK: - json = await req.json() - redemptions = s.CustomRewardRedemptions.model_validate( - json - ) - - for data in redemptions.data: - yield data - - if isinstance(redemptions.pagination, s.Pagination): - after = redemptions.pagination.cursor - - if not after: - break - - case ( - st.BAD_REQUEST - | st.UNAUTHORIZED - | st.NOT_FOUND - | st.TOO_MANY_REQUESTS - ): - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError( - req.status_code, 'Internal Server Error' - ) + async for data in self._paginate( + '/channel_points/custom_rewards/redemptions', + s.CustomRewardRedemptions, + self._auth(access_token), + self.clean_dict( + { + 'broadcaster_id': broadcaster_id, + 'reward_id': reward_id, + 'status': status, + 'redemption_id': redemption_id, + 'sort': sort, + 'first': first, + } + ), + cache_ttl, + ): + for item in data.data: + yield item async def update_custom_reward( self, @@ -736,28 +551,12 @@ class TwitchAPIClient(OxideHTTP): ) req = await self.patch( '/channel_points/custom_rewards', - headers={'Authorization': f'Bearer {access_token}'}, + headers=self._auth(access_token), params={'broadcaster_id': broadcaster_id, 'id': reward_id}, json=data, ) - match req.status_code: - case st.OK: - return s.CustomRewards.model_validate(await req.json()) - - case ( - st.BAD_REQUEST - | st.UNAUTHORIZED - | st.FORBIDDEN - | st.NOT_FOUND - | st.TOO_MANY_REQUESTS - ): - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.CustomRewards) async def update_redemption_status( self, @@ -769,7 +568,7 @@ class TwitchAPIClient(OxideHTTP): ) -> bool: req = await self.post( '/channel_points/custom_rewards/redemptions', - headers={'Authorization': f'Bearer {access_token}'}, + headers=self._auth(access_token), params={ 'broadcaster_id': broadcaster_id, 'reward_id': reward_id, @@ -778,44 +577,23 @@ class TwitchAPIClient(OxideHTTP): json={'status': status}, ) - match req.status_code: - case st.NO_CONTENT: - return True - - case ( - st.BAD_REQUEST - | st.UNAUTHORIZED - | st.FORBIDDEN - | st.NOT_FOUND - | st.TOO_MANY_REQUESTS - ): - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, None) async def get_charity_campaign( - self, access_token: str, broadcaster_id: int + self, + access_token: str, + broadcaster_id: int, + *, + cache_ttl: int | None = None, ) -> s.CharityCampaign: req = await self.get( '/charity/campaigns', + cache_ttl=cache_ttl, params={'broadcaster_id': broadcaster_id}, - headers={'Authorization': f'Bearer {access_token}'}, + headers=self._auth(access_token), ) - match req.status_code: - case st.OK: - return s.CharityCampaign.model_validate(await req.json()) - - case st.BAD_REQUEST | st.UNAUTHORIZED | st.FORBIDDEN: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.CharityCampaign) async def get_charity_campaign_donations( self, @@ -823,51 +601,19 @@ class TwitchAPIClient(OxideHTTP): broadcaster_id: int, *, first: int = 20, - cache_time: int | None = None, + cache_ttl: int | None = None, ) -> AsyncGenerator[s.CharityDonation]: - after = None - - while True: - req = await self.get( - '/charity/donations', - params=self.clean_dict( - { - 'broadcaster_id': broadcaster_id, - 'first': first, - 'after': after, - } - ), - headers=self.clean_dict( - { - 'Authorization': f'Bearer {access_token}', - 'X-Cache-TTL': str(cache_time), - } - ), - ) - - match req.status_code: - case st.OK: - json = await req.json() - donations = s.CharityDonations.model_validate(json) - - for donation in donations.data: - yield donation - - if isinstance(donations.pagination, s.Pagination): - after = donations.pagination.cursor - - if not after: - break - - case st.BAD_REQUEST | st.UNAUTHORIZED | st.FORBIDDEN: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError( - req.status_code, 'Internal Server Error' - ) + async for data in self._paginate( + '/charity/donations', + s.CharityDonations, + self._auth(access_token), + self.clean_dict( + {'broadcaster_id': broadcaster_id, 'first': first} + ), + cache_ttl, + ): + for item in data.data: + yield item async def get_chatters( self, @@ -876,192 +622,93 @@ class TwitchAPIClient(OxideHTTP): moderator_id: int, *, first: int = 20, - cache_time: int | None = None, + cache_ttl: int | None = None, ) -> AsyncGenerator[tuple[int, s.ChattersData]]: - after = None - - while True: - req = await self.get( - '/chat/chatters', - params=self.clean_dict( - { - 'broadcaster_id': broadcaster_id, - 'moderator_id': moderator_id, - 'first': first, - 'after': after, - } - ), - headers=self.clean_dict( - { - 'Authorization': f'Bearer {access_token}', - 'X-Cache-TTL': str(cache_time), - } - ), - ) - - match req.status_code: - case st.OK: - json = await req.json() - chatters = s.Chatters.model_validate(json) - - for data in chatters.data: - yield chatters.total, data - - if isinstance(chatters.pagination, s.Pagination): - after = chatters.pagination.cursor - - if not after: - break - - case st.BAD_REQUEST | st.UNAUTHORIZED | st.FORBIDDEN: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError( - req.status_code, 'Internal Server Error' - ) + async for data in self._paginate( + '/chat/chatters', + s.Chatters, + self._auth(access_token), + self.clean_dict( + { + 'broadcaster_id': broadcaster_id, + 'moderator_id': moderator_id, + 'first': first, + } + ), + cache_ttl, + ): + for item in data.data: + yield data.total, item async def get_channel_emotes( self, access_token: str, broadcaster_id: int, *, - cache_time: int | None = None, + cache_ttl: int | None = None, ) -> s.ChannelEmotes: req = await self.get( '/chat/emotes', - headers=self.clean_dict( - { - 'Authorization': f'Bearer {access_token}', - 'X-Cache-TTL': str(cache_time), - } - ), + cache_ttl=cache_ttl, + headers=self._auth(access_token), params={'broadcaster_id': broadcaster_id}, ) - match req.status_code: - case st.OK: - return s.ChannelEmotes.model_validate(await req.json()) - - case st.BAD_REQUEST | st.UNAUTHORIZED: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.ChannelEmotes) async def get_global_emotes( - self, access_token: str, *, cache_time: int | None = None + self, access_token: str, *, cache_ttl: int | None = None ) -> s.GlobalEmotes: req = await self.get( '/chat/emotes/global', - headers=self.clean_dict( - { - 'Authorization': f'Bearer {access_token}', - 'X-Cache-TTL': str(cache_time), - } - ), + cache_ttl=cache_ttl, + headers=self._auth(access_token), ) - match req.status_code: - case st.OK: - return s.GlobalEmotes.model_validate(await req.json()) - - case st.UNAUTHORIZED: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.GlobalEmotes) async def get_emote_sets( self, access_token: str, emote_set_id: int, *, - cache_time: int | None = None, + cache_ttl: int | None = None, ) -> s.EmoteSets: req = await self.get( '/chat/emotes/set', - headers=self.clean_dict( - { - 'Authorization': f'Bearer {access_token}', - 'X-Cache-TTL': str(cache_time), - } - ), + cache_ttl=cache_ttl, + headers=self._auth(access_token), params={'emote_set_id': emote_set_id}, ) - match req.status_code: - case st.OK: - return s.EmoteSets.model_validate(await req.json()) - - case st.BAD_REQUEST | st.UNAUTHORIZED: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.EmoteSets) async def get_channel_chat_badges( self, access_token: str, broadcaster_id: int, *, - cache_time: int | None = None, + cache_ttl: int | None = None, ) -> s.ChannelChatBadges: req = await self.get( '/chat/badges', - headers=self.clean_dict( - { - 'Authorization': f'Bearer {access_token}', - 'X-Cache-TTL': str(cache_time), - } - ), + cache_ttl=cache_ttl, + headers=self._auth(access_token), params={'broadcaster_id': broadcaster_id}, ) - match req.status_code: - case st.OK: - return s.ChannelChatBadges.model_validate(await req.json()) - - case st.BAD_REQUEST | st.UNAUTHORIZED: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.ChannelChatBadges) async def get_global_chat_badges( - self, access_token: str, *, cache_time: int | None = None + self, access_token: str, *, cache_ttl: int | None = None ) -> s.GlobalChatBadges: req = await self.get( '/chat/badges/global', - headers=self.clean_dict( - { - 'Authorization': f'Bearer {access_token}', - 'X-Cache-TTL': str(cache_time), - } - ), + cache_ttl=cache_ttl, + headers=self._auth(access_token), ) - match req.status_code: - case st.OK: - return s.GlobalChatBadges.model_validate(await req.json()) - - case st.UNAUTHORIZED: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.GlobalChatBadges) async def get_chat_settings( self, @@ -1069,16 +716,12 @@ class TwitchAPIClient(OxideHTTP): broadcaster_id: int, *, moderator_id: int | None = None, - cache_time: int | None = None, + cache_ttl: int | None = None, ) -> s.ChatSettings: req = await self.get( '/chat/settings', - headers=self.clean_dict( - { - 'Authorization': f'Bearer {access_token}', - 'X-Cache-TTL': str(cache_time), - } - ), + cache_ttl=cache_ttl, + headers=self._auth(access_token), params=self.clean_dict( { 'broadcaster_id': broadcaster_id, @@ -1087,47 +730,23 @@ class TwitchAPIClient(OxideHTTP): ), ) - match req.status_code: - case st.OK: - return s.ChatSettings.model_validate(await req.json()) - - case st.BAD_REQUEST | st.UNAUTHORIZED: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.ChatSettings) async def get_shared_chat_session( self, access_token: str, broadcaster_id: int, *, - cache_time: int | None = None, + cache_ttl: int | None = None, ) -> s.SharedChatSession: req = await self.get( '/shared_chat/session', - headers=self.clean_dict( - { - 'Authorization': f'Bearer {access_token}', - 'X-Cache-TTL': str(cache_time), - } - ), + cache_ttl=cache_ttl, + headers=self._auth(access_token), params={'broadcaster_id': broadcaster_id}, ) - match req.status_code: - case st.OK: - return s.SharedChatSession.model_validate(await req.json()) - - case st.BAD_REQUEST | st.UNAUTHORIZED: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.SharedChatSession) async def get_user_emotes( self, @@ -1135,51 +754,19 @@ class TwitchAPIClient(OxideHTTP): user_id: int, *, broadcaster_id: int | None = None, - cache_time: int | None = None, + cache_ttl: int | None = None, ) -> AsyncGenerator[tuple[str, s.ChannelEmote]]: - after = None - - while True: - req = await self.get( - '/chat/emotes/global', - headers=self.clean_dict( - { - 'Authorization': f'Bearer {access_token}', - 'X-Cache-TTL': str(cache_time), - } - ), - params=self.clean_dict( - { - 'user_id': user_id, - 'after': after, - 'broadcaster_id': broadcaster_id, - } - ), - ) - - match req.status_code: - case st.OK: - json = await req.json() - user_emotes = s.UserEmotes.model_validate(json) - - for data in user_emotes.data: - yield user_emotes.template, data - - if isinstance(user_emotes.pagination, s.Pagination): - after = user_emotes.pagination.cursor - - if not after: - break - - case st.UNAUTHORIZED: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError( - req.status_code, 'Internal Server Error' - ) + async for data in self._paginate( + '/chat/emotes', + s.UserEmotes, + self._auth(access_token), + self.clean_dict( + {'user_id': user_id, 'broadcaster_id': broadcaster_id} + ), + cache_ttl, + ): + for item in data.data: + yield data.template, item async def update_chat_settings( self, @@ -1199,7 +786,7 @@ class TwitchAPIClient(OxideHTTP): ) -> s.ChatSettings: req = await self.put( '/chat/settings', - headers={'Authorization': f'Bearer {access_token}'}, + headers=self._auth(access_token), params={ 'broadcaster_id': broadcaster_id, 'moderator_id': moderator_id, @@ -1221,17 +808,7 @@ class TwitchAPIClient(OxideHTTP): ), ) - match req.status_code: - case st.OK: - return s.ChatSettings.model_validate(await req.json()) - - case st.BAD_REQUEST | st.UNAUTHORIZED: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.ChatSettings) async def send_chat_announcement( self, @@ -1245,7 +822,7 @@ class TwitchAPIClient(OxideHTTP): ) -> bool: req = await self.post( '/chat/announcements', - headers={'Authorization': f'Bearer {access_token}'}, + headers=self._auth(access_token), params={ 'broadcaster_id': broadcaster_id, 'moderator_id': moderator_id, @@ -1253,17 +830,7 @@ class TwitchAPIClient(OxideHTTP): json=self.clean_dict({'message': message, 'color': color}), ) - match req.status_code: - case st.NO_CONTENT: - return True - - case st.BAD_REQUEST | st.UNAUTHORIZED | st.TOO_MANY_REQUESTS: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, None) async def send_shoutout( self, @@ -1274,7 +841,7 @@ class TwitchAPIClient(OxideHTTP): ) -> bool: req = await self.post( '/chat/shoutouts', - headers={'Authorization': f'Bearer {access_token}'}, + headers=self._auth(access_token), params={ 'from_broadcaster_id': from_broadcaster_id, 'to_broadcaster_id': to_broadcaster_id, @@ -1282,22 +849,7 @@ class TwitchAPIClient(OxideHTTP): }, ) - match req.status_code: - case st.NO_CONTENT: - return True - - case ( - st.BAD_REQUEST - | st.UNAUTHORIZED - | st.FORBIDDEN - | st.TOO_MANY_REQUESTS - ): - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, None) async def send_chat_message( self, @@ -1311,7 +863,7 @@ class TwitchAPIClient(OxideHTTP): ) -> s.Message: req = await self.post( '/chat/messages', - headers={'Authorization': f'Bearer {access_token}'}, + headers=self._auth(access_token), json=self.clean_dict( { 'broadcaster_id': broadcaster_id, @@ -1323,52 +875,23 @@ class TwitchAPIClient(OxideHTTP): ), ) - match req.status_code: - case st.OK: - return s.Message.model_validate(await req.json()) - - case ( - st.BAD_REQUEST - | st.UNAUTHORIZED - | st.FORBIDDEN - | st.TOO_MANY_REQUESTS - ): - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.Message) async def get_user_chat_color( self, access_token: str, user_id: int | list[int], *, - cache_time: int | None = None, + cache_ttl: int | None = None, ) -> s.UserChatColor: req = await self.get( '/chat/colors', - headers=self.clean_dict( - { - 'Authorization': f'Bearer {access_token}', - 'X-Cache-TTL': str(cache_time), - } - ), + cache_ttl=cache_ttl, + headers=self._auth(access_token), params={'user_id': user_id}, ) - match req.status_code: - case st.OK: - return s.UserChatColor.model_validate(await req.json()) - - case st.BAD_REQUEST | st.UNAUTHORIZED: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.UserChatColor) async def update_user_chat_color( self, @@ -1378,21 +901,11 @@ class TwitchAPIClient(OxideHTTP): ) -> bool: req = await self.put( '/chat/color', - headers={'Authorization': f'Bearer {access_token}'}, + headers=self._auth(access_token), params={'user_id': user_id, 'color': color}, ) - match req.status_code: - case st.NO_CONTENT: - return True - - case st.BAD_REQUEST | st.UNAUTHORIZED: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, None) async def create_clip( self, @@ -1403,25 +916,13 @@ class TwitchAPIClient(OxideHTTP): ) -> bool: req = await self.post( '/clips', - headers={'Authorization': f'Bearer {access_token}'}, + headers=self._auth(access_token), params=self.clean_dict( {'broadcaster_id': broadcaster_id, 'has_delay': has_delay} ), ) - match req.status_code: - case st.ACCEPTED: - return True - - case ( - st.BAD_REQUEST | st.UNAUTHORIZED | st.FORBIDDEN | st.NOT_FOUND - ): - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, None) async def get_clips( self, @@ -1435,57 +936,28 @@ class TwitchAPIClient(OxideHTTP): first: int = 20, before: str | None = None, is_featured: bool | None = None, - cache_time: int | None = None, + cache_ttl: int | None = None, ) -> AsyncGenerator[s.Clip]: - after = None - - while True: - req = await self.get( - '/clips', - headers=self.clean_dict( - { - 'Authorization': f'Bearer {access_token}', - 'X-Cache-TTL': str(cache_time), - } - ), - params=self.clean_dict( - { - 'broadcaster_id': broadcaster_id, - 'game_id': game_id, - 'clip_id': clip_id, - 'started_at': started_at, - 'ended_at': ended_at, - 'first': first, - 'before': before, - 'after': after, - 'is_featured': is_featured, - } - ), - ) - - match req.status_code: - case st.OK: - json = await req.json() - clips = s.Clips.model_validate(json) - - for clip in clips.data: - yield clip - - if isinstance(clips.pagination, s.Pagination): - after = clips.pagination.cursor - - if after is None: - break - - case st.BAD_REQUEST | st.UNAUTHORIZED | st.NOT_FOUND: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError( - req.status_code, 'Internal Server Error' - ) + async for data in self._paginate( + '/clips', + s.Clips, + self._auth(access_token), + self.clean_dict( + { + 'broadcaster_id': broadcaster_id, + 'game_id': game_id, + 'clip_id': clip_id, + 'started_at': started_at, + 'ended_at': ended_at, + 'first': first, + 'before': before, + 'is_featured': is_featured, + } + ), + cache_ttl, + ): + for item in data.data: + yield item async def get_clips_downloads( self, @@ -1494,16 +966,12 @@ class TwitchAPIClient(OxideHTTP): broadcaster_id: int, clip_id: str, *, - cache_time: int | None = None, + cache_ttl: int | None = None, ) -> s.ClipsDownloads: req = await self.get( '/clips/downloads', - headers=self.clean_dict( - { - 'Authorization': f'Bearer {access_token}', - 'X-Cache-TTL': str(cache_time), - } - ), + cache_ttl=cache_ttl, + headers=self._auth(access_token), params=self.clean_dict( { 'editor_id': editor_id, @@ -1513,108 +981,49 @@ class TwitchAPIClient(OxideHTTP): ), ) - match req.status_code: - case st.OK: - return s.ClipsDownloads.model_validate(await req.json()) - - case ( - st.BAD_REQUEST - | st.UNAUTHORIZED - | st.FORBIDDEN - | st.INTERNAL_SERVER_ERROR - ): - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.ClipsDownloads) async def get_conduits( - self, access_token: str, *, cache_time: int | None = None + self, access_token: str, *, cache_ttl: int | None = None ) -> s.Conduits: req = await self.get( '/eventsub/conduits', - headers=self.clean_dict( - { - 'Authorization': f'Bearer {access_token}', - 'X-Cache-TTL': str(cache_time), - } - ), + cache_ttl=cache_ttl, + headers=self._auth(access_token), ) - match req.status_code: - case st.OK: - return s.Conduits.model_validate(await req.json()) - - case st.UNAUTHORIZED: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.Conduits) async def create_conduits( self, access_token: str, shard_count: int ) -> s.Conduits: req = await self.post( '/eventsub/conduits', - headers={'Authorization': f'Bearer {access_token}'}, + headers=self._auth(access_token), json={'shard_count': shard_count}, ) - match req.status_code: - case st.OK: - return s.Conduits.model_validate(await req.json()) - - case st.UNAUTHORIZED: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.Conduits) async def update_conduits( self, access_token: str, conduit_id: str, shard_count: int ) -> s.Conduits: req = await self.patch( '/eventsub/conduits', - headers={'Authorization': f'Bearer {access_token}'}, + headers=self._auth(access_token), json={'shard_count': shard_count, 'id': conduit_id}, ) - match req.status_code: - case st.OK: - return s.Conduits.model_validate(await req.json()) - - case st.BAD_REQUEST | st.UNAUTHORIZED | st.NOT_FOUND: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.Conduits) async def delete_conduit(self, access_token: str, conduit_id: str) -> bool: req = await self.delete( '/eventsub/conduits', - headers={'Authorization': f'Bearer {access_token}'}, + headers=self._auth(access_token), params={'id': conduit_id}, ) - match req.status_code: - case st.NO_CONTENT: - return True - - case st.BAD_REQUEST | st.UNAUTHORIZED | st.NOT_FOUND: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, None) async def get_conduit_shards( self, @@ -1622,47 +1031,17 @@ class TwitchAPIClient(OxideHTTP): conduit_id: str, *, status: str | None = None, - cache_time: int | None = None, + cache_ttl: int | None = None, ) -> AsyncGenerator[s.ConduitShard]: - after = None - - while True: - req = await self.get( - '/eventsub/conduits/shards', - headers=self.clean_dict( - { - 'Authorization': f'Bearer {access_token}', - 'X-Cache-TTL': str(cache_time), - } - ), - params=self.clean_dict( - {'id': conduit_id, 'status': status, 'after': after} - ), - ) - - match req.status_code: - case st.OK: - json = await req.json() - shards = s.ConduitShards.model_validate(json) - - for shard in shards.data: - yield shard - - if isinstance(shards.pagination, s.Pagination): - after = shards.pagination.cursor - - if after is None: - break - - case st.BAD_REQUEST | st.UNAUTHORIZED | st.NOT_FOUND: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError( - req.status_code, 'Internal Server Error' - ) + async for data in self._paginate( + '/eventsub/conduits/shards', + s.ConduitShards, + self._auth(access_token), + self.clean_dict({'id': conduit_id, 'status': status}), + cache_ttl, + ): + for item in data.data: + yield item async def update_conduit_shards( self, @@ -1672,21 +1051,11 @@ class TwitchAPIClient(OxideHTTP): ) -> s.ConduitShards: req = await self.patch( '/eventsub/conduits/shards', - headers={'Authorization': f'Bearer {access_token}'}, + headers=self._auth(access_token), json={'conduit_id': conduit_id, 'shards': shards}, ) - match req.status_code: - case st.ACCEPTED: - return s.ConduitShards.model_validate(await req.json()) - - case st.BAD_REQUEST | st.UNAUTHORIZED | st.NOT_FOUND: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.ConduitShards) async def get_content_classification_labels( self, @@ -1722,26 +1091,17 @@ class TwitchAPIClient(OxideHTTP): 'zh-CN', 'zh-TW', ] = 'en-US', + *, + cache_ttl: int | None = None, ) -> s.ContentClassificationLabels: req = await self.get( '/content_classification_labels', - headers={'Authorization': f'Bearer {access_token}'}, + cache_ttl=cache_ttl, + headers=self._auth(access_token), params={'locale': locale}, ) - match req.status_code: - case st.OK: - return s.ContentClassificationLabels.model_validate( - await req.json() - ) - - case st.UNAUTHORIZED: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.ContentClassificationLabels) async def get_drops_entitlements( self, @@ -1752,59 +1112,24 @@ class TwitchAPIClient(OxideHTTP): game_id: int | None = None, fulfillment_status: Literal['CLAIMED', 'FULFILLED'] | None = None, first: int = 20, - cache_time: int | None = None, + cache_ttl: int | None = None, ) -> AsyncGenerator[s.DropEntitlement]: - after = None - - while True: - req = await self.get( - '/entitlements/drops', - headers=self.clean_dict( - { - 'Authorization': f'Bearer {access_token}', - 'X-Cache-TTL': str(cache_time), - } - ), - params=self.clean_dict( - { - 'drop_id': drop_id, - 'user_id': user_id, - 'game_id': game_id, - 'fulfillment_status': fulfillment_status, - 'after': after, - 'first': first, - } - ), - ) - - match req.status_code: - case st.OK: - json = await req.json() - entitlements = s.DropsEntitlements.model_validate(json) - - for entitlement in entitlements.data: - yield entitlement - - if isinstance(entitlements.pagination, s.Pagination): - after = entitlements.pagination.cursor - - if after is None: - break - - case ( - st.BAD_REQUEST - | st.UNAUTHORIZED - | st.FORBIDDEN - | st.INTERNAL_SERVER_ERROR - ): - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError( - req.status_code, 'Internal Server Error' - ) + async for data in self._paginate( + '/entitlements/drops', + s.DropsEntitlements, + self._auth(access_token), + self.clean_dict( + { + 'drop_id': drop_id, + 'user_id': user_id, + 'game_id': game_id, + 'fulfillment_status': fulfillment_status, + } + ), + cache_ttl, + ): + for item in data.data: + yield item async def update_drops_entitlements( self, @@ -1815,7 +1140,7 @@ class TwitchAPIClient(OxideHTTP): ) -> s.UpdateDropsEntitlements: req = await self.patch( '/entitlements/drops', - headers={'Authorization': f'Bearer {access_token}'}, + headers=self._auth(access_token), json=self.clean_dict( { 'entitlement_ids': entitlement_ids, @@ -1824,19 +1149,7 @@ class TwitchAPIClient(OxideHTTP): ), ) - match req.status_code: - case st.OK: - return s.UpdateDropsEntitlements.model_validate( - await req.json() - ) - - case st.BAD_REQUEST | st.UNAUTHORIZED | st.INTERNAL_SERVER_ERROR: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.UpdateDropsEntitlements) async def get_extension_configuration_segment( self, @@ -1845,16 +1158,12 @@ class TwitchAPIClient(OxideHTTP): broadcaster_id: int | None = None, extension_id: int | None = None, segment: Literal['broadcaster', 'developer', 'global'] | None = None, - cache_time: int | None = None, + cache_ttl: int | None = None, ) -> s.ExtensionConfigurationSegment: req = await self.get( '/extensions/configurations', - headers=self.clean_dict( - { - 'Authorization': f'Bearer {jwt_token}', - 'X-Cache-TTL': str(cache_time), - } - ), + cache_ttl=cache_ttl, + headers=self._auth(jwt_token), params=self.clean_dict( { 'broadcaster_id': broadcaster_id, @@ -1864,19 +1173,7 @@ class TwitchAPIClient(OxideHTTP): ), ) - match req.status_code: - case st.OK: - return s.ExtensionConfigurationSegment.model_validate( - await req.json() - ) - - case st.BAD_REQUEST | st.UNAUTHORIZED | st.TOO_MANY_REQUESTS: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.ExtensionConfigurationSegment) async def set_extension_configuration_segment( self, @@ -1890,7 +1187,7 @@ class TwitchAPIClient(OxideHTTP): ) -> bool: req = await self.put( '/extensions/configurations', - headers={'Authorization': f'Bearer {jwt_token}'}, + headers=self._auth(jwt_token), json=self.clean_dict( { 'extension_id': extension_id, @@ -1902,17 +1199,7 @@ class TwitchAPIClient(OxideHTTP): ), ) - match req.status_code: - case st.NO_CONTENT: - return True - - case st.BAD_REQUEST | st.UNAUTHORIZED: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, None) async def set_extension_required_configurations( self, @@ -1924,7 +1211,7 @@ class TwitchAPIClient(OxideHTTP): ) -> bool: req = await self.put( '/extensions/configurations/required', - headers={'Authorization': f'Bearer {jwt_token}'}, + headers=self._auth(jwt_token), json={ 'broadcaster_id': broadcaster_id, 'extension_id': extension_id, @@ -1933,17 +1220,7 @@ class TwitchAPIClient(OxideHTTP): }, ) - match req.status_code: - case st.NO_CONTENT: - return True - - case st.BAD_REQUEST | st.UNAUTHORIZED: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, None) async def send_extension_pubsub_message( self, @@ -1956,7 +1233,7 @@ class TwitchAPIClient(OxideHTTP): ) -> bool: req = await self.post( '/extensions/pubsub', - headers={'Authorization': f'Bearer {jwt_token}'}, + headers=self._auth(jwt_token), json=self.clean_dict( { 'target': target, @@ -1967,17 +1244,7 @@ class TwitchAPIClient(OxideHTTP): ), ) - match req.status_code: - case st.NO_CONTENT: - return True - - case st.BAD_REQUEST | st.UNAUTHORIZED | st.UNPROCESSABLE_ENTITY: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, None) async def get_extension_live_channels( self, @@ -1985,102 +1252,48 @@ class TwitchAPIClient(OxideHTTP): extension_id: str, *, first: int | None = None, - cache_time: int | None = None, + cache_ttl: int | None = None, ) -> AsyncGenerator[s.ExtensionLiveChannel]: - after = None - - while True: - req = await self.get( - '/extensions/live', - headers=self.clean_dict( - { - 'Authorization': f'Bearer {access_token}', - 'X-Cache-TTL': str(cache_time), - } - ), - params=self.clean_dict( - { - 'extension_id': extension_id, - 'first': first, - 'after': after, - } - ), - ) - - match req.status_code: - case st.OK: - json = await req.json() - extensions = s.ExtensionLiveChannels.model_validate(json) - - for extension in extensions.data: - yield extension - - if isinstance(extensions.paginaiton, s.Pagination): - after = extensions.paginaiton.cursor - - if after is None: - break - - case st.BAD_REQUEST | st.UNAUTHORIZED | st.NOT_FOUND: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError( - req.status_code, 'Internal Server Error' - ) + async for data in self._paginate( + '/extensions/live', + s.ExtensionLiveChannels, + self._auth(access_token), + self.clean_dict( + { + 'extension_id': extension_id, + } + ), + cache_ttl, + ): + for item in data.data: + yield item async def get_extension_secrets( self, jwt_token: str, extension_id: str, *, - cache_time: int | None = None, + cache_ttl: int | None = None, ) -> s.ExtensionSecrets: req = await self.get( '/extensions/jwt/secrets', - headers=self.clean_dict( - { - 'Authorization': f'Bearer {jwt_token}', - 'X-Cache-TTL': str(cache_time), - } - ), + cache_ttl=cache_ttl, + headers=self._auth(jwt_token), params={'extension_id': extension_id}, ) - match req.status_code: - case st.OK: - return s.ExtensionSecrets.model_validate(await req.json()) - - case st.BAD_REQUEST | st.UNAUTHORIZED: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.ExtensionSecrets) async def create_extension_secret( self, jwt_token: str, extension_id: str, delay: int = 300 ) -> s.ExtensionSecrets: req = await self.post( '/extensions/jwt/secrets', - headers={'Authorization': f'Bearer {jwt_token}'}, + headers=self._auth(jwt_token), params={'extension_id': extension_id, 'delay': delay}, ) - match req.status_code: - case st.OK: - return s.ExtensionSecrets.model_validate(await req.json()) - - case st.BAD_REQUEST | st.UNAUTHORIZED: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.ExtensionSecrets) async def send_extension_chat_message( self, @@ -2092,7 +1305,7 @@ class TwitchAPIClient(OxideHTTP): ) -> bool: req = await self.post( '/extensions/chat', - headers={'Authorization': f'Bearer {jwt_token}'}, + headers=self._auth(jwt_token), params={'broadcaster_id': broadcaster_id}, json={ 'text': text, @@ -2101,17 +1314,7 @@ class TwitchAPIClient(OxideHTTP): }, ) - match req.status_code: - case st.NO_CONTENT: - return True - - case st.BAD_REQUEST | st.UNAUTHORIZED: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, None) async def get_extensions( self, @@ -2119,16 +1322,12 @@ class TwitchAPIClient(OxideHTTP): extension_id: str, *, extension_version: str | None = None, - cache_time: int | None = None, + cache_ttl: int | None = None, ) -> s.Extensions: req = await self.get( '/extensions', - headers=self.clean_dict( - { - 'Authorization': f'Bearer {jwt_token}', - 'X-Cache-TTL': str(cache_time), - } - ), + cache_ttl=cache_ttl, + headers=self._auth(jwt_token), params=self.clean_dict( { 'extension_id': extension_id, @@ -2137,17 +1336,7 @@ class TwitchAPIClient(OxideHTTP): ), ) - match req.status_code: - case st.OK: - return s.Extensions.model_validate(await req.json()) - - case st.BAD_REQUEST | st.UNAUTHORIZED: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.Extensions) async def get_released_extension( self, @@ -2155,16 +1344,12 @@ class TwitchAPIClient(OxideHTTP): extension_id: str, *, extension_version: str | None = None, - cache_time: int | None = None, + cache_ttl: int | None = None, ) -> s.Extensions: req = await self.get( '/extensions/released', - headers=self.clean_dict( - { - 'Authorization': f'Bearer {access_token}', - 'X-Cache-TTL': str(cache_time), - } - ), + cache_ttl=cache_ttl, + headers=self._auth(access_token), params=self.clean_dict( { 'extension_id': extension_id, @@ -2173,33 +1358,19 @@ class TwitchAPIClient(OxideHTTP): ), ) - match req.status_code: - case st.OK: - return s.Extensions.model_validate(await req.json()) - - case st.BAD_REQUEST | st.UNAUTHORIZED | st.NOT_FOUND: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.Extensions) async def get_extension_bits_products( self, access_token: str, *, should_include_all: bool | None = None, - cache_time: int | None = None, + cache_ttl: int | None = None, ) -> s.ExtensionBitsProducts: req = await self.get( '/bits/extensions', - headers=self.clean_dict( - { - 'Authorization': f'Bearer {access_token}', - 'X-Cache-TTL': str(cache_time), - } - ), + cache_ttl=cache_ttl, + headers=self._auth(access_token), params=self.clean_dict( { 'should_include_all': should_include_all, @@ -2207,17 +1378,7 @@ class TwitchAPIClient(OxideHTTP): ), ) - match req.status_code: - case st.OK: - return s.ExtensionBitsProducts.model_validate(await req.json()) - - case st.BAD_REQUEST | st.UNAUTHORIZED: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.ExtensionBitsProducts) async def update_extension_bits_product( self, @@ -2232,7 +1393,7 @@ class TwitchAPIClient(OxideHTTP): ) -> s.ExtensionBitsProducts: req = await self.put( '/bits/extensions', - headers={'Authorization': f'Bearer {access_token}'}, + headers=self._auth(access_token), json=self.clean_dict( { 'sku': sku, @@ -2248,17 +1409,7 @@ class TwitchAPIClient(OxideHTTP): ), ) - match req.status_code: - case st.NO_CONTENT: - return s.ExtensionBitsProducts.model_validate(await req.json()) - - case st.BAD_REQUEST | st.UNAUTHORIZED: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.ExtensionBitsProducts) async def create_eventsub_subscription( self, @@ -2275,7 +1426,7 @@ class TwitchAPIClient(OxideHTTP): ) -> s.EventsubBaseSubscriptions: req = await self.post( '/eventsub/subscriptions', - headers={'Authorization': f'Bearer {access_token}'}, + headers=self._auth(access_token), json={ 'type': sub_type, 'version': version, @@ -2292,46 +1443,18 @@ class TwitchAPIClient(OxideHTTP): }, ) - match req.status_code: - case st.ACCEPTED: - return s.EventsubBaseSubscriptions.model_validate( - await req.json() - ) - - case ( - st.BAD_REQUEST - | st.UNAUTHORIZED - | st.FORBIDDEN - | st.CONFLICT - | st.TOO_MANY_REQUESTS - ): - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.EventsubBaseSubscriptions) async def delete_eventsub_subscription( self, access_token: str, sub_id: str - ) -> None: + ) -> bool: req = await self.delete( '/eventsub/subscriptions', - headers={'Authorization': f'Bearer {access_token}'}, + headers=self._auth(access_token), params={'id': sub_id}, ) - match req.status_code: - case st.NO_CONTENT: - return None - - case st.BAD_REQUEST | st.UNAUTHORIZED | st.NOT_FOUND: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, None) async def get_eventsub_subscriptions( self, @@ -2341,107 +1464,39 @@ class TwitchAPIClient(OxideHTTP): sub_type: sub_type.Any | None = None, user_id: int | None = None, subscription_id: str | None = None, - cache_time: int | None = None, + cache_ttl: int | None = None, ) -> AsyncGenerator[tuple[int, sub.Any]]: - after = None - - while True: - req = await self.get( - '/eventsub/subscriptions', - headers=self.clean_dict( - { - 'Authorization': f'Bearer {access_token}', - 'X-Cache-TTL': str(cache_time), - } - ), - params=self.clean_dict( - { - 'status': status, - 'type': sub_type, - 'user_id': user_id, - 'subscription_id': subscription_id, - 'after': after, - } - ), - ) - - match req.status_code: - case st.OK: - json = await req.json() - subscriptions = s.EventsubBaseSubscriptions.model_validate( - json - ) - - for subscription in subscriptions.data: - yield subscriptions.total, subscription - - if isinstance(subscriptions.pagination, s.Pagination): - after = subscriptions.pagination.cursor - - if after is None: - break - - case st.BAD_REQUEST | st.UNAUTHORIZED: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError( - req.status_code, 'Internal Server Error' - ) + async for data in self._paginate( + '/eventsub/subscriptions', + s.EventsubBaseSubscriptions, + self._auth(access_token), + { + 'status': status, + 'type': sub_type, + 'user_id': user_id, + 'subscription_id': subscription_id, + }, + cache_ttl, + ): + for item in data.data: + yield data.total, item async def get_top_games( self, access_token: str, *, first: int = 20, - before: str | None = None, - cache_time: int | None = None, + cache_ttl: int | None = None, ) -> AsyncGenerator[s.Game]: - after = None - - while True: - req = await self.get( - '/games/top', - headers=self.clean_dict( - { - 'Authorization': f'Bearer {access_token}', - 'X-Cache-TTL': str(cache_time), - } - ), - params=self.clean_dict( - { - 'first': first, - 'after': after, - 'before': before, - } - ), - ) - - match req.status_code: - case st.OK: - json = await req.json() - games = s.Games.model_validate(json) - - for game in games.data: - yield game - - if isinstance(games.pagination, s.Pagination): - after = games.pagination.cursor - - if after is None: - break - - case st.BAD_REQUEST | st.UNAUTHORIZED: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError( - req.status_code, 'Internal Server Error' - ) + async for data in self._paginate( + '/games/top', + s.Games, + self._auth(access_token), + {'first': first}, + cache_ttl, + ): + for item in data.data: + yield item async def get_games( self, @@ -2450,16 +1505,12 @@ class TwitchAPIClient(OxideHTTP): name: str, *, igdb_id: str | None = None, - cache_time: int | None = None, + cache_ttl: int | None = None, ) -> s.Games: req = await self.get( '/games', - headers=self.clean_dict( - { - 'Authorization': f'Bearer {access_token}', - 'X-Cache-TTL': str(cache_time), - } - ), + cache_ttl=cache_ttl, + headers=self._auth(access_token), params=self.clean_dict( { 'id': game_id, @@ -2469,47 +1520,23 @@ class TwitchAPIClient(OxideHTTP): ), ) - match req.status_code: - case st.OK: - return s.Games.model_validate(await req.json()) - - case st.BAD_REQUEST | st.UNAUTHORIZED: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.Games) async def get_creator_goals( self, access_token: str, broadcaster_id: int, *, - cache_time: int | None = None, + cache_ttl: int | None = None, ) -> s.CreatorGoals: req = await self.get( '/creator_goals', - headers=self.clean_dict( - { - 'Authorization': f'Bearer {access_token}', - 'X-Cache-TTL': str(cache_time), - } - ), + cache_ttl=cache_ttl, + headers=self._auth(access_token), params={'broadcaster_id': broadcaster_id}, ) - match req.status_code: - case st.OK: - return s.CreatorGoals.model_validate(await req.json()) - - case st.BAD_REQUEST | st.UNAUTHORIZED: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.CreatorGoals) # TODO: implement guest star endpoints @@ -2518,65 +1545,32 @@ class TwitchAPIClient(OxideHTTP): access_token: str, broadcaster_id: int, *, - cache_time: int | None = None, + cache_ttl: int | None = None, ) -> s.HypeTrainStatus: req = await self.get( '/hypetrain/status', - headers=self.clean_dict( - { - 'Authorization': f'Bearer {access_token}', - 'X-Cache-TTL': str(cache_time), - } - ), + cache_ttl=cache_ttl, + headers=self._auth(access_token), params={'broadcaster_id': broadcaster_id}, ) - match req.status_code: - case st.OK: - return s.HypeTrainStatus.model_validate(await req.json()) - - case st.BAD_REQUEST | st.UNAUTHORIZED | st.INTERNAL_SERVER_ERROR: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.HypeTrainStatus) async def check_automod_status( self, access_token: str, broadcaster_id: int, *, - cache_time: int | None = None, + cache_ttl: int | None = None, ) -> s.AutomodStatus: req = await self.get( '/moderation/enforcements/status', - headers=self.clean_dict( - { - 'Authorization': f'Bearer {access_token}', - 'X-Cache-TTL': str(cache_time), - } - ), + cache_ttl=cache_ttl, + headers=self._auth(access_token), params={'broadcaster_id': broadcaster_id}, ) - match req.status_code: - case st.OK: - return s.AutomodStatus.model_validate(await req.json()) - - case ( - st.BAD_REQUEST - | st.UNAUTHORIZED - | st.FORBIDDEN - | st.TOO_MANY_REQUESTS - ): - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.AutomodStatus) async def manage_held_automod_messages( self, @@ -2587,23 +1581,11 @@ class TwitchAPIClient(OxideHTTP): ) -> bool: req = await self.post( '/moderation/automod/message', - headers={'Authorization': f'Bearer {access_token}'}, + headers=self._auth(access_token), params={'action': action, 'msg_id': msg_id, 'user_id': user_id}, ) - match req.status_code: - case st.NO_CONTENT: - return True - - case ( - st.BAD_REQUEST | st.UNAUTHORIZED | st.FORBIDDEN | st.NOT_FOUND - ): - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, None) async def get_automod_settings( self, @@ -2611,33 +1593,19 @@ class TwitchAPIClient(OxideHTTP): broadcaster_id: int, moderator_id: int, *, - cache_time: int | None = None, + cache_ttl: int | None = None, ) -> s.AutomodSettings: req = await self.get( '/moderation/automod/settings', - headers=self.clean_dict( - { - 'Authorization': f'Bearer {access_token}', - 'X-Cache-TTL': str(cache_time), - } - ), + cache_ttl=cache_ttl, + headers=self._auth(access_token), params={ 'broadcaster_id': broadcaster_id, 'moderator_id': moderator_id, }, ) - match req.status_code: - case st.OK: - return s.AutomodSettings.model_validate(await req.json()) - - case st.BAD_REQUEST | st.UNAUTHORIZED | st.FORBIDDEN: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.AutomodSettings) async def update_automod_settings( self, @@ -2657,7 +1625,7 @@ class TwitchAPIClient(OxideHTTP): ) -> s.AutomodSettings: req = await self.put( '/moderation/automod/settings', - headers={'Authorization': f'Bearer {access_token}'}, + headers=self._auth(access_token), json=self.clean_dict( { 'broadcaster_id': broadcaster_id, @@ -2675,17 +1643,7 @@ class TwitchAPIClient(OxideHTTP): ), ) - match req.status_code: - case st.NO_CONTENT: - return s.AutomodSettings.model_validate(await req.json()) - - case st.BAD_REQUEST | st.UNAUTHORIZED | st.FORBIDDEN: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.AutomodSettings) async def get_banned_users( self, @@ -2694,48 +1652,21 @@ class TwitchAPIClient(OxideHTTP): user_id: int | list[int], *, first: int = 20, - before: str | None = None, + cache_ttl: int | None = None, ) -> AsyncGenerator[s.BannedUser]: - after = None - - while True: - req = await self.get( - '/moderation/banned', - headers={'Authorization': f'Bearer {access_token}'}, - params=self.clean_dict( - { - 'broadcaster_id': broadcaster_id, - 'user_id': user_id, - 'first': first, - 'after': after, - 'before': before, - } - ), - ) - - match req.status_code: - case st.OK: - json = await req.json() - banned = s.BannedUsers.model_validate(json) - - for user in banned.data: - yield user - - if isinstance(banned.pagination, s.Pagination): - after = banned.pagination.cursor - - if after is None: - break - - case st.BAD_REQUEST | st.UNAUTHORIZED: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError( - req.status_code, 'Internal Server Error' - ) + async for data in self._paginate( + '/moderation/banned', + s.BannedUsers, + self._auth(access_token), + { + 'broadcaster_id': broadcaster_id, + 'user_id': user_id, + 'first': first, + }, + cache_ttl, + ): + for item in data.data: + yield item async def ban_user( self, @@ -2749,7 +1680,7 @@ class TwitchAPIClient(OxideHTTP): ) -> s.BanUser: req = await self.post( '/moderation/banned', - headers={'Authorization': f'Bearer {access_token}'}, + headers=self._auth(access_token), params={ 'broadcaster_id': broadcaster_id, 'moderator_id': moderator_id, @@ -2763,23 +1694,7 @@ class TwitchAPIClient(OxideHTTP): ), ) - match req.status_code: - case st.OK: - return s.BanUser.model_validate(await req.json()) - - case ( - st.BAD_REQUEST - | st.UNAUTHORIZED - | st.FORBIDDEN - | st.CONFLICT - | st.TOO_MANY_REQUESTS - ): - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.BanUser) async def unban_user( self, @@ -2787,10 +1702,10 @@ class TwitchAPIClient(OxideHTTP): broadcaster_id: int, moderator_id: int, user_id: int, - ) -> None: + ) -> bool: req = await self.delete( '/moderation/bans', - headers={'Authorization': f'Bearer {access_token}'}, + headers=self._auth(access_token), params={ 'broadcaster_id': broadcaster_id, 'moderator_id': moderator_id, @@ -2798,23 +1713,7 @@ class TwitchAPIClient(OxideHTTP): }, ) - match req.status_code: - case st.NO_CONTENT: - return - - case ( - st.BAD_REQUEST - | st.UNAUTHORIZED - | st.FORBIDDEN - | st.CONFLICT - | st.TOO_MANY_REQUESTS - ): - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, None) async def get_unban_requests( self, @@ -2827,48 +1726,23 @@ class TwitchAPIClient(OxideHTTP): *, user_id: int | None = None, first: int | None = None, + cache_ttl: int | None = None, ) -> AsyncGenerator[s.UnbanRequest]: - after = None - - while True: - req = await self.get( - '/moderation/unbans', - headers={'Authorization': f'Bearer {access_token}'}, - params=self.clean_dict( - { - 'broadcaster_id': broadcaster_id, - 'moderator_id': moderator_id, - 'status': status, - 'user_id': user_id, - 'after': after, - 'first': first, - } - ), - ) - - match req.status_code: - case st.OK: - json = await req.json() - unban_requests = s.UnbanRequests.model_validate(json) - - for unban_request in unban_requests.data: - yield unban_request - - if isinstance(unban_requests.pagination, s.Pagination): - after = unban_requests.pagination.cursor - - if after is None: - break - - case st.BAD_REQUEST | st.UNAUTHORIZED: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError( - req.status_code, 'Internal Server Error' - ) + async for data in self._paginate( + '/moderation/unbans', + s.UnbanRequests, + self._auth(access_token), + { + 'broadcaster_id': broadcaster_id, + 'moderator_id': moderator_id, + 'status': status, + 'user_id': user_id, + 'first': first, + }, + cache_ttl, + ): + for item in data.data: + yield item async def resolve_unban_request( self, @@ -2882,7 +1756,7 @@ class TwitchAPIClient(OxideHTTP): ) -> s.UnbanRequests: req = await self.patch( '/moderation/unban_requests', - headers={'Authorization': f'Bearer {access_token}'}, + headers=self._auth(access_token), params=self.clean_dict( { 'broadcaster_id': broadcaster_id, @@ -2894,17 +1768,7 @@ class TwitchAPIClient(OxideHTTP): ), ) - match req.status_code: - case st.OK: - return s.UnbanRequests.model_validate(await req.json()) - - case st.BAD_REQUEST | st.UNAUTHORIZED | st.NOT_FOUND: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.UnbanRequests) async def get_blocked_terms( self, @@ -2913,52 +1777,21 @@ class TwitchAPIClient(OxideHTTP): moderator_id: int, *, first: int | None = None, - cache_time: int | None = None, + cache_ttl: int | None = None, ) -> AsyncGenerator[s.BlockedTerm]: - after = None - - while True: - req = await self.get( - '/moderation/blocked_terms', - headers=self.clean_dict( - { - 'Authorization': f'Bearer {access_token}', - 'X-Cache-TTL': str(cache_time), - } - ), - params=self.clean_dict( - { - 'broadcaster_id': broadcaster_id, - 'moderator_id': moderator_id, - 'first': first, - 'after': after, - } - ), - ) - - match req.status_code: - case st.OK: - json = await req.json() - blocked_terms = s.BlockedTerms.model_validate(json) - - for blocked_term in blocked_terms.data: - yield blocked_term - - if isinstance(blocked_terms.pagination, s.Pagination): - after = blocked_terms.pagination.cursor - - if after is None: - break - - case st.BAD_REQUEST | st.UNAUTHORIZED | st.FORBIDDEN: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError( - req.status_code, 'Internal Server Error' - ) + async for data in self._paginate( + '/moderation/blocked_terms', + s.BlockedTerms, + self._auth(access_token), + { + 'broadcaster_id': broadcaster_id, + 'moderator_id': moderator_id, + 'first': first, + }, + cache_ttl, + ): + for item in data.data: + yield item async def add_blocked_term( self, @@ -2969,7 +1802,7 @@ class TwitchAPIClient(OxideHTTP): ) -> s.BlockedTerms: req = await self.post( '/moderation/blocked_terms', - headers={'Authorization': f'Bearer {access_token}'}, + headers=self._auth(access_token), params={ 'broadcaster_id': broadcaster_id, 'moderator_id': moderator_id, @@ -2977,17 +1810,7 @@ class TwitchAPIClient(OxideHTTP): }, ) - match req.status_code: - case st.OK: - return s.BlockedTerms.model_validate(await req.json()) - - case st.BAD_REQUEST | st.UNAUTHORIZED | st.FORBIDDEN: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.BlockedTerms) async def remove_blocked_term( self, @@ -2998,7 +1821,7 @@ class TwitchAPIClient(OxideHTTP): ) -> bool: req = await self.delete( '/moderation/blocked_terms', - headers={'Authorization': f'Bearer {access_token}'}, + headers=self._auth(access_token), params={ 'broadcaster_id': broadcaster_id, 'moderator_id': moderator_id, @@ -3006,17 +1829,7 @@ class TwitchAPIClient(OxideHTTP): }, ) - match req.status_code: - case st.NO_CONTENT: - return True - - case st.BAD_REQUEST | st.UNAUTHORIZED | st.FORBIDDEN: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, None) async def delete_chat_message( self, @@ -3028,7 +1841,7 @@ class TwitchAPIClient(OxideHTTP): ) -> bool: req = await self.delete( '/moderation/chat', - headers={'Authorization': f'Bearer {access_token}'}, + headers=self._auth(access_token), params=self.clean_dict( { 'broadcaster_id': broadcaster_id, @@ -3038,19 +1851,7 @@ class TwitchAPIClient(OxideHTTP): ), ) - match req.status_code: - case st.NO_CONTENT: - return True - - case ( - st.BAD_REQUEST | st.UNAUTHORIZED | st.FORBIDDEN | st.NOT_FOUND - ): - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, None) async def get_moderated_channels( self, @@ -3058,53 +1859,20 @@ class TwitchAPIClient(OxideHTTP): user_id: int, *, first: int | None = None, - cache_time: int | None = None, + cache_ttl: int | None = None, ) -> AsyncGenerator[s.ModeratedChannel]: - after = None - - while True: - req = await self.get( - '/moderation/channels', - headers=self.clean_dict( - { - 'Authorization': f'Bearer {access_token}', - 'X-Cache-TTL': str(cache_time), - } - ), - params=self.clean_dict( - { - 'user_id': user_id, - 'after': after, - 'first': first, - } - ), - ) - - match req.status_code: - case st.OK: - json = await req.json() - moderated_channels = s.ModeratedChannels.model_validate( - json - ) - - for moderated_channel in moderated_channels.data: - yield moderated_channel - - if isinstance(moderated_channels.pagination, s.Pagination): - after = moderated_channels.pagination.cursor - - if after is None: - break - - case st.BAD_REQUEST | st.UNAUTHORIZED: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError( - req.status_code, 'Internal Server Error' - ) + async for data in self._paginate( + '/moderation/channels', + s.ModeratedChannels, + self._auth(access_token), + { + 'user_id': user_id, + 'first': first, + }, + cache_ttl, + ): + for item in data.data: + yield item async def get_moderators( self, @@ -3113,99 +1881,45 @@ class TwitchAPIClient(OxideHTTP): *, user_id: int | list[int] | None = None, first: int = 20, - cache_time: int | None = None, + cache_ttl: int | None = None, ) -> AsyncGenerator[s.Moderator]: - after = None - - while True: - req = await self.get( - '/moderation/moderators', - headers=self.clean_dict( - { - 'Authorization': f'Bearer {access_token}', - 'X-Cache-TTL': str(cache_time), - } - ), - params=self.clean_dict( - { - 'broadcaster_id': broadcaster_id, - 'user_id': user_id, - 'first': first, - 'after': after, - } - ), - ) - - match req.status_code: - case st.OK: - json = await req.json() - moderators = s.Moderators.model_validate(json) - - for moderator in moderators.data: - yield moderator - - if isinstance(moderators.pagination, s.Pagination): - after = moderators.pagination.cursor - - if after is None: - break - - case st.BAD_REQUEST | st.UNAUTHORIZED: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError( - req.status_code, 'Internal Server Error' - ) + async for data in self._paginate( + '/moderation/moderators', + s.Moderators, + self._auth(access_token), + self.clean_dict( + { + 'broadcaster_id': broadcaster_id, + 'user_id': user_id, + 'first': first, + } + ), + cache_ttl, + ): + for item in data.data: + yield item async def add_channel_moderator( self, access_token: str, broadcaster_id: int, user_id: int ) -> bool: req = await self.post( '/moderation/moderators', - headers={'Authorization': f'Bearer {access_token}'}, + headers=self._auth(access_token), params={'broadcaster_id': broadcaster_id, 'user_id': user_id}, ) - match req.status_code: - case st.NO_CONTENT: - return True - - case ( - st.BAD_REQUEST - | st.UNAUTHORIZED - | st.UNPROCESSABLE_ENTITY - | st.TOO_MANY_REQUESTS - ): - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, None) async def remove_channel_moderator( self, access_token: str, broadcaster_id: int, user_id: int ) -> bool: req = await self.delete( '/moderation/moderators', - headers={'Authorization': f'Bearer {access_token}'}, + headers=self._auth(access_token), params={'broadcaster_id': broadcaster_id, 'user_id': user_id}, ) - match req.status_code: - case st.NO_CONTENT: - return True - - case st.BAD_REQUEST | st.UNAUTHORIZED | st.TOO_MANY_REQUESTS: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, None) async def get_vips( self, @@ -3214,109 +1928,43 @@ class TwitchAPIClient(OxideHTTP): broadcaster_id: int, *, first: int = 20, - cache_time: int | None = None, + cache_ttl: int | None = None, ) -> AsyncGenerator[s.VIP]: - after = None - - while True: - req = await self.get( - '/channels/vips', - headers=self.clean_dict( - { - 'Authorization': f'Bearer {access_token}', - 'X-Cache-TTL': str(cache_time), - } - ), - params=self.clean_dict( - { - 'user_id': user_id, - 'broadcaster_id': broadcaster_id, - 'first': first, - 'after': after, - } - ), - ) - - match req.status_code: - case st.OK: - json = await req.json() - vips = s.VIPs.model_validate(json) - - for vip in vips.data: - yield vip - - if isinstance(vips.pagination, s.Pagination): - after = vips.pagination.cursor - - if after is None: - break - - case st.BAD_REQUEST | st.UNAUTHORIZED: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError( - req.status_code, 'Internal Server Error' - ) + async for data in self._paginate( + '/channels/vips', + s.VIPs, + self._auth(access_token), + { + 'user_id': user_id, + 'broadcaster_id': broadcaster_id, + 'first': first, + }, + cache_ttl, + ): + for item in data.data: + yield item async def add_channel_vip( self, access_token: str, broadcaster_id: int, user_id: int ) -> bool: req = await self.post( '/channels/vips', - headers={'Authorization': f'Bearer {access_token}'}, + headers=self._auth(access_token), params={'broadcaster_id': broadcaster_id, 'user_id': user_id}, ) - match req.status_code: - case st.NO_CONTENT: - return True - - case ( - st.BAD_REQUEST - | st.UNAUTHORIZED - | st.FORBIDDEN - | st.NOT_FOUND - | st.CONFLICT - | st.UNPROCESSABLE_ENTITY - | st.TOO_MANY_REQUESTS - ): - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, None) async def remove_channel_vip( self, access_token: str, broadcaster_id: int, user_id: int ) -> bool: req = await self.delete( '/channels/vips', - headers={'Authorization': f'Bearer {access_token}'}, + headers=self._auth(access_token), params={'broadcaster_id': broadcaster_id, 'user_id': user_id}, ) - match req.status_code: - case st.NO_CONTENT: - return True - - case ( - st.BAD_REQUEST - | st.UNAUTHORIZED - | st.FORBIDDEN - | st.NOT_FOUND - | st.UNPROCESSABLE_ENTITY - | st.TOO_MANY_REQUESTS - ): - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, None) async def update_shield_mode_status( self, @@ -3328,7 +1976,7 @@ class TwitchAPIClient(OxideHTTP): ) -> s.ShieldModeStatus: req = await self.put( '/moderation/shield_mode', - headers={'Authorization': f'Bearer {access_token}'}, + headers=self._auth(access_token), params={ 'broadcaster_id': broadcaster_id, 'moderator_id': moderator_id, @@ -3336,17 +1984,7 @@ class TwitchAPIClient(OxideHTTP): }, ) - match req.status_code: - case st.OK: - return s.ShieldModeStatus.model_validate(await req.json()) - - case st.BAD_REQUEST | st.UNAUTHORIZED | st.FORBIDDEN: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.ShieldModeStatus) async def get_shield_mode_status( self, @@ -3354,33 +1992,19 @@ class TwitchAPIClient(OxideHTTP): broadcaster_id: int, moderator_id: int, *, - cache_time: int | None = None, + cache_ttl: int | None = None, ) -> s.ShieldModeStatus: req = await self.get( '/moderation/shield_mode', - headers=self.clean_dict( - { - 'Authorization': f'Bearer {access_token}', - 'X-Cache-TTL': str(cache_time), - } - ), + cache_ttl=cache_ttl, + headers=self._auth(access_token), params={ 'broadcaster_id': broadcaster_id, 'moderator_id': moderator_id, }, ) - match req.status_code: - case st.OK: - return s.ShieldModeStatus.model_validate(await req.json()) - - case st.BAD_REQUEST | st.UNAUTHORIZED | st.FORBIDDEN: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.ShieldModeStatus) async def warn_chat_user( self, @@ -3392,7 +2016,7 @@ class TwitchAPIClient(OxideHTTP): ) -> s.UserWarns: req = await self.post( '/moderation/warnings', - headers={'Authorization': f'Bearer {access_token}'}, + headers=self._auth(access_token), params={ 'broadcaster_id': broadcaster_id, 'moderator_id': moderator_id, @@ -3405,24 +2029,7 @@ class TwitchAPIClient(OxideHTTP): }, ) - match req.status_code: - case st.OK: - return s.UserWarns.model_validate(await req.json()) - - case ( - st.BAD_REQUEST - | st.UNAUTHORIZED - | st.FORBIDDEN - | st.CONFLICT - | st.TOO_MANY_REQUESTS - | st.INTERNAL_SERVER_ERROR - ): - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.UserWarns) async def get_polls( self, @@ -3431,52 +2038,23 @@ class TwitchAPIClient(OxideHTTP): *, poll_id: int | list[int] | None = None, first: int = 20, - cache_time: int | None = None, + cache_ttl: int | None = None, ) -> AsyncGenerator[s.Poll]: - after = None - - while True: - req = await self.get( - '/polls', - headers=self.clean_dict( - { - 'Authorization': f'Bearer {access_token}', - 'X-Cache-TTL': str(cache_time), - } - ), - params=self.clean_dict( - { - 'broadcaster_id': broadcaster_id, - 'id': poll_id, - 'first': first, - 'after': after, - } - ), - ) - - match req.status_code: - case st.OK: - json = await req.json() - polls = s.Polls.model_validate(json) - - for poll in polls.data: - yield poll - - if isinstance(polls.pagination, s.Pagination): - after = polls.pagination.cursor - - if after is None: - break - - case st.BAD_REQUEST | st.UNAUTHORIZED | st.NOT_FOUND: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError( - req.status_code, 'Internal Server Error' - ) + async for data in self._paginate( + '/polls', + s.Polls, + self._auth(access_token), + self.clean_dict( + { + 'broadcaster_id': broadcaster_id, + 'id': poll_id, + 'first': first, + } + ), + cache_ttl, + ): + for item in data.data: + yield item async def create_poll( self, @@ -3503,21 +2081,11 @@ class TwitchAPIClient(OxideHTTP): ) req = await self.post( '/polls', - headers={'Authorization': f'Bearer {access_token}'}, + headers=self._auth(access_token), json=body, ) - match req.status_code: - case st.OK: - return s.Polls.model_validate(await req.json()) - - case st.BAD_REQUEST | st.UNAUTHORIZED: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.Polls) async def end_poll( self, @@ -3528,7 +2096,7 @@ class TwitchAPIClient(OxideHTTP): ) -> s.Polls: req = await self.patch( '/polls', - headers={'Authorization': f'Bearer {access_token}'}, + headers=self._auth(access_token), json={ 'broadcaster_id': broadcaster_id, 'id': poll_id, @@ -3536,17 +2104,7 @@ class TwitchAPIClient(OxideHTTP): }, ) - match req.status_code: - case st.OK: - return s.Polls.model_validate(await req.json()) - - case st.BAD_REQUEST | st.UNAUTHORIZED: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.Polls) async def get_predictions( self, @@ -3555,52 +2113,23 @@ class TwitchAPIClient(OxideHTTP): *, prediction_id: int | list[int] | None = None, first: int = 20, - cache_time: int | None = None, + cache_ttl: int | None = None, ) -> AsyncGenerator[s.Prediction]: - after = None - - while True: - req = await self.get( - '/predictions', - headers=self.clean_dict( - { - 'Authorization': f'Bearer {access_token}', - 'X-Cache-TTL': str(cache_time), - } - ), - params=self.clean_dict( - { - 'broadcaster_id': broadcaster_id, - 'id': prediction_id, - 'first': first, - 'after': after, - } - ), - ) - - match req.status_code: - case st.OK: - json = await req.json() - predictions = s.Predictions.model_validate(json) - - for prediction in predictions.data: - yield prediction - - if isinstance(predictions.pagination, s.Pagination): - after = predictions.pagination.cursor - - if after is None: - break - - case st.BAD_REQUEST | st.UNAUTHORIZED: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError( - req.status_code, 'Internal Server Error' - ) + async for data in self._paginate( + '/predictions', + s.Predictions, + self._auth(access_token), + self.clean_dict( + { + 'broadcaster_id': broadcaster_id, + 'id': prediction_id, + 'first': first, + } + ), + cache_ttl, + ): + for item in data.data: + yield item async def create_prediction( self, @@ -3620,21 +2149,11 @@ class TwitchAPIClient(OxideHTTP): ) req = await self.post( '/predictions', - headers={'Authorization': f'Bearer {access_token}'}, + headers=self._auth(access_token), json=body, ) - match req.status_code: - case st.OK: - return s.Predictions.model_validate(await req.json()) - - case st.BAD_REQUEST | st.UNAUTHORIZED | st.TOO_MANY_REQUESTS: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.Predictions) async def end_prediction( self, @@ -3647,7 +2166,7 @@ class TwitchAPIClient(OxideHTTP): ) -> s.Predictions: req = await self.patch( '/predictions', - headers={'Authorization': f'Bearer {access_token}'}, + headers=self._auth(access_token), json=self.clean_dict( { 'broadcaster_id': broadcaster_id, @@ -3658,17 +2177,7 @@ class TwitchAPIClient(OxideHTTP): ), ) - match req.status_code: - case st.OK: - return s.Predictions.model_validate(await req.json()) - - case st.BAD_REQUEST | st.UNAUTHORIZED | st.NOT_FOUND: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.Predictions) async def start_a_raid( self, @@ -3678,56 +2187,25 @@ class TwitchAPIClient(OxideHTTP): ) -> s.Raids: req = await self.post( '/raids', - headers={'Authorization': f'Bearer {access_token}'}, + headers=self._auth(access_token), json={ 'from_broadcaster_id': from_broadcaster_id, 'to_broadcaster_id': to_broadcaster_id, }, ) - match req.status_code: - case st.OK: - return s.Raids.model_validate(await req.json()) - - case ( - st.BAD_REQUEST - | st.UNAUTHORIZED - | st.NOT_FOUND - | st.CONFLICT - | st.TOO_MANY_REQUESTS - ): - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.Raids) async def cancel_a_raid( self, access_token: str, broadcaster_id: int ) -> bool: req = await self.patch( '/raids', - headers={'Authorization': f'Bearer {access_token}'}, + headers=self._auth(access_token), params={'broadcaster_id': broadcaster_id}, ) - match req.status_code: - case st.NO_CONTENT: - return True - - case ( - st.BAD_REQUEST - | st.UNAUTHORIZED - | st.NOT_FOUND - | st.TOO_MANY_REQUESTS - ): - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, None) async def get_channel_stream_schedule( self, @@ -3737,74 +2215,36 @@ class TwitchAPIClient(OxideHTTP): segment_id: int | list[int] | None = None, start_time: datetime | None = None, first: int = 20, - cache_time: int | None = None, + cache_ttl: int | None = None, ) -> AsyncGenerator[s.Schedule]: - after = None - - while True: - req = await self.get( - '/schedule', - headers=self.clean_dict( - { - 'Authorization': f'Bearer {access_token}', - 'X-Cache-TTL': str(cache_time), - } - ), - params=self.clean_dict( - { - 'broadcaster_id': broadcaster_id, - 'id': segment_id, - 'start_time': start_time, - 'first': first, - 'after': after, - } - ), - ) - - match req.status_code: - case st.OK: - json = await req.json() - schedules = s.Schedules.model_validate(json) - - for schedule in schedules.data: - yield schedule - - if isinstance(schedules.pagination, s.Pagination): - after = schedules.pagination.cursor - - if after is None: - break - - case ( - st.BAD_REQUEST - | st.UNAUTHORIZED - | st.FORBIDDEN - | st.NOT_FOUND - ): - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError( - req.status_code, 'Internal Server Error' - ) + async for data in self._paginate( + '/schedule', + s.Schedules, + self._auth(access_token), + self.clean_dict( + { + 'broadcaster_id': broadcaster_id, + 'id': segment_id, + 'start_time': start_time, + 'first': first, + } + ), + cache_ttl, + ): + for item in data.data: + yield item async def get_channel_icalendar( self, access_token: str, broadcaster_id: int, *, - cache_time: int | None = None, + cache_ttl: int | None = None, ) -> bytes: req = await self.get( '/schedule/icalendar', - headers=self.clean_dict( - { - 'Authorization': f'Bearer {access_token}', - 'X-Cache-TTL': str(cache_time), - } - ), + cache_ttl=cache_ttl, + headers=self._auth(access_token), params={'broadcaster_id': broadcaster_id}, ) @@ -3833,7 +2273,7 @@ class TwitchAPIClient(OxideHTTP): ) -> bool: req = await self.patch( '/schedule/settings', - headers={'Authorization': f'Bearer {access_token}'}, + headers=self._auth(access_token), params=self.clean_dict( { 'broadcaster_id': broadcaster_id, @@ -3845,17 +2285,7 @@ class TwitchAPIClient(OxideHTTP): ), ) - match req.status_code: - case st.NO_CONTENT: - return True - - case st.BAD_REQUEST | st.UNAUTHORIZED | st.NOT_FOUND: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, None) async def create_channel_stream_schedule_segment( self, @@ -3871,7 +2301,7 @@ class TwitchAPIClient(OxideHTTP): ) -> s.Schedules: req = await self.post( '/schedule/segment', - headers={'Authorization': f'Bearer {access_token}'}, + headers=self._auth(access_token), params={'broadcaster_id': broadcaster_id}, json=self.clean_dict( { @@ -3885,17 +2315,7 @@ class TwitchAPIClient(OxideHTTP): ), ) - match req.status_code: - case st.OK: - return s.Schedules.model_validate(await req.json()) - - case st.BAD_REQUEST | st.UNAUTHORIZED | st.FORBIDDEN: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.Schedules) async def update_channel_stream_schedule_segment( self, @@ -3912,7 +2332,7 @@ class TwitchAPIClient(OxideHTTP): ) -> s.Schedules: req = await self.patch( '/schedule/segment', - headers={'Authorization': f'Bearer {access_token}'}, + headers=self._auth(access_token), params=self.clean_dict( { 'broadcaster_id': broadcaster_id, @@ -3931,24 +2351,14 @@ class TwitchAPIClient(OxideHTTP): ), ) - match req.status_code: - case st.OK: - return s.Schedules.model_validate(await req.json()) - - case st.BAD_REQUEST | st.UNAUTHORIZED | st.NOT_FOUND: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.Schedules) async def delete_channel_stream_schedule_segment( self, access_token: str, broadcaster_id: int, segment_id: str ) -> bool: req = await self.delete( '/schedule/segment', - headers={'Authorization': f'Bearer {access_token}'}, + headers=self._auth(access_token), params=self.clean_dict( { 'broadcaster_id': broadcaster_id, @@ -3957,17 +2367,7 @@ class TwitchAPIClient(OxideHTTP): ), ) - match req.status_code: - case st.NO_CONTENT: - return True - - case st.BAD_REQUEST | st.UNAUTHORIZED: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, None) async def search_categories( self, @@ -3975,47 +2375,17 @@ class TwitchAPIClient(OxideHTTP): query: str, *, first: int = 20, - cache_time: int | None = None, + cache_ttl: int | None = None, ) -> AsyncGenerator[s.Category]: - after = None - - while True: - req = await self.get( - '/search/categories', - headers=self.clean_dict( - { - 'Authorization': f'Bearer {access_token}', - 'X-Cache-TTL': str(cache_time), - } - ), - params=self.clean_dict( - {'query': query, 'first': first, 'after': after} - ), - ) - - match req.status_code: - case st.OK: - json = await req.json() - categories = s.Categories.model_validate(json) - - for category in categories.data: - yield category - - if isinstance(categories.pagination, s.Pagination): - after = categories.pagination.cursor - - if not after: - break - - case st.BAD_REQUEST | st.UNAUTHORIZED: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError( - req.status_code, 'Internal Server Error' - ) + async for data in self._paginate( + '/search/categories', + s.Categories, + self._auth(access_token), + {'query': query, 'first': first}, + cache_ttl, + ): + for item in data.data: + yield item async def search_channels( self, @@ -4024,82 +2394,35 @@ class TwitchAPIClient(OxideHTTP): *, live_only: bool = False, first: int = 20, - cache_time: int | None = None, + cache_ttl: int | None = None, ) -> AsyncGenerator[s.Channel]: - after = None - - while True: - req = await self.get( - '/search/channels', - headers=self.clean_dict( - { - 'Authorization': f'Bearer {access_token}', - 'X-Cache-TTL': str(cache_time), - } - ), - params=self.clean_dict( - { - 'query': query, - 'live_only': live_only, - 'first': first, - 'after': after, - } - ), - ) - - match req.status_code: - case st.OK: - json = await req.json() - channels = s.Channels.model_validate(json) - - for channel in channels.data: - yield channel - - if isinstance(channels.pagination, s.Pagination): - after = channels.pagination.cursor - - if not after: - break - - case st.BAD_REQUEST | st.UNAUTHORIZED: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError( - req.status_code, 'Internal Server Error' - ) + async for data in self._paginate( + '/search/channels', + s.Channels, + self._auth(access_token), + self.clean_dict( + {'query': query, 'live_only': live_only, 'first': first} + ), + cache_ttl, + ): + for item in data.data: + yield item async def get_stream_key( self, access_token: str, broadcaster_id: int, *, - cache_time: int | None = None, + cache_ttl: int | None = None, ) -> s.StreamKeys: req = await self.get( '/streams/key', - headers=self.clean_dict( - { - 'Authorization': f'Bearer {access_token}', - 'X-Cache-TTL': str(cache_time), - } - ), + cache_ttl=cache_ttl, + headers=self._auth(access_token), params={'broadcaster_id': broadcaster_id}, ) - match req.status_code: - case st.OK: - return s.StreamKeys.model_validate(await req.json()) - - case st.BAD_REQUEST | st.UNAUTHORIZED | st.FORBIDDEN: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.StreamKeys) async def get_streams( self, @@ -4112,56 +2435,27 @@ class TwitchAPIClient(OxideHTTP): language: str | None = None, first: int = 20, before: str | None = None, - cache_time: int | None = None, + cache_ttl: int | None = None, ) -> AsyncGenerator[s.Stream]: - after = None - - while True: - req = await self.get( - '/streams', - headers=self.clean_dict( - { - 'Authorization': f'Bearer {access_token}', - 'X-Cache-TTL': str(cache_time), - } - ), - params=self.clean_dict( - { - 'user_id': user_id, - 'user_login': user_login, - 'game_id': game_id, - 'type': stream_type, - 'language': language, - 'first': first, - 'before': before, - 'after': after, - } - ), - ) - - match req.status_code: - case st.OK: - json = await req.json() - streams = s.Streams.model_validate(json) - - for stream in streams.data: - yield stream - - if isinstance(streams.pagination, s.Pagination): - after = streams.pagination.cursor - - if not after: - break - - case st.BAD_REQUEST | st.UNAUTHORIZED: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError( - req.status_code, 'Internal Server Error' - ) + async for data in self._paginate( + '/streams', + s.Streams, + self._auth(access_token), + self.clean_dict( + { + 'user_id': user_id, + 'user_login': user_login, + 'game_id': game_id, + 'type': stream_type, + 'language': language, + 'first': first, + 'before': before, + } + ), + cache_ttl, + ): + for item in data.data: + yield item async def get_followed_streams( self, @@ -4169,51 +2463,20 @@ class TwitchAPIClient(OxideHTTP): user_id: int, *, first: int = 20, - cache_time: int | None = None, + cache_ttl: int | None = None, ) -> AsyncGenerator[s.Stream]: - after = None - - while True: - req = await self.get( - '/streams/followed', - headers=self.clean_dict( - { - 'Authorization': f'Bearer {access_token}', - 'X-Cache-TTL': str(cache_time), - } - ), - params=self.clean_dict( - { - 'user_id': user_id, - 'first': first, - 'after': after, - } - ), - ) - - match req.status_code: - case st.OK: - json = await req.json() - streams = s.Streams.model_validate(json) - - for stream in streams.data: - yield stream - - if isinstance(streams.pagination, s.Pagination): - after = streams.pagination.cursor - - if not after: - break - - case st.BAD_REQUEST | st.UNAUTHORIZED: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError( - req.status_code, 'Internal Server Error' - ) + async for data in self._paginate( + '/streams/followed', + s.Streams, + self._auth(access_token), + { + 'user_id': user_id, + 'first': first, + }, + cache_ttl, + ): + for item in data.data: + yield item async def create_stream_marker( self, @@ -4224,9 +2487,7 @@ class TwitchAPIClient(OxideHTTP): ) -> s.CreateStreamMarkers: req = await self.post( '/streams/markers', - headers={ - 'Authorization': f'Bearer {access_token}', - }, + headers=self._auth(access_token), json=self.clean_dict( { 'user_id': user_id, @@ -4235,19 +2496,7 @@ class TwitchAPIClient(OxideHTTP): ), ) - match req.status_code: - case st.OK: - return s.CreateStreamMarkers.model_validate(await req.json()) - - case ( - st.BAD_REQUEST | st.UNAUTHORIZED | st.FORBIDDEN | st.NOT_FOUND - ): - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.CreateStreamMarkers) async def get_stream_markers( self, @@ -4257,58 +2506,24 @@ class TwitchAPIClient(OxideHTTP): *, first: int = 20, before: str | None = None, - cache_time: int | None = None, + cache_ttl: int | None = None, ) -> AsyncGenerator[s.StreamMarkersData]: - after = None - - while True: - req = await self.get( - '/streams/markers', - headers=self.clean_dict( - { - 'Authorization': f'Bearer {access_token}', - 'X-Cache-TTL': str(cache_time), - } - ), - params=self.clean_dict( - { - 'user_id': user_id, - 'video_id': video_id, - 'first': first, - 'before': before, - 'after': after, - } - ), - ) - - match req.status_code: - case st.OK: - json = await req.json() - markers = s.StreamMarkers.model_validate(json) - - for marker in markers.data: - yield marker - - if isinstance(markers.pagination, s.Pagination): - after = markers.pagination.cursor - - if not after: - break - - case ( - st.BAD_REQUEST - | st.UNAUTHORIZED - | st.FORBIDDEN - | st.NOT_FOUND - ): - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError( - req.status_code, 'Internal Server Error' - ) + async for data in self._paginate( + '/streams/markers', + s.StreamMarkers, + self._auth(access_token), + self.clean_dict( + { + 'user_id': user_id, + 'video_id': video_id, + 'first': first, + 'before': before, + } + ), + cache_ttl, + ): + for item in data.data: + yield item async def get_broadcaster_subscriptions( self, @@ -4317,58 +2532,23 @@ class TwitchAPIClient(OxideHTTP): *, first: int = 20, before: str | None = None, - cache_time: int | None = None, + cache_ttl: int | None = None, ) -> AsyncGenerator[tuple[int, int, s.BroadcasterSubscription]]: - after = None - - while True: - req = await self.get( - '/subscriptions', - headers=self.clean_dict( - { - 'Authorization': f'Bearer {access_token}', - 'X-Cache-TTL': str(cache_time), - } - ), - params=self.clean_dict( - { - 'broadcaster_id': broadcaster_id, - 'first': first, - 'after': after, - 'before': before, - } - ), - ) - - match req.status_code: - case st.OK: - json = await req.json() - subscriptions = s.BroadcasterSubscriptions.model_validate( - json - ) - - for subscription in subscriptions.data: - yield ( - subscriptions.total, - subscriptions.points, - subscription, - ) - - if isinstance(subscriptions.pagination, s.Pagination): - after = subscriptions.pagination.cursor - - if not after: - break - - case st.BAD_REQUEST | st.UNAUTHORIZED: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError( - req.status_code, 'Internal Server Error' - ) + async for data in self._paginate( + '/subscriptions', + s.BroadcasterSubscriptions, + self._auth(access_token), + self.clean_dict( + { + 'broadcaster_id': broadcaster_id, + 'first': first, + 'before': before, + } + ), + cache_ttl, + ): + for item in data.data: + yield data.total, data.points, item async def check_user_subscription( self, @@ -4376,16 +2556,12 @@ class TwitchAPIClient(OxideHTTP): broadcaster_id: int, user_id: int, *, - cache_time: int | None = None, + cache_ttl: int | None = None, ) -> s.UserSubscription: req = await self.get( '/subscriptions/user', - headers=self.clean_dict( - { - 'Authorization': f'Bearer {access_token}', - 'X-Cache-TTL': str(cache_time), - } - ), + cache_ttl=cache_ttl, + headers=self._auth(access_token), params=self.clean_dict( { 'broadcaster_id': broadcaster_id, @@ -4394,47 +2570,23 @@ class TwitchAPIClient(OxideHTTP): ), ) - match req.status_code: - case st.OK: - return s.UserSubscription.model_validate(await req.json()) - - case st.BAD_REQUEST | st.UNAUTHORIZED | st.NOT_FOUND: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.UserSubscription) async def get_channel_teams( self, access_token: str, broadcaster_id: int, *, - cache_time: int | None = None, + cache_ttl: int | None = None, ) -> s.ChannelTeams: req = await self.get( '/teams/channel', - headers=self.clean_dict( - { - 'Authorization': f'Bearer {access_token}', - 'X-Cache-TTL': str(cache_time), - } - ), + cache_ttl=cache_ttl, + headers=self._auth(access_token), params={'broadcaster_id': broadcaster_id}, ) - match req.status_code: - case st.OK: - return s.ChannelTeams.model_validate(await req.json()) - - case st.BAD_REQUEST | st.UNAUTHORIZED | st.NOT_FOUND: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.ChannelTeams) async def get_teams( self, @@ -4442,30 +2594,16 @@ class TwitchAPIClient(OxideHTTP): *, name: str | None = None, team_id: int | None = None, - cache_time: int | None = None, + cache_ttl: int | None = None, ) -> s.Teams: req = await self.get( '/teams', - headers=self.clean_dict( - { - 'Authorization': f'Bearer {access_token}', - 'X-Cache-TTL': str(cache_time), - } - ), + cache_ttl=cache_ttl, + headers=self._auth(access_token), params=self.clean_dict({'name': name, 'id': team_id}), ) - match req.status_code: - case st.OK: - return s.Teams.model_validate(await req.json()) - - case st.BAD_REQUEST | st.UNAUTHORIZED | st.NOT_FOUND: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.Teams) async def get_users( self, @@ -4473,90 +2611,43 @@ class TwitchAPIClient(OxideHTTP): *, user_id: int | list[int] | None = None, login: str | list[str] | None = None, - cache_time: int | None = None, + cache_ttl: int | None = None, ) -> s.Users: req = await self.get( '/users', - headers=self.clean_dict( - { - 'Authorization': f'Bearer {access_token}', - 'X-Cache-TTL': str(cache_time), - } - ), + cache_ttl=cache_ttl, + headers=self._auth(access_token), params=self.clean_dict({'id': user_id, 'login': login}), ) - match req.status_code: - case st.OK: - return s.Users.model_validate(await req.json()) - - case st.BAD_REQUEST | st.UNAUTHORIZED: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.Users) async def update_user( self, access_token: str, *, description: str = '' ) -> s.Users: req = await self.put( '/users', - headers=self.clean_dict( - { - 'Authorization': f'Bearer {access_token}', - } - ), + headers=self._auth(access_token), json={'description': description}, ) - match req.status_code: - case st.OK: - return s.Users.model_validate(await req.json()) - - case st.BAD_REQUEST | st.UNAUTHORIZED | st.TOO_MANY_REQUESTS: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.Users) async def get_authorization_by_user( self, access_token: str, user_id: int | list[int], *, - cache_time: int | None = None, + cache_ttl: int | None = None, ) -> s.AuthorizationsByUser: req = await self.get( '/authorization/users', - headers=self.clean_dict( - { - 'Authorization': f'Bearer {access_token}', - 'X-Cache-TTL': str(cache_time), - } - ), + cache_ttl=cache_ttl, + headers=self._auth(access_token), params={'user_id': user_id}, ) - match req.status_code: - case st.OK: - return s.AuthorizationsByUser.model_validate(await req.json()) - - case ( - st.BAD_REQUEST - | st.UNAUTHORIZED - | st.FORBIDDEN - | st.INTERNAL_SERVER_ERROR - ): - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.AuthorizationsByUser) async def get_user_block_list( self, @@ -4564,51 +2655,17 @@ class TwitchAPIClient(OxideHTTP): broadcaster_id: int, *, first: int = 20, - cache_time: int | None = None, + cache_ttl: int | None = None, ) -> AsyncGenerator[s.UserBlock]: - after = None - - while True: - req = await self.get( - '/users/blocks', - headers=self.clean_dict( - { - 'Authorization': f'Bearer {access_token}', - 'X-Cache-TTL': str(cache_time), - } - ), - params=self.clean_dict( - { - 'broadcaster_id': broadcaster_id, - 'first': first, - 'after': after, - } - ), - ) - - match req.status_code: - case st.OK: - json = await req.json() - user_block_list = s.UserBlockList.model_validate(json) - - for user in user_block_list.data: - yield user - - if isinstance(user_block_list.pagination, s.Pagination): - after = user_block_list.pagination.cursor - - if after is None: - break - - case st.BAD_REQUEST | st.UNAUTHORIZED: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError( - req.status_code, 'Internal Server Error' - ) + async for data in self._paginate( + '/users/blocks', + s.UserBlockList, + self._auth(access_token), + {'broadcaster_id': broadcaster_id, 'first': first}, + cache_ttl, + ): + for item in data.data: + yield item async def block_user( self, @@ -4620,11 +2677,7 @@ class TwitchAPIClient(OxideHTTP): ) -> bool: req = await self.put( '/users/blocks', - headers=self.clean_dict( - { - 'Authorization': f'Bearer {access_token}', - } - ), + headers=self._auth(access_token), json=self.clean_dict( { 'target_user_id': target_user_id, @@ -4634,120 +2687,56 @@ class TwitchAPIClient(OxideHTTP): ), ) - match req.status_code: - case st.NO_CONTENT: - return True - - case st.BAD_REQUEST | st.UNAUTHORIZED: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, None) async def unblock_user( self, access_token: str, target_user_id: int ) -> bool: req = await self.delete( '/users/blocks', - headers=self.clean_dict( - { - 'Authorization': f'Bearer {access_token}', - } - ), + headers=self._auth(access_token), params={'target_user_id': target_user_id}, ) - match req.status_code: - case st.NO_CONTENT: - return True - - case st.BAD_REQUEST | st.UNAUTHORIZED: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, None) async def get_user_extensions( - self, access_token: str, *, cache_time: int | None = None + self, access_token: str, *, cache_ttl: int | None = None ) -> s.UserExtensions: req = await self.get( '/users/extensions/list', - headers=self.clean_dict( - { - 'Authorization': f'Bearer {access_token}', - 'X-Cache-TTL': str(cache_time), - } - ), + cache_ttl=cache_ttl, + headers=self._auth(access_token), ) - match req.status_code: - case st.OK: - return s.UserExtensions.model_validate(await req.json()) - - case st.UNAUTHORIZED: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.UserExtensions) async def get_user_active_extensions( self, access_token: str, *, user_id: int | None = None, - cache_time: int | None = None, + cache_ttl: int | None = None, ) -> s.UserActiveExtensions: req = await self.get( '/users/extensions', - headers=self.clean_dict( - { - 'Authorization': f'Bearer {access_token}', - 'X-Cache-TTL': str(cache_time), - } - ), + cache_ttl=cache_ttl, + headers=self._auth(access_token), params={'user_id': user_id}, ) - match req.status_code: - case st.OK: - return s.UserActiveExtensions.model_validate(await req.json()) - - case st.BAD_REQUEST | st.UNAUTHORIZED: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.UserActiveExtensions) async def update_user_extensions( self, access_token: str, extensions: s.UserActiveExtensions ) -> bool: req = await self.put( '/users/extensions', - headers={ - 'Authorization': f'Bearer {access_token}', - }, + headers=self._auth(access_token), json=extensions.model_dump(), ) - match req.status_code: - case st.NO_CONTENT: - return True - - case st.BAD_REQUEST | st.UNAUTHORIZED | st.NOT_FOUND: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, None) async def get_videos( self, @@ -4761,82 +2750,39 @@ class TwitchAPIClient(OxideHTTP): sort: Literal['time', 'trending', 'views'] = 'time', video_type: Literal['all', 'archive', 'highlight', 'upload'] = 'all', first: int = 20, - cache_time: int | None = None, + cache_ttl: int | None = None, ) -> AsyncGenerator[s.Video]: - after = None - - while True: - req = await self.get( - '/videos', - headers=self.clean_dict( - { - 'Authorization': f'Bearer {access_token}', - 'X-Cache-TTL': str(cache_time), - } - ), - params=self.clean_dict( - { - 'video_id': video_id, - 'user_id': user_id, - 'game_id': game_id, - 'language': language, - 'period': period, - 'sort': sort, - 'video_type': video_type, - 'first': first, - 'after': after, - } - ), - ) - - match req.status_code: - case st.OK: - json = await req.json() - videos = s.Videos.model_validate(json) - - for video in videos.data: - yield video - - if isinstance(videos.pagination, s.Pagination): - after = videos.pagination.cursor - - if after is None: - break - - case st.BAD_REQUEST | st.UNAUTHORIZED | st.NOT_FOUND: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError( - req.status_code, 'Internal Server Error' - ) + async for data in self._paginate( + '/videos', + s.Videos, + self._auth(access_token), + self.clean_dict( + { + 'video_id': video_id, + 'user_id': user_id, + 'game_id': game_id, + 'language': language, + 'period': period, + 'sort': sort, + 'video_type': video_type, + 'first': first, + } + ), + cache_ttl, + ): + for item in data.data: + yield item async def delete_videos( self, access_token: str, video_id: int | list[int] ) -> s.DeleteVideos: req = await self.delete( '/videos', - headers=self.clean_dict( - { - 'Authorization': f'Bearer {access_token}', - } - ), + headers=self._auth(access_token), params={'id': video_id}, ) - match req.status_code: - case st.OK: - return s.DeleteVideos.model_validate(await req.json()) - - case st.BAD_REQUEST | st.UNAUTHORIZED: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.DeleteVideos) async def send_whisper( self, @@ -4847,29 +2793,9 @@ class TwitchAPIClient(OxideHTTP): ) -> bool: req = await self.post( '/whispers', - headers=self.clean_dict( - { - 'Authorization': f'Bearer {access_token}', - } - ), + headers=self._auth(access_token), params={'from_user_id': from_user_id, 'to_user_id': to_user_id}, json={'message': message}, ) - match req.status_code: - case st.NO_CONTENT: - return True - - case ( - st.BAD_REQUEST - | st.UNAUTHORIZED - | st.FORBIDDEN - | st.NOT_FOUND - | st.TOO_MANY_REQUESTS - ): - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, None) diff --git a/src/oxidetwitch/schema.py b/src/oxidetwitch/schema.py index 6d627e1..7a9c1f2 100644 --- a/src/oxidetwitch/schema.py +++ b/src/oxidetwitch/schema.py @@ -33,6 +33,14 @@ class Pagination(BaseSchema): cursor: str +class BasePaginated(BaseSchema): + pagination: Pagination | dict[Any, Any] | None = None + + +class PaginatedSchema[T](BasePaginated): + data: list[T] + + class AppAccessToken(BaseSchema): access_token: str expires_in: int @@ -100,9 +108,8 @@ class ExtensionAnalyticsData(BaseSchema): date_range: DateRange -class ExtensionAnalytics(BaseSchema): +class ExtensionAnalytics(PaginatedSchema[ExtensionAnalyticsData]): data: list[ExtensionAnalyticsData] - pagination: Pagination | dict[Any, Any] | None = None class GameAnalyticsData(BaseSchema): @@ -112,9 +119,8 @@ class GameAnalyticsData(BaseSchema): date_range: DateRange -class GameAnalytics(BaseSchema): +class GameAnalytics(PaginatedSchema[GameAnalyticsData]): data: list[GameAnalyticsData] - pagination: Pagination | dict[Any, Any] | None = None class BitsLeaderboardData(BaseSchema): @@ -213,9 +219,8 @@ class ExtensionTransactionsData(BaseSchema): product_data: ExtensionProductData -class ExtensionTransactions(BaseSchema): +class ExtensionTransactions(PaginatedSchema[ExtensionTransactionsData]): data: list[ExtensionTransactionsData] - pagination: Pagination | dict[Any, Any] | None = None class ContentClassificationLabel(TypedDict): @@ -265,9 +270,8 @@ class FollowedChannel(BaseSchema): followed_at: datetime -class FollowedChannels(BaseSchema): +class FollowedChannels(PaginatedSchema[FollowedChannel]): data: list[FollowedChannel] - pagination: Pagination | dict[Any, Any] | None = None total: int @@ -278,9 +282,8 @@ class ChannelFollower(BaseSchema): followed_at: datetime -class ChannelFollowers(BaseSchema): +class ChannelFollowers(PaginatedSchema[ChannelFollower]): data: list[ChannelFollower] - pagination: Pagination | dict[Any, Any] | None = None total: int @@ -353,9 +356,8 @@ class CustomRewardRedemption(BaseSchema): reward: CustomRewardRedemptionReward -class CustomRewardRedemptions(BaseSchema): +class CustomRewardRedemptions(PaginatedSchema[CustomRewardRedemption]): data: list[CustomRewardRedemption] - pagination: Pagination | dict[Any, Any] | None = None class CharityCampaignCurrentAmount(BaseSchema): @@ -402,9 +404,8 @@ class CharityDonation(BaseSchema): amount: CharityDonationAmount -class CharityDonations(BaseSchema): +class CharityDonations(PaginatedSchema[CharityDonation]): data: list[CharityDonation] - pagination: Pagination | dict[Any, Any] | None = None class ChattersData(BaseSchema): @@ -413,9 +414,8 @@ class ChattersData(BaseSchema): user_name: str -class Chatters(BaseSchema): +class Chatters(PaginatedSchema[ChattersData]): data: list[ChattersData] - pagination: Pagination | dict[Any, Any] | None = None total: int @@ -522,8 +522,8 @@ class SharedChatSession(BaseSchema): data: list[SharedChatSessionData] -class UserEmotes(ChannelEmotes): - pagination: Pagination | dict[Any, Any] | None = None +class UserEmotes(PaginatedSchema[ChannelEmote], ChannelEmotes): + pass class MessageDropReason(BaseSchema): @@ -572,9 +572,8 @@ class Clip(BaseSchema): is_featured: bool -class Clips(BaseSchema): +class Clips(PaginatedSchema[Clip]): data: list[Clip] - pagination: Pagination | dict[Any, Any] | None = None class ClipDownload(BaseSchema): @@ -626,9 +625,8 @@ class ConduitShard(BaseSchema): transport: ConduitShardTransportWebhook | ConduitShardTransportWebsocket -class ConduitShards(BaseSchema): +class ConduitShards(PaginatedSchema[ConduitShard]): data: list[ConduitShard] - pagination: Pagination | dict[Any, Any] | None = None class UpdateConduitShardTransportWebhook(TypedDict): @@ -688,9 +686,8 @@ class DropEntitlement(BaseSchema): last_updated: datetime -class DropsEntitlements(BaseSchema): +class DropsEntitlements(PaginatedSchema[DropEntitlement]): data: list[DropEntitlement] - pagination: Pagination | dict[Any, Any] | None = None class UpdateDropsEntitlementsData(BaseSchema): @@ -723,9 +720,8 @@ class ExtensionLiveChannel(BaseSchema): title: str -class ExtensionLiveChannels(BaseSchema): +class ExtensionLiveChannels(PaginatedSchema[ExtensionLiveChannel]): data: list[ExtensionLiveChannel] - paginaiton: Pagination | dict[Any, Any] | None = None class ExtensionSecret(BaseSchema): @@ -838,12 +834,11 @@ class ExtensionBitsProducts(BaseSchema): data: list[ExtensionBitsProduct] -class EventsubBaseSubscriptions(BaseSchema): +class EventsubBaseSubscriptions(PaginatedSchema[sub.Any]): data: list[sub.Any] total: int total_cost: int max_total_cost: int - pagination: Pagination | dict[Any, Any] | None = None class Game(BaseSchema): @@ -853,9 +848,8 @@ class Game(BaseSchema): igdb_id: int | str -class Games(BaseSchema): +class Games(PaginatedSchema[Game]): data: list[Game] - pagination: Pagination | dict[Any, Any] | None = None class CreatorGoal(BaseSchema): @@ -971,9 +965,8 @@ class BannedUser(BaseSchema): moderator_name: str -class BannedUsers(BaseSchema): +class BannedUsers(PaginatedSchema[BannedUser]): data: list[BannedUser] - pagination: Pagination | dict[Any, Any] | None = None class BanUserData(BaseSchema): @@ -1008,9 +1001,8 @@ class UnbanRequest(BaseSchema): resolution_text: str | None -class UnbanRequests(BaseSchema): +class UnbanRequests(PaginatedSchema[UnbanRequest]): data: list[UnbanRequest] - pagination: Pagination | dict[Any, Any] | None = None class BlockedTerm(BaseSchema): @@ -1023,9 +1015,8 @@ class BlockedTerm(BaseSchema): expires_at: datetime | None -class BlockedTerms(BaseSchema): +class BlockedTerms(PaginatedSchema[BlockedTerm]): data: list[BlockedTerm] - pagination: Pagination | dict[Any, Any] | None = None class ModeratedChannel(BaseSchema): @@ -1034,9 +1025,8 @@ class ModeratedChannel(BaseSchema): broadcaster_name: str -class ModeratedChannels(BaseSchema): +class ModeratedChannels(PaginatedSchema[ModeratedChannel]): data: list[ModeratedChannel] - pagination: Pagination | dict[Any, Any] | None = None class Moderator(BaseSchema): @@ -1045,9 +1035,8 @@ class Moderator(BaseSchema): user_name: str -class Moderators(BaseSchema): +class Moderators(PaginatedSchema[Moderator]): data: list[Moderator] - pagination: Pagination | dict[Any, Any] | None = None class VIP(BaseSchema): @@ -1056,9 +1045,8 @@ class VIP(BaseSchema): user_name: str -class VIPs(BaseSchema): +class VIPs(PaginatedSchema[VIP]): data: list[VIP] - pagination: Pagination | dict[Any, Any] | None = None class ShieldModeStatusData(BaseSchema): @@ -1108,9 +1096,8 @@ class Poll(BaseSchema): ended_at: datetime | None -class Polls(BaseSchema): +class Polls(PaginatedSchema[Poll]): data: list[Poll] - pagination: Pagination | dict[Any, Any] | None = None class PredictionTopPredictor(BaseSchema): @@ -1145,9 +1132,8 @@ class Prediction(BaseSchema): locked_at: datetime | None -class Predictions(BaseSchema): +class Predictions(PaginatedSchema[Prediction]): data: list[Prediction] - pagination: Pagination | dict[Any, Any] | None = None class Raid(BaseSchema): @@ -1187,9 +1173,8 @@ class Schedule(BaseSchema): segments: list[ScheduleSegment] -class Schedules(BaseSchema): +class Schedules(PaginatedSchema[Schedule]): data: list[Schedule] - pagination: Pagination | dict[Any, Any] | None = None class Category(BaseSchema): @@ -1198,9 +1183,8 @@ class Category(BaseSchema): box_art_url: str -class Categories(BaseSchema): +class Categories(PaginatedSchema[Category]): data: list[Category] - pagination: Pagination | dict[Any, Any] | None = None class Channel(BaseSchema): @@ -1217,9 +1201,8 @@ class Channel(BaseSchema): started_at: datetime | None -class Channels(BaseSchema): +class Channels(PaginatedSchema[Channel]): data: list[Channel] - pagination: Pagination | dict[Any, Any] | None = None class StreamKey(BaseSchema): @@ -1248,9 +1231,8 @@ class Stream(BaseSchema): is_mature: bool -class Streams(BaseSchema): +class Streams(PaginatedSchema[Stream]): data: list[Stream] - pagination: Pagination | dict[Any, Any] | None = None class BaseStreamMarker(BaseSchema): @@ -1280,9 +1262,8 @@ class StreamMarkersData(BaseSchema): videos: list[StreamMarkerVideo] -class StreamMarkers(BaseSchema): +class StreamMarkers(PaginatedSchema[StreamMarkersData]): data: list[StreamMarkersData] - pagination: Pagination | dict[Any, Any] | None = None class Subscription(BaseSchema): @@ -1303,9 +1284,8 @@ class BroadcasterSubscription(Subscription): user_name: str -class BroadcasterSubscriptions(BaseSchema): +class BroadcasterSubscriptions(PaginatedSchema[BroadcasterSubscription]): data: list[BroadcasterSubscription] - pagination: Pagination | dict[Any, Any] | None = None total: int points: int @@ -1391,9 +1371,8 @@ class UserBlock(BaseSchema): display_name: str -class UserBlockList(BaseSchema): +class UserBlockList(PaginatedSchema[UserBlock]): data: list[UserBlock] - pagination: Pagination | dict[Any, Any] | None = None class UserExtension(BaseSchema): @@ -1480,9 +1459,8 @@ class Video(BaseSchema): muted_segments: list[VideoMutedSegment] -class Videos(BaseSchema): +class Videos(PaginatedSchema[Video]): data: list[Video] - pagination: Pagination | dict[Any, Any] | None class DeleteVideos(BaseSchema):