Files
TwitchClient/src/twitchclient/api.py

916 lines
29 KiB
Python

from datetime import datetime
from typing import Literal
from aiohttpx import status as st
from aiohttpx.client import AioHTTPXClient
from . import schema as s
class TwitchAPIClient(AioHTTPXClient):
def __init__(
self,
redis_url: str,
client_id: str,
client_secret: str,
redirect_uri: str,
):
self.base_uri = 'https://api.twitch.tv/helix'
self.client_id = client_id
self.client_secret = client_secret
self.redirect_uri = redirect_uri
super().__init__(
base_url=self.base_uri,
headers={'Client-Id': self.client_id},
redis_url=redis_url,
key='twitch',
limit=10,
logger='Twitch API',
)
async def start_commercial(
self, access_token: str, broadcaster_id: int | str
):
req = await self.get(
'/channels/commercial',
headers={'Authorization': f'Bearer {access_token}'},
params={
'broadcaster_id': broadcaster_id,
},
)
match req.status_code:
case st.OK:
return s.StartCommercial.model_validate(req.json()).data
case (
st.BAD_REQUEST
| st.UNAUTHORIZED
| st.NOT_FOUND
| st.TOO_MANY_REQUESTS
):
raise s.Error(req.status_code, req.json()['message'])
case _:
raise s.Error(req.status_code, 'Internal Server Error')
async def get_ad_schedule(
self, access_token: str, broadcaster_id: int | str
):
req = await self.get(
'/channels/ads',
headers={'Authorization': f'Bearer {access_token}'},
params={
'broadcaster_id': broadcaster_id,
},
)
match req.status_code:
case st.OK:
return s.AdSchedule.model_validate(req.json()).data
case st.BAD_REQUEST | st.UNAUTHORIZED:
raise s.Error(req.status_code, req.json()['message'])
case _:
raise s.Error(req.status_code, 'Internal Server Error')
async def snooze_next_ad(
self, access_token: str, broadcaster_id: int | str
):
req = await self.post(
'/channels/ads/schedule/snooze',
headers={'Authorization': f'Bearer {access_token}'},
params={
'broadcaster_id': broadcaster_id,
},
)
match req.status_code:
case st.OK:
return s.SnoozeNextAd.model_validate(req.json()).data
case st.BAD_REQUEST | st.UNAUTHORIZED | st.TOO_MANY_REQUESTS:
raise s.Error(req.status_code, req.json()['message'])
case _:
raise s.Error(req.status_code, 'Internal Server Error')
async def get_extension_analytics(
self,
access_token: str,
extension_id: str | None = None,
analytics_type: Literal['overview_v2'] | None = None,
started_at: datetime | None = None,
ended_at: datetime | None = None,
first: int = 20,
after: str | None = None,
):
req = await self.get(
'/analytics/extensions',
headers={'Authorization': f'Bearer {access_token}'},
params=self.clean_dict(
{
'extension_id': extension_id,
'type': analytics_type,
'started_at': started_at,
'ended_at': ended_at,
'first': first,
'after': after,
}
),
)
match req.status_code:
case st.OK:
return s.ExtensionAnalytics.model_validate(req.json()).data
case st.BAD_REQUEST | st.UNAUTHORIZED | st.NOT_FOUND:
raise s.Error(req.status_code, req.json()['message'])
case _:
raise s.Error(req.status_code, 'Internal Server Error')
async def get_game_analytics(
self,
access_token: str,
game_id: int | None = None,
analytics_type: Literal['overview_v2'] | None = None,
started_at: datetime | None = None,
ended_at: datetime | None = None,
first: int = 20,
after: str | None = None,
):
req = await self.get(
'/analytics/games',
headers={'Authorization': f'Bearer {access_token}'},
params=self.clean_dict(
{
'game_id': game_id,
'type': analytics_type,
'started_at': started_at,
'ended_at': ended_at,
'first': first,
'after': after,
}
),
)
match req.status_code:
case st.OK:
return s.GameAnalytics.model_validate(req.json()).data
case st.BAD_REQUEST | st.UNAUTHORIZED | st.NOT_FOUND:
raise s.Error(req.status_code, req.json()['message'])
case _:
raise s.Error(req.status_code, 'Internal Server Error')
async def get_bits_leaderboard(
self,
access_token: str,
count: int = 10,
period: Literal['day', 'week', 'month', 'year', 'all'] = 'all',
started_at: datetime | None = None,
user_id: int | None = None,
):
req = await self.get(
'/bits/leaderboard',
headers={'Authorization': f'Bearer {access_token}'},
params=self.clean_dict(
{
'count': count,
'period': period,
'started_at': started_at,
'user_id': user_id,
}
),
)
match req.status_code:
case st.OK:
return s.BitsLeaderboard.model_validate(req.json()).data
case st.BAD_REQUEST | st.UNAUTHORIZED | st.FORBIDDEN:
raise s.Error(req.status_code, req.json()['message'])
case _:
raise s.Error(req.status_code, 'Internal Server Error')
async def get_cheermotes(
self, access_token: str, broadcaster_id: int | str | None = None
):
req = await self.get(
'/bits/cheermotes',
headers={'Authorization': f'Bearer {access_token}'},
params=self.clean_dict({'broadcaster_id': broadcaster_id}),
)
match req.status_code:
case st.OK:
return s.Cheermotes.model_validate(req.json()).data
case st.BAD_REQUEST | st.UNAUTHORIZED:
raise s.Error(req.status_code, req.json()['message'])
case _:
raise s.Error(req.status_code, 'Internal Server Error')
async def get_extension_transactions(
self,
access_token: str,
extension_id: str,
user_id: int | list[int] | None = None,
first: int = 20,
after: str | None = None,
):
req = await self.get(
'/extensions/transactions',
headers={'Authorization': f'Bearer {access_token}'},
params=self.clean_dict(
{
'extension_id': extension_id,
'user_id': user_id,
'first': first,
'after': after,
}
),
)
match req.status_code:
case st.OK:
return s.ExtensionTransactions.model_validate(req.json()).data
case st.BAD_REQUEST | st.UNAUTHORIZED | st.NOT_FOUND:
raise s.Error(req.status_code, req.json()['message'])
case _:
raise s.Error(req.status_code, 'Internal Server Error')
async def get_channel_information(
self,
access_token: str,
broadcaster_id: int | list[int] | str | list[str],
):
req = await self.get(
'/channels',
headers={'Authorization': f'Bearer {access_token}'},
params={'broadcaster_id': broadcaster_id},
)
match req.status_code:
case st.OK:
return s.ChannelsInformation.model_validate(req.json()).data
case st.BAD_REQUEST | st.UNAUTHORIZED | st.TOO_MANY_REQUESTS:
raise s.Error(req.status_code, req.json()['message'])
case _:
raise s.Error(req.status_code, 'Internal Server Error')
async def modify_channel_information(
self,
access_token: str,
broadcaster_id: int | str,
*,
game_id: int | str | None = None,
broadcaster_language: str | None = None,
title: str | None = None,
delay: int | None = None,
tags: list[str] | None = None,
content_classification_labels: list[s.ContentClassificationLabel]
| None = None,
is_branded_content: bool | None = None,
):
data = self.clean_dict(
{
'game_id': game_id,
'broadcaster_language': broadcaster_language,
'title': title,
'delay': delay,
'tags': tags,
'content_classification_labels': content_classification_labels,
'is_branded_content': is_branded_content,
}
)
req = await self.patch(
'/channels',
headers={'Authorization': f'Bearer {access_token}'},
params={'broadcaster_id': broadcaster_id},
json=data,
)
match req.status_code:
case st.NO_CONTENT:
return True
case (
st.BAD_REQUEST
| st.UNAUTHORIZED
| st.FORBIDDEN
| st.TOO_MANY_REQUESTS
):
raise s.Error(req.status_code, req.json()['message'])
case _:
raise s.Error(req.status_code, 'Internal Server Error')
async def get_channel_editors(
self, access_token: str, broadcaster_id: int | str
):
req = await self.get(
'/channels/editors',
headers={'Authorization': f'Bearer {access_token}'},
params={'broadcaster_id': broadcaster_id},
)
match req.status_code:
case st.OK:
return s.ChannelEditors.model_validate(req.json()).data
case st.BAD_REQUEST | st.UNAUTHORIZED | st.TOO_MANY_REQUESTS:
raise s.Error(req.status_code, req.json()['message'])
case _:
raise s.Error(req.status_code, 'Internal Server Error')
async def get_followed_channels(
self,
access_token: str,
broadcaster_id: int | str,
*,
first: int = 20,
after: str | None = None,
):
req = await self.get(
'/channels/followed',
headers={'Authorization': f'Bearer {access_token}'},
params=self.clean_dict(
{
'broadcaster_id': broadcaster_id,
'first': first,
'after': after,
}
),
)
match req.status_code:
case st.OK:
return s.FollowedChannels.model_validate(req.json()).data
case st.BAD_REQUEST | st.UNAUTHORIZED | st.TOO_MANY_REQUESTS:
raise s.Error(req.status_code, req.json()['message'])
case _:
raise s.Error(req.status_code, 'Internal Server Error')
async def get_channel_followers(
self,
access_token: str,
broadcaster_id: int | str,
*,
user_id: int | None = None,
first: int = 20,
after: str | None = None,
):
req = await self.get(
'/channels/followers',
headers={'Authorization': f'Bearer {access_token}'},
params=self.clean_dict(
{
'broadcaster_id': broadcaster_id,
'user_id': user_id,
'first': first,
'after': after,
}
),
)
match req.status_code:
case st.OK:
return s.ChannelFollowers.model_validate(req.json()).data
case st.BAD_REQUEST | st.UNAUTHORIZED | st.TOO_MANY_REQUESTS:
raise s.Error(req.status_code, req.json()['message'])
case _:
raise s.Error(req.status_code, 'Internal Server Error')
async def create_custom_rewards(
self,
access_token: str,
broadcaster_id: int | str,
title: str,
cost: int,
*,
prompt: str | None = None,
is_enabled: bool = True,
background_color: str | None = None,
is_user_input_required: bool = False,
is_max_per_stream_enabled: bool = False,
max_per_stream: int | None = None,
is_max_per_user_per_stream_enabled: bool = False,
max_per_user_per_stream: int | None = None,
is_global_cooldown_enabled: bool = False,
global_cooldown_seconds: int | None = None,
should_redemptions_skip_request_queue: bool = False,
):
data = self.clean_dict(
{
'broadcaster_id': broadcaster_id,
'title': title,
'cost': cost,
'prompt': prompt,
'is_enabled': is_enabled,
'background_color': background_color,
'is_user_input_required': is_user_input_required,
'is_max_per_stream_enabled': is_max_per_stream_enabled,
'max_per_stream': max_per_stream,
'is_max_per_user_per_'
'stream_enabled': is_max_per_user_per_stream_enabled,
'max_per_user_per_stream': max_per_user_per_stream,
'is_global_cooldown_enabled': is_global_cooldown_enabled,
'global_cooldown_seconds': global_cooldown_seconds,
'should_redemptions_skip_'
'request_queue': should_redemptions_skip_request_queue,
}
)
req = await self.post(
'/channel_points/custom_rewards',
headers={'Authorization': f'Bearer {access_token}'},
json=data,
)
match req.status_code:
case st.OK:
return s.CustomRewards.model_validate(req.json()).data
case (
st.BAD_REQUEST
| st.UNAUTHORIZED
| st.FORBIDDEN
| st.TOO_MANY_REQUESTS
):
raise s.Error(req.status_code, req.json()['message'])
case _:
raise s.Error(req.status_code, 'Internal Server Error')
async def delete_custom_reward(
self, access_token: str, broadcaster_id: int | str, reward_id: str
):
req = await self.delete(
'/channel_points/custom_rewards',
headers={'Authorization': f'Bearer {access_token}'},
params={'broadcaster_id': broadcaster_id, 'id': reward_id},
)
match req.status_code:
case st.NO_CONTENT:
return True
case (
st.BAD_REQUEST
| st.UNAUTHORIZED
| st.FORBIDDEN
| st.NOT_FOUND
| st.TOO_MANY_REQUESTS
):
raise s.Error(req.status_code, req.json()['message'])
case _:
raise s.Error(req.status_code, 'Internal Server Error')
async def get_custom_rewards(
self,
access_token: str,
broadcaster_id: int | str,
*,
reward_id: str | None = None,
only_manageable_rewards: bool = False,
):
req = await self.get(
'/channel_points/custom_rewards',
headers={'Authorization': f'Bearer {access_token}'},
params=self.clean_dict(
{
'broadcaster_id': broadcaster_id,
'id': reward_id,
'only_manageable_rewards': only_manageable_rewards,
}
),
)
match req.status_code:
case st.OK:
return s.CustomRewards.model_validate(req.json()).data
case (
st.BAD_REQUEST
| st.UNAUTHORIZED
| st.FORBIDDEN
| st.NOT_FOUND
| st.TOO_MANY_REQUESTS
):
raise s.Error(req.status_code, req.json()['message'])
case _:
raise s.Error(req.status_code, 'Internal Server Error')
async def get_custom_reward_redemption(
self,
access_token: str,
broadcaster_id: int | str,
reward_id: str,
status: Literal['CANCELED', 'FULFILLED', 'UNFULFILLED'],
*,
redemption_id: int | list[int] | None = None,
sort: Literal['OLDEST', 'NEWEST'] = 'OLDEST',
after: str | None = None,
first: int = 20,
):
req = await self.get(
'/channel_points/custom_rewards/redemptions',
headers={'Authorization': f'Bearer {access_token}'},
params=self.clean_dict(
{
'broadcaster_id': broadcaster_id,
'reward_id': reward_id,
'status': status,
'redemption_id': redemption_id,
'sort': sort,
'after': after,
'first': first,
}
),
)
match req.status_code:
case st.OK:
return s.CustomRewardRedemptions.model_validate(
req.json()
).data
case (
st.BAD_REQUEST
| st.UNAUTHORIZED
| st.NOT_FOUND
| st.TOO_MANY_REQUESTS
):
raise s.Error(req.status_code, req.json()['message'])
case _:
raise s.Error(req.status_code, 'Internal Server Error')
async def update_custom_reward(
self,
access_token: str,
broadcaster_id: int | str,
reward_id: str,
*,
title: str | None = None,
prompt: str | None = None,
cost: int | None = None,
background_color: str | None = None,
is_enabled: bool | None = None,
is_user_input_required: bool | None = None,
is_max_per_stream_enabled: bool | None = None,
max_per_stream: int | None = None,
is_max_per_user_per_stream_enabled: bool | None = None,
max_per_user_per_stream: int | None = None,
is_global_cooldown_enabled: bool | None = None,
global_cooldown_seconds: int | None = None,
is_paused: bool | None = None,
should_redemptions_skip_request_queue: bool | None = None,
):
data = self.clean_dict(
{
'title': title,
'prompt': prompt,
'cost': cost,
'background_color': background_color,
'is_enabled': is_enabled,
'is_user_input_required': is_user_input_required,
'is_max_per_stream_enabled': is_max_per_stream_enabled,
'max_per_stream': max_per_stream,
'is_max_per_user_per_'
'stream_enabled': is_max_per_user_per_stream_enabled,
'max_per_user_per_stream': max_per_user_per_stream,
'is_global_cooldown_enabled': is_global_cooldown_enabled,
'global_cooldown_seconds': global_cooldown_seconds,
'is_paused': is_paused,
'should_redemptions_skip_'
'request_queue': should_redemptions_skip_request_queue,
}
)
req = await self.patch(
'/channel_points/custom_rewards',
headers={'Authorization': f'Bearer {access_token}'},
params={'broadcaster_id': broadcaster_id, 'id': reward_id},
json=data,
)
match req.status_code:
case st.OK:
return s.CustomRewards.model_validate(req.json()).data
case (
st.BAD_REQUEST
| st.UNAUTHORIZED
| st.FORBIDDEN
| st.NOT_FOUND
| st.TOO_MANY_REQUESTS
):
raise s.Error(req.status_code, req.json()['message'])
case _:
raise s.Error(req.status_code, 'Internal Server Error')
async def update_redemption_status(
self,
access_token: str,
broadcaster_id: int | str,
reward_id: int,
redemption_id: int | list[int],
status: Literal['CANCELED', 'FULFILLED'],
):
req = await self.post(
'/channel_points/custom_rewards/redemptions',
headers={'Authorization': f'Bearer {access_token}'},
params={
'broadcaster_id': broadcaster_id,
'reward_id': reward_id,
'id': redemption_id,
},
json={'status': status},
)
match req.status_code:
case st.NO_CONTENT:
return True
case (
st.BAD_REQUEST
| st.UNAUTHORIZED
| st.FORBIDDEN
| st.NOT_FOUND
| st.TOO_MANY_REQUESTS
):
raise s.Error(req.status_code, req.json()['message'])
case _:
raise s.Error(req.status_code, 'Internal Server Error')
async def get_charity_campaign(
self, access_token: str, broadcaster_id: int | str
):
req = await self.get(
'/charity/campaigns',
params={'broadcaster_id': broadcaster_id},
headers={'Authorization': f'Bearer {access_token}'},
)
match req.status_code:
case st.OK:
return s.CharityCampaign.model_validate(req.json()).data
case st.BAD_REQUEST | st.UNAUTHORIZED | st.FORBIDDEN:
raise s.Error(req.status_code, req.json()['message'])
case _:
raise s.Error(req.status_code, 'Internal Server Error')
async def get_charity_campaign_donations(
self,
access_token: str,
broadcaster_id: int | str,
first: int = 20,
after: str | None = None,
cache_time: int | None = None,
):
req = await self.get(
'/charity/donations',
params=self.clean_dict(
{
'broadcaster_id': broadcaster_id,
'first': first,
'after': after,
}
),
headers=self.clean_dict(
{
'Authorization': f'Bearer {access_token}',
'X-Cache-TTL': cache_time,
}
),
)
match req.status_code:
case st.OK:
return s.CharityDonations.model_validate(req.json()).data
case st.BAD_REQUEST | st.UNAUTHORIZED | st.FORBIDDEN:
raise s.Error(req.status_code, req.json()['message'])
case _:
raise s.Error(req.status_code, 'Internal Server Error')
async def get_chatters(
self,
access_token: str,
broadcaster_id: int | str,
moderator_id: int,
first: int = 20,
after: str | None = None,
cache_time: int | None = None,
):
req = await self.get(
'/chatters',
params=self.clean_dict(
{
'broadcaster_id': broadcaster_id,
'moderator_id': moderator_id,
'first': first,
'after': after,
}
),
headers=self.clean_dict(
{
'Authorization': f'Bearer {access_token}',
'X-Cache-TTL': cache_time,
}
),
)
match req.status_code:
case st.OK:
return s.Chatters.model_validate(req.json()).data
case st.BAD_REQUEST | st.UNAUTHORIZED | st.FORBIDDEN:
raise s.Error(req.status_code, req.json()['message'])
case _:
raise s.Error(req.status_code, 'Internal Server Error')
async def get_channel_emotes(
self,
access_token: str,
broadcaster_id: int | str,
cache_time: int | None = None,
):
req = await self.get(
'/chat/emotes',
headers=self.clean_dict(
{
'Authorization': f'Bearer {access_token}',
'X-Cache-TTL': cache_time,
}
),
params={'broadcaster_id': broadcaster_id},
)
match req.status_code:
case st.OK:
return s.ChannelEmotes.model_validate(req.json())
case st.BAD_REQUEST | st.UNAUTHORIZED:
raise s.Error(req.status_code, req.json()['message'])
case _:
raise s.Error(req.status_code, 'Internal Server Error')
async def get_global_emotes(
self, access_token: str, cache_time: int | None = None
):
req = await self.get(
'/chat/emotes/global',
headers=self.clean_dict(
{
'Authorization': f'Bearer {access_token}',
'X-Cache-TTL': cache_time,
}
),
)
match req.status_code:
case st.OK:
return s.GlobalEmotes.model_validate(req.json())
case st.UNAUTHORIZED:
raise s.Error(req.status_code, req.json()['message'])
case _:
raise s.Error(req.status_code, 'Internal Server Error')
async def get_emote_sets(
self,
access_token: str,
emote_set_id: int,
cache_time: int | None = None,
):
req = await self.get(
'/chat/emotes/set',
headers=self.clean_dict(
{
'Authorization': f'Bearer {access_token}',
'X-Cache-TTL': cache_time,
}
),
params={'emote_set_id': emote_set_id},
)
match req.status_code:
case st.OK:
return s.EmoteSets.model_validate(req.json())
case st.BAD_REQUEST | st.UNAUTHORIZED:
raise s.Error(req.status_code, req.json()['message'])
case _:
raise s.Error(req.status_code, 'Internal Server Error')
async def get_channel_chat_badges(
self,
access_token: str,
broadcaster_id: int | str,
cache_time: int | None = None,
):
req = await self.get(
'/chat/badges',
headers=self.clean_dict(
{
'Authorization': f'Bearer {access_token}',
'X-Cache-TTL': cache_time,
}
),
params={'broadcaster_id': broadcaster_id},
)
match req.status_code:
case st.OK:
return s.ChannelChatBadges.model_validate(req.json())
case st.BAD_REQUEST | st.UNAUTHORIZED:
raise s.Error(req.status_code, req.json()['message'])
case _:
raise s.Error(req.status_code, 'Internal Server Error')
async def get_global_chat_badges(
self, access_token: str, cache_time: int | None = None
):
req = await self.get(
'/chat/badges/global',
headers=self.clean_dict(
{
'Authorization': f'Bearer {access_token}',
'X-Cache-TTL': cache_time,
}
),
)
match req.status_code:
case st.OK:
return s.GlobalChatBadges.model_validate(req.json())
case st.UNAUTHORIZED:
raise s.Error(req.status_code, req.json()['message'])
case _:
raise s.Error(req.status_code, 'Internal Server Error')
async def get_chat_settings(
self,
access_token: str,
broadcaster_id: int | str,
moderator_id: int | str | None = None,
cache_time: int | None = None,
):
req = await self.get(
'/chat/settings',
headers=self.clean_dict(
{
'Authorization': f'Bearer {access_token}',
'X-Cache-TTL': cache_time,
}
),
params=self.clean_dict(
{
'broadcaster_id': broadcaster_id,
'moderator_id': moderator_id,
}
),
)
match req.status_code:
case st.OK:
return s.ChatSettings.model_validate(req.json()).data
case st.BAD_REQUEST | st.UNAUTHORIZED:
raise s.Error(req.status_code, req.json()['message'])
case _:
raise s.Error(req.status_code, 'Internal Server Error')