Files
OxideTwitch/src/oxidetwitch/api.py
Miwory 645f7029ad
All checks were successful
Verify Dev Build / publish (push) Successful in 38s
Фикс base_url
2026-03-08 00:25:32 +03:00

2802 lines
82 KiB
Python

from collections.abc import AsyncGenerator
from datetime import datetime
from typing import Any, Literal, overload
from zoneinfo import ZoneInfo
from oxidehttp import status as st
from oxidehttp.client import OxideHTTP
from oxidehttp.schema import CachedResponse, Response
from pydantic import BaseModel
from . import schema as s
from .eventsub import statuses as sub_status
from .eventsub import subscriptions as sub
from .eventsub import types as sub_type
class TwitchAPIClient(OxideHTTP):
def __init__(
self,
client_id: str,
client_secret: str,
redirect_uri: str,
redis_url: str | None = None,
proxy_url: str | None = None,
) -> None:
self.base_uri = 'https://api.twitch.tv/helix/'
self.client_id = client_id
self.client_secret = client_secret
self.redirect_uri = redirect_uri
super().__init__(
base_url=self.base_uri,
headers={'Client-Id': self.client_id},
redis_url=redis_url,
ratelimit_key='twitch' if redis_url else None,
ratelimit_limit=700 if redis_url else None,
proxy_url=proxy_url,
)
def _auth(self, access_token: str) -> dict[str, str]:
return {'Authorization': f'Bearer {access_token}'}
@overload
async def _process(
self, req: Response | CachedResponse, schema: None
) -> bool: ...
@overload
async def _process[T: BaseModel](
self, req: Response | CachedResponse, schema: type[T]
) -> T: ...
async def _process[T: BaseModel](
self, req: Response | CachedResponse, schema: type[T] | None
) -> T | bool:
if req.status_code >= 500:
raise s.InternalError(req.status_code, 'Internal Server Error')
if req.status_code >= 400 and req.status_code < 500:
data = await req.json()
message = data.get('message', 'Twitch API Error')
raise s.ClientError(req.status_code, message)
if schema is None:
return True
data = await req.json()
return schema.model_validate(data)
async def _paginate[S: s.BasePaginated](
self,
path: str,
schema: type[S],
headers: dict[str, str] | None,
params: dict[str, Any],
cache_ttl: int | None = None,
) -> AsyncGenerator[S]:
after = None
while True:
current_params = {**params, 'after': after} if after else params
req = await self.get(
path,
headers=headers,
params=current_params,
cache_ttl=cache_ttl,
)
result = await self._process(req, schema)
yield result
if isinstance(result.pagination, s.Pagination):
after = result.pagination.cursor
if after is None:
break
async def start_commercial(
self, access_token: str, broadcaster_id: int, length: int
) -> s.StartCommercial:
req = await self.post(
'/channels/commercial',
headers=self._auth(access_token),
json={
'broadcaster_id': broadcaster_id,
'length': length,
},
)
return await self._process(req, s.StartCommercial)
async def get_ad_schedule(
self,
access_token: str,
broadcaster_id: int,
*,
cache_ttl: int | None = None,
) -> s.AdSchedule:
req = await self.get(
'/channels/ads',
cache_ttl=cache_ttl,
headers=self._auth(access_token),
params={'broadcaster_id': broadcaster_id},
)
return await self._process(req, s.AdSchedule)
async def snooze_next_ad(
self, access_token: str, broadcaster_id: int
) -> s.SnoozeNextAd:
req = await self.post(
'/channels/ads/schedule/snooze',
headers=self._auth(access_token),
params={
'broadcaster_id': broadcaster_id,
},
)
return await self._process(req, s.SnoozeNextAd)
async def get_extension_analytics(
self,
access_token: str,
*,
extension_id: str | None = None,
analytics_type: Literal['overview_v2'] | None = None,
started_at: datetime | None = None,
ended_at: datetime | None = None,
first: int = 20,
cache_ttl: int | None = None,
) -> AsyncGenerator[s.ExtensionAnalyticsData]:
async for data in self._paginate(
'/analytics/extensions',
s.ExtensionAnalytics,
self._auth(access_token),
self.clean_dict(
{
'extension_id': extension_id,
'type': analytics_type,
'started_at': started_at,
'ended_at': ended_at,
'first': first,
}
),
cache_ttl,
):
for item in data.data:
yield item
async def get_game_analytics(
self,
access_token: str,
*,
game_id: int | None = None,
analytics_type: Literal['overview_v2'] | None = None,
started_at: datetime | None = None,
ended_at: datetime | None = None,
first: int = 20,
cache_ttl: int | None = None,
) -> AsyncGenerator[s.GameAnalyticsData]:
async for data in self._paginate(
'/analytics/games',
s.GameAnalytics,
self._auth(access_token),
self.clean_dict(
{
'game_id': game_id,
'type': analytics_type,
'started_at': started_at,
'ended_at': ended_at,
'first': first,
}
),
cache_ttl,
):
for item in data.data:
yield item
async def get_bits_leaderboard(
self,
access_token: str,
count: int = 10,
period: Literal['day', 'week', 'month', 'year', 'all'] = 'all',
*,
started_at: datetime | None = None,
user_id: int | None = None,
cache_ttl: int | None = None,
) -> s.BitsLeaderboard:
req = await self.get(
'/bits/leaderboard',
cache_ttl=cache_ttl,
headers=self._auth(access_token),
params=self.clean_dict(
{
'count': count,
'period': period,
'started_at': started_at,
'user_id': user_id,
}
),
)
return await self._process(req, s.BitsLeaderboard)
async def get_cheermotes(
self,
access_token: str,
*,
broadcaster_id: int | None = None,
cache_ttl: int | None = None,
) -> s.Cheermotes:
req = await self.get(
'/bits/cheermotes',
cache_ttl=cache_ttl,
headers=self._auth(access_token),
params=self.clean_dict({'broadcaster_id': broadcaster_id}),
)
return await self._process(req, s.Cheermotes)
async def get_extension_transactions(
self,
access_token: str,
extension_id: str,
*,
user_id: int | list[int] | None = None,
first: int = 20,
cache_ttl: int | None = None,
) -> AsyncGenerator[s.ExtensionTransactionsData]:
async for data in self._paginate(
'/extensions/transactions',
s.ExtensionTransactions,
self._auth(access_token),
self.clean_dict(
{
'extension_id': extension_id,
'user_id': user_id,
'first': first,
}
),
cache_ttl,
):
for item in data.data:
yield item
async def get_channel_information(
self,
access_token: str,
broadcaster_id: int | list[int],
*,
cache_ttl: int | None = None,
) -> s.ChannelsInformation:
req = await self.get(
'/channels',
cache_ttl=cache_ttl,
headers=self._auth(access_token),
params={'broadcaster_id': broadcaster_id},
)
return await self._process(req, s.ChannelsInformation)
async def modify_channel_information(
self,
access_token: str,
broadcaster_id: int,
*,
game_id: int | None = None,
broadcaster_language: str | None = None,
title: str | None = None,
delay: int | None = None,
tags: list[str] | None = None,
content_classification_labels: list[s.ContentClassificationLabel]
| None = None,
is_branded_content: bool | None = None,
) -> bool:
data = self.clean_dict(
{
'game_id': game_id,
'broadcaster_language': broadcaster_language,
'title': title,
'delay': delay,
'tags': tags,
'content_classification_labels': content_classification_labels,
'is_branded_content': is_branded_content,
}
)
req = await self.patch(
'/channels',
headers=self._auth(access_token),
params={'broadcaster_id': broadcaster_id},
json=data,
)
return await self._process(req, None)
async def get_channel_editors(
self,
access_token: str,
broadcaster_id: int,
*,
cache_ttl: int | None = None,
) -> s.ChannelEditors:
req = await self.get(
'/channels/editors',
cache_ttl=cache_ttl,
headers=self._auth(access_token),
params={'broadcaster_id': broadcaster_id},
)
return await self._process(req, s.ChannelEditors)
async def get_followed_channels(
self,
access_token: str,
broadcaster_id: int,
*,
first: int = 20,
cache_ttl: int | None = None,
) -> AsyncGenerator[tuple[int, s.FollowedChannel]]:
async for data in self._paginate(
'/channels/followed',
s.FollowedChannels,
self._auth(access_token),
self.clean_dict(
{
'broadcaster_id': broadcaster_id,
'first': first,
}
),
cache_ttl,
):
for item in data.data:
yield data.total, item
async def get_channel_followers(
self,
access_token: str,
broadcaster_id: int,
*,
user_id: int | None = None,
first: int = 20,
cache_ttl: int | None = None,
) -> AsyncGenerator[tuple[int, s.ChannelFollower]]:
async for data in self._paginate(
'/channels/followers',
s.ChannelFollowers,
self._auth(access_token),
self.clean_dict(
{
'broadcaster_id': broadcaster_id,
'user_id': user_id,
'first': first,
}
),
cache_ttl,
):
for item in data.data:
yield data.total, item
async def create_custom_rewards(
self,
access_token: str,
broadcaster_id: int,
title: str,
cost: int,
*,
prompt: str | None = None,
is_enabled: bool = True,
background_color: str | None = None,
is_user_input_required: bool = False,
is_max_per_stream_enabled: bool = False,
max_per_stream: int | None = None,
is_max_per_user_per_stream_enabled: bool = False,
max_per_user_per_stream: int | None = None,
is_global_cooldown_enabled: bool = False,
global_cooldown_seconds: int | None = None,
should_redemptions_skip_request_queue: bool = False,
) -> s.CustomRewards:
data = self.clean_dict(
{
'broadcaster_id': broadcaster_id,
'title': title,
'cost': cost,
'prompt': prompt,
'is_enabled': is_enabled,
'background_color': background_color,
'is_user_input_required': is_user_input_required,
'is_max_per_stream_enabled': is_max_per_stream_enabled,
'max_per_stream': max_per_stream,
'is_max_per_user_per_'
'stream_enabled': is_max_per_user_per_stream_enabled,
'max_per_user_per_stream': max_per_user_per_stream,
'is_global_cooldown_enabled': is_global_cooldown_enabled,
'global_cooldown_seconds': global_cooldown_seconds,
'should_redemptions_skip_'
'request_queue': should_redemptions_skip_request_queue,
}
)
req = await self.post(
'/channel_points/custom_rewards',
headers=self._auth(access_token),
json=data,
)
return await self._process(req, s.CustomRewards)
async def delete_custom_reward(
self, access_token: str, broadcaster_id: int, reward_id: str
) -> bool:
req = await self.delete(
'/channel_points/custom_rewards',
headers=self._auth(access_token),
params={'broadcaster_id': broadcaster_id, 'id': reward_id},
)
match req.status_code:
case st.NO_CONTENT:
return True
case (
st.BAD_REQUEST
| st.UNAUTHORIZED
| st.FORBIDDEN
| st.NOT_FOUND
| st.TOO_MANY_REQUESTS
):
raise s.ClientError(
req.status_code, (await req.json())['message']
)
case _:
raise s.InternalError(req.status_code, 'Internal Server Error')
async def get_custom_rewards(
self,
access_token: str,
broadcaster_id: int,
*,
reward_id: str | None = None,
only_manageable_rewards: bool = False,
cache_ttl: int | None = None,
) -> s.CustomRewards:
req = await self.get(
'/channel_points/custom_rewards',
cache_ttl=cache_ttl,
headers=self._auth(access_token),
params=self.clean_dict(
{
'broadcaster_id': broadcaster_id,
'id': reward_id,
'only_manageable_rewards': only_manageable_rewards,
}
),
)
return await self._process(req, s.CustomRewards)
async def get_custom_reward_redemption(
self,
access_token: str,
broadcaster_id: int,
reward_id: str,
status: Literal['CANCELED', 'FULFILLED', 'UNFULFILLED'],
*,
redemption_id: int | list[int] | None = None,
sort: Literal['OLDEST', 'NEWEST'] = 'OLDEST',
first: int = 20,
cache_ttl: int | None = None,
) -> AsyncGenerator[s.CustomRewardRedemption]:
async for data in self._paginate(
'/channel_points/custom_rewards/redemptions',
s.CustomRewardRedemptions,
self._auth(access_token),
self.clean_dict(
{
'broadcaster_id': broadcaster_id,
'reward_id': reward_id,
'status': status,
'redemption_id': redemption_id,
'sort': sort,
'first': first,
}
),
cache_ttl,
):
for item in data.data:
yield item
async def update_custom_reward(
self,
access_token: str,
broadcaster_id: int,
reward_id: str,
*,
title: str | None = None,
prompt: str | None = None,
cost: int | None = None,
background_color: str | None = None,
is_enabled: bool | None = None,
is_user_input_required: bool | None = None,
is_max_per_stream_enabled: bool | None = None,
max_per_stream: int | None = None,
is_max_per_user_per_stream_enabled: bool | None = None,
max_per_user_per_stream: int | None = None,
is_global_cooldown_enabled: bool | None = None,
global_cooldown_seconds: int | None = None,
is_paused: bool | None = None,
should_redemptions_skip_request_queue: bool | None = None,
) -> s.CustomRewards:
data = self.clean_dict(
{
'title': title,
'prompt': prompt,
'cost': cost,
'background_color': background_color,
'is_enabled': is_enabled,
'is_user_input_required': is_user_input_required,
'is_max_per_stream_enabled': is_max_per_stream_enabled,
'max_per_stream': max_per_stream,
'is_max_per_user_per_'
'stream_enabled': is_max_per_user_per_stream_enabled,
'max_per_user_per_stream': max_per_user_per_stream,
'is_global_cooldown_enabled': is_global_cooldown_enabled,
'global_cooldown_seconds': global_cooldown_seconds,
'is_paused': is_paused,
'should_redemptions_skip_'
'request_queue': should_redemptions_skip_request_queue,
}
)
req = await self.patch(
'/channel_points/custom_rewards',
headers=self._auth(access_token),
params={'broadcaster_id': broadcaster_id, 'id': reward_id},
json=data,
)
return await self._process(req, s.CustomRewards)
async def update_redemption_status(
self,
access_token: str,
broadcaster_id: int,
reward_id: str,
redemption_id: str | list[str],
status: Literal['CANCELED', 'FULFILLED'],
) -> bool:
req = await self.post(
'/channel_points/custom_rewards/redemptions',
headers=self._auth(access_token),
params={
'broadcaster_id': broadcaster_id,
'reward_id': reward_id,
'id': redemption_id,
},
json={'status': status},
)
return await self._process(req, None)
async def get_charity_campaign(
self,
access_token: str,
broadcaster_id: int,
*,
cache_ttl: int | None = None,
) -> s.CharityCampaign:
req = await self.get(
'/charity/campaigns',
cache_ttl=cache_ttl,
params={'broadcaster_id': broadcaster_id},
headers=self._auth(access_token),
)
return await self._process(req, s.CharityCampaign)
async def get_charity_campaign_donations(
self,
access_token: str,
broadcaster_id: int,
*,
first: int = 20,
cache_ttl: int | None = None,
) -> AsyncGenerator[s.CharityDonation]:
async for data in self._paginate(
'/charity/donations',
s.CharityDonations,
self._auth(access_token),
self.clean_dict(
{'broadcaster_id': broadcaster_id, 'first': first}
),
cache_ttl,
):
for item in data.data:
yield item
async def get_chatters(
self,
access_token: str,
broadcaster_id: int,
moderator_id: int,
*,
first: int = 20,
cache_ttl: int | None = None,
) -> AsyncGenerator[tuple[int, s.ChattersData]]:
async for data in self._paginate(
'/chat/chatters',
s.Chatters,
self._auth(access_token),
self.clean_dict(
{
'broadcaster_id': broadcaster_id,
'moderator_id': moderator_id,
'first': first,
}
),
cache_ttl,
):
for item in data.data:
yield data.total, item
async def get_channel_emotes(
self,
access_token: str,
broadcaster_id: int,
*,
cache_ttl: int | None = None,
) -> s.ChannelEmotes:
req = await self.get(
'/chat/emotes',
cache_ttl=cache_ttl,
headers=self._auth(access_token),
params={'broadcaster_id': broadcaster_id},
)
return await self._process(req, s.ChannelEmotes)
async def get_global_emotes(
self, access_token: str, *, cache_ttl: int | None = None
) -> s.GlobalEmotes:
req = await self.get(
'/chat/emotes/global',
cache_ttl=cache_ttl,
headers=self._auth(access_token),
)
return await self._process(req, s.GlobalEmotes)
async def get_emote_sets(
self,
access_token: str,
emote_set_id: int,
*,
cache_ttl: int | None = None,
) -> s.EmoteSets:
req = await self.get(
'/chat/emotes/set',
cache_ttl=cache_ttl,
headers=self._auth(access_token),
params={'emote_set_id': emote_set_id},
)
return await self._process(req, s.EmoteSets)
async def get_channel_chat_badges(
self,
access_token: str,
broadcaster_id: int,
*,
cache_ttl: int | None = None,
) -> s.ChannelChatBadges:
req = await self.get(
'/chat/badges',
cache_ttl=cache_ttl,
headers=self._auth(access_token),
params={'broadcaster_id': broadcaster_id},
)
return await self._process(req, s.ChannelChatBadges)
async def get_global_chat_badges(
self, access_token: str, *, cache_ttl: int | None = None
) -> s.GlobalChatBadges:
req = await self.get(
'/chat/badges/global',
cache_ttl=cache_ttl,
headers=self._auth(access_token),
)
return await self._process(req, s.GlobalChatBadges)
async def get_chat_settings(
self,
access_token: str,
broadcaster_id: int,
*,
moderator_id: int | None = None,
cache_ttl: int | None = None,
) -> s.ChatSettings:
req = await self.get(
'/chat/settings',
cache_ttl=cache_ttl,
headers=self._auth(access_token),
params=self.clean_dict(
{
'broadcaster_id': broadcaster_id,
'moderator_id': moderator_id,
}
),
)
return await self._process(req, s.ChatSettings)
async def get_shared_chat_session(
self,
access_token: str,
broadcaster_id: int,
*,
cache_ttl: int | None = None,
) -> s.SharedChatSession:
req = await self.get(
'/shared_chat/session',
cache_ttl=cache_ttl,
headers=self._auth(access_token),
params={'broadcaster_id': broadcaster_id},
)
return await self._process(req, s.SharedChatSession)
async def get_user_emotes(
self,
access_token: str,
user_id: int,
*,
broadcaster_id: int | None = None,
cache_ttl: int | None = None,
) -> AsyncGenerator[tuple[str, s.ChannelEmote]]:
async for data in self._paginate(
'/chat/emotes',
s.UserEmotes,
self._auth(access_token),
self.clean_dict(
{'user_id': user_id, 'broadcaster_id': broadcaster_id}
),
cache_ttl,
):
for item in data.data:
yield data.template, item
async def update_chat_settings(
self,
access_token: str,
broadcaster_id: int,
moderator_id: int,
*,
emote_mode: bool | None = None,
follower_mode: bool | None = None,
follower_mode_duration: int | None = None,
non_moderator_chat_delay: bool | None = None,
non_moderator_chat_delay_duration: Literal[2, 4, 6] | None = None,
slow_mode: bool | None = None,
slow_mode_wait_time: int | None = None,
subscriber_mode: bool | None = None,
unique_chat_mode: bool | None = None,
) -> s.ChatSettings:
req = await self.put(
'/chat/settings',
headers=self._auth(access_token),
params={
'broadcaster_id': broadcaster_id,
'moderator_id': moderator_id,
},
json=self.clean_dict(
{
'emote_mode': emote_mode,
'follower_mode': follower_mode,
'follower_mode_duration': follower_mode_duration,
'non_moderator_chat_delay': non_moderator_chat_delay,
'non_moderator_chat_delay_duration': (
non_moderator_chat_delay_duration
),
'slow_mode': slow_mode,
'slow_mode_wait_time': slow_mode_wait_time,
'subscriber_mode': subscriber_mode,
'unique_chat_mode': unique_chat_mode,
}
),
)
return await self._process(req, s.ChatSettings)
async def send_chat_announcement(
self,
access_token: str,
broadcaster_id: int,
moderator_id: int,
message: str,
color: Literal[
'blue', 'green', 'orange', 'purple', 'primary'
] = 'primary',
) -> bool:
req = await self.post(
'/chat/announcements',
headers=self._auth(access_token),
params={
'broadcaster_id': broadcaster_id,
'moderator_id': moderator_id,
},
json=self.clean_dict({'message': message, 'color': color}),
)
return await self._process(req, None)
async def send_shoutout(
self,
access_token: str,
from_broadcaster_id: int,
to_broadcaster_id: int,
moderator_id: int,
) -> bool:
req = await self.post(
'/chat/shoutouts',
headers=self._auth(access_token),
params={
'from_broadcaster_id': from_broadcaster_id,
'to_broadcaster_id': to_broadcaster_id,
'moderator_id': moderator_id,
},
)
return await self._process(req, None)
async def send_chat_message(
self,
access_token: str,
broadcaster_id: int,
sender_id: int,
message: str,
*,
reply_parent_message_id: str | None = None,
for_source_only: bool | None = None,
) -> s.Message:
req = await self.post(
'/chat/messages',
headers=self._auth(access_token),
json=self.clean_dict(
{
'broadcaster_id': broadcaster_id,
'sender_id': sender_id,
'reply_parent_message_id': reply_parent_message_id,
'for_source_only': for_source_only,
'message': message,
}
),
)
return await self._process(req, s.Message)
async def get_user_chat_color(
self,
access_token: str,
user_id: int | list[int],
*,
cache_ttl: int | None = None,
) -> s.UserChatColor:
req = await self.get(
'/chat/colors',
cache_ttl=cache_ttl,
headers=self._auth(access_token),
params={'user_id': user_id},
)
return await self._process(req, s.UserChatColor)
async def update_user_chat_color(
self,
access_token: str,
user_id: int,
color: str,
) -> bool:
req = await self.put(
'/chat/color',
headers=self._auth(access_token),
params={'user_id': user_id, 'color': color},
)
return await self._process(req, None)
async def create_clip(
self,
access_token: str,
broadcaster_id: int,
*,
has_delay: bool | None = None,
) -> bool:
req = await self.post(
'/clips',
headers=self._auth(access_token),
params=self.clean_dict(
{'broadcaster_id': broadcaster_id, 'has_delay': has_delay}
),
)
return await self._process(req, None)
async def get_clips(
self,
access_token: str,
*,
broadcaster_id: int | None = None,
game_id: int | None = None,
clip_id: str | list[str] | None = None,
started_at: datetime | None = None,
ended_at: datetime | None = None,
first: int = 20,
before: str | None = None,
is_featured: bool | None = None,
cache_ttl: int | None = None,
) -> AsyncGenerator[s.Clip]:
async for data in self._paginate(
'/clips',
s.Clips,
self._auth(access_token),
self.clean_dict(
{
'broadcaster_id': broadcaster_id,
'game_id': game_id,
'clip_id': clip_id,
'started_at': started_at,
'ended_at': ended_at,
'first': first,
'before': before,
'is_featured': is_featured,
}
),
cache_ttl,
):
for item in data.data:
yield item
async def get_clips_downloads(
self,
access_token: str,
editor_id: int,
broadcaster_id: int,
clip_id: str,
*,
cache_ttl: int | None = None,
) -> s.ClipsDownloads:
req = await self.get(
'/clips/downloads',
cache_ttl=cache_ttl,
headers=self._auth(access_token),
params=self.clean_dict(
{
'editor_id': editor_id,
'broadcaster_id': broadcaster_id,
'clip_id': clip_id,
}
),
)
return await self._process(req, s.ClipsDownloads)
async def get_conduits(
self, access_token: str, *, cache_ttl: int | None = None
) -> s.Conduits:
req = await self.get(
'/eventsub/conduits',
cache_ttl=cache_ttl,
headers=self._auth(access_token),
)
return await self._process(req, s.Conduits)
async def create_conduits(
self, access_token: str, shard_count: int
) -> s.Conduits:
req = await self.post(
'/eventsub/conduits',
headers=self._auth(access_token),
json={'shard_count': shard_count},
)
return await self._process(req, s.Conduits)
async def update_conduits(
self, access_token: str, conduit_id: str, shard_count: int
) -> s.Conduits:
req = await self.patch(
'/eventsub/conduits',
headers=self._auth(access_token),
json={'shard_count': shard_count, 'id': conduit_id},
)
return await self._process(req, s.Conduits)
async def delete_conduit(self, access_token: str, conduit_id: str) -> bool:
req = await self.delete(
'/eventsub/conduits',
headers=self._auth(access_token),
params={'id': conduit_id},
)
return await self._process(req, None)
async def get_conduit_shards(
self,
access_token: str,
conduit_id: str,
*,
status: str | None = None,
cache_ttl: int | None = None,
) -> AsyncGenerator[s.ConduitShard]:
async for data in self._paginate(
'/eventsub/conduits/shards',
s.ConduitShards,
self._auth(access_token),
self.clean_dict({'id': conduit_id, 'status': status}),
cache_ttl,
):
for item in data.data:
yield item
async def update_conduit_shards(
self,
access_token: str,
conduit_id: str,
shards: list[s.UpdateConduitShard],
) -> s.ConduitShards:
req = await self.patch(
'/eventsub/conduits/shards',
headers=self._auth(access_token),
json={'conduit_id': conduit_id, 'shards': shards},
)
return await self._process(req, s.ConduitShards)
async def get_content_classification_labels(
self,
access_token: str,
locale: Literal[
'bg-BG',
'cs-CZ',
'da-DK',
'de-DE',
'el-GR',
'en-GB',
'en-US',
'es-ES',
'es-MX',
'fi-FI',
'fr-FR',
'hu-HU',
'it-IT',
'ja-JP',
'ko-KR',
'nl-NL',
'no-NO',
'pl-PL',
'pt-BR',
'pt-PT',
'ro-RO',
'ru-RU',
'sk-SK',
'sv-SE',
'th-TH',
'tr-TR',
'vi-VN',
'zh-CN',
'zh-TW',
] = 'en-US',
*,
cache_ttl: int | None = None,
) -> s.ContentClassificationLabels:
req = await self.get(
'/content_classification_labels',
cache_ttl=cache_ttl,
headers=self._auth(access_token),
params={'locale': locale},
)
return await self._process(req, s.ContentClassificationLabels)
async def get_drops_entitlements(
self,
access_token: str,
*,
drop_id: str | list[str] | None = None,
user_id: int | None = None,
game_id: int | None = None,
fulfillment_status: Literal['CLAIMED', 'FULFILLED'] | None = None,
first: int = 20,
cache_ttl: int | None = None,
) -> AsyncGenerator[s.DropEntitlement]:
async for data in self._paginate(
'/entitlements/drops',
s.DropsEntitlements,
self._auth(access_token),
self.clean_dict(
{
'drop_id': drop_id,
'user_id': user_id,
'game_id': game_id,
'fulfillment_status': fulfillment_status,
}
),
cache_ttl,
):
for item in data.data:
yield item
async def update_drops_entitlements(
self,
access_token: str,
*,
entitlement_ids: list[str] | None = None,
fulfillment_status: Literal['CLAIMED', 'FULFILLED'] | None = None,
) -> s.UpdateDropsEntitlements:
req = await self.patch(
'/entitlements/drops',
headers=self._auth(access_token),
json=self.clean_dict(
{
'entitlement_ids': entitlement_ids,
'fulfillment_status': fulfillment_status,
}
),
)
return await self._process(req, s.UpdateDropsEntitlements)
async def get_extension_configuration_segment(
self,
jwt_token: str,
*,
broadcaster_id: int | None = None,
extension_id: int | None = None,
segment: Literal['broadcaster', 'developer', 'global'] | None = None,
cache_ttl: int | None = None,
) -> s.ExtensionConfigurationSegment:
req = await self.get(
'/extensions/configurations',
cache_ttl=cache_ttl,
headers=self._auth(jwt_token),
params=self.clean_dict(
{
'broadcaster_id': broadcaster_id,
'extension_id': extension_id,
'segment': segment,
}
),
)
return await self._process(req, s.ExtensionConfigurationSegment)
async def set_extension_configuration_segment(
self,
jwt_token: str,
extension_id: str,
segment: Literal['broadcaster', 'developer', 'global'],
*,
broadcaster_id: int | None = None,
content: str | None = None,
version: str | None = None,
) -> bool:
req = await self.put(
'/extensions/configurations',
headers=self._auth(jwt_token),
json=self.clean_dict(
{
'extension_id': extension_id,
'segment': segment,
'broadcaster_id': broadcaster_id,
'content': content,
'version': version,
}
),
)
return await self._process(req, None)
async def set_extension_required_configurations(
self,
jwt_token: str,
broadcaster_id: int,
extension_id: str,
extension_version: str,
required_configuration: str,
) -> bool:
req = await self.put(
'/extensions/configurations/required',
headers=self._auth(jwt_token),
json={
'broadcaster_id': broadcaster_id,
'extension_id': extension_id,
'extension_version': extension_version,
'required_configuration': required_configuration,
},
)
return await self._process(req, None)
async def send_extension_pubsub_message(
self,
jwt_token: str,
target: list[str],
broadcaster_id: int,
message: str,
*,
is_global_broadcast: bool | None = None,
) -> bool:
req = await self.post(
'/extensions/pubsub',
headers=self._auth(jwt_token),
json=self.clean_dict(
{
'target': target,
'broadcaster_id': broadcaster_id,
'message': message,
'is_global_broadcast': is_global_broadcast,
}
),
)
return await self._process(req, None)
async def get_extension_live_channels(
self,
access_token: str,
extension_id: str,
*,
first: int | None = None,
cache_ttl: int | None = None,
) -> AsyncGenerator[s.ExtensionLiveChannel]:
async for data in self._paginate(
'/extensions/live',
s.ExtensionLiveChannels,
self._auth(access_token),
self.clean_dict(
{
'extension_id': extension_id,
}
),
cache_ttl,
):
for item in data.data:
yield item
async def get_extension_secrets(
self,
jwt_token: str,
extension_id: str,
*,
cache_ttl: int | None = None,
) -> s.ExtensionSecrets:
req = await self.get(
'/extensions/jwt/secrets',
cache_ttl=cache_ttl,
headers=self._auth(jwt_token),
params={'extension_id': extension_id},
)
return await self._process(req, s.ExtensionSecrets)
async def create_extension_secret(
self, jwt_token: str, extension_id: str, delay: int = 300
) -> s.ExtensionSecrets:
req = await self.post(
'/extensions/jwt/secrets',
headers=self._auth(jwt_token),
params={'extension_id': extension_id, 'delay': delay},
)
return await self._process(req, s.ExtensionSecrets)
async def send_extension_chat_message(
self,
jwt_token: str,
broadcaster_id: int,
text: str,
extension_id: str,
extension_version: str,
) -> bool:
req = await self.post(
'/extensions/chat',
headers=self._auth(jwt_token),
params={'broadcaster_id': broadcaster_id},
json={
'text': text,
'extension_id': extension_id,
'extension_version': extension_version,
},
)
return await self._process(req, None)
async def get_extensions(
self,
jwt_token: str,
extension_id: str,
*,
extension_version: str | None = None,
cache_ttl: int | None = None,
) -> s.Extensions:
req = await self.get(
'/extensions',
cache_ttl=cache_ttl,
headers=self._auth(jwt_token),
params=self.clean_dict(
{
'extension_id': extension_id,
'extension_version': extension_version,
}
),
)
return await self._process(req, s.Extensions)
async def get_released_extension(
self,
access_token: str,
extension_id: str,
*,
extension_version: str | None = None,
cache_ttl: int | None = None,
) -> s.Extensions:
req = await self.get(
'/extensions/released',
cache_ttl=cache_ttl,
headers=self._auth(access_token),
params=self.clean_dict(
{
'extension_id': extension_id,
'extension_version': extension_version,
}
),
)
return await self._process(req, s.Extensions)
async def get_extension_bits_products(
self,
access_token: str,
*,
should_include_all: bool | None = None,
cache_ttl: int | None = None,
) -> s.ExtensionBitsProducts:
req = await self.get(
'/bits/extensions',
cache_ttl=cache_ttl,
headers=self._auth(access_token),
params=self.clean_dict(
{
'should_include_all': should_include_all,
}
),
)
return await self._process(req, s.ExtensionBitsProducts)
async def update_extension_bits_product(
self,
access_token: str,
sku: str,
cost_amount: int,
display_name: str,
*,
in_development: bool | None = None,
expiration: str | None = None,
is_broadcast: bool | None = None,
) -> s.ExtensionBitsProducts:
req = await self.put(
'/bits/extensions',
headers=self._auth(access_token),
json=self.clean_dict(
{
'sku': sku,
'cost': {
'amount': cost_amount,
'type': 'bits',
},
'display_name': display_name,
'in_development': in_development,
'expiration': expiration,
'is_broadcast': is_broadcast,
}
),
)
return await self._process(req, s.ExtensionBitsProducts)
async def create_eventsub_subscription(
self,
access_token: str,
sub_type: sub_type.Any,
version: str | int,
condition: dict[str, str],
transport_method: Literal['webhook', 'websocket', 'conduit'],
*,
webhook_callback: str | None = None,
webhook_secret: str | None = None,
websocket_session_id: str | None = None,
conduit_id: str | None = None,
) -> s.EventsubBaseSubscriptions:
req = await self.post(
'/eventsub/subscriptions',
headers=self._auth(access_token),
json={
'type': sub_type,
'version': version,
'condition': condition,
'transport': self.clean_dict(
{
'method': transport_method,
'callback': webhook_callback,
'secret': webhook_secret,
'session_id': websocket_session_id,
'conduit_id': conduit_id,
}
),
},
)
return await self._process(req, s.EventsubBaseSubscriptions)
async def delete_eventsub_subscription(
self, access_token: str, sub_id: str
) -> bool:
req = await self.delete(
'/eventsub/subscriptions',
headers=self._auth(access_token),
params={'id': sub_id},
)
return await self._process(req, None)
async def get_eventsub_subscriptions(
self,
access_token: str,
*,
status: sub_status.Any | None = None,
sub_type: sub_type.Any | None = None,
user_id: int | None = None,
subscription_id: str | None = None,
cache_ttl: int | None = None,
) -> AsyncGenerator[tuple[int, sub.Any]]:
async for data in self._paginate(
'/eventsub/subscriptions',
s.EventsubBaseSubscriptions,
self._auth(access_token),
{
'status': status,
'type': sub_type,
'user_id': user_id,
'subscription_id': subscription_id,
},
cache_ttl,
):
for item in data.data:
yield data.total, item
async def get_top_games(
self,
access_token: str,
*,
first: int = 20,
cache_ttl: int | None = None,
) -> AsyncGenerator[s.Game]:
async for data in self._paginate(
'/games/top',
s.Games,
self._auth(access_token),
{'first': first},
cache_ttl,
):
for item in data.data:
yield item
async def get_games(
self,
access_token: str,
game_id: int | list[int],
name: str,
*,
igdb_id: str | None = None,
cache_ttl: int | None = None,
) -> s.Games:
req = await self.get(
'/games',
cache_ttl=cache_ttl,
headers=self._auth(access_token),
params=self.clean_dict(
{
'id': game_id,
'name': name,
'igdb_id': igdb_id,
}
),
)
return await self._process(req, s.Games)
async def get_creator_goals(
self,
access_token: str,
broadcaster_id: int,
*,
cache_ttl: int | None = None,
) -> s.CreatorGoals:
req = await self.get(
'/creator_goals',
cache_ttl=cache_ttl,
headers=self._auth(access_token),
params={'broadcaster_id': broadcaster_id},
)
return await self._process(req, s.CreatorGoals)
# TODO: implement guest star endpoints
async def get_hype_train_status(
self,
access_token: str,
broadcaster_id: int,
*,
cache_ttl: int | None = None,
) -> s.HypeTrainStatus:
req = await self.get(
'/hypetrain/status',
cache_ttl=cache_ttl,
headers=self._auth(access_token),
params={'broadcaster_id': broadcaster_id},
)
return await self._process(req, s.HypeTrainStatus)
async def check_automod_status(
self,
access_token: str,
broadcaster_id: int,
*,
cache_ttl: int | None = None,
) -> s.AutomodStatus:
req = await self.get(
'/moderation/enforcements/status',
cache_ttl=cache_ttl,
headers=self._auth(access_token),
params={'broadcaster_id': broadcaster_id},
)
return await self._process(req, s.AutomodStatus)
async def manage_held_automod_messages(
self,
access_token: str,
user_id: int,
msg_id: int,
action: Literal['ALLOW', 'DENY'],
) -> bool:
req = await self.post(
'/moderation/automod/message',
headers=self._auth(access_token),
params={'action': action, 'msg_id': msg_id, 'user_id': user_id},
)
return await self._process(req, None)
async def get_automod_settings(
self,
access_token: str,
broadcaster_id: int,
moderator_id: int,
*,
cache_ttl: int | None = None,
) -> s.AutomodSettings:
req = await self.get(
'/moderation/automod/settings',
cache_ttl=cache_ttl,
headers=self._auth(access_token),
params={
'broadcaster_id': broadcaster_id,
'moderator_id': moderator_id,
},
)
return await self._process(req, s.AutomodSettings)
async def update_automod_settings(
self,
access_token: str,
broadcaster_id: int,
moderator_id: int,
*,
overall_level: int | None = None,
disability: int | None = None,
aggression: int | None = None,
sexuality_sex_or_gender: int | None = None,
misogyny: int | None = None,
bullying: int | None = None,
swearing: int | None = None,
race_ethnicity_or_religion: int | None = None,
sex_based_terms: int | None = None,
) -> s.AutomodSettings:
req = await self.put(
'/moderation/automod/settings',
headers=self._auth(access_token),
json=self.clean_dict(
{
'broadcaster_id': broadcaster_id,
'moderator_id': moderator_id,
'overall_level': overall_level,
'disability': disability,
'aggression': aggression,
'sexuality_sex_or_gender': sexuality_sex_or_gender,
'misogyny': misogyny,
'bullying': bullying,
'swearing': swearing,
'race_ethnicity_or_religion': race_ethnicity_or_religion,
'sex_based_terms': sex_based_terms,
}
),
)
return await self._process(req, s.AutomodSettings)
async def get_banned_users(
self,
access_token: str,
broadcaster_id: int,
user_id: int | list[int],
*,
first: int = 20,
cache_ttl: int | None = None,
) -> AsyncGenerator[s.BannedUser]:
async for data in self._paginate(
'/moderation/banned',
s.BannedUsers,
self._auth(access_token),
{
'broadcaster_id': broadcaster_id,
'user_id': user_id,
'first': first,
},
cache_ttl,
):
for item in data.data:
yield item
async def ban_user(
self,
access_token: str,
broadcaster_id: int,
moderator_id: int,
user_id: int,
*,
duration: int | None = None,
reason: str | None = None,
) -> s.BanUser:
req = await self.post(
'/moderation/banned',
headers=self._auth(access_token),
params={
'broadcaster_id': broadcaster_id,
'moderator_id': moderator_id,
},
json=self.clean_dict(
{
'user_id': user_id,
'duration': duration,
'reason': reason,
}
),
)
return await self._process(req, s.BanUser)
async def unban_user(
self,
access_token: str,
broadcaster_id: int,
moderator_id: int,
user_id: int,
) -> bool:
req = await self.delete(
'/moderation/bans',
headers=self._auth(access_token),
params={
'broadcaster_id': broadcaster_id,
'moderator_id': moderator_id,
'user_id': user_id,
},
)
return await self._process(req, None)
async def get_unban_requests(
self,
access_token: str,
broadcaster_id: int,
moderator_id: int,
status: Literal[
'pending', 'approved', 'denied', 'acknowledged', 'canceled'
],
*,
user_id: int | None = None,
first: int | None = None,
cache_ttl: int | None = None,
) -> AsyncGenerator[s.UnbanRequest]:
async for data in self._paginate(
'/moderation/unbans',
s.UnbanRequests,
self._auth(access_token),
{
'broadcaster_id': broadcaster_id,
'moderator_id': moderator_id,
'status': status,
'user_id': user_id,
'first': first,
},
cache_ttl,
):
for item in data.data:
yield item
async def resolve_unban_request(
self,
access_token: str,
broadcaster_id: int,
moderator_id: int,
unban_request_id: str,
status: Literal['approved', 'denied'],
*,
resolution_text: str | None = None,
) -> s.UnbanRequests:
req = await self.patch(
'/moderation/unban_requests',
headers=self._auth(access_token),
params=self.clean_dict(
{
'broadcaster_id': broadcaster_id,
'moderator_id': moderator_id,
'unban_request_id': unban_request_id,
'status': status,
'resolution_text': resolution_text,
}
),
)
return await self._process(req, s.UnbanRequests)
async def get_blocked_terms(
self,
access_token: str,
broadcaster_id: int,
moderator_id: int,
*,
first: int | None = None,
cache_ttl: int | None = None,
) -> AsyncGenerator[s.BlockedTerm]:
async for data in self._paginate(
'/moderation/blocked_terms',
s.BlockedTerms,
self._auth(access_token),
{
'broadcaster_id': broadcaster_id,
'moderator_id': moderator_id,
'first': first,
},
cache_ttl,
):
for item in data.data:
yield item
async def add_blocked_term(
self,
access_token: str,
broadcaster_id: int,
moderator_id: int,
text: str,
) -> s.BlockedTerms:
req = await self.post(
'/moderation/blocked_terms',
headers=self._auth(access_token),
params={
'broadcaster_id': broadcaster_id,
'moderator_id': moderator_id,
'text': text,
},
)
return await self._process(req, s.BlockedTerms)
async def remove_blocked_term(
self,
access_token: str,
broadcaster_id: int,
moderator_id: int,
blocked_term_id: str,
) -> bool:
req = await self.delete(
'/moderation/blocked_terms',
headers=self._auth(access_token),
params={
'broadcaster_id': broadcaster_id,
'moderator_id': moderator_id,
'id': blocked_term_id,
},
)
return await self._process(req, None)
async def delete_chat_message(
self,
access_token: str,
broadcaster_id: int,
moderator_id: int,
*,
message_id: str | None = None,
) -> bool:
req = await self.delete(
'/moderation/chat',
headers=self._auth(access_token),
params=self.clean_dict(
{
'broadcaster_id': broadcaster_id,
'moderator_id': moderator_id,
'message_id': message_id,
}
),
)
return await self._process(req, None)
async def get_moderated_channels(
self,
access_token: str,
user_id: int,
*,
first: int | None = None,
cache_ttl: int | None = None,
) -> AsyncGenerator[s.ModeratedChannel]:
async for data in self._paginate(
'/moderation/channels',
s.ModeratedChannels,
self._auth(access_token),
{
'user_id': user_id,
'first': first,
},
cache_ttl,
):
for item in data.data:
yield item
async def get_moderators(
self,
access_token: str,
broadcaster_id: int,
*,
user_id: int | list[int] | None = None,
first: int = 20,
cache_ttl: int | None = None,
) -> AsyncGenerator[s.Moderator]:
async for data in self._paginate(
'/moderation/moderators',
s.Moderators,
self._auth(access_token),
self.clean_dict(
{
'broadcaster_id': broadcaster_id,
'user_id': user_id,
'first': first,
}
),
cache_ttl,
):
for item in data.data:
yield item
async def add_channel_moderator(
self, access_token: str, broadcaster_id: int, user_id: int
) -> bool:
req = await self.post(
'/moderation/moderators',
headers=self._auth(access_token),
params={'broadcaster_id': broadcaster_id, 'user_id': user_id},
)
return await self._process(req, None)
async def remove_channel_moderator(
self, access_token: str, broadcaster_id: int, user_id: int
) -> bool:
req = await self.delete(
'/moderation/moderators',
headers=self._auth(access_token),
params={'broadcaster_id': broadcaster_id, 'user_id': user_id},
)
return await self._process(req, None)
async def get_vips(
self,
access_token: str,
user_id: int,
broadcaster_id: int,
*,
first: int = 20,
cache_ttl: int | None = None,
) -> AsyncGenerator[s.VIP]:
async for data in self._paginate(
'/channels/vips',
s.VIPs,
self._auth(access_token),
{
'user_id': user_id,
'broadcaster_id': broadcaster_id,
'first': first,
},
cache_ttl,
):
for item in data.data:
yield item
async def add_channel_vip(
self, access_token: str, broadcaster_id: int, user_id: int
) -> bool:
req = await self.post(
'/channels/vips',
headers=self._auth(access_token),
params={'broadcaster_id': broadcaster_id, 'user_id': user_id},
)
return await self._process(req, None)
async def remove_channel_vip(
self, access_token: str, broadcaster_id: int, user_id: int
) -> bool:
req = await self.delete(
'/channels/vips',
headers=self._auth(access_token),
params={'broadcaster_id': broadcaster_id, 'user_id': user_id},
)
return await self._process(req, None)
async def update_shield_mode_status(
self,
access_token: str,
broadcaster_id: int,
moderator_id: int,
*,
is_active: bool,
) -> s.ShieldModeStatus:
req = await self.put(
'/moderation/shield_mode',
headers=self._auth(access_token),
params={
'broadcaster_id': broadcaster_id,
'moderator_id': moderator_id,
'is_active': is_active,
},
)
return await self._process(req, s.ShieldModeStatus)
async def get_shield_mode_status(
self,
access_token: str,
broadcaster_id: int,
moderator_id: int,
*,
cache_ttl: int | None = None,
) -> s.ShieldModeStatus:
req = await self.get(
'/moderation/shield_mode',
cache_ttl=cache_ttl,
headers=self._auth(access_token),
params={
'broadcaster_id': broadcaster_id,
'moderator_id': moderator_id,
},
)
return await self._process(req, s.ShieldModeStatus)
async def warn_chat_user(
self,
access_token: str,
broadcaster_id: int,
moderator_id: int,
user_id: int,
reason: str,
) -> s.UserWarns:
req = await self.post(
'/moderation/warnings',
headers=self._auth(access_token),
params={
'broadcaster_id': broadcaster_id,
'moderator_id': moderator_id,
},
json={
'data': {
'user_id': user_id,
'reason': reason,
}
},
)
return await self._process(req, s.UserWarns)
async def get_polls(
self,
access_token: str,
broadcaster_id: int,
*,
poll_id: int | list[int] | None = None,
first: int = 20,
cache_ttl: int | None = None,
) -> AsyncGenerator[s.Poll]:
async for data in self._paginate(
'/polls',
s.Polls,
self._auth(access_token),
self.clean_dict(
{
'broadcaster_id': broadcaster_id,
'id': poll_id,
'first': first,
}
),
cache_ttl,
):
for item in data.data:
yield item
async def create_poll(
self,
access_token: str,
broadcaster_id: int,
title: str,
choices: list[str],
duration: int,
*,
channel_points_voting_enabled: bool = False,
channel_points_per_vote: int | None = None,
) -> s.Polls:
body = self.clean_dict(
{
'broadcaster_id': broadcaster_id,
'title': title,
'choices': [
{'title': choice_title} for choice_title in choices
],
'duration': duration,
'channel_points_voting_enabled': channel_points_voting_enabled,
'channel_points_per_vote': channel_points_per_vote,
}
)
req = await self.post(
'/polls',
headers=self._auth(access_token),
json=body,
)
return await self._process(req, s.Polls)
async def end_poll(
self,
access_token: str,
broadcaster_id: int,
poll_id: str,
status: Literal['TERMINATED', 'ARCHIVED'],
) -> s.Polls:
req = await self.patch(
'/polls',
headers=self._auth(access_token),
json={
'broadcaster_id': broadcaster_id,
'id': poll_id,
'status': status,
},
)
return await self._process(req, s.Polls)
async def get_predictions(
self,
access_token: str,
broadcaster_id: int,
*,
prediction_id: int | list[int] | None = None,
first: int = 20,
cache_ttl: int | None = None,
) -> AsyncGenerator[s.Prediction]:
async for data in self._paginate(
'/predictions',
s.Predictions,
self._auth(access_token),
self.clean_dict(
{
'broadcaster_id': broadcaster_id,
'id': prediction_id,
'first': first,
}
),
cache_ttl,
):
for item in data.data:
yield item
async def create_prediction(
self,
access_token: str,
broadcaster_id: int,
title: str,
outcomes: list[str],
prediction_window: int,
) -> s.Predictions:
body = self.clean_dict(
{
'broadcaster_id': broadcaster_id,
'title': title,
'outcomes': [{'title': outcome} for outcome in outcomes],
'prediction_window': prediction_window,
}
)
req = await self.post(
'/predictions',
headers=self._auth(access_token),
json=body,
)
return await self._process(req, s.Predictions)
async def end_prediction(
self,
access_token: str,
broadcaster_id: int,
prediction_id: str,
status: Literal['RESOLVED', 'CANCELED', 'LOCKED'],
*,
winning_outcome_id: str | None = None,
) -> s.Predictions:
req = await self.patch(
'/predictions',
headers=self._auth(access_token),
json=self.clean_dict(
{
'broadcaster_id': broadcaster_id,
'id': prediction_id,
'status': status,
'winning_outcome_id': winning_outcome_id,
}
),
)
return await self._process(req, s.Predictions)
async def start_a_raid(
self,
access_token: str,
from_broadcaster_id: int,
to_broadcaster_id: int,
) -> s.Raids:
req = await self.post(
'/raids',
headers=self._auth(access_token),
json={
'from_broadcaster_id': from_broadcaster_id,
'to_broadcaster_id': to_broadcaster_id,
},
)
return await self._process(req, s.Raids)
async def cancel_a_raid(
self, access_token: str, broadcaster_id: int
) -> bool:
req = await self.patch(
'/raids',
headers=self._auth(access_token),
params={'broadcaster_id': broadcaster_id},
)
return await self._process(req, None)
async def get_channel_stream_schedule(
self,
access_token: str,
broadcaster_id: int,
*,
segment_id: int | list[int] | None = None,
start_time: datetime | None = None,
first: int = 20,
cache_ttl: int | None = None,
) -> AsyncGenerator[s.Schedule]:
async for data in self._paginate(
'/schedule',
s.Schedules,
self._auth(access_token),
self.clean_dict(
{
'broadcaster_id': broadcaster_id,
'id': segment_id,
'start_time': start_time,
'first': first,
}
),
cache_ttl,
):
for item in data.data:
yield item
async def get_channel_icalendar(
self,
access_token: str,
broadcaster_id: int,
*,
cache_ttl: int | None = None,
) -> bytes:
req = await self.get(
'/schedule/icalendar',
cache_ttl=cache_ttl,
headers=self._auth(access_token),
params={'broadcaster_id': broadcaster_id},
)
match req.status_code:
case st.OK:
mem = await req.get_bytes()
return bytes(mem)
case st.BAD_REQUEST:
raise s.ClientError(
req.status_code, (await req.json())['message']
)
case _:
raise s.InternalError(req.status_code, 'Internal Server Error')
async def update_channel_stream_schedule(
self,
access_token: str,
broadcaster_id: int,
*,
is_vacation_enabled: bool | None = None,
vacation_start_time: datetime | None = None,
vacation_end_time: datetime | None = None,
timezone: ZoneInfo | None = None,
) -> bool:
req = await self.patch(
'/schedule/settings',
headers=self._auth(access_token),
params=self.clean_dict(
{
'broadcaster_id': broadcaster_id,
'is_vacation_enabled': is_vacation_enabled,
'vacation_start_time': vacation_start_time,
'vacation_end_time': vacation_end_time,
'timezone': str(timezone) if timezone else None,
}
),
)
return await self._process(req, None)
async def create_channel_stream_schedule_segment(
self,
access_token: str,
broadcaster_id: int,
start_time: datetime,
timezone: ZoneInfo,
duration: int,
*,
is_recurring: bool | None = None,
category_id: int | None,
title: str | None = None,
) -> s.Schedules:
req = await self.post(
'/schedule/segment',
headers=self._auth(access_token),
params={'broadcaster_id': broadcaster_id},
json=self.clean_dict(
{
'start_time': start_time,
'timezone': str(timezone),
'duration': duration,
'is_recurring': is_recurring,
'category_id': category_id,
'title': title,
}
),
)
return await self._process(req, s.Schedules)
async def update_channel_stream_schedule_segment(
self,
access_token: str,
broadcaster_id: int,
segment_id: str,
*,
start_time: datetime | None = None,
duration: int | None = None,
category_id: int | None = None,
title: str | None = None,
is_canceled: bool | None = None,
timezone: ZoneInfo | None = None,
) -> s.Schedules:
req = await self.patch(
'/schedule/segment',
headers=self._auth(access_token),
params=self.clean_dict(
{
'broadcaster_id': broadcaster_id,
'id': segment_id,
}
),
json=self.clean_dict(
{
'start_time': start_time,
'duration': duration,
'category_id': category_id,
'title': title,
'is_canceled': is_canceled,
'timezone': str(timezone) if timezone else None,
}
),
)
return await self._process(req, s.Schedules)
async def delete_channel_stream_schedule_segment(
self, access_token: str, broadcaster_id: int, segment_id: str
) -> bool:
req = await self.delete(
'/schedule/segment',
headers=self._auth(access_token),
params=self.clean_dict(
{
'broadcaster_id': broadcaster_id,
'id': segment_id,
}
),
)
return await self._process(req, None)
async def search_categories(
self,
access_token: str,
query: str,
*,
first: int = 20,
cache_ttl: int | None = None,
) -> AsyncGenerator[s.Category]:
async for data in self._paginate(
'/search/categories',
s.Categories,
self._auth(access_token),
{'query': query, 'first': first},
cache_ttl,
):
for item in data.data:
yield item
async def search_channels(
self,
access_token: str,
query: str,
*,
live_only: bool = False,
first: int = 20,
cache_ttl: int | None = None,
) -> AsyncGenerator[s.Channel]:
async for data in self._paginate(
'/search/channels',
s.Channels,
self._auth(access_token),
self.clean_dict(
{'query': query, 'live_only': live_only, 'first': first}
),
cache_ttl,
):
for item in data.data:
yield item
async def get_stream_key(
self,
access_token: str,
broadcaster_id: int,
*,
cache_ttl: int | None = None,
) -> s.StreamKeys:
req = await self.get(
'/streams/key',
cache_ttl=cache_ttl,
headers=self._auth(access_token),
params={'broadcaster_id': broadcaster_id},
)
return await self._process(req, s.StreamKeys)
async def get_streams(
self,
access_token: str,
*,
user_id: int | list[int] | None = None,
user_login: str | list[str] | None = None,
game_id: int | list[int] | None = None,
stream_type: Literal['live', 'all'] = 'all',
language: str | None = None,
first: int = 20,
before: str | None = None,
cache_ttl: int | None = None,
) -> AsyncGenerator[s.Stream]:
async for data in self._paginate(
'/streams',
s.Streams,
self._auth(access_token),
self.clean_dict(
{
'user_id': user_id,
'user_login': user_login,
'game_id': game_id,
'type': stream_type,
'language': language,
'first': first,
'before': before,
}
),
cache_ttl,
):
for item in data.data:
yield item
async def get_followed_streams(
self,
access_token: str,
user_id: int,
*,
first: int = 20,
cache_ttl: int | None = None,
) -> AsyncGenerator[s.Stream]:
async for data in self._paginate(
'/streams/followed',
s.Streams,
self._auth(access_token),
{
'user_id': user_id,
'first': first,
},
cache_ttl,
):
for item in data.data:
yield item
async def create_stream_marker(
self,
access_token: str,
user_id: int,
*,
description: str | None = None,
) -> s.CreateStreamMarkers:
req = await self.post(
'/streams/markers',
headers=self._auth(access_token),
json=self.clean_dict(
{
'user_id': user_id,
'description': description,
}
),
)
return await self._process(req, s.CreateStreamMarkers)
async def get_stream_markers(
self,
access_token: str,
user_id: int,
video_id: int,
*,
first: int = 20,
before: str | None = None,
cache_ttl: int | None = None,
) -> AsyncGenerator[s.StreamMarkersData]:
async for data in self._paginate(
'/streams/markers',
s.StreamMarkers,
self._auth(access_token),
self.clean_dict(
{
'user_id': user_id,
'video_id': video_id,
'first': first,
'before': before,
}
),
cache_ttl,
):
for item in data.data:
yield item
async def get_broadcaster_subscriptions(
self,
access_token: str,
broadcaster_id: int,
*,
first: int = 20,
before: str | None = None,
cache_ttl: int | None = None,
) -> AsyncGenerator[tuple[int, int, s.BroadcasterSubscription]]:
async for data in self._paginate(
'/subscriptions',
s.BroadcasterSubscriptions,
self._auth(access_token),
self.clean_dict(
{
'broadcaster_id': broadcaster_id,
'first': first,
'before': before,
}
),
cache_ttl,
):
for item in data.data:
yield data.total, data.points, item
async def check_user_subscription(
self,
access_token: str,
broadcaster_id: int,
user_id: int,
*,
cache_ttl: int | None = None,
) -> s.UserSubscription:
req = await self.get(
'/subscriptions/user',
cache_ttl=cache_ttl,
headers=self._auth(access_token),
params=self.clean_dict(
{
'broadcaster_id': broadcaster_id,
'user_id': user_id,
}
),
)
return await self._process(req, s.UserSubscription)
async def get_channel_teams(
self,
access_token: str,
broadcaster_id: int,
*,
cache_ttl: int | None = None,
) -> s.ChannelTeams:
req = await self.get(
'/teams/channel',
cache_ttl=cache_ttl,
headers=self._auth(access_token),
params={'broadcaster_id': broadcaster_id},
)
return await self._process(req, s.ChannelTeams)
async def get_teams(
self,
access_token: str,
*,
name: str | None = None,
team_id: int | None = None,
cache_ttl: int | None = None,
) -> s.Teams:
req = await self.get(
'/teams',
cache_ttl=cache_ttl,
headers=self._auth(access_token),
params=self.clean_dict({'name': name, 'id': team_id}),
)
return await self._process(req, s.Teams)
async def get_users(
self,
access_token: str,
*,
user_id: int | list[int] | None = None,
login: str | list[str] | None = None,
cache_ttl: int | None = None,
) -> s.Users:
req = await self.get(
'/users',
cache_ttl=cache_ttl,
headers=self._auth(access_token),
params=self.clean_dict({'id': user_id, 'login': login}),
)
return await self._process(req, s.Users)
async def update_user(
self, access_token: str, *, description: str = ''
) -> s.Users:
req = await self.put(
'/users',
headers=self._auth(access_token),
json={'description': description},
)
return await self._process(req, s.Users)
async def get_authorization_by_user(
self,
access_token: str,
user_id: int | list[int],
*,
cache_ttl: int | None = None,
) -> s.AuthorizationsByUser:
req = await self.get(
'/authorization/users',
cache_ttl=cache_ttl,
headers=self._auth(access_token),
params={'user_id': user_id},
)
return await self._process(req, s.AuthorizationsByUser)
async def get_user_block_list(
self,
access_token: str,
broadcaster_id: int,
*,
first: int = 20,
cache_ttl: int | None = None,
) -> AsyncGenerator[s.UserBlock]:
async for data in self._paginate(
'/users/blocks',
s.UserBlockList,
self._auth(access_token),
{'broadcaster_id': broadcaster_id, 'first': first},
cache_ttl,
):
for item in data.data:
yield item
async def block_user(
self,
access_token: str,
target_user_id: int,
*,
source_context: Literal['chat', 'whisper'] | None = None,
reason: Literal['harassment', 'spam', 'other'] | None = None,
) -> bool:
req = await self.put(
'/users/blocks',
headers=self._auth(access_token),
json=self.clean_dict(
{
'target_user_id': target_user_id,
'source_context': source_context,
'reason': reason,
}
),
)
return await self._process(req, None)
async def unblock_user(
self, access_token: str, target_user_id: int
) -> bool:
req = await self.delete(
'/users/blocks',
headers=self._auth(access_token),
params={'target_user_id': target_user_id},
)
return await self._process(req, None)
async def get_user_extensions(
self, access_token: str, *, cache_ttl: int | None = None
) -> s.UserExtensions:
req = await self.get(
'/users/extensions/list',
cache_ttl=cache_ttl,
headers=self._auth(access_token),
)
return await self._process(req, s.UserExtensions)
async def get_user_active_extensions(
self,
access_token: str,
*,
user_id: int | None = None,
cache_ttl: int | None = None,
) -> s.UserActiveExtensions:
req = await self.get(
'/users/extensions',
cache_ttl=cache_ttl,
headers=self._auth(access_token),
params={'user_id': user_id},
)
return await self._process(req, s.UserActiveExtensions)
async def update_user_extensions(
self, access_token: str, extensions: s.UserActiveExtensions
) -> bool:
req = await self.put(
'/users/extensions',
headers=self._auth(access_token),
json=extensions.model_dump(),
)
return await self._process(req, None)
async def get_videos(
self,
access_token: str,
video_id: int | list[int],
user_id: int,
game_id: int,
*,
language: str | None = None,
period: Literal['all', 'day', 'month', 'week'] = 'all',
sort: Literal['time', 'trending', 'views'] = 'time',
video_type: Literal['all', 'archive', 'highlight', 'upload'] = 'all',
first: int = 20,
cache_ttl: int | None = None,
) -> AsyncGenerator[s.Video]:
async for data in self._paginate(
'/videos',
s.Videos,
self._auth(access_token),
self.clean_dict(
{
'video_id': video_id,
'user_id': user_id,
'game_id': game_id,
'language': language,
'period': period,
'sort': sort,
'video_type': video_type,
'first': first,
}
),
cache_ttl,
):
for item in data.data:
yield item
async def delete_videos(
self, access_token: str, video_id: int | list[int]
) -> s.DeleteVideos:
req = await self.delete(
'/videos',
headers=self._auth(access_token),
params={'id': video_id},
)
return await self._process(req, s.DeleteVideos)
async def send_whisper(
self,
access_token: str,
from_user_id: int,
to_user_id: int,
message: str,
) -> bool:
req = await self.post(
'/whispers',
headers=self._auth(access_token),
params={'from_user_id': from_user_id, 'to_user_id': to_user_id},
json={'message': message},
)
return await self._process(req, None)