From ab914bf58ca17babc8a43ff63d7d91c848924637 Mon Sep 17 00:00:00 2001 From: Miwory Date: Thu, 26 Feb 2026 15:17:40 +0300 Subject: [PATCH] =?UTF-8?q?=D0=94=D0=BE=D0=B1=D0=B0=D0=B2=D0=BB=D0=B5?= =?UTF-8?q?=D0=BD=20=D1=85=D0=B5=D0=BB=D0=BF=D0=B5=D1=80=20=D0=BD=D0=B0=20?= =?UTF-8?q?=D0=BF=D0=B0=D0=B3=D0=B8=D0=BD=D0=B0=D1=86=D0=B8=D1=8E?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/oxidetwitch/api.py | 3095 ++++++++----------------------------- src/oxidetwitch/schema.py | 102 +- 2 files changed, 647 insertions(+), 2550 deletions(-) diff --git a/src/oxidetwitch/api.py b/src/oxidetwitch/api.py index 4272c4f..860ba19 100644 --- a/src/oxidetwitch/api.py +++ b/src/oxidetwitch/api.py @@ -1,10 +1,12 @@ from collections.abc import AsyncGenerator from datetime import datetime -from typing import Literal +from typing import Any, Literal, overload from zoneinfo import ZoneInfo from oxidehttp import status as st from oxidehttp.client import OxideHTTP +from oxidehttp.schema import CachedResponse, Response +from pydantic import BaseModel from . import schema as s from .eventsub import statuses as sub_status @@ -38,6 +40,62 @@ class TwitchAPIClient(OxideHTTP): def _auth(self, access_token: str) -> dict[str, str]: return {'Authorization': f'Bearer {access_token}'} + @overload + async def _process( + self, req: Response | CachedResponse, schema: None + ) -> bool: ... + + @overload + async def _process[T: BaseModel]( + self, req: Response | CachedResponse, schema: type[T] + ) -> T: ... + + async def _process[T: BaseModel]( + self, req: Response | CachedResponse, schema: type[T] | None + ) -> T | bool: + if req.status_code >= 500: + raise s.InternalError(req.status_code, 'Internal Server Error') + + if req.status_code >= 400 and req.status_code < 500: + data = await req.json() + message = data.get('message', 'Twitch API Error') + + raise s.ClientError(req.status_code, message) + + if schema is None: + return True + + data = await req.json() + return schema.model_validate(data) + + async def _paginate[S: s.BasePaginated]( + self, + path: str, + schema: type[S], + headers: dict[str, str] | None, + params: dict[str, Any], + cache_ttl: int | None = None, + ) -> AsyncGenerator[S]: + after = None + + while True: + current_params = {**params, 'after': after} if after else params + + req = await self.get( + path, + headers=headers, + params=current_params, + cache_ttl=cache_ttl, + ) + result = await self._process(req, schema) + yield result + + if isinstance(result.pagination, s.Pagination): + after = result.pagination.cursor + + if after is None: + break + async def start_commercial( self, access_token: str, broadcaster_id: int, length: int ) -> s.StartCommercial: @@ -50,22 +108,7 @@ class TwitchAPIClient(OxideHTTP): }, ) - match req.status_code: - case st.OK: - return s.StartCommercial.model_validate(await req.json()) - - case ( - st.BAD_REQUEST - | st.UNAUTHORIZED - | st.NOT_FOUND - | st.TOO_MANY_REQUESTS - ): - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.StartCommercial) async def get_ad_schedule( self, @@ -78,22 +121,10 @@ class TwitchAPIClient(OxideHTTP): '/channels/ads', cache_ttl=cache_ttl, headers=self._auth(access_token), - params={ - 'broadcaster_id': broadcaster_id, - }, + params={'broadcaster_id': broadcaster_id}, ) - match req.status_code: - case st.OK: - return s.AdSchedule.model_validate(await req.json()) - - case st.BAD_REQUEST | st.UNAUTHORIZED: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.AdSchedule) async def snooze_next_ad( self, access_token: str, broadcaster_id: int @@ -106,17 +137,7 @@ class TwitchAPIClient(OxideHTTP): }, ) - match req.status_code: - case st.OK: - return s.SnoozeNextAd.model_validate(await req.json()) - - case st.BAD_REQUEST | st.UNAUTHORIZED | st.TOO_MANY_REQUESTS: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.SnoozeNextAd) async def get_extension_analytics( self, @@ -129,48 +150,23 @@ class TwitchAPIClient(OxideHTTP): first: int = 20, cache_ttl: int | None = None, ) -> AsyncGenerator[s.ExtensionAnalyticsData]: - after = None - - while True: - req = await self.get( - '/analytics/extensions', - cache_ttl=cache_ttl, - headers=self._auth(access_token), - params=self.clean_dict( - { - 'extension_id': extension_id, - 'type': analytics_type, - 'started_at': started_at, - 'ended_at': ended_at, - 'first': first, - 'after': after, - } - ), - ) - - match req.status_code: - case st.OK: - json = await req.json() - analytics = s.ExtensionAnalytics.model_validate(json) - - for data in analytics.data: - yield data - - if isinstance(analytics.pagination, s.Pagination): - after = analytics.pagination.cursor - - if not after: - break - - case st.BAD_REQUEST | st.UNAUTHORIZED | st.NOT_FOUND: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError( - req.status_code, 'Internal Server Error' - ) + async for data in self._paginate( + '/analytics/extensions', + s.ExtensionAnalytics, + self._auth(access_token), + self.clean_dict( + { + 'extension_id': extension_id, + 'type': analytics_type, + 'started_at': started_at, + 'ended_at': ended_at, + 'first': first, + } + ), + cache_ttl, + ): + for item in data.data: + yield item async def get_game_analytics( self, @@ -183,48 +179,23 @@ class TwitchAPIClient(OxideHTTP): first: int = 20, cache_ttl: int | None = None, ) -> AsyncGenerator[s.GameAnalyticsData]: - after = None - - while True: - req = await self.get( - '/analytics/games', - cache_ttl=cache_ttl, - headers=self._auth(access_token), - params=self.clean_dict( - { - 'game_id': game_id, - 'type': analytics_type, - 'started_at': started_at, - 'ended_at': ended_at, - 'first': first, - 'after': after, - } - ), - ) - - match req.status_code: - case st.OK: - json = await req.json() - analytics = s.GameAnalytics.model_validate(json) - - for data in analytics.data: - yield data - - if isinstance(analytics.pagination, s.Pagination): - after = analytics.pagination.cursor - - if not after: - break - - case st.BAD_REQUEST | st.UNAUTHORIZED | st.NOT_FOUND: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError( - req.status_code, 'Internal Server Error' - ) + async for data in self._paginate( + '/analytics/games', + s.GameAnalytics, + self._auth(access_token), + self.clean_dict( + { + 'game_id': game_id, + 'type': analytics_type, + 'started_at': started_at, + 'ended_at': ended_at, + 'first': first, + } + ), + cache_ttl, + ): + for item in data.data: + yield item async def get_bits_leaderboard( self, @@ -250,17 +221,7 @@ class TwitchAPIClient(OxideHTTP): ), ) - match req.status_code: - case st.OK: - return s.BitsLeaderboard.model_validate(await req.json()) - - case st.BAD_REQUEST | st.UNAUTHORIZED | st.FORBIDDEN: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.BitsLeaderboard) async def get_cheermotes( self, @@ -276,17 +237,7 @@ class TwitchAPIClient(OxideHTTP): params=self.clean_dict({'broadcaster_id': broadcaster_id}), ) - match req.status_code: - case st.OK: - return s.Cheermotes.model_validate(await req.json()) - - case st.BAD_REQUEST | st.UNAUTHORIZED: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.Cheermotes) async def get_extension_transactions( self, @@ -297,46 +248,21 @@ class TwitchAPIClient(OxideHTTP): first: int = 20, cache_ttl: int | None = None, ) -> AsyncGenerator[s.ExtensionTransactionsData]: - after = None - - while True: - req = await self.get( - '/extensions/transactions', - cache_ttl=cache_ttl, - headers=self._auth(access_token), - params=self.clean_dict( - { - 'extension_id': extension_id, - 'user_id': user_id, - 'first': first, - 'after': after, - } - ), - ) - - match req.status_code: - case st.OK: - json = await req.json() - transactions = s.ExtensionTransactions.model_validate(json) - - for data in transactions.data: - yield data - - if isinstance(transactions.pagination, s.Pagination): - after = transactions.pagination.cursor - - if not after: - break - - case st.BAD_REQUEST | st.UNAUTHORIZED | st.NOT_FOUND: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError( - req.status_code, 'Internal Server Error' - ) + async for data in self._paginate( + '/extensions/transactions', + s.ExtensionTransactions, + self._auth(access_token), + self.clean_dict( + { + 'extension_id': extension_id, + 'user_id': user_id, + 'first': first, + } + ), + cache_ttl, + ): + for item in data.data: + yield item async def get_channel_information( self, @@ -352,17 +278,7 @@ class TwitchAPIClient(OxideHTTP): params={'broadcaster_id': broadcaster_id}, ) - match req.status_code: - case st.OK: - return s.ChannelsInformation.model_validate(await req.json()) - - case st.BAD_REQUEST | st.UNAUTHORIZED | st.TOO_MANY_REQUESTS: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.ChannelsInformation) async def modify_channel_information( self, @@ -396,22 +312,7 @@ class TwitchAPIClient(OxideHTTP): json=data, ) - match req.status_code: - case st.NO_CONTENT: - return True - - case ( - st.BAD_REQUEST - | st.UNAUTHORIZED - | st.FORBIDDEN - | st.TOO_MANY_REQUESTS - ): - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, None) async def get_channel_editors( self, @@ -427,17 +328,7 @@ class TwitchAPIClient(OxideHTTP): params={'broadcaster_id': broadcaster_id}, ) - match req.status_code: - case st.OK: - return s.ChannelEditors.model_validate(await req.json()) - - case st.BAD_REQUEST | st.UNAUTHORIZED | st.TOO_MANY_REQUESTS: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.ChannelEditors) async def get_followed_channels( self, @@ -447,45 +338,20 @@ class TwitchAPIClient(OxideHTTP): first: int = 20, cache_ttl: int | None = None, ) -> AsyncGenerator[tuple[int, s.FollowedChannel]]: - after = None - - while True: - req = await self.get( - '/channels/followed', - cache_ttl=cache_ttl, - headers=self._auth(access_token), - params=self.clean_dict( - { - 'broadcaster_id': broadcaster_id, - 'first': first, - 'after': after, - } - ), - ) - - match req.status_code: - case st.OK: - json = await req.json() - followed_channels = s.FollowedChannels.model_validate(json) - - for data in followed_channels.data: - yield followed_channels.total, data - - if isinstance(followed_channels.pagination, s.Pagination): - after = followed_channels.pagination.cursor - - if not after: - break - - case st.BAD_REQUEST | st.UNAUTHORIZED | st.TOO_MANY_REQUESTS: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError( - req.status_code, 'Internal Server Error' - ) + async for data in self._paginate( + '/channels/followed', + s.FollowedChannels, + self._auth(access_token), + self.clean_dict( + { + 'broadcaster_id': broadcaster_id, + 'first': first, + } + ), + cache_ttl, + ): + for item in data.data: + yield data.total, item async def get_channel_followers( self, @@ -496,46 +362,21 @@ class TwitchAPIClient(OxideHTTP): first: int = 20, cache_ttl: int | None = None, ) -> AsyncGenerator[tuple[int, s.ChannelFollower]]: - after = None - - while True: - req = await self.get( - '/channels/followers', - cache_ttl=cache_ttl, - headers=self._auth(access_token), - params=self.clean_dict( - { - 'broadcaster_id': broadcaster_id, - 'user_id': user_id, - 'first': first, - 'after': after, - } - ), - ) - - match req.status_code: - case st.OK: - json = await req.json() - channel_followers = s.ChannelFollowers.model_validate(json) - - for data in channel_followers.data: - yield channel_followers.total, data - - if isinstance(channel_followers.pagination, s.Pagination): - after = channel_followers.pagination.cursor - - if not after: - break - - case st.BAD_REQUEST | st.UNAUTHORIZED | st.TOO_MANY_REQUESTS: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError( - req.status_code, 'Internal Server Error' - ) + async for data in self._paginate( + '/channels/followers', + s.ChannelFollowers, + self._auth(access_token), + self.clean_dict( + { + 'broadcaster_id': broadcaster_id, + 'user_id': user_id, + 'first': first, + } + ), + cache_ttl, + ): + for item in data.data: + yield data.total, item async def create_custom_rewards( self, @@ -583,22 +424,7 @@ class TwitchAPIClient(OxideHTTP): json=data, ) - match req.status_code: - case st.OK: - return s.CustomRewards.model_validate(await req.json()) - - case ( - st.BAD_REQUEST - | st.UNAUTHORIZED - | st.FORBIDDEN - | st.TOO_MANY_REQUESTS - ): - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.CustomRewards) async def delete_custom_reward( self, access_token: str, broadcaster_id: int, reward_id: str @@ -649,23 +475,7 @@ class TwitchAPIClient(OxideHTTP): ), ) - match req.status_code: - case st.OK: - return s.CustomRewards.model_validate(await req.json()) - - case ( - st.BAD_REQUEST - | st.UNAUTHORIZED - | st.FORBIDDEN - | st.NOT_FOUND - | st.TOO_MANY_REQUESTS - ): - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.CustomRewards) async def get_custom_reward_redemption( self, @@ -679,56 +489,24 @@ class TwitchAPIClient(OxideHTTP): first: int = 20, cache_ttl: int | None = None, ) -> AsyncGenerator[s.CustomRewardRedemption]: - after = None - - while True: - req = await self.get( - '/channel_points/custom_rewards/redemptions', - cache_ttl=cache_ttl, - headers=self._auth(access_token), - params=self.clean_dict( - { - 'broadcaster_id': broadcaster_id, - 'reward_id': reward_id, - 'status': status, - 'redemption_id': redemption_id, - 'sort': sort, - 'after': after, - 'first': first, - } - ), - ) - - match req.status_code: - case st.OK: - json = await req.json() - redemptions = s.CustomRewardRedemptions.model_validate( - json - ) - - for data in redemptions.data: - yield data - - if isinstance(redemptions.pagination, s.Pagination): - after = redemptions.pagination.cursor - - if not after: - break - - case ( - st.BAD_REQUEST - | st.UNAUTHORIZED - | st.NOT_FOUND - | st.TOO_MANY_REQUESTS - ): - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError( - req.status_code, 'Internal Server Error' - ) + async for data in self._paginate( + '/channel_points/custom_rewards/redemptions', + s.CustomRewardRedemptions, + self._auth(access_token), + self.clean_dict( + { + 'broadcaster_id': broadcaster_id, + 'reward_id': reward_id, + 'status': status, + 'redemption_id': redemption_id, + 'sort': sort, + 'first': first, + } + ), + cache_ttl, + ): + for item in data.data: + yield item async def update_custom_reward( self, @@ -778,23 +556,7 @@ class TwitchAPIClient(OxideHTTP): json=data, ) - match req.status_code: - case st.OK: - return s.CustomRewards.model_validate(await req.json()) - - case ( - st.BAD_REQUEST - | st.UNAUTHORIZED - | st.FORBIDDEN - | st.NOT_FOUND - | st.TOO_MANY_REQUESTS - ): - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.CustomRewards) async def update_redemption_status( self, @@ -815,23 +577,7 @@ class TwitchAPIClient(OxideHTTP): json={'status': status}, ) - match req.status_code: - case st.NO_CONTENT: - return True - - case ( - st.BAD_REQUEST - | st.UNAUTHORIZED - | st.FORBIDDEN - | st.NOT_FOUND - | st.TOO_MANY_REQUESTS - ): - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, None) async def get_charity_campaign( self, @@ -847,17 +593,7 @@ class TwitchAPIClient(OxideHTTP): headers=self._auth(access_token), ) - match req.status_code: - case st.OK: - return s.CharityCampaign.model_validate(await req.json()) - - case st.BAD_REQUEST | st.UNAUTHORIZED | st.FORBIDDEN: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.CharityCampaign) async def get_charity_campaign_donations( self, @@ -867,45 +603,17 @@ class TwitchAPIClient(OxideHTTP): first: int = 20, cache_ttl: int | None = None, ) -> AsyncGenerator[s.CharityDonation]: - after = None - - while True: - req = await self.get( - '/charity/donations', - cache_ttl=cache_ttl, - headers=self._auth(access_token), - params=self.clean_dict( - { - 'broadcaster_id': broadcaster_id, - 'first': first, - 'after': after, - } - ), - ) - - match req.status_code: - case st.OK: - json = await req.json() - donations = s.CharityDonations.model_validate(json) - - for donation in donations.data: - yield donation - - if isinstance(donations.pagination, s.Pagination): - after = donations.pagination.cursor - - if not after: - break - - case st.BAD_REQUEST | st.UNAUTHORIZED | st.FORBIDDEN: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError( - req.status_code, 'Internal Server Error' - ) + async for data in self._paginate( + '/charity/donations', + s.CharityDonations, + self._auth(access_token), + self.clean_dict( + {'broadcaster_id': broadcaster_id, 'first': first} + ), + cache_ttl, + ): + for item in data.data: + yield item async def get_chatters( self, @@ -916,46 +624,21 @@ class TwitchAPIClient(OxideHTTP): first: int = 20, cache_ttl: int | None = None, ) -> AsyncGenerator[tuple[int, s.ChattersData]]: - after = None - - while True: - req = await self.get( - '/chat/chatters', - cache_ttl=cache_ttl, - headers=self._auth(access_token), - params=self.clean_dict( - { - 'broadcaster_id': broadcaster_id, - 'moderator_id': moderator_id, - 'first': first, - 'after': after, - } - ), - ) - - match req.status_code: - case st.OK: - json = await req.json() - chatters = s.Chatters.model_validate(json) - - for data in chatters.data: - yield chatters.total, data - - if isinstance(chatters.pagination, s.Pagination): - after = chatters.pagination.cursor - - if not after: - break - - case st.BAD_REQUEST | st.UNAUTHORIZED | st.FORBIDDEN: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError( - req.status_code, 'Internal Server Error' - ) + async for data in self._paginate( + '/chat/chatters', + s.Chatters, + self._auth(access_token), + self.clean_dict( + { + 'broadcaster_id': broadcaster_id, + 'moderator_id': moderator_id, + 'first': first, + } + ), + cache_ttl, + ): + for item in data.data: + yield data.total, item async def get_channel_emotes( self, @@ -971,17 +654,7 @@ class TwitchAPIClient(OxideHTTP): params={'broadcaster_id': broadcaster_id}, ) - match req.status_code: - case st.OK: - return s.ChannelEmotes.model_validate(await req.json()) - - case st.BAD_REQUEST | st.UNAUTHORIZED: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.ChannelEmotes) async def get_global_emotes( self, access_token: str, *, cache_ttl: int | None = None @@ -992,17 +665,7 @@ class TwitchAPIClient(OxideHTTP): headers=self._auth(access_token), ) - match req.status_code: - case st.OK: - return s.GlobalEmotes.model_validate(await req.json()) - - case st.UNAUTHORIZED: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.GlobalEmotes) async def get_emote_sets( self, @@ -1018,17 +681,7 @@ class TwitchAPIClient(OxideHTTP): params={'emote_set_id': emote_set_id}, ) - match req.status_code: - case st.OK: - return s.EmoteSets.model_validate(await req.json()) - - case st.BAD_REQUEST | st.UNAUTHORIZED: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.EmoteSets) async def get_channel_chat_badges( self, @@ -1044,17 +697,7 @@ class TwitchAPIClient(OxideHTTP): params={'broadcaster_id': broadcaster_id}, ) - match req.status_code: - case st.OK: - return s.ChannelChatBadges.model_validate(await req.json()) - - case st.BAD_REQUEST | st.UNAUTHORIZED: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.ChannelChatBadges) async def get_global_chat_badges( self, access_token: str, *, cache_ttl: int | None = None @@ -1065,17 +708,7 @@ class TwitchAPIClient(OxideHTTP): headers=self._auth(access_token), ) - match req.status_code: - case st.OK: - return s.GlobalChatBadges.model_validate(await req.json()) - - case st.UNAUTHORIZED: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.GlobalChatBadges) async def get_chat_settings( self, @@ -1097,17 +730,7 @@ class TwitchAPIClient(OxideHTTP): ), ) - match req.status_code: - case st.OK: - return s.ChatSettings.model_validate(await req.json()) - - case st.BAD_REQUEST | st.UNAUTHORIZED: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.ChatSettings) async def get_shared_chat_session( self, @@ -1123,17 +746,7 @@ class TwitchAPIClient(OxideHTTP): params={'broadcaster_id': broadcaster_id}, ) - match req.status_code: - case st.OK: - return s.SharedChatSession.model_validate(await req.json()) - - case st.BAD_REQUEST | st.UNAUTHORIZED: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.SharedChatSession) async def get_user_emotes( self, @@ -1143,45 +756,17 @@ class TwitchAPIClient(OxideHTTP): broadcaster_id: int | None = None, cache_ttl: int | None = None, ) -> AsyncGenerator[tuple[str, s.ChannelEmote]]: - after = None - - while True: - req = await self.get( - '/chat/emotes/global', - cache_ttl=cache_ttl, - headers=self._auth(access_token), - params=self.clean_dict( - { - 'user_id': user_id, - 'after': after, - 'broadcaster_id': broadcaster_id, - } - ), - ) - - match req.status_code: - case st.OK: - json = await req.json() - user_emotes = s.UserEmotes.model_validate(json) - - for data in user_emotes.data: - yield user_emotes.template, data - - if isinstance(user_emotes.pagination, s.Pagination): - after = user_emotes.pagination.cursor - - if not after: - break - - case st.UNAUTHORIZED: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError( - req.status_code, 'Internal Server Error' - ) + async for data in self._paginate( + '/chat/emotes', + s.UserEmotes, + self._auth(access_token), + self.clean_dict( + {'user_id': user_id, 'broadcaster_id': broadcaster_id} + ), + cache_ttl, + ): + for item in data.data: + yield data.template, item async def update_chat_settings( self, @@ -1223,17 +808,7 @@ class TwitchAPIClient(OxideHTTP): ), ) - match req.status_code: - case st.OK: - return s.ChatSettings.model_validate(await req.json()) - - case st.BAD_REQUEST | st.UNAUTHORIZED: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.ChatSettings) async def send_chat_announcement( self, @@ -1255,17 +830,7 @@ class TwitchAPIClient(OxideHTTP): json=self.clean_dict({'message': message, 'color': color}), ) - match req.status_code: - case st.NO_CONTENT: - return True - - case st.BAD_REQUEST | st.UNAUTHORIZED | st.TOO_MANY_REQUESTS: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, None) async def send_shoutout( self, @@ -1284,22 +849,7 @@ class TwitchAPIClient(OxideHTTP): }, ) - match req.status_code: - case st.NO_CONTENT: - return True - - case ( - st.BAD_REQUEST - | st.UNAUTHORIZED - | st.FORBIDDEN - | st.TOO_MANY_REQUESTS - ): - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, None) async def send_chat_message( self, @@ -1325,22 +875,7 @@ class TwitchAPIClient(OxideHTTP): ), ) - match req.status_code: - case st.OK: - return s.Message.model_validate(await req.json()) - - case ( - st.BAD_REQUEST - | st.UNAUTHORIZED - | st.FORBIDDEN - | st.TOO_MANY_REQUESTS - ): - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.Message) async def get_user_chat_color( self, @@ -1356,17 +891,7 @@ class TwitchAPIClient(OxideHTTP): params={'user_id': user_id}, ) - match req.status_code: - case st.OK: - return s.UserChatColor.model_validate(await req.json()) - - case st.BAD_REQUEST | st.UNAUTHORIZED: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.UserChatColor) async def update_user_chat_color( self, @@ -1380,17 +905,7 @@ class TwitchAPIClient(OxideHTTP): params={'user_id': user_id, 'color': color}, ) - match req.status_code: - case st.NO_CONTENT: - return True - - case st.BAD_REQUEST | st.UNAUTHORIZED: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, None) async def create_clip( self, @@ -1407,19 +922,7 @@ class TwitchAPIClient(OxideHTTP): ), ) - match req.status_code: - case st.ACCEPTED: - return True - - case ( - st.BAD_REQUEST | st.UNAUTHORIZED | st.FORBIDDEN | st.NOT_FOUND - ): - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, None) async def get_clips( self, @@ -1435,51 +938,26 @@ class TwitchAPIClient(OxideHTTP): is_featured: bool | None = None, cache_ttl: int | None = None, ) -> AsyncGenerator[s.Clip]: - after = None - - while True: - req = await self.get( - '/clips', - cache_ttl=cache_ttl, - headers=self._auth(access_token), - params=self.clean_dict( - { - 'broadcaster_id': broadcaster_id, - 'game_id': game_id, - 'clip_id': clip_id, - 'started_at': started_at, - 'ended_at': ended_at, - 'first': first, - 'before': before, - 'after': after, - 'is_featured': is_featured, - } - ), - ) - - match req.status_code: - case st.OK: - json = await req.json() - clips = s.Clips.model_validate(json) - - for clip in clips.data: - yield clip - - if isinstance(clips.pagination, s.Pagination): - after = clips.pagination.cursor - - if after is None: - break - - case st.BAD_REQUEST | st.UNAUTHORIZED | st.NOT_FOUND: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError( - req.status_code, 'Internal Server Error' - ) + async for data in self._paginate( + '/clips', + s.Clips, + self._auth(access_token), + self.clean_dict( + { + 'broadcaster_id': broadcaster_id, + 'game_id': game_id, + 'clip_id': clip_id, + 'started_at': started_at, + 'ended_at': ended_at, + 'first': first, + 'before': before, + 'is_featured': is_featured, + } + ), + cache_ttl, + ): + for item in data.data: + yield item async def get_clips_downloads( self, @@ -1503,22 +981,7 @@ class TwitchAPIClient(OxideHTTP): ), ) - match req.status_code: - case st.OK: - return s.ClipsDownloads.model_validate(await req.json()) - - case ( - st.BAD_REQUEST - | st.UNAUTHORIZED - | st.FORBIDDEN - | st.INTERNAL_SERVER_ERROR - ): - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.ClipsDownloads) async def get_conduits( self, access_token: str, *, cache_ttl: int | None = None @@ -1529,17 +992,7 @@ class TwitchAPIClient(OxideHTTP): headers=self._auth(access_token), ) - match req.status_code: - case st.OK: - return s.Conduits.model_validate(await req.json()) - - case st.UNAUTHORIZED: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.Conduits) async def create_conduits( self, access_token: str, shard_count: int @@ -1550,17 +1003,7 @@ class TwitchAPIClient(OxideHTTP): json={'shard_count': shard_count}, ) - match req.status_code: - case st.OK: - return s.Conduits.model_validate(await req.json()) - - case st.UNAUTHORIZED: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.Conduits) async def update_conduits( self, access_token: str, conduit_id: str, shard_count: int @@ -1571,17 +1014,7 @@ class TwitchAPIClient(OxideHTTP): json={'shard_count': shard_count, 'id': conduit_id}, ) - match req.status_code: - case st.OK: - return s.Conduits.model_validate(await req.json()) - - case st.BAD_REQUEST | st.UNAUTHORIZED | st.NOT_FOUND: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.Conduits) async def delete_conduit(self, access_token: str, conduit_id: str) -> bool: req = await self.delete( @@ -1590,17 +1023,7 @@ class TwitchAPIClient(OxideHTTP): params={'id': conduit_id}, ) - match req.status_code: - case st.NO_CONTENT: - return True - - case st.BAD_REQUEST | st.UNAUTHORIZED | st.NOT_FOUND: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, None) async def get_conduit_shards( self, @@ -1610,41 +1033,15 @@ class TwitchAPIClient(OxideHTTP): status: str | None = None, cache_ttl: int | None = None, ) -> AsyncGenerator[s.ConduitShard]: - after = None - - while True: - req = await self.get( - '/eventsub/conduits/shards', - cache_ttl=cache_ttl, - headers=self._auth(access_token), - params=self.clean_dict( - {'id': conduit_id, 'status': status, 'after': after} - ), - ) - - match req.status_code: - case st.OK: - json = await req.json() - shards = s.ConduitShards.model_validate(json) - - for shard in shards.data: - yield shard - - if isinstance(shards.pagination, s.Pagination): - after = shards.pagination.cursor - - if after is None: - break - - case st.BAD_REQUEST | st.UNAUTHORIZED | st.NOT_FOUND: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError( - req.status_code, 'Internal Server Error' - ) + async for data in self._paginate( + '/eventsub/conduits/shards', + s.ConduitShards, + self._auth(access_token), + self.clean_dict({'id': conduit_id, 'status': status}), + cache_ttl, + ): + for item in data.data: + yield item async def update_conduit_shards( self, @@ -1658,17 +1055,7 @@ class TwitchAPIClient(OxideHTTP): json={'conduit_id': conduit_id, 'shards': shards}, ) - match req.status_code: - case st.ACCEPTED: - return s.ConduitShards.model_validate(await req.json()) - - case st.BAD_REQUEST | st.UNAUTHORIZED | st.NOT_FOUND: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.ConduitShards) async def get_content_classification_labels( self, @@ -1714,19 +1101,7 @@ class TwitchAPIClient(OxideHTTP): params={'locale': locale}, ) - match req.status_code: - case st.OK: - return s.ContentClassificationLabels.model_validate( - await req.json() - ) - - case st.UNAUTHORIZED: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.ContentClassificationLabels) async def get_drops_entitlements( self, @@ -1739,53 +1114,22 @@ class TwitchAPIClient(OxideHTTP): first: int = 20, cache_ttl: int | None = None, ) -> AsyncGenerator[s.DropEntitlement]: - after = None - - while True: - req = await self.get( - '/entitlements/drops', - cache_ttl=cache_ttl, - headers=self._auth(access_token), - params=self.clean_dict( - { - 'drop_id': drop_id, - 'user_id': user_id, - 'game_id': game_id, - 'fulfillment_status': fulfillment_status, - 'after': after, - 'first': first, - } - ), - ) - - match req.status_code: - case st.OK: - json = await req.json() - entitlements = s.DropsEntitlements.model_validate(json) - - for entitlement in entitlements.data: - yield entitlement - - if isinstance(entitlements.pagination, s.Pagination): - after = entitlements.pagination.cursor - - if after is None: - break - - case ( - st.BAD_REQUEST - | st.UNAUTHORIZED - | st.FORBIDDEN - | st.INTERNAL_SERVER_ERROR - ): - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError( - req.status_code, 'Internal Server Error' - ) + async for data in self._paginate( + '/entitlements/drops', + s.DropsEntitlements, + self._auth(access_token), + self.clean_dict( + { + 'drop_id': drop_id, + 'user_id': user_id, + 'game_id': game_id, + 'fulfillment_status': fulfillment_status, + } + ), + cache_ttl, + ): + for item in data.data: + yield item async def update_drops_entitlements( self, @@ -1805,19 +1149,7 @@ class TwitchAPIClient(OxideHTTP): ), ) - match req.status_code: - case st.OK: - return s.UpdateDropsEntitlements.model_validate( - await req.json() - ) - - case st.BAD_REQUEST | st.UNAUTHORIZED | st.INTERNAL_SERVER_ERROR: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.UpdateDropsEntitlements) async def get_extension_configuration_segment( self, @@ -1841,19 +1173,7 @@ class TwitchAPIClient(OxideHTTP): ), ) - match req.status_code: - case st.OK: - return s.ExtensionConfigurationSegment.model_validate( - await req.json() - ) - - case st.BAD_REQUEST | st.UNAUTHORIZED | st.TOO_MANY_REQUESTS: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.ExtensionConfigurationSegment) async def set_extension_configuration_segment( self, @@ -1879,17 +1199,7 @@ class TwitchAPIClient(OxideHTTP): ), ) - match req.status_code: - case st.NO_CONTENT: - return True - - case st.BAD_REQUEST | st.UNAUTHORIZED: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, None) async def set_extension_required_configurations( self, @@ -1910,17 +1220,7 @@ class TwitchAPIClient(OxideHTTP): }, ) - match req.status_code: - case st.NO_CONTENT: - return True - - case st.BAD_REQUEST | st.UNAUTHORIZED: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, None) async def send_extension_pubsub_message( self, @@ -1944,17 +1244,7 @@ class TwitchAPIClient(OxideHTTP): ), ) - match req.status_code: - case st.NO_CONTENT: - return True - - case st.BAD_REQUEST | st.UNAUTHORIZED | st.UNPROCESSABLE_ENTITY: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, None) async def get_extension_live_channels( self, @@ -1964,45 +1254,19 @@ class TwitchAPIClient(OxideHTTP): first: int | None = None, cache_ttl: int | None = None, ) -> AsyncGenerator[s.ExtensionLiveChannel]: - after = None - - while True: - req = await self.get( - '/extensions/live', - cache_ttl=cache_ttl, - headers=self._auth(access_token), - params=self.clean_dict( - { - 'extension_id': extension_id, - 'first': first, - 'after': after, - } - ), - ) - - match req.status_code: - case st.OK: - json = await req.json() - extensions = s.ExtensionLiveChannels.model_validate(json) - - for extension in extensions.data: - yield extension - - if isinstance(extensions.paginaiton, s.Pagination): - after = extensions.paginaiton.cursor - - if after is None: - break - - case st.BAD_REQUEST | st.UNAUTHORIZED | st.NOT_FOUND: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError( - req.status_code, 'Internal Server Error' - ) + async for data in self._paginate( + '/extensions/live', + s.ExtensionLiveChannels, + self._auth(access_token), + self.clean_dict( + { + 'extension_id': extension_id, + } + ), + cache_ttl, + ): + for item in data.data: + yield item async def get_extension_secrets( self, @@ -2018,17 +1282,7 @@ class TwitchAPIClient(OxideHTTP): params={'extension_id': extension_id}, ) - match req.status_code: - case st.OK: - return s.ExtensionSecrets.model_validate(await req.json()) - - case st.BAD_REQUEST | st.UNAUTHORIZED: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.ExtensionSecrets) async def create_extension_secret( self, jwt_token: str, extension_id: str, delay: int = 300 @@ -2039,17 +1293,7 @@ class TwitchAPIClient(OxideHTTP): params={'extension_id': extension_id, 'delay': delay}, ) - match req.status_code: - case st.OK: - return s.ExtensionSecrets.model_validate(await req.json()) - - case st.BAD_REQUEST | st.UNAUTHORIZED: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.ExtensionSecrets) async def send_extension_chat_message( self, @@ -2070,17 +1314,7 @@ class TwitchAPIClient(OxideHTTP): }, ) - match req.status_code: - case st.NO_CONTENT: - return True - - case st.BAD_REQUEST | st.UNAUTHORIZED: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, None) async def get_extensions( self, @@ -2102,17 +1336,7 @@ class TwitchAPIClient(OxideHTTP): ), ) - match req.status_code: - case st.OK: - return s.Extensions.model_validate(await req.json()) - - case st.BAD_REQUEST | st.UNAUTHORIZED: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.Extensions) async def get_released_extension( self, @@ -2134,17 +1358,7 @@ class TwitchAPIClient(OxideHTTP): ), ) - match req.status_code: - case st.OK: - return s.Extensions.model_validate(await req.json()) - - case st.BAD_REQUEST | st.UNAUTHORIZED | st.NOT_FOUND: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.Extensions) async def get_extension_bits_products( self, @@ -2164,17 +1378,7 @@ class TwitchAPIClient(OxideHTTP): ), ) - match req.status_code: - case st.OK: - return s.ExtensionBitsProducts.model_validate(await req.json()) - - case st.BAD_REQUEST | st.UNAUTHORIZED: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.ExtensionBitsProducts) async def update_extension_bits_product( self, @@ -2205,17 +1409,7 @@ class TwitchAPIClient(OxideHTTP): ), ) - match req.status_code: - case st.NO_CONTENT: - return s.ExtensionBitsProducts.model_validate(await req.json()) - - case st.BAD_REQUEST | st.UNAUTHORIZED: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.ExtensionBitsProducts) async def create_eventsub_subscription( self, @@ -2249,46 +1443,18 @@ class TwitchAPIClient(OxideHTTP): }, ) - match req.status_code: - case st.ACCEPTED: - return s.EventsubBaseSubscriptions.model_validate( - await req.json() - ) - - case ( - st.BAD_REQUEST - | st.UNAUTHORIZED - | st.FORBIDDEN - | st.CONFLICT - | st.TOO_MANY_REQUESTS - ): - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.EventsubBaseSubscriptions) async def delete_eventsub_subscription( self, access_token: str, sub_id: str - ) -> None: + ) -> bool: req = await self.delete( '/eventsub/subscriptions', headers=self._auth(access_token), params={'id': sub_id}, ) - match req.status_code: - case st.NO_CONTENT: - return None - - case st.BAD_REQUEST | st.UNAUTHORIZED | st.NOT_FOUND: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, None) async def get_eventsub_subscriptions( self, @@ -2300,97 +1466,37 @@ class TwitchAPIClient(OxideHTTP): subscription_id: str | None = None, cache_ttl: int | None = None, ) -> AsyncGenerator[tuple[int, sub.Any]]: - after = None - - while True: - req = await self.get( - '/eventsub/subscriptions', - cache_ttl=cache_ttl, - headers=self._auth(access_token), - params=self.clean_dict( - { - 'status': status, - 'type': sub_type, - 'user_id': user_id, - 'subscription_id': subscription_id, - 'after': after, - } - ), - ) - - match req.status_code: - case st.OK: - json = await req.json() - subscriptions = s.EventsubBaseSubscriptions.model_validate( - json - ) - - for subscription in subscriptions.data: - yield subscriptions.total, subscription - - if isinstance(subscriptions.pagination, s.Pagination): - after = subscriptions.pagination.cursor - - if after is None: - break - - case st.BAD_REQUEST | st.UNAUTHORIZED: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError( - req.status_code, 'Internal Server Error' - ) + async for data in self._paginate( + '/eventsub/subscriptions', + s.EventsubBaseSubscriptions, + self._auth(access_token), + { + 'status': status, + 'type': sub_type, + 'user_id': user_id, + 'subscription_id': subscription_id, + }, + cache_ttl, + ): + for item in data.data: + yield data.total, item async def get_top_games( self, access_token: str, *, first: int = 20, - before: str | None = None, cache_ttl: int | None = None, ) -> AsyncGenerator[s.Game]: - after = None - - while True: - req = await self.get( - '/games/top', - cache_ttl=cache_ttl, - headers=self._auth(access_token), - params=self.clean_dict( - { - 'first': first, - 'after': after, - 'before': before, - } - ), - ) - - match req.status_code: - case st.OK: - json = await req.json() - games = s.Games.model_validate(json) - - for game in games.data: - yield game - - if isinstance(games.pagination, s.Pagination): - after = games.pagination.cursor - - if after is None: - break - - case st.BAD_REQUEST | st.UNAUTHORIZED: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError( - req.status_code, 'Internal Server Error' - ) + async for data in self._paginate( + '/games/top', + s.Games, + self._auth(access_token), + {'first': first}, + cache_ttl, + ): + for item in data.data: + yield item async def get_games( self, @@ -2414,17 +1520,7 @@ class TwitchAPIClient(OxideHTTP): ), ) - match req.status_code: - case st.OK: - return s.Games.model_validate(await req.json()) - - case st.BAD_REQUEST | st.UNAUTHORIZED: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.Games) async def get_creator_goals( self, @@ -2440,17 +1536,7 @@ class TwitchAPIClient(OxideHTTP): params={'broadcaster_id': broadcaster_id}, ) - match req.status_code: - case st.OK: - return s.CreatorGoals.model_validate(await req.json()) - - case st.BAD_REQUEST | st.UNAUTHORIZED: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.CreatorGoals) # TODO: implement guest star endpoints @@ -2468,17 +1554,7 @@ class TwitchAPIClient(OxideHTTP): params={'broadcaster_id': broadcaster_id}, ) - match req.status_code: - case st.OK: - return s.HypeTrainStatus.model_validate(await req.json()) - - case st.BAD_REQUEST | st.UNAUTHORIZED | st.INTERNAL_SERVER_ERROR: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.HypeTrainStatus) async def check_automod_status( self, @@ -2494,22 +1570,7 @@ class TwitchAPIClient(OxideHTTP): params={'broadcaster_id': broadcaster_id}, ) - match req.status_code: - case st.OK: - return s.AutomodStatus.model_validate(await req.json()) - - case ( - st.BAD_REQUEST - | st.UNAUTHORIZED - | st.FORBIDDEN - | st.TOO_MANY_REQUESTS - ): - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.AutomodStatus) async def manage_held_automod_messages( self, @@ -2524,19 +1585,7 @@ class TwitchAPIClient(OxideHTTP): params={'action': action, 'msg_id': msg_id, 'user_id': user_id}, ) - match req.status_code: - case st.NO_CONTENT: - return True - - case ( - st.BAD_REQUEST | st.UNAUTHORIZED | st.FORBIDDEN | st.NOT_FOUND - ): - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, None) async def get_automod_settings( self, @@ -2556,17 +1605,7 @@ class TwitchAPIClient(OxideHTTP): }, ) - match req.status_code: - case st.OK: - return s.AutomodSettings.model_validate(await req.json()) - - case st.BAD_REQUEST | st.UNAUTHORIZED | st.FORBIDDEN: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.AutomodSettings) async def update_automod_settings( self, @@ -2604,17 +1643,7 @@ class TwitchAPIClient(OxideHTTP): ), ) - match req.status_code: - case st.NO_CONTENT: - return s.AutomodSettings.model_validate(await req.json()) - - case st.BAD_REQUEST | st.UNAUTHORIZED | st.FORBIDDEN: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.AutomodSettings) async def get_banned_users( self, @@ -2625,46 +1654,19 @@ class TwitchAPIClient(OxideHTTP): first: int = 20, cache_ttl: int | None = None, ) -> AsyncGenerator[s.BannedUser]: - after = None - - while True: - req = await self.get( - '/moderation/banned', - cache_ttl=cache_ttl, - headers=self._auth(access_token), - params=self.clean_dict( - { - 'broadcaster_id': broadcaster_id, - 'user_id': user_id, - 'first': first, - 'after': after, - } - ), - ) - - match req.status_code: - case st.OK: - json = await req.json() - banned = s.BannedUsers.model_validate(json) - - for user in banned.data: - yield user - - if isinstance(banned.pagination, s.Pagination): - after = banned.pagination.cursor - - if after is None: - break - - case st.BAD_REQUEST | st.UNAUTHORIZED: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError( - req.status_code, 'Internal Server Error' - ) + async for data in self._paginate( + '/moderation/banned', + s.BannedUsers, + self._auth(access_token), + { + 'broadcaster_id': broadcaster_id, + 'user_id': user_id, + 'first': first, + }, + cache_ttl, + ): + for item in data.data: + yield item async def ban_user( self, @@ -2692,23 +1694,7 @@ class TwitchAPIClient(OxideHTTP): ), ) - match req.status_code: - case st.OK: - return s.BanUser.model_validate(await req.json()) - - case ( - st.BAD_REQUEST - | st.UNAUTHORIZED - | st.FORBIDDEN - | st.CONFLICT - | st.TOO_MANY_REQUESTS - ): - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.BanUser) async def unban_user( self, @@ -2716,7 +1702,7 @@ class TwitchAPIClient(OxideHTTP): broadcaster_id: int, moderator_id: int, user_id: int, - ) -> None: + ) -> bool: req = await self.delete( '/moderation/bans', headers=self._auth(access_token), @@ -2727,23 +1713,7 @@ class TwitchAPIClient(OxideHTTP): }, ) - match req.status_code: - case st.NO_CONTENT: - return - - case ( - st.BAD_REQUEST - | st.UNAUTHORIZED - | st.FORBIDDEN - | st.CONFLICT - | st.TOO_MANY_REQUESTS - ): - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, None) async def get_unban_requests( self, @@ -2758,48 +1728,21 @@ class TwitchAPIClient(OxideHTTP): first: int | None = None, cache_ttl: int | None = None, ) -> AsyncGenerator[s.UnbanRequest]: - after = None - - while True: - req = await self.get( - '/moderation/unbans', - cache_ttl=cache_ttl, - headers=self._auth(access_token), - params=self.clean_dict( - { - 'broadcaster_id': broadcaster_id, - 'moderator_id': moderator_id, - 'status': status, - 'user_id': user_id, - 'after': after, - 'first': first, - } - ), - ) - - match req.status_code: - case st.OK: - json = await req.json() - unban_requests = s.UnbanRequests.model_validate(json) - - for unban_request in unban_requests.data: - yield unban_request - - if isinstance(unban_requests.pagination, s.Pagination): - after = unban_requests.pagination.cursor - - if after is None: - break - - case st.BAD_REQUEST | st.UNAUTHORIZED: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError( - req.status_code, 'Internal Server Error' - ) + async for data in self._paginate( + '/moderation/unbans', + s.UnbanRequests, + self._auth(access_token), + { + 'broadcaster_id': broadcaster_id, + 'moderator_id': moderator_id, + 'status': status, + 'user_id': user_id, + 'first': first, + }, + cache_ttl, + ): + for item in data.data: + yield item async def resolve_unban_request( self, @@ -2825,17 +1768,7 @@ class TwitchAPIClient(OxideHTTP): ), ) - match req.status_code: - case st.OK: - return s.UnbanRequests.model_validate(await req.json()) - - case st.BAD_REQUEST | st.UNAUTHORIZED | st.NOT_FOUND: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.UnbanRequests) async def get_blocked_terms( self, @@ -2846,46 +1779,19 @@ class TwitchAPIClient(OxideHTTP): first: int | None = None, cache_ttl: int | None = None, ) -> AsyncGenerator[s.BlockedTerm]: - after = None - - while True: - req = await self.get( - '/moderation/blocked_terms', - cache_ttl=cache_ttl, - headers=self._auth(access_token), - params=self.clean_dict( - { - 'broadcaster_id': broadcaster_id, - 'moderator_id': moderator_id, - 'first': first, - 'after': after, - } - ), - ) - - match req.status_code: - case st.OK: - json = await req.json() - blocked_terms = s.BlockedTerms.model_validate(json) - - for blocked_term in blocked_terms.data: - yield blocked_term - - if isinstance(blocked_terms.pagination, s.Pagination): - after = blocked_terms.pagination.cursor - - if after is None: - break - - case st.BAD_REQUEST | st.UNAUTHORIZED | st.FORBIDDEN: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError( - req.status_code, 'Internal Server Error' - ) + async for data in self._paginate( + '/moderation/blocked_terms', + s.BlockedTerms, + self._auth(access_token), + { + 'broadcaster_id': broadcaster_id, + 'moderator_id': moderator_id, + 'first': first, + }, + cache_ttl, + ): + for item in data.data: + yield item async def add_blocked_term( self, @@ -2904,17 +1810,7 @@ class TwitchAPIClient(OxideHTTP): }, ) - match req.status_code: - case st.OK: - return s.BlockedTerms.model_validate(await req.json()) - - case st.BAD_REQUEST | st.UNAUTHORIZED | st.FORBIDDEN: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.BlockedTerms) async def remove_blocked_term( self, @@ -2933,17 +1829,7 @@ class TwitchAPIClient(OxideHTTP): }, ) - match req.status_code: - case st.NO_CONTENT: - return True - - case st.BAD_REQUEST | st.UNAUTHORIZED | st.FORBIDDEN: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, None) async def delete_chat_message( self, @@ -2965,19 +1851,7 @@ class TwitchAPIClient(OxideHTTP): ), ) - match req.status_code: - case st.NO_CONTENT: - return True - - case ( - st.BAD_REQUEST | st.UNAUTHORIZED | st.FORBIDDEN | st.NOT_FOUND - ): - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, None) async def get_moderated_channels( self, @@ -2987,47 +1861,18 @@ class TwitchAPIClient(OxideHTTP): first: int | None = None, cache_ttl: int | None = None, ) -> AsyncGenerator[s.ModeratedChannel]: - after = None - - while True: - req = await self.get( - '/moderation/channels', - cache_ttl=cache_ttl, - headers=self._auth(access_token), - params=self.clean_dict( - { - 'user_id': user_id, - 'after': after, - 'first': first, - } - ), - ) - - match req.status_code: - case st.OK: - json = await req.json() - moderated_channels = s.ModeratedChannels.model_validate( - json - ) - - for moderated_channel in moderated_channels.data: - yield moderated_channel - - if isinstance(moderated_channels.pagination, s.Pagination): - after = moderated_channels.pagination.cursor - - if after is None: - break - - case st.BAD_REQUEST | st.UNAUTHORIZED: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError( - req.status_code, 'Internal Server Error' - ) + async for data in self._paginate( + '/moderation/channels', + s.ModeratedChannels, + self._auth(access_token), + { + 'user_id': user_id, + 'first': first, + }, + cache_ttl, + ): + for item in data.data: + yield item async def get_moderators( self, @@ -3038,46 +1883,21 @@ class TwitchAPIClient(OxideHTTP): first: int = 20, cache_ttl: int | None = None, ) -> AsyncGenerator[s.Moderator]: - after = None - - while True: - req = await self.get( - '/moderation/moderators', - cache_ttl=cache_ttl, - headers=self._auth(access_token), - params=self.clean_dict( - { - 'broadcaster_id': broadcaster_id, - 'user_id': user_id, - 'first': first, - 'after': after, - } - ), - ) - - match req.status_code: - case st.OK: - json = await req.json() - moderators = s.Moderators.model_validate(json) - - for moderator in moderators.data: - yield moderator - - if isinstance(moderators.pagination, s.Pagination): - after = moderators.pagination.cursor - - if after is None: - break - - case st.BAD_REQUEST | st.UNAUTHORIZED: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError( - req.status_code, 'Internal Server Error' - ) + async for data in self._paginate( + '/moderation/moderators', + s.Moderators, + self._auth(access_token), + self.clean_dict( + { + 'broadcaster_id': broadcaster_id, + 'user_id': user_id, + 'first': first, + } + ), + cache_ttl, + ): + for item in data.data: + yield item async def add_channel_moderator( self, access_token: str, broadcaster_id: int, user_id: int @@ -3088,22 +1908,7 @@ class TwitchAPIClient(OxideHTTP): params={'broadcaster_id': broadcaster_id, 'user_id': user_id}, ) - match req.status_code: - case st.NO_CONTENT: - return True - - case ( - st.BAD_REQUEST - | st.UNAUTHORIZED - | st.UNPROCESSABLE_ENTITY - | st.TOO_MANY_REQUESTS - ): - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, None) async def remove_channel_moderator( self, access_token: str, broadcaster_id: int, user_id: int @@ -3114,17 +1919,7 @@ class TwitchAPIClient(OxideHTTP): params={'broadcaster_id': broadcaster_id, 'user_id': user_id}, ) - match req.status_code: - case st.NO_CONTENT: - return True - - case st.BAD_REQUEST | st.UNAUTHORIZED | st.TOO_MANY_REQUESTS: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, None) async def get_vips( self, @@ -3135,46 +1930,19 @@ class TwitchAPIClient(OxideHTTP): first: int = 20, cache_ttl: int | None = None, ) -> AsyncGenerator[s.VIP]: - after = None - - while True: - req = await self.get( - '/channels/vips', - cache_ttl=cache_ttl, - headers=self._auth(access_token), - params=self.clean_dict( - { - 'user_id': user_id, - 'broadcaster_id': broadcaster_id, - 'first': first, - 'after': after, - } - ), - ) - - match req.status_code: - case st.OK: - json = await req.json() - vips = s.VIPs.model_validate(json) - - for vip in vips.data: - yield vip - - if isinstance(vips.pagination, s.Pagination): - after = vips.pagination.cursor - - if after is None: - break - - case st.BAD_REQUEST | st.UNAUTHORIZED: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError( - req.status_code, 'Internal Server Error' - ) + async for data in self._paginate( + '/channels/vips', + s.VIPs, + self._auth(access_token), + { + 'user_id': user_id, + 'broadcaster_id': broadcaster_id, + 'first': first, + }, + cache_ttl, + ): + for item in data.data: + yield item async def add_channel_vip( self, access_token: str, broadcaster_id: int, user_id: int @@ -3185,25 +1953,7 @@ class TwitchAPIClient(OxideHTTP): params={'broadcaster_id': broadcaster_id, 'user_id': user_id}, ) - match req.status_code: - case st.NO_CONTENT: - return True - - case ( - st.BAD_REQUEST - | st.UNAUTHORIZED - | st.FORBIDDEN - | st.NOT_FOUND - | st.CONFLICT - | st.UNPROCESSABLE_ENTITY - | st.TOO_MANY_REQUESTS - ): - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, None) async def remove_channel_vip( self, access_token: str, broadcaster_id: int, user_id: int @@ -3214,24 +1964,7 @@ class TwitchAPIClient(OxideHTTP): params={'broadcaster_id': broadcaster_id, 'user_id': user_id}, ) - match req.status_code: - case st.NO_CONTENT: - return True - - case ( - st.BAD_REQUEST - | st.UNAUTHORIZED - | st.FORBIDDEN - | st.NOT_FOUND - | st.UNPROCESSABLE_ENTITY - | st.TOO_MANY_REQUESTS - ): - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, None) async def update_shield_mode_status( self, @@ -3251,17 +1984,7 @@ class TwitchAPIClient(OxideHTTP): }, ) - match req.status_code: - case st.OK: - return s.ShieldModeStatus.model_validate(await req.json()) - - case st.BAD_REQUEST | st.UNAUTHORIZED | st.FORBIDDEN: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.ShieldModeStatus) async def get_shield_mode_status( self, @@ -3281,17 +2004,7 @@ class TwitchAPIClient(OxideHTTP): }, ) - match req.status_code: - case st.OK: - return s.ShieldModeStatus.model_validate(await req.json()) - - case st.BAD_REQUEST | st.UNAUTHORIZED | st.FORBIDDEN: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.ShieldModeStatus) async def warn_chat_user( self, @@ -3316,24 +2029,7 @@ class TwitchAPIClient(OxideHTTP): }, ) - match req.status_code: - case st.OK: - return s.UserWarns.model_validate(await req.json()) - - case ( - st.BAD_REQUEST - | st.UNAUTHORIZED - | st.FORBIDDEN - | st.CONFLICT - | st.TOO_MANY_REQUESTS - | st.INTERNAL_SERVER_ERROR - ): - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.UserWarns) async def get_polls( self, @@ -3344,46 +2040,21 @@ class TwitchAPIClient(OxideHTTP): first: int = 20, cache_ttl: int | None = None, ) -> AsyncGenerator[s.Poll]: - after = None - - while True: - req = await self.get( - '/polls', - cache_ttl=cache_ttl, - headers=self._auth(access_token), - params=self.clean_dict( - { - 'broadcaster_id': broadcaster_id, - 'id': poll_id, - 'first': first, - 'after': after, - } - ), - ) - - match req.status_code: - case st.OK: - json = await req.json() - polls = s.Polls.model_validate(json) - - for poll in polls.data: - yield poll - - if isinstance(polls.pagination, s.Pagination): - after = polls.pagination.cursor - - if after is None: - break - - case st.BAD_REQUEST | st.UNAUTHORIZED | st.NOT_FOUND: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError( - req.status_code, 'Internal Server Error' - ) + async for data in self._paginate( + '/polls', + s.Polls, + self._auth(access_token), + self.clean_dict( + { + 'broadcaster_id': broadcaster_id, + 'id': poll_id, + 'first': first, + } + ), + cache_ttl, + ): + for item in data.data: + yield item async def create_poll( self, @@ -3414,17 +2085,7 @@ class TwitchAPIClient(OxideHTTP): json=body, ) - match req.status_code: - case st.OK: - return s.Polls.model_validate(await req.json()) - - case st.BAD_REQUEST | st.UNAUTHORIZED: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.Polls) async def end_poll( self, @@ -3443,17 +2104,7 @@ class TwitchAPIClient(OxideHTTP): }, ) - match req.status_code: - case st.OK: - return s.Polls.model_validate(await req.json()) - - case st.BAD_REQUEST | st.UNAUTHORIZED: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.Polls) async def get_predictions( self, @@ -3464,46 +2115,21 @@ class TwitchAPIClient(OxideHTTP): first: int = 20, cache_ttl: int | None = None, ) -> AsyncGenerator[s.Prediction]: - after = None - - while True: - req = await self.get( - '/predictions', - cache_ttl=cache_ttl, - headers=self._auth(access_token), - params=self.clean_dict( - { - 'broadcaster_id': broadcaster_id, - 'id': prediction_id, - 'first': first, - 'after': after, - } - ), - ) - - match req.status_code: - case st.OK: - json = await req.json() - predictions = s.Predictions.model_validate(json) - - for prediction in predictions.data: - yield prediction - - if isinstance(predictions.pagination, s.Pagination): - after = predictions.pagination.cursor - - if after is None: - break - - case st.BAD_REQUEST | st.UNAUTHORIZED: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError( - req.status_code, 'Internal Server Error' - ) + async for data in self._paginate( + '/predictions', + s.Predictions, + self._auth(access_token), + self.clean_dict( + { + 'broadcaster_id': broadcaster_id, + 'id': prediction_id, + 'first': first, + } + ), + cache_ttl, + ): + for item in data.data: + yield item async def create_prediction( self, @@ -3527,17 +2153,7 @@ class TwitchAPIClient(OxideHTTP): json=body, ) - match req.status_code: - case st.OK: - return s.Predictions.model_validate(await req.json()) - - case st.BAD_REQUEST | st.UNAUTHORIZED | st.TOO_MANY_REQUESTS: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.Predictions) async def end_prediction( self, @@ -3561,17 +2177,7 @@ class TwitchAPIClient(OxideHTTP): ), ) - match req.status_code: - case st.OK: - return s.Predictions.model_validate(await req.json()) - - case st.BAD_REQUEST | st.UNAUTHORIZED | st.NOT_FOUND: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.Predictions) async def start_a_raid( self, @@ -3588,23 +2194,7 @@ class TwitchAPIClient(OxideHTTP): }, ) - match req.status_code: - case st.OK: - return s.Raids.model_validate(await req.json()) - - case ( - st.BAD_REQUEST - | st.UNAUTHORIZED - | st.NOT_FOUND - | st.CONFLICT - | st.TOO_MANY_REQUESTS - ): - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.Raids) async def cancel_a_raid( self, access_token: str, broadcaster_id: int @@ -3615,22 +2205,7 @@ class TwitchAPIClient(OxideHTTP): params={'broadcaster_id': broadcaster_id}, ) - match req.status_code: - case st.NO_CONTENT: - return True - - case ( - st.BAD_REQUEST - | st.UNAUTHORIZED - | st.NOT_FOUND - | st.TOO_MANY_REQUESTS - ): - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, None) async def get_channel_stream_schedule( self, @@ -3642,52 +2217,22 @@ class TwitchAPIClient(OxideHTTP): first: int = 20, cache_ttl: int | None = None, ) -> AsyncGenerator[s.Schedule]: - after = None - - while True: - req = await self.get( - '/schedule', - cache_ttl=cache_ttl, - headers=self._auth(access_token), - params=self.clean_dict( - { - 'broadcaster_id': broadcaster_id, - 'id': segment_id, - 'start_time': start_time, - 'first': first, - 'after': after, - } - ), - ) - - match req.status_code: - case st.OK: - json = await req.json() - schedules = s.Schedules.model_validate(json) - - for schedule in schedules.data: - yield schedule - - if isinstance(schedules.pagination, s.Pagination): - after = schedules.pagination.cursor - - if after is None: - break - - case ( - st.BAD_REQUEST - | st.UNAUTHORIZED - | st.FORBIDDEN - | st.NOT_FOUND - ): - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError( - req.status_code, 'Internal Server Error' - ) + async for data in self._paginate( + '/schedule', + s.Schedules, + self._auth(access_token), + self.clean_dict( + { + 'broadcaster_id': broadcaster_id, + 'id': segment_id, + 'start_time': start_time, + 'first': first, + } + ), + cache_ttl, + ): + for item in data.data: + yield item async def get_channel_icalendar( self, @@ -3740,17 +2285,7 @@ class TwitchAPIClient(OxideHTTP): ), ) - match req.status_code: - case st.NO_CONTENT: - return True - - case st.BAD_REQUEST | st.UNAUTHORIZED | st.NOT_FOUND: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, None) async def create_channel_stream_schedule_segment( self, @@ -3780,17 +2315,7 @@ class TwitchAPIClient(OxideHTTP): ), ) - match req.status_code: - case st.OK: - return s.Schedules.model_validate(await req.json()) - - case st.BAD_REQUEST | st.UNAUTHORIZED | st.FORBIDDEN: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.Schedules) async def update_channel_stream_schedule_segment( self, @@ -3826,17 +2351,7 @@ class TwitchAPIClient(OxideHTTP): ), ) - match req.status_code: - case st.OK: - return s.Schedules.model_validate(await req.json()) - - case st.BAD_REQUEST | st.UNAUTHORIZED | st.NOT_FOUND: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.Schedules) async def delete_channel_stream_schedule_segment( self, access_token: str, broadcaster_id: int, segment_id: str @@ -3852,17 +2367,7 @@ class TwitchAPIClient(OxideHTTP): ), ) - match req.status_code: - case st.NO_CONTENT: - return True - - case st.BAD_REQUEST | st.UNAUTHORIZED: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, None) async def search_categories( self, @@ -3872,41 +2377,15 @@ class TwitchAPIClient(OxideHTTP): first: int = 20, cache_ttl: int | None = None, ) -> AsyncGenerator[s.Category]: - after = None - - while True: - req = await self.get( - '/search/categories', - cache_ttl=cache_ttl, - headers=self._auth(access_token), - params=self.clean_dict( - {'query': query, 'first': first, 'after': after} - ), - ) - - match req.status_code: - case st.OK: - json = await req.json() - categories = s.Categories.model_validate(json) - - for category in categories.data: - yield category - - if isinstance(categories.pagination, s.Pagination): - after = categories.pagination.cursor - - if not after: - break - - case st.BAD_REQUEST | st.UNAUTHORIZED: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError( - req.status_code, 'Internal Server Error' - ) + async for data in self._paginate( + '/search/categories', + s.Categories, + self._auth(access_token), + {'query': query, 'first': first}, + cache_ttl, + ): + for item in data.data: + yield item async def search_channels( self, @@ -3917,46 +2396,17 @@ class TwitchAPIClient(OxideHTTP): first: int = 20, cache_ttl: int | None = None, ) -> AsyncGenerator[s.Channel]: - after = None - - while True: - req = await self.get( - '/search/channels', - cache_ttl=cache_ttl, - headers=self._auth(access_token), - params=self.clean_dict( - { - 'query': query, - 'live_only': live_only, - 'first': first, - 'after': after, - } - ), - ) - - match req.status_code: - case st.OK: - json = await req.json() - channels = s.Channels.model_validate(json) - - for channel in channels.data: - yield channel - - if isinstance(channels.pagination, s.Pagination): - after = channels.pagination.cursor - - if not after: - break - - case st.BAD_REQUEST | st.UNAUTHORIZED: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError( - req.status_code, 'Internal Server Error' - ) + async for data in self._paginate( + '/search/channels', + s.Channels, + self._auth(access_token), + self.clean_dict( + {'query': query, 'live_only': live_only, 'first': first} + ), + cache_ttl, + ): + for item in data.data: + yield item async def get_stream_key( self, @@ -3972,17 +2422,7 @@ class TwitchAPIClient(OxideHTTP): params={'broadcaster_id': broadcaster_id}, ) - match req.status_code: - case st.OK: - return s.StreamKeys.model_validate(await req.json()) - - case st.BAD_REQUEST | st.UNAUTHORIZED | st.FORBIDDEN: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.StreamKeys) async def get_streams( self, @@ -3997,50 +2437,25 @@ class TwitchAPIClient(OxideHTTP): before: str | None = None, cache_ttl: int | None = None, ) -> AsyncGenerator[s.Stream]: - after = None - - while True: - req = await self.get( - '/streams', - cache_ttl=cache_ttl, - headers=self._auth(access_token), - params=self.clean_dict( - { - 'user_id': user_id, - 'user_login': user_login, - 'game_id': game_id, - 'type': stream_type, - 'language': language, - 'first': first, - 'before': before, - 'after': after, - } - ), - ) - - match req.status_code: - case st.OK: - json = await req.json() - streams = s.Streams.model_validate(json) - - for stream in streams.data: - yield stream - - if isinstance(streams.pagination, s.Pagination): - after = streams.pagination.cursor - - if not after: - break - - case st.BAD_REQUEST | st.UNAUTHORIZED: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError( - req.status_code, 'Internal Server Error' - ) + async for data in self._paginate( + '/streams', + s.Streams, + self._auth(access_token), + self.clean_dict( + { + 'user_id': user_id, + 'user_login': user_login, + 'game_id': game_id, + 'type': stream_type, + 'language': language, + 'first': first, + 'before': before, + } + ), + cache_ttl, + ): + for item in data.data: + yield item async def get_followed_streams( self, @@ -4050,45 +2465,18 @@ class TwitchAPIClient(OxideHTTP): first: int = 20, cache_ttl: int | None = None, ) -> AsyncGenerator[s.Stream]: - after = None - - while True: - req = await self.get( - '/streams/followed', - cache_ttl=cache_ttl, - headers=self._auth(access_token), - params=self.clean_dict( - { - 'user_id': user_id, - 'first': first, - 'after': after, - } - ), - ) - - match req.status_code: - case st.OK: - json = await req.json() - streams = s.Streams.model_validate(json) - - for stream in streams.data: - yield stream - - if isinstance(streams.pagination, s.Pagination): - after = streams.pagination.cursor - - if not after: - break - - case st.BAD_REQUEST | st.UNAUTHORIZED: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError( - req.status_code, 'Internal Server Error' - ) + async for data in self._paginate( + '/streams/followed', + s.Streams, + self._auth(access_token), + { + 'user_id': user_id, + 'first': first, + }, + cache_ttl, + ): + for item in data.data: + yield item async def create_stream_marker( self, @@ -4108,19 +2496,7 @@ class TwitchAPIClient(OxideHTTP): ), ) - match req.status_code: - case st.OK: - return s.CreateStreamMarkers.model_validate(await req.json()) - - case ( - st.BAD_REQUEST | st.UNAUTHORIZED | st.FORBIDDEN | st.NOT_FOUND - ): - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.CreateStreamMarkers) async def get_stream_markers( self, @@ -4132,52 +2508,22 @@ class TwitchAPIClient(OxideHTTP): before: str | None = None, cache_ttl: int | None = None, ) -> AsyncGenerator[s.StreamMarkersData]: - after = None - - while True: - req = await self.get( - '/streams/markers', - cache_ttl=cache_ttl, - headers=self._auth(access_token), - params=self.clean_dict( - { - 'user_id': user_id, - 'video_id': video_id, - 'first': first, - 'before': before, - 'after': after, - } - ), - ) - - match req.status_code: - case st.OK: - json = await req.json() - markers = s.StreamMarkers.model_validate(json) - - for marker in markers.data: - yield marker - - if isinstance(markers.pagination, s.Pagination): - after = markers.pagination.cursor - - if not after: - break - - case ( - st.BAD_REQUEST - | st.UNAUTHORIZED - | st.FORBIDDEN - | st.NOT_FOUND - ): - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError( - req.status_code, 'Internal Server Error' - ) + async for data in self._paginate( + '/streams/markers', + s.StreamMarkers, + self._auth(access_token), + self.clean_dict( + { + 'user_id': user_id, + 'video_id': video_id, + 'first': first, + 'before': before, + } + ), + cache_ttl, + ): + for item in data.data: + yield item async def get_broadcaster_subscriptions( self, @@ -4188,52 +2534,21 @@ class TwitchAPIClient(OxideHTTP): before: str | None = None, cache_ttl: int | None = None, ) -> AsyncGenerator[tuple[int, int, s.BroadcasterSubscription]]: - after = None - - while True: - req = await self.get( - '/subscriptions', - cache_ttl=cache_ttl, - headers=self._auth(access_token), - params=self.clean_dict( - { - 'broadcaster_id': broadcaster_id, - 'first': first, - 'after': after, - 'before': before, - } - ), - ) - - match req.status_code: - case st.OK: - json = await req.json() - subscriptions = s.BroadcasterSubscriptions.model_validate( - json - ) - - for subscription in subscriptions.data: - yield ( - subscriptions.total, - subscriptions.points, - subscription, - ) - - if isinstance(subscriptions.pagination, s.Pagination): - after = subscriptions.pagination.cursor - - if not after: - break - - case st.BAD_REQUEST | st.UNAUTHORIZED: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError( - req.status_code, 'Internal Server Error' - ) + async for data in self._paginate( + '/subscriptions', + s.BroadcasterSubscriptions, + self._auth(access_token), + self.clean_dict( + { + 'broadcaster_id': broadcaster_id, + 'first': first, + 'before': before, + } + ), + cache_ttl, + ): + for item in data.data: + yield data.total, data.points, item async def check_user_subscription( self, @@ -4255,17 +2570,7 @@ class TwitchAPIClient(OxideHTTP): ), ) - match req.status_code: - case st.OK: - return s.UserSubscription.model_validate(await req.json()) - - case st.BAD_REQUEST | st.UNAUTHORIZED | st.NOT_FOUND: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.UserSubscription) async def get_channel_teams( self, @@ -4281,17 +2586,7 @@ class TwitchAPIClient(OxideHTTP): params={'broadcaster_id': broadcaster_id}, ) - match req.status_code: - case st.OK: - return s.ChannelTeams.model_validate(await req.json()) - - case st.BAD_REQUEST | st.UNAUTHORIZED | st.NOT_FOUND: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.ChannelTeams) async def get_teams( self, @@ -4308,17 +2603,7 @@ class TwitchAPIClient(OxideHTTP): params=self.clean_dict({'name': name, 'id': team_id}), ) - match req.status_code: - case st.OK: - return s.Teams.model_validate(await req.json()) - - case st.BAD_REQUEST | st.UNAUTHORIZED | st.NOT_FOUND: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.Teams) async def get_users( self, @@ -4335,17 +2620,7 @@ class TwitchAPIClient(OxideHTTP): params=self.clean_dict({'id': user_id, 'login': login}), ) - match req.status_code: - case st.OK: - return s.Users.model_validate(await req.json()) - - case st.BAD_REQUEST | st.UNAUTHORIZED: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.Users) async def update_user( self, access_token: str, *, description: str = '' @@ -4356,17 +2631,7 @@ class TwitchAPIClient(OxideHTTP): json={'description': description}, ) - match req.status_code: - case st.OK: - return s.Users.model_validate(await req.json()) - - case st.BAD_REQUEST | st.UNAUTHORIZED | st.TOO_MANY_REQUESTS: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.Users) async def get_authorization_by_user( self, @@ -4382,22 +2647,7 @@ class TwitchAPIClient(OxideHTTP): params={'user_id': user_id}, ) - match req.status_code: - case st.OK: - return s.AuthorizationsByUser.model_validate(await req.json()) - - case ( - st.BAD_REQUEST - | st.UNAUTHORIZED - | st.FORBIDDEN - | st.INTERNAL_SERVER_ERROR - ): - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.AuthorizationsByUser) async def get_user_block_list( self, @@ -4407,45 +2657,15 @@ class TwitchAPIClient(OxideHTTP): first: int = 20, cache_ttl: int | None = None, ) -> AsyncGenerator[s.UserBlock]: - after = None - - while True: - req = await self.get( - '/users/blocks', - cache_ttl=cache_ttl, - headers=self._auth(access_token), - params=self.clean_dict( - { - 'broadcaster_id': broadcaster_id, - 'first': first, - 'after': after, - } - ), - ) - - match req.status_code: - case st.OK: - json = await req.json() - user_block_list = s.UserBlockList.model_validate(json) - - for user in user_block_list.data: - yield user - - if isinstance(user_block_list.pagination, s.Pagination): - after = user_block_list.pagination.cursor - - if after is None: - break - - case st.BAD_REQUEST | st.UNAUTHORIZED: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError( - req.status_code, 'Internal Server Error' - ) + async for data in self._paginate( + '/users/blocks', + s.UserBlockList, + self._auth(access_token), + {'broadcaster_id': broadcaster_id, 'first': first}, + cache_ttl, + ): + for item in data.data: + yield item async def block_user( self, @@ -4467,17 +2687,7 @@ class TwitchAPIClient(OxideHTTP): ), ) - match req.status_code: - case st.NO_CONTENT: - return True - - case st.BAD_REQUEST | st.UNAUTHORIZED: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, None) async def unblock_user( self, access_token: str, target_user_id: int @@ -4488,17 +2698,7 @@ class TwitchAPIClient(OxideHTTP): params={'target_user_id': target_user_id}, ) - match req.status_code: - case st.NO_CONTENT: - return True - - case st.BAD_REQUEST | st.UNAUTHORIZED: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, None) async def get_user_extensions( self, access_token: str, *, cache_ttl: int | None = None @@ -4509,17 +2709,7 @@ class TwitchAPIClient(OxideHTTP): headers=self._auth(access_token), ) - match req.status_code: - case st.OK: - return s.UserExtensions.model_validate(await req.json()) - - case st.UNAUTHORIZED: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.UserExtensions) async def get_user_active_extensions( self, @@ -4535,17 +2725,7 @@ class TwitchAPIClient(OxideHTTP): params={'user_id': user_id}, ) - match req.status_code: - case st.OK: - return s.UserActiveExtensions.model_validate(await req.json()) - - case st.BAD_REQUEST | st.UNAUTHORIZED: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.UserActiveExtensions) async def update_user_extensions( self, access_token: str, extensions: s.UserActiveExtensions @@ -4556,17 +2736,7 @@ class TwitchAPIClient(OxideHTTP): json=extensions.model_dump(), ) - match req.status_code: - case st.NO_CONTENT: - return True - - case st.BAD_REQUEST | st.UNAUTHORIZED | st.NOT_FOUND: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, None) async def get_videos( self, @@ -4582,51 +2752,26 @@ class TwitchAPIClient(OxideHTTP): first: int = 20, cache_ttl: int | None = None, ) -> AsyncGenerator[s.Video]: - after = None - - while True: - req = await self.get( - '/videos', - cache_ttl=cache_ttl, - headers=self._auth(access_token), - params=self.clean_dict( - { - 'video_id': video_id, - 'user_id': user_id, - 'game_id': game_id, - 'language': language, - 'period': period, - 'sort': sort, - 'video_type': video_type, - 'first': first, - 'after': after, - } - ), - ) - - match req.status_code: - case st.OK: - json = await req.json() - videos = s.Videos.model_validate(json) - - for video in videos.data: - yield video - - if isinstance(videos.pagination, s.Pagination): - after = videos.pagination.cursor - - if after is None: - break - - case st.BAD_REQUEST | st.UNAUTHORIZED | st.NOT_FOUND: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError( - req.status_code, 'Internal Server Error' - ) + async for data in self._paginate( + '/videos', + s.Videos, + self._auth(access_token), + self.clean_dict( + { + 'video_id': video_id, + 'user_id': user_id, + 'game_id': game_id, + 'language': language, + 'period': period, + 'sort': sort, + 'video_type': video_type, + 'first': first, + } + ), + cache_ttl, + ): + for item in data.data: + yield item async def delete_videos( self, access_token: str, video_id: int | list[int] @@ -4637,17 +2782,7 @@ class TwitchAPIClient(OxideHTTP): params={'id': video_id}, ) - match req.status_code: - case st.OK: - return s.DeleteVideos.model_validate(await req.json()) - - case st.BAD_REQUEST | st.UNAUTHORIZED: - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, s.DeleteVideos) async def send_whisper( self, @@ -4663,20 +2798,4 @@ class TwitchAPIClient(OxideHTTP): json={'message': message}, ) - match req.status_code: - case st.NO_CONTENT: - return True - - case ( - st.BAD_REQUEST - | st.UNAUTHORIZED - | st.FORBIDDEN - | st.NOT_FOUND - | st.TOO_MANY_REQUESTS - ): - raise s.ClientError( - req.status_code, (await req.json())['message'] - ) - - case _: - raise s.InternalError(req.status_code, 'Internal Server Error') + return await self._process(req, None) diff --git a/src/oxidetwitch/schema.py b/src/oxidetwitch/schema.py index 6d627e1..7a9c1f2 100644 --- a/src/oxidetwitch/schema.py +++ b/src/oxidetwitch/schema.py @@ -33,6 +33,14 @@ class Pagination(BaseSchema): cursor: str +class BasePaginated(BaseSchema): + pagination: Pagination | dict[Any, Any] | None = None + + +class PaginatedSchema[T](BasePaginated): + data: list[T] + + class AppAccessToken(BaseSchema): access_token: str expires_in: int @@ -100,9 +108,8 @@ class ExtensionAnalyticsData(BaseSchema): date_range: DateRange -class ExtensionAnalytics(BaseSchema): +class ExtensionAnalytics(PaginatedSchema[ExtensionAnalyticsData]): data: list[ExtensionAnalyticsData] - pagination: Pagination | dict[Any, Any] | None = None class GameAnalyticsData(BaseSchema): @@ -112,9 +119,8 @@ class GameAnalyticsData(BaseSchema): date_range: DateRange -class GameAnalytics(BaseSchema): +class GameAnalytics(PaginatedSchema[GameAnalyticsData]): data: list[GameAnalyticsData] - pagination: Pagination | dict[Any, Any] | None = None class BitsLeaderboardData(BaseSchema): @@ -213,9 +219,8 @@ class ExtensionTransactionsData(BaseSchema): product_data: ExtensionProductData -class ExtensionTransactions(BaseSchema): +class ExtensionTransactions(PaginatedSchema[ExtensionTransactionsData]): data: list[ExtensionTransactionsData] - pagination: Pagination | dict[Any, Any] | None = None class ContentClassificationLabel(TypedDict): @@ -265,9 +270,8 @@ class FollowedChannel(BaseSchema): followed_at: datetime -class FollowedChannels(BaseSchema): +class FollowedChannels(PaginatedSchema[FollowedChannel]): data: list[FollowedChannel] - pagination: Pagination | dict[Any, Any] | None = None total: int @@ -278,9 +282,8 @@ class ChannelFollower(BaseSchema): followed_at: datetime -class ChannelFollowers(BaseSchema): +class ChannelFollowers(PaginatedSchema[ChannelFollower]): data: list[ChannelFollower] - pagination: Pagination | dict[Any, Any] | None = None total: int @@ -353,9 +356,8 @@ class CustomRewardRedemption(BaseSchema): reward: CustomRewardRedemptionReward -class CustomRewardRedemptions(BaseSchema): +class CustomRewardRedemptions(PaginatedSchema[CustomRewardRedemption]): data: list[CustomRewardRedemption] - pagination: Pagination | dict[Any, Any] | None = None class CharityCampaignCurrentAmount(BaseSchema): @@ -402,9 +404,8 @@ class CharityDonation(BaseSchema): amount: CharityDonationAmount -class CharityDonations(BaseSchema): +class CharityDonations(PaginatedSchema[CharityDonation]): data: list[CharityDonation] - pagination: Pagination | dict[Any, Any] | None = None class ChattersData(BaseSchema): @@ -413,9 +414,8 @@ class ChattersData(BaseSchema): user_name: str -class Chatters(BaseSchema): +class Chatters(PaginatedSchema[ChattersData]): data: list[ChattersData] - pagination: Pagination | dict[Any, Any] | None = None total: int @@ -522,8 +522,8 @@ class SharedChatSession(BaseSchema): data: list[SharedChatSessionData] -class UserEmotes(ChannelEmotes): - pagination: Pagination | dict[Any, Any] | None = None +class UserEmotes(PaginatedSchema[ChannelEmote], ChannelEmotes): + pass class MessageDropReason(BaseSchema): @@ -572,9 +572,8 @@ class Clip(BaseSchema): is_featured: bool -class Clips(BaseSchema): +class Clips(PaginatedSchema[Clip]): data: list[Clip] - pagination: Pagination | dict[Any, Any] | None = None class ClipDownload(BaseSchema): @@ -626,9 +625,8 @@ class ConduitShard(BaseSchema): transport: ConduitShardTransportWebhook | ConduitShardTransportWebsocket -class ConduitShards(BaseSchema): +class ConduitShards(PaginatedSchema[ConduitShard]): data: list[ConduitShard] - pagination: Pagination | dict[Any, Any] | None = None class UpdateConduitShardTransportWebhook(TypedDict): @@ -688,9 +686,8 @@ class DropEntitlement(BaseSchema): last_updated: datetime -class DropsEntitlements(BaseSchema): +class DropsEntitlements(PaginatedSchema[DropEntitlement]): data: list[DropEntitlement] - pagination: Pagination | dict[Any, Any] | None = None class UpdateDropsEntitlementsData(BaseSchema): @@ -723,9 +720,8 @@ class ExtensionLiveChannel(BaseSchema): title: str -class ExtensionLiveChannels(BaseSchema): +class ExtensionLiveChannels(PaginatedSchema[ExtensionLiveChannel]): data: list[ExtensionLiveChannel] - paginaiton: Pagination | dict[Any, Any] | None = None class ExtensionSecret(BaseSchema): @@ -838,12 +834,11 @@ class ExtensionBitsProducts(BaseSchema): data: list[ExtensionBitsProduct] -class EventsubBaseSubscriptions(BaseSchema): +class EventsubBaseSubscriptions(PaginatedSchema[sub.Any]): data: list[sub.Any] total: int total_cost: int max_total_cost: int - pagination: Pagination | dict[Any, Any] | None = None class Game(BaseSchema): @@ -853,9 +848,8 @@ class Game(BaseSchema): igdb_id: int | str -class Games(BaseSchema): +class Games(PaginatedSchema[Game]): data: list[Game] - pagination: Pagination | dict[Any, Any] | None = None class CreatorGoal(BaseSchema): @@ -971,9 +965,8 @@ class BannedUser(BaseSchema): moderator_name: str -class BannedUsers(BaseSchema): +class BannedUsers(PaginatedSchema[BannedUser]): data: list[BannedUser] - pagination: Pagination | dict[Any, Any] | None = None class BanUserData(BaseSchema): @@ -1008,9 +1001,8 @@ class UnbanRequest(BaseSchema): resolution_text: str | None -class UnbanRequests(BaseSchema): +class UnbanRequests(PaginatedSchema[UnbanRequest]): data: list[UnbanRequest] - pagination: Pagination | dict[Any, Any] | None = None class BlockedTerm(BaseSchema): @@ -1023,9 +1015,8 @@ class BlockedTerm(BaseSchema): expires_at: datetime | None -class BlockedTerms(BaseSchema): +class BlockedTerms(PaginatedSchema[BlockedTerm]): data: list[BlockedTerm] - pagination: Pagination | dict[Any, Any] | None = None class ModeratedChannel(BaseSchema): @@ -1034,9 +1025,8 @@ class ModeratedChannel(BaseSchema): broadcaster_name: str -class ModeratedChannels(BaseSchema): +class ModeratedChannels(PaginatedSchema[ModeratedChannel]): data: list[ModeratedChannel] - pagination: Pagination | dict[Any, Any] | None = None class Moderator(BaseSchema): @@ -1045,9 +1035,8 @@ class Moderator(BaseSchema): user_name: str -class Moderators(BaseSchema): +class Moderators(PaginatedSchema[Moderator]): data: list[Moderator] - pagination: Pagination | dict[Any, Any] | None = None class VIP(BaseSchema): @@ -1056,9 +1045,8 @@ class VIP(BaseSchema): user_name: str -class VIPs(BaseSchema): +class VIPs(PaginatedSchema[VIP]): data: list[VIP] - pagination: Pagination | dict[Any, Any] | None = None class ShieldModeStatusData(BaseSchema): @@ -1108,9 +1096,8 @@ class Poll(BaseSchema): ended_at: datetime | None -class Polls(BaseSchema): +class Polls(PaginatedSchema[Poll]): data: list[Poll] - pagination: Pagination | dict[Any, Any] | None = None class PredictionTopPredictor(BaseSchema): @@ -1145,9 +1132,8 @@ class Prediction(BaseSchema): locked_at: datetime | None -class Predictions(BaseSchema): +class Predictions(PaginatedSchema[Prediction]): data: list[Prediction] - pagination: Pagination | dict[Any, Any] | None = None class Raid(BaseSchema): @@ -1187,9 +1173,8 @@ class Schedule(BaseSchema): segments: list[ScheduleSegment] -class Schedules(BaseSchema): +class Schedules(PaginatedSchema[Schedule]): data: list[Schedule] - pagination: Pagination | dict[Any, Any] | None = None class Category(BaseSchema): @@ -1198,9 +1183,8 @@ class Category(BaseSchema): box_art_url: str -class Categories(BaseSchema): +class Categories(PaginatedSchema[Category]): data: list[Category] - pagination: Pagination | dict[Any, Any] | None = None class Channel(BaseSchema): @@ -1217,9 +1201,8 @@ class Channel(BaseSchema): started_at: datetime | None -class Channels(BaseSchema): +class Channels(PaginatedSchema[Channel]): data: list[Channel] - pagination: Pagination | dict[Any, Any] | None = None class StreamKey(BaseSchema): @@ -1248,9 +1231,8 @@ class Stream(BaseSchema): is_mature: bool -class Streams(BaseSchema): +class Streams(PaginatedSchema[Stream]): data: list[Stream] - pagination: Pagination | dict[Any, Any] | None = None class BaseStreamMarker(BaseSchema): @@ -1280,9 +1262,8 @@ class StreamMarkersData(BaseSchema): videos: list[StreamMarkerVideo] -class StreamMarkers(BaseSchema): +class StreamMarkers(PaginatedSchema[StreamMarkersData]): data: list[StreamMarkersData] - pagination: Pagination | dict[Any, Any] | None = None class Subscription(BaseSchema): @@ -1303,9 +1284,8 @@ class BroadcasterSubscription(Subscription): user_name: str -class BroadcasterSubscriptions(BaseSchema): +class BroadcasterSubscriptions(PaginatedSchema[BroadcasterSubscription]): data: list[BroadcasterSubscription] - pagination: Pagination | dict[Any, Any] | None = None total: int points: int @@ -1391,9 +1371,8 @@ class UserBlock(BaseSchema): display_name: str -class UserBlockList(BaseSchema): +class UserBlockList(PaginatedSchema[UserBlock]): data: list[UserBlock] - pagination: Pagination | dict[Any, Any] | None = None class UserExtension(BaseSchema): @@ -1480,9 +1459,8 @@ class Video(BaseSchema): muted_segments: list[VideoMutedSegment] -class Videos(BaseSchema): +class Videos(PaginatedSchema[Video]): data: list[Video] - pagination: Pagination | dict[Any, Any] | None class DeleteVideos(BaseSchema):