2802 lines
82 KiB
Python
2802 lines
82 KiB
Python
from collections.abc import AsyncGenerator
|
|
from datetime import datetime
|
|
from typing import Any, Literal, overload
|
|
from zoneinfo import ZoneInfo
|
|
|
|
from oxidehttp import status as st
|
|
from oxidehttp.client import OxideHTTP
|
|
from oxidehttp.schema import CachedResponse, Response
|
|
from pydantic import BaseModel
|
|
|
|
from . import schema as s
|
|
from .eventsub import statuses as sub_status
|
|
from .eventsub import subscriptions as sub
|
|
from .eventsub import types as sub_type
|
|
|
|
|
|
class TwitchAPIClient(OxideHTTP):
|
|
def __init__(
|
|
self,
|
|
client_id: str,
|
|
client_secret: str,
|
|
redirect_uri: str,
|
|
redis_url: str | None = None,
|
|
proxy_url: str | None = None,
|
|
) -> None:
|
|
self.base_uri = 'https://api.twitch.tv/helix/'
|
|
self.client_id = client_id
|
|
self.client_secret = client_secret
|
|
self.redirect_uri = redirect_uri
|
|
|
|
super().__init__(
|
|
base_url=self.base_uri,
|
|
headers={'Client-Id': self.client_id},
|
|
redis_url=redis_url,
|
|
ratelimit_key='twitch' if redis_url else None,
|
|
ratelimit_limit=700 if redis_url else None,
|
|
proxy_url=proxy_url,
|
|
)
|
|
|
|
def _auth(self, access_token: str) -> dict[str, str]:
|
|
return {'Authorization': f'Bearer {access_token}'}
|
|
|
|
@overload
|
|
async def _process(
|
|
self, req: Response | CachedResponse, schema: None
|
|
) -> bool: ...
|
|
|
|
@overload
|
|
async def _process[T: BaseModel](
|
|
self, req: Response | CachedResponse, schema: type[T]
|
|
) -> T: ...
|
|
|
|
async def _process[T: BaseModel](
|
|
self, req: Response | CachedResponse, schema: type[T] | None
|
|
) -> T | bool:
|
|
if req.status_code >= 500:
|
|
raise s.InternalError(req.status_code, 'Internal Server Error')
|
|
|
|
if req.status_code >= 400 and req.status_code < 500:
|
|
data = await req.json()
|
|
message = data.get('message', 'Twitch API Error')
|
|
|
|
raise s.ClientError(req.status_code, message)
|
|
|
|
if schema is None:
|
|
return True
|
|
|
|
data = await req.json()
|
|
return schema.model_validate(data)
|
|
|
|
async def _paginate[S: s.BasePaginated](
|
|
self,
|
|
path: str,
|
|
schema: type[S],
|
|
headers: dict[str, str] | None,
|
|
params: dict[str, Any],
|
|
cache_ttl: int | None = None,
|
|
) -> AsyncGenerator[S]:
|
|
after = None
|
|
|
|
while True:
|
|
current_params = {**params, 'after': after} if after else params
|
|
|
|
req = await self.get(
|
|
path,
|
|
headers=headers,
|
|
params=current_params,
|
|
cache_ttl=cache_ttl,
|
|
)
|
|
result = await self._process(req, schema)
|
|
yield result
|
|
|
|
if isinstance(result.pagination, s.Pagination):
|
|
after = result.pagination.cursor
|
|
|
|
if after is None:
|
|
break
|
|
|
|
async def start_commercial(
|
|
self, access_token: str, broadcaster_id: int, length: int
|
|
) -> s.StartCommercial:
|
|
req = await self.post(
|
|
'/channels/commercial',
|
|
headers=self._auth(access_token),
|
|
json={
|
|
'broadcaster_id': broadcaster_id,
|
|
'length': length,
|
|
},
|
|
)
|
|
|
|
return await self._process(req, s.StartCommercial)
|
|
|
|
async def get_ad_schedule(
|
|
self,
|
|
access_token: str,
|
|
broadcaster_id: int,
|
|
*,
|
|
cache_ttl: int | None = None,
|
|
) -> s.AdSchedule:
|
|
req = await self.get(
|
|
'/channels/ads',
|
|
cache_ttl=cache_ttl,
|
|
headers=self._auth(access_token),
|
|
params={'broadcaster_id': broadcaster_id},
|
|
)
|
|
|
|
return await self._process(req, s.AdSchedule)
|
|
|
|
async def snooze_next_ad(
|
|
self, access_token: str, broadcaster_id: int
|
|
) -> s.SnoozeNextAd:
|
|
req = await self.post(
|
|
'/channels/ads/schedule/snooze',
|
|
headers=self._auth(access_token),
|
|
params={
|
|
'broadcaster_id': broadcaster_id,
|
|
},
|
|
)
|
|
|
|
return await self._process(req, s.SnoozeNextAd)
|
|
|
|
async def get_extension_analytics(
|
|
self,
|
|
access_token: str,
|
|
*,
|
|
extension_id: str | None = None,
|
|
analytics_type: Literal['overview_v2'] | None = None,
|
|
started_at: datetime | None = None,
|
|
ended_at: datetime | None = None,
|
|
first: int = 20,
|
|
cache_ttl: int | None = None,
|
|
) -> AsyncGenerator[s.ExtensionAnalyticsData]:
|
|
async for data in self._paginate(
|
|
'/analytics/extensions',
|
|
s.ExtensionAnalytics,
|
|
self._auth(access_token),
|
|
self.clean_dict(
|
|
{
|
|
'extension_id': extension_id,
|
|
'type': analytics_type,
|
|
'started_at': started_at,
|
|
'ended_at': ended_at,
|
|
'first': first,
|
|
}
|
|
),
|
|
cache_ttl,
|
|
):
|
|
for item in data.data:
|
|
yield item
|
|
|
|
async def get_game_analytics(
|
|
self,
|
|
access_token: str,
|
|
*,
|
|
game_id: int | None = None,
|
|
analytics_type: Literal['overview_v2'] | None = None,
|
|
started_at: datetime | None = None,
|
|
ended_at: datetime | None = None,
|
|
first: int = 20,
|
|
cache_ttl: int | None = None,
|
|
) -> AsyncGenerator[s.GameAnalyticsData]:
|
|
async for data in self._paginate(
|
|
'/analytics/games',
|
|
s.GameAnalytics,
|
|
self._auth(access_token),
|
|
self.clean_dict(
|
|
{
|
|
'game_id': game_id,
|
|
'type': analytics_type,
|
|
'started_at': started_at,
|
|
'ended_at': ended_at,
|
|
'first': first,
|
|
}
|
|
),
|
|
cache_ttl,
|
|
):
|
|
for item in data.data:
|
|
yield item
|
|
|
|
async def get_bits_leaderboard(
|
|
self,
|
|
access_token: str,
|
|
count: int = 10,
|
|
period: Literal['day', 'week', 'month', 'year', 'all'] = 'all',
|
|
*,
|
|
started_at: datetime | None = None,
|
|
user_id: int | None = None,
|
|
cache_ttl: int | None = None,
|
|
) -> s.BitsLeaderboard:
|
|
req = await self.get(
|
|
'/bits/leaderboard',
|
|
cache_ttl=cache_ttl,
|
|
headers=self._auth(access_token),
|
|
params=self.clean_dict(
|
|
{
|
|
'count': count,
|
|
'period': period,
|
|
'started_at': started_at,
|
|
'user_id': user_id,
|
|
}
|
|
),
|
|
)
|
|
|
|
return await self._process(req, s.BitsLeaderboard)
|
|
|
|
async def get_cheermotes(
|
|
self,
|
|
access_token: str,
|
|
*,
|
|
broadcaster_id: int | None = None,
|
|
cache_ttl: int | None = None,
|
|
) -> s.Cheermotes:
|
|
req = await self.get(
|
|
'/bits/cheermotes',
|
|
cache_ttl=cache_ttl,
|
|
headers=self._auth(access_token),
|
|
params=self.clean_dict({'broadcaster_id': broadcaster_id}),
|
|
)
|
|
|
|
return await self._process(req, s.Cheermotes)
|
|
|
|
async def get_extension_transactions(
|
|
self,
|
|
access_token: str,
|
|
extension_id: str,
|
|
*,
|
|
user_id: int | list[int] | None = None,
|
|
first: int = 20,
|
|
cache_ttl: int | None = None,
|
|
) -> AsyncGenerator[s.ExtensionTransactionsData]:
|
|
async for data in self._paginate(
|
|
'/extensions/transactions',
|
|
s.ExtensionTransactions,
|
|
self._auth(access_token),
|
|
self.clean_dict(
|
|
{
|
|
'extension_id': extension_id,
|
|
'user_id': user_id,
|
|
'first': first,
|
|
}
|
|
),
|
|
cache_ttl,
|
|
):
|
|
for item in data.data:
|
|
yield item
|
|
|
|
async def get_channel_information(
|
|
self,
|
|
access_token: str,
|
|
broadcaster_id: int | list[int],
|
|
*,
|
|
cache_ttl: int | None = None,
|
|
) -> s.ChannelsInformation:
|
|
req = await self.get(
|
|
'/channels',
|
|
cache_ttl=cache_ttl,
|
|
headers=self._auth(access_token),
|
|
params={'broadcaster_id': broadcaster_id},
|
|
)
|
|
|
|
return await self._process(req, s.ChannelsInformation)
|
|
|
|
async def modify_channel_information(
|
|
self,
|
|
access_token: str,
|
|
broadcaster_id: int,
|
|
*,
|
|
game_id: int | None = None,
|
|
broadcaster_language: str | None = None,
|
|
title: str | None = None,
|
|
delay: int | None = None,
|
|
tags: list[str] | None = None,
|
|
content_classification_labels: list[s.ContentClassificationLabel]
|
|
| None = None,
|
|
is_branded_content: bool | None = None,
|
|
) -> bool:
|
|
data = self.clean_dict(
|
|
{
|
|
'game_id': game_id,
|
|
'broadcaster_language': broadcaster_language,
|
|
'title': title,
|
|
'delay': delay,
|
|
'tags': tags,
|
|
'content_classification_labels': content_classification_labels,
|
|
'is_branded_content': is_branded_content,
|
|
}
|
|
)
|
|
req = await self.patch(
|
|
'/channels',
|
|
headers=self._auth(access_token),
|
|
params={'broadcaster_id': broadcaster_id},
|
|
json=data,
|
|
)
|
|
|
|
return await self._process(req, None)
|
|
|
|
async def get_channel_editors(
|
|
self,
|
|
access_token: str,
|
|
broadcaster_id: int,
|
|
*,
|
|
cache_ttl: int | None = None,
|
|
) -> s.ChannelEditors:
|
|
req = await self.get(
|
|
'/channels/editors',
|
|
cache_ttl=cache_ttl,
|
|
headers=self._auth(access_token),
|
|
params={'broadcaster_id': broadcaster_id},
|
|
)
|
|
|
|
return await self._process(req, s.ChannelEditors)
|
|
|
|
async def get_followed_channels(
|
|
self,
|
|
access_token: str,
|
|
broadcaster_id: int,
|
|
*,
|
|
first: int = 20,
|
|
cache_ttl: int | None = None,
|
|
) -> AsyncGenerator[tuple[int, s.FollowedChannel]]:
|
|
async for data in self._paginate(
|
|
'/channels/followed',
|
|
s.FollowedChannels,
|
|
self._auth(access_token),
|
|
self.clean_dict(
|
|
{
|
|
'broadcaster_id': broadcaster_id,
|
|
'first': first,
|
|
}
|
|
),
|
|
cache_ttl,
|
|
):
|
|
for item in data.data:
|
|
yield data.total, item
|
|
|
|
async def get_channel_followers(
|
|
self,
|
|
access_token: str,
|
|
broadcaster_id: int,
|
|
*,
|
|
user_id: int | None = None,
|
|
first: int = 20,
|
|
cache_ttl: int | None = None,
|
|
) -> AsyncGenerator[tuple[int, s.ChannelFollower]]:
|
|
async for data in self._paginate(
|
|
'/channels/followers',
|
|
s.ChannelFollowers,
|
|
self._auth(access_token),
|
|
self.clean_dict(
|
|
{
|
|
'broadcaster_id': broadcaster_id,
|
|
'user_id': user_id,
|
|
'first': first,
|
|
}
|
|
),
|
|
cache_ttl,
|
|
):
|
|
for item in data.data:
|
|
yield data.total, item
|
|
|
|
async def create_custom_rewards(
|
|
self,
|
|
access_token: str,
|
|
broadcaster_id: int,
|
|
title: str,
|
|
cost: int,
|
|
*,
|
|
prompt: str | None = None,
|
|
is_enabled: bool = True,
|
|
background_color: str | None = None,
|
|
is_user_input_required: bool = False,
|
|
is_max_per_stream_enabled: bool = False,
|
|
max_per_stream: int | None = None,
|
|
is_max_per_user_per_stream_enabled: bool = False,
|
|
max_per_user_per_stream: int | None = None,
|
|
is_global_cooldown_enabled: bool = False,
|
|
global_cooldown_seconds: int | None = None,
|
|
should_redemptions_skip_request_queue: bool = False,
|
|
) -> s.CustomRewards:
|
|
data = self.clean_dict(
|
|
{
|
|
'broadcaster_id': broadcaster_id,
|
|
'title': title,
|
|
'cost': cost,
|
|
'prompt': prompt,
|
|
'is_enabled': is_enabled,
|
|
'background_color': background_color,
|
|
'is_user_input_required': is_user_input_required,
|
|
'is_max_per_stream_enabled': is_max_per_stream_enabled,
|
|
'max_per_stream': max_per_stream,
|
|
'is_max_per_user_per_'
|
|
'stream_enabled': is_max_per_user_per_stream_enabled,
|
|
'max_per_user_per_stream': max_per_user_per_stream,
|
|
'is_global_cooldown_enabled': is_global_cooldown_enabled,
|
|
'global_cooldown_seconds': global_cooldown_seconds,
|
|
'should_redemptions_skip_'
|
|
'request_queue': should_redemptions_skip_request_queue,
|
|
}
|
|
)
|
|
|
|
req = await self.post(
|
|
'/channel_points/custom_rewards',
|
|
headers=self._auth(access_token),
|
|
json=data,
|
|
)
|
|
|
|
return await self._process(req, s.CustomRewards)
|
|
|
|
async def delete_custom_reward(
|
|
self, access_token: str, broadcaster_id: int, reward_id: str
|
|
) -> bool:
|
|
req = await self.delete(
|
|
'/channel_points/custom_rewards',
|
|
headers=self._auth(access_token),
|
|
params={'broadcaster_id': broadcaster_id, 'id': reward_id},
|
|
)
|
|
|
|
match req.status_code:
|
|
case st.NO_CONTENT:
|
|
return True
|
|
|
|
case (
|
|
st.BAD_REQUEST
|
|
| st.UNAUTHORIZED
|
|
| st.FORBIDDEN
|
|
| st.NOT_FOUND
|
|
| st.TOO_MANY_REQUESTS
|
|
):
|
|
raise s.ClientError(
|
|
req.status_code, (await req.json())['message']
|
|
)
|
|
|
|
case _:
|
|
raise s.InternalError(req.status_code, 'Internal Server Error')
|
|
|
|
async def get_custom_rewards(
|
|
self,
|
|
access_token: str,
|
|
broadcaster_id: int,
|
|
*,
|
|
reward_id: str | None = None,
|
|
only_manageable_rewards: bool = False,
|
|
cache_ttl: int | None = None,
|
|
) -> s.CustomRewards:
|
|
req = await self.get(
|
|
'/channel_points/custom_rewards',
|
|
cache_ttl=cache_ttl,
|
|
headers=self._auth(access_token),
|
|
params=self.clean_dict(
|
|
{
|
|
'broadcaster_id': broadcaster_id,
|
|
'id': reward_id,
|
|
'only_manageable_rewards': only_manageable_rewards,
|
|
}
|
|
),
|
|
)
|
|
|
|
return await self._process(req, s.CustomRewards)
|
|
|
|
async def get_custom_reward_redemption(
|
|
self,
|
|
access_token: str,
|
|
broadcaster_id: int,
|
|
reward_id: str,
|
|
status: Literal['CANCELED', 'FULFILLED', 'UNFULFILLED'],
|
|
*,
|
|
redemption_id: int | list[int] | None = None,
|
|
sort: Literal['OLDEST', 'NEWEST'] = 'OLDEST',
|
|
first: int = 20,
|
|
cache_ttl: int | None = None,
|
|
) -> AsyncGenerator[s.CustomRewardRedemption]:
|
|
async for data in self._paginate(
|
|
'/channel_points/custom_rewards/redemptions',
|
|
s.CustomRewardRedemptions,
|
|
self._auth(access_token),
|
|
self.clean_dict(
|
|
{
|
|
'broadcaster_id': broadcaster_id,
|
|
'reward_id': reward_id,
|
|
'status': status,
|
|
'redemption_id': redemption_id,
|
|
'sort': sort,
|
|
'first': first,
|
|
}
|
|
),
|
|
cache_ttl,
|
|
):
|
|
for item in data.data:
|
|
yield item
|
|
|
|
async def update_custom_reward(
|
|
self,
|
|
access_token: str,
|
|
broadcaster_id: int,
|
|
reward_id: str,
|
|
*,
|
|
title: str | None = None,
|
|
prompt: str | None = None,
|
|
cost: int | None = None,
|
|
background_color: str | None = None,
|
|
is_enabled: bool | None = None,
|
|
is_user_input_required: bool | None = None,
|
|
is_max_per_stream_enabled: bool | None = None,
|
|
max_per_stream: int | None = None,
|
|
is_max_per_user_per_stream_enabled: bool | None = None,
|
|
max_per_user_per_stream: int | None = None,
|
|
is_global_cooldown_enabled: bool | None = None,
|
|
global_cooldown_seconds: int | None = None,
|
|
is_paused: bool | None = None,
|
|
should_redemptions_skip_request_queue: bool | None = None,
|
|
) -> s.CustomRewards:
|
|
data = self.clean_dict(
|
|
{
|
|
'title': title,
|
|
'prompt': prompt,
|
|
'cost': cost,
|
|
'background_color': background_color,
|
|
'is_enabled': is_enabled,
|
|
'is_user_input_required': is_user_input_required,
|
|
'is_max_per_stream_enabled': is_max_per_stream_enabled,
|
|
'max_per_stream': max_per_stream,
|
|
'is_max_per_user_per_'
|
|
'stream_enabled': is_max_per_user_per_stream_enabled,
|
|
'max_per_user_per_stream': max_per_user_per_stream,
|
|
'is_global_cooldown_enabled': is_global_cooldown_enabled,
|
|
'global_cooldown_seconds': global_cooldown_seconds,
|
|
'is_paused': is_paused,
|
|
'should_redemptions_skip_'
|
|
'request_queue': should_redemptions_skip_request_queue,
|
|
}
|
|
)
|
|
req = await self.patch(
|
|
'/channel_points/custom_rewards',
|
|
headers=self._auth(access_token),
|
|
params={'broadcaster_id': broadcaster_id, 'id': reward_id},
|
|
json=data,
|
|
)
|
|
|
|
return await self._process(req, s.CustomRewards)
|
|
|
|
async def update_redemption_status(
|
|
self,
|
|
access_token: str,
|
|
broadcaster_id: int,
|
|
reward_id: str,
|
|
redemption_id: str | list[str],
|
|
status: Literal['CANCELED', 'FULFILLED'],
|
|
) -> bool:
|
|
req = await self.post(
|
|
'/channel_points/custom_rewards/redemptions',
|
|
headers=self._auth(access_token),
|
|
params={
|
|
'broadcaster_id': broadcaster_id,
|
|
'reward_id': reward_id,
|
|
'id': redemption_id,
|
|
},
|
|
json={'status': status},
|
|
)
|
|
|
|
return await self._process(req, None)
|
|
|
|
async def get_charity_campaign(
|
|
self,
|
|
access_token: str,
|
|
broadcaster_id: int,
|
|
*,
|
|
cache_ttl: int | None = None,
|
|
) -> s.CharityCampaign:
|
|
req = await self.get(
|
|
'/charity/campaigns',
|
|
cache_ttl=cache_ttl,
|
|
params={'broadcaster_id': broadcaster_id},
|
|
headers=self._auth(access_token),
|
|
)
|
|
|
|
return await self._process(req, s.CharityCampaign)
|
|
|
|
async def get_charity_campaign_donations(
|
|
self,
|
|
access_token: str,
|
|
broadcaster_id: int,
|
|
*,
|
|
first: int = 20,
|
|
cache_ttl: int | None = None,
|
|
) -> AsyncGenerator[s.CharityDonation]:
|
|
async for data in self._paginate(
|
|
'/charity/donations',
|
|
s.CharityDonations,
|
|
self._auth(access_token),
|
|
self.clean_dict(
|
|
{'broadcaster_id': broadcaster_id, 'first': first}
|
|
),
|
|
cache_ttl,
|
|
):
|
|
for item in data.data:
|
|
yield item
|
|
|
|
async def get_chatters(
|
|
self,
|
|
access_token: str,
|
|
broadcaster_id: int,
|
|
moderator_id: int,
|
|
*,
|
|
first: int = 20,
|
|
cache_ttl: int | None = None,
|
|
) -> AsyncGenerator[tuple[int, s.ChattersData]]:
|
|
async for data in self._paginate(
|
|
'/chat/chatters',
|
|
s.Chatters,
|
|
self._auth(access_token),
|
|
self.clean_dict(
|
|
{
|
|
'broadcaster_id': broadcaster_id,
|
|
'moderator_id': moderator_id,
|
|
'first': first,
|
|
}
|
|
),
|
|
cache_ttl,
|
|
):
|
|
for item in data.data:
|
|
yield data.total, item
|
|
|
|
async def get_channel_emotes(
|
|
self,
|
|
access_token: str,
|
|
broadcaster_id: int,
|
|
*,
|
|
cache_ttl: int | None = None,
|
|
) -> s.ChannelEmotes:
|
|
req = await self.get(
|
|
'/chat/emotes',
|
|
cache_ttl=cache_ttl,
|
|
headers=self._auth(access_token),
|
|
params={'broadcaster_id': broadcaster_id},
|
|
)
|
|
|
|
return await self._process(req, s.ChannelEmotes)
|
|
|
|
async def get_global_emotes(
|
|
self, access_token: str, *, cache_ttl: int | None = None
|
|
) -> s.GlobalEmotes:
|
|
req = await self.get(
|
|
'/chat/emotes/global',
|
|
cache_ttl=cache_ttl,
|
|
headers=self._auth(access_token),
|
|
)
|
|
|
|
return await self._process(req, s.GlobalEmotes)
|
|
|
|
async def get_emote_sets(
|
|
self,
|
|
access_token: str,
|
|
emote_set_id: int,
|
|
*,
|
|
cache_ttl: int | None = None,
|
|
) -> s.EmoteSets:
|
|
req = await self.get(
|
|
'/chat/emotes/set',
|
|
cache_ttl=cache_ttl,
|
|
headers=self._auth(access_token),
|
|
params={'emote_set_id': emote_set_id},
|
|
)
|
|
|
|
return await self._process(req, s.EmoteSets)
|
|
|
|
async def get_channel_chat_badges(
|
|
self,
|
|
access_token: str,
|
|
broadcaster_id: int,
|
|
*,
|
|
cache_ttl: int | None = None,
|
|
) -> s.ChannelChatBadges:
|
|
req = await self.get(
|
|
'/chat/badges',
|
|
cache_ttl=cache_ttl,
|
|
headers=self._auth(access_token),
|
|
params={'broadcaster_id': broadcaster_id},
|
|
)
|
|
|
|
return await self._process(req, s.ChannelChatBadges)
|
|
|
|
async def get_global_chat_badges(
|
|
self, access_token: str, *, cache_ttl: int | None = None
|
|
) -> s.GlobalChatBadges:
|
|
req = await self.get(
|
|
'/chat/badges/global',
|
|
cache_ttl=cache_ttl,
|
|
headers=self._auth(access_token),
|
|
)
|
|
|
|
return await self._process(req, s.GlobalChatBadges)
|
|
|
|
async def get_chat_settings(
|
|
self,
|
|
access_token: str,
|
|
broadcaster_id: int,
|
|
*,
|
|
moderator_id: int | None = None,
|
|
cache_ttl: int | None = None,
|
|
) -> s.ChatSettings:
|
|
req = await self.get(
|
|
'/chat/settings',
|
|
cache_ttl=cache_ttl,
|
|
headers=self._auth(access_token),
|
|
params=self.clean_dict(
|
|
{
|
|
'broadcaster_id': broadcaster_id,
|
|
'moderator_id': moderator_id,
|
|
}
|
|
),
|
|
)
|
|
|
|
return await self._process(req, s.ChatSettings)
|
|
|
|
async def get_shared_chat_session(
|
|
self,
|
|
access_token: str,
|
|
broadcaster_id: int,
|
|
*,
|
|
cache_ttl: int | None = None,
|
|
) -> s.SharedChatSession:
|
|
req = await self.get(
|
|
'/shared_chat/session',
|
|
cache_ttl=cache_ttl,
|
|
headers=self._auth(access_token),
|
|
params={'broadcaster_id': broadcaster_id},
|
|
)
|
|
|
|
return await self._process(req, s.SharedChatSession)
|
|
|
|
async def get_user_emotes(
|
|
self,
|
|
access_token: str,
|
|
user_id: int,
|
|
*,
|
|
broadcaster_id: int | None = None,
|
|
cache_ttl: int | None = None,
|
|
) -> AsyncGenerator[tuple[str, s.ChannelEmote]]:
|
|
async for data in self._paginate(
|
|
'/chat/emotes',
|
|
s.UserEmotes,
|
|
self._auth(access_token),
|
|
self.clean_dict(
|
|
{'user_id': user_id, 'broadcaster_id': broadcaster_id}
|
|
),
|
|
cache_ttl,
|
|
):
|
|
for item in data.data:
|
|
yield data.template, item
|
|
|
|
async def update_chat_settings(
|
|
self,
|
|
access_token: str,
|
|
broadcaster_id: int,
|
|
moderator_id: int,
|
|
*,
|
|
emote_mode: bool | None = None,
|
|
follower_mode: bool | None = None,
|
|
follower_mode_duration: int | None = None,
|
|
non_moderator_chat_delay: bool | None = None,
|
|
non_moderator_chat_delay_duration: Literal[2, 4, 6] | None = None,
|
|
slow_mode: bool | None = None,
|
|
slow_mode_wait_time: int | None = None,
|
|
subscriber_mode: bool | None = None,
|
|
unique_chat_mode: bool | None = None,
|
|
) -> s.ChatSettings:
|
|
req = await self.put(
|
|
'/chat/settings',
|
|
headers=self._auth(access_token),
|
|
params={
|
|
'broadcaster_id': broadcaster_id,
|
|
'moderator_id': moderator_id,
|
|
},
|
|
json=self.clean_dict(
|
|
{
|
|
'emote_mode': emote_mode,
|
|
'follower_mode': follower_mode,
|
|
'follower_mode_duration': follower_mode_duration,
|
|
'non_moderator_chat_delay': non_moderator_chat_delay,
|
|
'non_moderator_chat_delay_duration': (
|
|
non_moderator_chat_delay_duration
|
|
),
|
|
'slow_mode': slow_mode,
|
|
'slow_mode_wait_time': slow_mode_wait_time,
|
|
'subscriber_mode': subscriber_mode,
|
|
'unique_chat_mode': unique_chat_mode,
|
|
}
|
|
),
|
|
)
|
|
|
|
return await self._process(req, s.ChatSettings)
|
|
|
|
async def send_chat_announcement(
|
|
self,
|
|
access_token: str,
|
|
broadcaster_id: int,
|
|
moderator_id: int,
|
|
message: str,
|
|
color: Literal[
|
|
'blue', 'green', 'orange', 'purple', 'primary'
|
|
] = 'primary',
|
|
) -> bool:
|
|
req = await self.post(
|
|
'/chat/announcements',
|
|
headers=self._auth(access_token),
|
|
params={
|
|
'broadcaster_id': broadcaster_id,
|
|
'moderator_id': moderator_id,
|
|
},
|
|
json=self.clean_dict({'message': message, 'color': color}),
|
|
)
|
|
|
|
return await self._process(req, None)
|
|
|
|
async def send_shoutout(
|
|
self,
|
|
access_token: str,
|
|
from_broadcaster_id: int,
|
|
to_broadcaster_id: int,
|
|
moderator_id: int,
|
|
) -> bool:
|
|
req = await self.post(
|
|
'/chat/shoutouts',
|
|
headers=self._auth(access_token),
|
|
params={
|
|
'from_broadcaster_id': from_broadcaster_id,
|
|
'to_broadcaster_id': to_broadcaster_id,
|
|
'moderator_id': moderator_id,
|
|
},
|
|
)
|
|
|
|
return await self._process(req, None)
|
|
|
|
async def send_chat_message(
|
|
self,
|
|
access_token: str,
|
|
broadcaster_id: int,
|
|
sender_id: int,
|
|
message: str,
|
|
*,
|
|
reply_parent_message_id: str | None = None,
|
|
for_source_only: bool | None = None,
|
|
) -> s.Message:
|
|
req = await self.post(
|
|
'/chat/messages',
|
|
headers=self._auth(access_token),
|
|
json=self.clean_dict(
|
|
{
|
|
'broadcaster_id': broadcaster_id,
|
|
'sender_id': sender_id,
|
|
'reply_parent_message_id': reply_parent_message_id,
|
|
'for_source_only': for_source_only,
|
|
'message': message,
|
|
}
|
|
),
|
|
)
|
|
|
|
return await self._process(req, s.Message)
|
|
|
|
async def get_user_chat_color(
|
|
self,
|
|
access_token: str,
|
|
user_id: int | list[int],
|
|
*,
|
|
cache_ttl: int | None = None,
|
|
) -> s.UserChatColor:
|
|
req = await self.get(
|
|
'/chat/colors',
|
|
cache_ttl=cache_ttl,
|
|
headers=self._auth(access_token),
|
|
params={'user_id': user_id},
|
|
)
|
|
|
|
return await self._process(req, s.UserChatColor)
|
|
|
|
async def update_user_chat_color(
|
|
self,
|
|
access_token: str,
|
|
user_id: int,
|
|
color: str,
|
|
) -> bool:
|
|
req = await self.put(
|
|
'/chat/color',
|
|
headers=self._auth(access_token),
|
|
params={'user_id': user_id, 'color': color},
|
|
)
|
|
|
|
return await self._process(req, None)
|
|
|
|
async def create_clip(
|
|
self,
|
|
access_token: str,
|
|
broadcaster_id: int,
|
|
*,
|
|
has_delay: bool | None = None,
|
|
) -> bool:
|
|
req = await self.post(
|
|
'/clips',
|
|
headers=self._auth(access_token),
|
|
params=self.clean_dict(
|
|
{'broadcaster_id': broadcaster_id, 'has_delay': has_delay}
|
|
),
|
|
)
|
|
|
|
return await self._process(req, None)
|
|
|
|
async def get_clips(
|
|
self,
|
|
access_token: str,
|
|
*,
|
|
broadcaster_id: int | None = None,
|
|
game_id: int | None = None,
|
|
clip_id: str | list[str] | None = None,
|
|
started_at: datetime | None = None,
|
|
ended_at: datetime | None = None,
|
|
first: int = 20,
|
|
before: str | None = None,
|
|
is_featured: bool | None = None,
|
|
cache_ttl: int | None = None,
|
|
) -> AsyncGenerator[s.Clip]:
|
|
async for data in self._paginate(
|
|
'/clips',
|
|
s.Clips,
|
|
self._auth(access_token),
|
|
self.clean_dict(
|
|
{
|
|
'broadcaster_id': broadcaster_id,
|
|
'game_id': game_id,
|
|
'clip_id': clip_id,
|
|
'started_at': started_at,
|
|
'ended_at': ended_at,
|
|
'first': first,
|
|
'before': before,
|
|
'is_featured': is_featured,
|
|
}
|
|
),
|
|
cache_ttl,
|
|
):
|
|
for item in data.data:
|
|
yield item
|
|
|
|
async def get_clips_downloads(
|
|
self,
|
|
access_token: str,
|
|
editor_id: int,
|
|
broadcaster_id: int,
|
|
clip_id: str,
|
|
*,
|
|
cache_ttl: int | None = None,
|
|
) -> s.ClipsDownloads:
|
|
req = await self.get(
|
|
'/clips/downloads',
|
|
cache_ttl=cache_ttl,
|
|
headers=self._auth(access_token),
|
|
params=self.clean_dict(
|
|
{
|
|
'editor_id': editor_id,
|
|
'broadcaster_id': broadcaster_id,
|
|
'clip_id': clip_id,
|
|
}
|
|
),
|
|
)
|
|
|
|
return await self._process(req, s.ClipsDownloads)
|
|
|
|
async def get_conduits(
|
|
self, access_token: str, *, cache_ttl: int | None = None
|
|
) -> s.Conduits:
|
|
req = await self.get(
|
|
'/eventsub/conduits',
|
|
cache_ttl=cache_ttl,
|
|
headers=self._auth(access_token),
|
|
)
|
|
|
|
return await self._process(req, s.Conduits)
|
|
|
|
async def create_conduits(
|
|
self, access_token: str, shard_count: int
|
|
) -> s.Conduits:
|
|
req = await self.post(
|
|
'/eventsub/conduits',
|
|
headers=self._auth(access_token),
|
|
json={'shard_count': shard_count},
|
|
)
|
|
|
|
return await self._process(req, s.Conduits)
|
|
|
|
async def update_conduits(
|
|
self, access_token: str, conduit_id: str, shard_count: int
|
|
) -> s.Conduits:
|
|
req = await self.patch(
|
|
'/eventsub/conduits',
|
|
headers=self._auth(access_token),
|
|
json={'shard_count': shard_count, 'id': conduit_id},
|
|
)
|
|
|
|
return await self._process(req, s.Conduits)
|
|
|
|
async def delete_conduit(self, access_token: str, conduit_id: str) -> bool:
|
|
req = await self.delete(
|
|
'/eventsub/conduits',
|
|
headers=self._auth(access_token),
|
|
params={'id': conduit_id},
|
|
)
|
|
|
|
return await self._process(req, None)
|
|
|
|
async def get_conduit_shards(
|
|
self,
|
|
access_token: str,
|
|
conduit_id: str,
|
|
*,
|
|
status: str | None = None,
|
|
cache_ttl: int | None = None,
|
|
) -> AsyncGenerator[s.ConduitShard]:
|
|
async for data in self._paginate(
|
|
'/eventsub/conduits/shards',
|
|
s.ConduitShards,
|
|
self._auth(access_token),
|
|
self.clean_dict({'id': conduit_id, 'status': status}),
|
|
cache_ttl,
|
|
):
|
|
for item in data.data:
|
|
yield item
|
|
|
|
async def update_conduit_shards(
|
|
self,
|
|
access_token: str,
|
|
conduit_id: str,
|
|
shards: list[s.UpdateConduitShard],
|
|
) -> s.ConduitShards:
|
|
req = await self.patch(
|
|
'/eventsub/conduits/shards',
|
|
headers=self._auth(access_token),
|
|
json={'conduit_id': conduit_id, 'shards': shards},
|
|
)
|
|
|
|
return await self._process(req, s.ConduitShards)
|
|
|
|
async def get_content_classification_labels(
|
|
self,
|
|
access_token: str,
|
|
locale: Literal[
|
|
'bg-BG',
|
|
'cs-CZ',
|
|
'da-DK',
|
|
'de-DE',
|
|
'el-GR',
|
|
'en-GB',
|
|
'en-US',
|
|
'es-ES',
|
|
'es-MX',
|
|
'fi-FI',
|
|
'fr-FR',
|
|
'hu-HU',
|
|
'it-IT',
|
|
'ja-JP',
|
|
'ko-KR',
|
|
'nl-NL',
|
|
'no-NO',
|
|
'pl-PL',
|
|
'pt-BR',
|
|
'pt-PT',
|
|
'ro-RO',
|
|
'ru-RU',
|
|
'sk-SK',
|
|
'sv-SE',
|
|
'th-TH',
|
|
'tr-TR',
|
|
'vi-VN',
|
|
'zh-CN',
|
|
'zh-TW',
|
|
] = 'en-US',
|
|
*,
|
|
cache_ttl: int | None = None,
|
|
) -> s.ContentClassificationLabels:
|
|
req = await self.get(
|
|
'/content_classification_labels',
|
|
cache_ttl=cache_ttl,
|
|
headers=self._auth(access_token),
|
|
params={'locale': locale},
|
|
)
|
|
|
|
return await self._process(req, s.ContentClassificationLabels)
|
|
|
|
async def get_drops_entitlements(
|
|
self,
|
|
access_token: str,
|
|
*,
|
|
drop_id: str | list[str] | None = None,
|
|
user_id: int | None = None,
|
|
game_id: int | None = None,
|
|
fulfillment_status: Literal['CLAIMED', 'FULFILLED'] | None = None,
|
|
first: int = 20,
|
|
cache_ttl: int | None = None,
|
|
) -> AsyncGenerator[s.DropEntitlement]:
|
|
async for data in self._paginate(
|
|
'/entitlements/drops',
|
|
s.DropsEntitlements,
|
|
self._auth(access_token),
|
|
self.clean_dict(
|
|
{
|
|
'drop_id': drop_id,
|
|
'user_id': user_id,
|
|
'game_id': game_id,
|
|
'fulfillment_status': fulfillment_status,
|
|
}
|
|
),
|
|
cache_ttl,
|
|
):
|
|
for item in data.data:
|
|
yield item
|
|
|
|
async def update_drops_entitlements(
|
|
self,
|
|
access_token: str,
|
|
*,
|
|
entitlement_ids: list[str] | None = None,
|
|
fulfillment_status: Literal['CLAIMED', 'FULFILLED'] | None = None,
|
|
) -> s.UpdateDropsEntitlements:
|
|
req = await self.patch(
|
|
'/entitlements/drops',
|
|
headers=self._auth(access_token),
|
|
json=self.clean_dict(
|
|
{
|
|
'entitlement_ids': entitlement_ids,
|
|
'fulfillment_status': fulfillment_status,
|
|
}
|
|
),
|
|
)
|
|
|
|
return await self._process(req, s.UpdateDropsEntitlements)
|
|
|
|
async def get_extension_configuration_segment(
|
|
self,
|
|
jwt_token: str,
|
|
*,
|
|
broadcaster_id: int | None = None,
|
|
extension_id: int | None = None,
|
|
segment: Literal['broadcaster', 'developer', 'global'] | None = None,
|
|
cache_ttl: int | None = None,
|
|
) -> s.ExtensionConfigurationSegment:
|
|
req = await self.get(
|
|
'/extensions/configurations',
|
|
cache_ttl=cache_ttl,
|
|
headers=self._auth(jwt_token),
|
|
params=self.clean_dict(
|
|
{
|
|
'broadcaster_id': broadcaster_id,
|
|
'extension_id': extension_id,
|
|
'segment': segment,
|
|
}
|
|
),
|
|
)
|
|
|
|
return await self._process(req, s.ExtensionConfigurationSegment)
|
|
|
|
async def set_extension_configuration_segment(
|
|
self,
|
|
jwt_token: str,
|
|
extension_id: str,
|
|
segment: Literal['broadcaster', 'developer', 'global'],
|
|
*,
|
|
broadcaster_id: int | None = None,
|
|
content: str | None = None,
|
|
version: str | None = None,
|
|
) -> bool:
|
|
req = await self.put(
|
|
'/extensions/configurations',
|
|
headers=self._auth(jwt_token),
|
|
json=self.clean_dict(
|
|
{
|
|
'extension_id': extension_id,
|
|
'segment': segment,
|
|
'broadcaster_id': broadcaster_id,
|
|
'content': content,
|
|
'version': version,
|
|
}
|
|
),
|
|
)
|
|
|
|
return await self._process(req, None)
|
|
|
|
async def set_extension_required_configurations(
|
|
self,
|
|
jwt_token: str,
|
|
broadcaster_id: int,
|
|
extension_id: str,
|
|
extension_version: str,
|
|
required_configuration: str,
|
|
) -> bool:
|
|
req = await self.put(
|
|
'/extensions/configurations/required',
|
|
headers=self._auth(jwt_token),
|
|
json={
|
|
'broadcaster_id': broadcaster_id,
|
|
'extension_id': extension_id,
|
|
'extension_version': extension_version,
|
|
'required_configuration': required_configuration,
|
|
},
|
|
)
|
|
|
|
return await self._process(req, None)
|
|
|
|
async def send_extension_pubsub_message(
|
|
self,
|
|
jwt_token: str,
|
|
target: list[str],
|
|
broadcaster_id: int,
|
|
message: str,
|
|
*,
|
|
is_global_broadcast: bool | None = None,
|
|
) -> bool:
|
|
req = await self.post(
|
|
'/extensions/pubsub',
|
|
headers=self._auth(jwt_token),
|
|
json=self.clean_dict(
|
|
{
|
|
'target': target,
|
|
'broadcaster_id': broadcaster_id,
|
|
'message': message,
|
|
'is_global_broadcast': is_global_broadcast,
|
|
}
|
|
),
|
|
)
|
|
|
|
return await self._process(req, None)
|
|
|
|
async def get_extension_live_channels(
|
|
self,
|
|
access_token: str,
|
|
extension_id: str,
|
|
*,
|
|
first: int | None = None,
|
|
cache_ttl: int | None = None,
|
|
) -> AsyncGenerator[s.ExtensionLiveChannel]:
|
|
async for data in self._paginate(
|
|
'/extensions/live',
|
|
s.ExtensionLiveChannels,
|
|
self._auth(access_token),
|
|
self.clean_dict(
|
|
{
|
|
'extension_id': extension_id,
|
|
}
|
|
),
|
|
cache_ttl,
|
|
):
|
|
for item in data.data:
|
|
yield item
|
|
|
|
async def get_extension_secrets(
|
|
self,
|
|
jwt_token: str,
|
|
extension_id: str,
|
|
*,
|
|
cache_ttl: int | None = None,
|
|
) -> s.ExtensionSecrets:
|
|
req = await self.get(
|
|
'/extensions/jwt/secrets',
|
|
cache_ttl=cache_ttl,
|
|
headers=self._auth(jwt_token),
|
|
params={'extension_id': extension_id},
|
|
)
|
|
|
|
return await self._process(req, s.ExtensionSecrets)
|
|
|
|
async def create_extension_secret(
|
|
self, jwt_token: str, extension_id: str, delay: int = 300
|
|
) -> s.ExtensionSecrets:
|
|
req = await self.post(
|
|
'/extensions/jwt/secrets',
|
|
headers=self._auth(jwt_token),
|
|
params={'extension_id': extension_id, 'delay': delay},
|
|
)
|
|
|
|
return await self._process(req, s.ExtensionSecrets)
|
|
|
|
async def send_extension_chat_message(
|
|
self,
|
|
jwt_token: str,
|
|
broadcaster_id: int,
|
|
text: str,
|
|
extension_id: str,
|
|
extension_version: str,
|
|
) -> bool:
|
|
req = await self.post(
|
|
'/extensions/chat',
|
|
headers=self._auth(jwt_token),
|
|
params={'broadcaster_id': broadcaster_id},
|
|
json={
|
|
'text': text,
|
|
'extension_id': extension_id,
|
|
'extension_version': extension_version,
|
|
},
|
|
)
|
|
|
|
return await self._process(req, None)
|
|
|
|
async def get_extensions(
|
|
self,
|
|
jwt_token: str,
|
|
extension_id: str,
|
|
*,
|
|
extension_version: str | None = None,
|
|
cache_ttl: int | None = None,
|
|
) -> s.Extensions:
|
|
req = await self.get(
|
|
'/extensions',
|
|
cache_ttl=cache_ttl,
|
|
headers=self._auth(jwt_token),
|
|
params=self.clean_dict(
|
|
{
|
|
'extension_id': extension_id,
|
|
'extension_version': extension_version,
|
|
}
|
|
),
|
|
)
|
|
|
|
return await self._process(req, s.Extensions)
|
|
|
|
async def get_released_extension(
|
|
self,
|
|
access_token: str,
|
|
extension_id: str,
|
|
*,
|
|
extension_version: str | None = None,
|
|
cache_ttl: int | None = None,
|
|
) -> s.Extensions:
|
|
req = await self.get(
|
|
'/extensions/released',
|
|
cache_ttl=cache_ttl,
|
|
headers=self._auth(access_token),
|
|
params=self.clean_dict(
|
|
{
|
|
'extension_id': extension_id,
|
|
'extension_version': extension_version,
|
|
}
|
|
),
|
|
)
|
|
|
|
return await self._process(req, s.Extensions)
|
|
|
|
async def get_extension_bits_products(
|
|
self,
|
|
access_token: str,
|
|
*,
|
|
should_include_all: bool | None = None,
|
|
cache_ttl: int | None = None,
|
|
) -> s.ExtensionBitsProducts:
|
|
req = await self.get(
|
|
'/bits/extensions',
|
|
cache_ttl=cache_ttl,
|
|
headers=self._auth(access_token),
|
|
params=self.clean_dict(
|
|
{
|
|
'should_include_all': should_include_all,
|
|
}
|
|
),
|
|
)
|
|
|
|
return await self._process(req, s.ExtensionBitsProducts)
|
|
|
|
async def update_extension_bits_product(
|
|
self,
|
|
access_token: str,
|
|
sku: str,
|
|
cost_amount: int,
|
|
display_name: str,
|
|
*,
|
|
in_development: bool | None = None,
|
|
expiration: str | None = None,
|
|
is_broadcast: bool | None = None,
|
|
) -> s.ExtensionBitsProducts:
|
|
req = await self.put(
|
|
'/bits/extensions',
|
|
headers=self._auth(access_token),
|
|
json=self.clean_dict(
|
|
{
|
|
'sku': sku,
|
|
'cost': {
|
|
'amount': cost_amount,
|
|
'type': 'bits',
|
|
},
|
|
'display_name': display_name,
|
|
'in_development': in_development,
|
|
'expiration': expiration,
|
|
'is_broadcast': is_broadcast,
|
|
}
|
|
),
|
|
)
|
|
|
|
return await self._process(req, s.ExtensionBitsProducts)
|
|
|
|
async def create_eventsub_subscription(
|
|
self,
|
|
access_token: str,
|
|
sub_type: sub_type.Any,
|
|
version: str | int,
|
|
condition: dict[str, str],
|
|
transport_method: Literal['webhook', 'websocket', 'conduit'],
|
|
*,
|
|
webhook_callback: str | None = None,
|
|
webhook_secret: str | None = None,
|
|
websocket_session_id: str | None = None,
|
|
conduit_id: str | None = None,
|
|
) -> s.EventsubBaseSubscriptions:
|
|
req = await self.post(
|
|
'/eventsub/subscriptions',
|
|
headers=self._auth(access_token),
|
|
json={
|
|
'type': sub_type,
|
|
'version': version,
|
|
'condition': condition,
|
|
'transport': self.clean_dict(
|
|
{
|
|
'method': transport_method,
|
|
'callback': webhook_callback,
|
|
'secret': webhook_secret,
|
|
'session_id': websocket_session_id,
|
|
'conduit_id': conduit_id,
|
|
}
|
|
),
|
|
},
|
|
)
|
|
|
|
return await self._process(req, s.EventsubBaseSubscriptions)
|
|
|
|
async def delete_eventsub_subscription(
|
|
self, access_token: str, sub_id: str
|
|
) -> bool:
|
|
req = await self.delete(
|
|
'/eventsub/subscriptions',
|
|
headers=self._auth(access_token),
|
|
params={'id': sub_id},
|
|
)
|
|
|
|
return await self._process(req, None)
|
|
|
|
async def get_eventsub_subscriptions(
|
|
self,
|
|
access_token: str,
|
|
*,
|
|
status: sub_status.Any | None = None,
|
|
sub_type: sub_type.Any | None = None,
|
|
user_id: int | None = None,
|
|
subscription_id: str | None = None,
|
|
cache_ttl: int | None = None,
|
|
) -> AsyncGenerator[tuple[int, sub.Any]]:
|
|
async for data in self._paginate(
|
|
'/eventsub/subscriptions',
|
|
s.EventsubBaseSubscriptions,
|
|
self._auth(access_token),
|
|
{
|
|
'status': status,
|
|
'type': sub_type,
|
|
'user_id': user_id,
|
|
'subscription_id': subscription_id,
|
|
},
|
|
cache_ttl,
|
|
):
|
|
for item in data.data:
|
|
yield data.total, item
|
|
|
|
async def get_top_games(
|
|
self,
|
|
access_token: str,
|
|
*,
|
|
first: int = 20,
|
|
cache_ttl: int | None = None,
|
|
) -> AsyncGenerator[s.Game]:
|
|
async for data in self._paginate(
|
|
'/games/top',
|
|
s.Games,
|
|
self._auth(access_token),
|
|
{'first': first},
|
|
cache_ttl,
|
|
):
|
|
for item in data.data:
|
|
yield item
|
|
|
|
async def get_games(
|
|
self,
|
|
access_token: str,
|
|
game_id: int | list[int],
|
|
name: str,
|
|
*,
|
|
igdb_id: str | None = None,
|
|
cache_ttl: int | None = None,
|
|
) -> s.Games:
|
|
req = await self.get(
|
|
'/games',
|
|
cache_ttl=cache_ttl,
|
|
headers=self._auth(access_token),
|
|
params=self.clean_dict(
|
|
{
|
|
'id': game_id,
|
|
'name': name,
|
|
'igdb_id': igdb_id,
|
|
}
|
|
),
|
|
)
|
|
|
|
return await self._process(req, s.Games)
|
|
|
|
async def get_creator_goals(
|
|
self,
|
|
access_token: str,
|
|
broadcaster_id: int,
|
|
*,
|
|
cache_ttl: int | None = None,
|
|
) -> s.CreatorGoals:
|
|
req = await self.get(
|
|
'/creator_goals',
|
|
cache_ttl=cache_ttl,
|
|
headers=self._auth(access_token),
|
|
params={'broadcaster_id': broadcaster_id},
|
|
)
|
|
|
|
return await self._process(req, s.CreatorGoals)
|
|
|
|
# TODO: implement guest star endpoints
|
|
|
|
async def get_hype_train_status(
|
|
self,
|
|
access_token: str,
|
|
broadcaster_id: int,
|
|
*,
|
|
cache_ttl: int | None = None,
|
|
) -> s.HypeTrainStatus:
|
|
req = await self.get(
|
|
'/hypetrain/status',
|
|
cache_ttl=cache_ttl,
|
|
headers=self._auth(access_token),
|
|
params={'broadcaster_id': broadcaster_id},
|
|
)
|
|
|
|
return await self._process(req, s.HypeTrainStatus)
|
|
|
|
async def check_automod_status(
|
|
self,
|
|
access_token: str,
|
|
broadcaster_id: int,
|
|
*,
|
|
cache_ttl: int | None = None,
|
|
) -> s.AutomodStatus:
|
|
req = await self.get(
|
|
'/moderation/enforcements/status',
|
|
cache_ttl=cache_ttl,
|
|
headers=self._auth(access_token),
|
|
params={'broadcaster_id': broadcaster_id},
|
|
)
|
|
|
|
return await self._process(req, s.AutomodStatus)
|
|
|
|
async def manage_held_automod_messages(
|
|
self,
|
|
access_token: str,
|
|
user_id: int,
|
|
msg_id: int,
|
|
action: Literal['ALLOW', 'DENY'],
|
|
) -> bool:
|
|
req = await self.post(
|
|
'/moderation/automod/message',
|
|
headers=self._auth(access_token),
|
|
params={'action': action, 'msg_id': msg_id, 'user_id': user_id},
|
|
)
|
|
|
|
return await self._process(req, None)
|
|
|
|
async def get_automod_settings(
|
|
self,
|
|
access_token: str,
|
|
broadcaster_id: int,
|
|
moderator_id: int,
|
|
*,
|
|
cache_ttl: int | None = None,
|
|
) -> s.AutomodSettings:
|
|
req = await self.get(
|
|
'/moderation/automod/settings',
|
|
cache_ttl=cache_ttl,
|
|
headers=self._auth(access_token),
|
|
params={
|
|
'broadcaster_id': broadcaster_id,
|
|
'moderator_id': moderator_id,
|
|
},
|
|
)
|
|
|
|
return await self._process(req, s.AutomodSettings)
|
|
|
|
async def update_automod_settings(
|
|
self,
|
|
access_token: str,
|
|
broadcaster_id: int,
|
|
moderator_id: int,
|
|
*,
|
|
overall_level: int | None = None,
|
|
disability: int | None = None,
|
|
aggression: int | None = None,
|
|
sexuality_sex_or_gender: int | None = None,
|
|
misogyny: int | None = None,
|
|
bullying: int | None = None,
|
|
swearing: int | None = None,
|
|
race_ethnicity_or_religion: int | None = None,
|
|
sex_based_terms: int | None = None,
|
|
) -> s.AutomodSettings:
|
|
req = await self.put(
|
|
'/moderation/automod/settings',
|
|
headers=self._auth(access_token),
|
|
json=self.clean_dict(
|
|
{
|
|
'broadcaster_id': broadcaster_id,
|
|
'moderator_id': moderator_id,
|
|
'overall_level': overall_level,
|
|
'disability': disability,
|
|
'aggression': aggression,
|
|
'sexuality_sex_or_gender': sexuality_sex_or_gender,
|
|
'misogyny': misogyny,
|
|
'bullying': bullying,
|
|
'swearing': swearing,
|
|
'race_ethnicity_or_religion': race_ethnicity_or_religion,
|
|
'sex_based_terms': sex_based_terms,
|
|
}
|
|
),
|
|
)
|
|
|
|
return await self._process(req, s.AutomodSettings)
|
|
|
|
async def get_banned_users(
|
|
self,
|
|
access_token: str,
|
|
broadcaster_id: int,
|
|
user_id: int | list[int],
|
|
*,
|
|
first: int = 20,
|
|
cache_ttl: int | None = None,
|
|
) -> AsyncGenerator[s.BannedUser]:
|
|
async for data in self._paginate(
|
|
'/moderation/banned',
|
|
s.BannedUsers,
|
|
self._auth(access_token),
|
|
{
|
|
'broadcaster_id': broadcaster_id,
|
|
'user_id': user_id,
|
|
'first': first,
|
|
},
|
|
cache_ttl,
|
|
):
|
|
for item in data.data:
|
|
yield item
|
|
|
|
async def ban_user(
|
|
self,
|
|
access_token: str,
|
|
broadcaster_id: int,
|
|
moderator_id: int,
|
|
user_id: int,
|
|
*,
|
|
duration: int | None = None,
|
|
reason: str | None = None,
|
|
) -> s.BanUser:
|
|
req = await self.post(
|
|
'/moderation/banned',
|
|
headers=self._auth(access_token),
|
|
params={
|
|
'broadcaster_id': broadcaster_id,
|
|
'moderator_id': moderator_id,
|
|
},
|
|
json=self.clean_dict(
|
|
{
|
|
'user_id': user_id,
|
|
'duration': duration,
|
|
'reason': reason,
|
|
}
|
|
),
|
|
)
|
|
|
|
return await self._process(req, s.BanUser)
|
|
|
|
async def unban_user(
|
|
self,
|
|
access_token: str,
|
|
broadcaster_id: int,
|
|
moderator_id: int,
|
|
user_id: int,
|
|
) -> bool:
|
|
req = await self.delete(
|
|
'/moderation/bans',
|
|
headers=self._auth(access_token),
|
|
params={
|
|
'broadcaster_id': broadcaster_id,
|
|
'moderator_id': moderator_id,
|
|
'user_id': user_id,
|
|
},
|
|
)
|
|
|
|
return await self._process(req, None)
|
|
|
|
async def get_unban_requests(
|
|
self,
|
|
access_token: str,
|
|
broadcaster_id: int,
|
|
moderator_id: int,
|
|
status: Literal[
|
|
'pending', 'approved', 'denied', 'acknowledged', 'canceled'
|
|
],
|
|
*,
|
|
user_id: int | None = None,
|
|
first: int | None = None,
|
|
cache_ttl: int | None = None,
|
|
) -> AsyncGenerator[s.UnbanRequest]:
|
|
async for data in self._paginate(
|
|
'/moderation/unbans',
|
|
s.UnbanRequests,
|
|
self._auth(access_token),
|
|
{
|
|
'broadcaster_id': broadcaster_id,
|
|
'moderator_id': moderator_id,
|
|
'status': status,
|
|
'user_id': user_id,
|
|
'first': first,
|
|
},
|
|
cache_ttl,
|
|
):
|
|
for item in data.data:
|
|
yield item
|
|
|
|
async def resolve_unban_request(
|
|
self,
|
|
access_token: str,
|
|
broadcaster_id: int,
|
|
moderator_id: int,
|
|
unban_request_id: str,
|
|
status: Literal['approved', 'denied'],
|
|
*,
|
|
resolution_text: str | None = None,
|
|
) -> s.UnbanRequests:
|
|
req = await self.patch(
|
|
'/moderation/unban_requests',
|
|
headers=self._auth(access_token),
|
|
params=self.clean_dict(
|
|
{
|
|
'broadcaster_id': broadcaster_id,
|
|
'moderator_id': moderator_id,
|
|
'unban_request_id': unban_request_id,
|
|
'status': status,
|
|
'resolution_text': resolution_text,
|
|
}
|
|
),
|
|
)
|
|
|
|
return await self._process(req, s.UnbanRequests)
|
|
|
|
async def get_blocked_terms(
|
|
self,
|
|
access_token: str,
|
|
broadcaster_id: int,
|
|
moderator_id: int,
|
|
*,
|
|
first: int | None = None,
|
|
cache_ttl: int | None = None,
|
|
) -> AsyncGenerator[s.BlockedTerm]:
|
|
async for data in self._paginate(
|
|
'/moderation/blocked_terms',
|
|
s.BlockedTerms,
|
|
self._auth(access_token),
|
|
{
|
|
'broadcaster_id': broadcaster_id,
|
|
'moderator_id': moderator_id,
|
|
'first': first,
|
|
},
|
|
cache_ttl,
|
|
):
|
|
for item in data.data:
|
|
yield item
|
|
|
|
async def add_blocked_term(
|
|
self,
|
|
access_token: str,
|
|
broadcaster_id: int,
|
|
moderator_id: int,
|
|
text: str,
|
|
) -> s.BlockedTerms:
|
|
req = await self.post(
|
|
'/moderation/blocked_terms',
|
|
headers=self._auth(access_token),
|
|
params={
|
|
'broadcaster_id': broadcaster_id,
|
|
'moderator_id': moderator_id,
|
|
'text': text,
|
|
},
|
|
)
|
|
|
|
return await self._process(req, s.BlockedTerms)
|
|
|
|
async def remove_blocked_term(
|
|
self,
|
|
access_token: str,
|
|
broadcaster_id: int,
|
|
moderator_id: int,
|
|
blocked_term_id: str,
|
|
) -> bool:
|
|
req = await self.delete(
|
|
'/moderation/blocked_terms',
|
|
headers=self._auth(access_token),
|
|
params={
|
|
'broadcaster_id': broadcaster_id,
|
|
'moderator_id': moderator_id,
|
|
'id': blocked_term_id,
|
|
},
|
|
)
|
|
|
|
return await self._process(req, None)
|
|
|
|
async def delete_chat_message(
|
|
self,
|
|
access_token: str,
|
|
broadcaster_id: int,
|
|
moderator_id: int,
|
|
*,
|
|
message_id: str | None = None,
|
|
) -> bool:
|
|
req = await self.delete(
|
|
'/moderation/chat',
|
|
headers=self._auth(access_token),
|
|
params=self.clean_dict(
|
|
{
|
|
'broadcaster_id': broadcaster_id,
|
|
'moderator_id': moderator_id,
|
|
'message_id': message_id,
|
|
}
|
|
),
|
|
)
|
|
|
|
return await self._process(req, None)
|
|
|
|
async def get_moderated_channels(
|
|
self,
|
|
access_token: str,
|
|
user_id: int,
|
|
*,
|
|
first: int | None = None,
|
|
cache_ttl: int | None = None,
|
|
) -> AsyncGenerator[s.ModeratedChannel]:
|
|
async for data in self._paginate(
|
|
'/moderation/channels',
|
|
s.ModeratedChannels,
|
|
self._auth(access_token),
|
|
{
|
|
'user_id': user_id,
|
|
'first': first,
|
|
},
|
|
cache_ttl,
|
|
):
|
|
for item in data.data:
|
|
yield item
|
|
|
|
async def get_moderators(
|
|
self,
|
|
access_token: str,
|
|
broadcaster_id: int,
|
|
*,
|
|
user_id: int | list[int] | None = None,
|
|
first: int = 20,
|
|
cache_ttl: int | None = None,
|
|
) -> AsyncGenerator[s.Moderator]:
|
|
async for data in self._paginate(
|
|
'/moderation/moderators',
|
|
s.Moderators,
|
|
self._auth(access_token),
|
|
self.clean_dict(
|
|
{
|
|
'broadcaster_id': broadcaster_id,
|
|
'user_id': user_id,
|
|
'first': first,
|
|
}
|
|
),
|
|
cache_ttl,
|
|
):
|
|
for item in data.data:
|
|
yield item
|
|
|
|
async def add_channel_moderator(
|
|
self, access_token: str, broadcaster_id: int, user_id: int
|
|
) -> bool:
|
|
req = await self.post(
|
|
'/moderation/moderators',
|
|
headers=self._auth(access_token),
|
|
params={'broadcaster_id': broadcaster_id, 'user_id': user_id},
|
|
)
|
|
|
|
return await self._process(req, None)
|
|
|
|
async def remove_channel_moderator(
|
|
self, access_token: str, broadcaster_id: int, user_id: int
|
|
) -> bool:
|
|
req = await self.delete(
|
|
'/moderation/moderators',
|
|
headers=self._auth(access_token),
|
|
params={'broadcaster_id': broadcaster_id, 'user_id': user_id},
|
|
)
|
|
|
|
return await self._process(req, None)
|
|
|
|
async def get_vips(
|
|
self,
|
|
access_token: str,
|
|
user_id: int,
|
|
broadcaster_id: int,
|
|
*,
|
|
first: int = 20,
|
|
cache_ttl: int | None = None,
|
|
) -> AsyncGenerator[s.VIP]:
|
|
async for data in self._paginate(
|
|
'/channels/vips',
|
|
s.VIPs,
|
|
self._auth(access_token),
|
|
{
|
|
'user_id': user_id,
|
|
'broadcaster_id': broadcaster_id,
|
|
'first': first,
|
|
},
|
|
cache_ttl,
|
|
):
|
|
for item in data.data:
|
|
yield item
|
|
|
|
async def add_channel_vip(
|
|
self, access_token: str, broadcaster_id: int, user_id: int
|
|
) -> bool:
|
|
req = await self.post(
|
|
'/channels/vips',
|
|
headers=self._auth(access_token),
|
|
params={'broadcaster_id': broadcaster_id, 'user_id': user_id},
|
|
)
|
|
|
|
return await self._process(req, None)
|
|
|
|
async def remove_channel_vip(
|
|
self, access_token: str, broadcaster_id: int, user_id: int
|
|
) -> bool:
|
|
req = await self.delete(
|
|
'/channels/vips',
|
|
headers=self._auth(access_token),
|
|
params={'broadcaster_id': broadcaster_id, 'user_id': user_id},
|
|
)
|
|
|
|
return await self._process(req, None)
|
|
|
|
async def update_shield_mode_status(
|
|
self,
|
|
access_token: str,
|
|
broadcaster_id: int,
|
|
moderator_id: int,
|
|
*,
|
|
is_active: bool,
|
|
) -> s.ShieldModeStatus:
|
|
req = await self.put(
|
|
'/moderation/shield_mode',
|
|
headers=self._auth(access_token),
|
|
params={
|
|
'broadcaster_id': broadcaster_id,
|
|
'moderator_id': moderator_id,
|
|
'is_active': is_active,
|
|
},
|
|
)
|
|
|
|
return await self._process(req, s.ShieldModeStatus)
|
|
|
|
async def get_shield_mode_status(
|
|
self,
|
|
access_token: str,
|
|
broadcaster_id: int,
|
|
moderator_id: int,
|
|
*,
|
|
cache_ttl: int | None = None,
|
|
) -> s.ShieldModeStatus:
|
|
req = await self.get(
|
|
'/moderation/shield_mode',
|
|
cache_ttl=cache_ttl,
|
|
headers=self._auth(access_token),
|
|
params={
|
|
'broadcaster_id': broadcaster_id,
|
|
'moderator_id': moderator_id,
|
|
},
|
|
)
|
|
|
|
return await self._process(req, s.ShieldModeStatus)
|
|
|
|
async def warn_chat_user(
|
|
self,
|
|
access_token: str,
|
|
broadcaster_id: int,
|
|
moderator_id: int,
|
|
user_id: int,
|
|
reason: str,
|
|
) -> s.UserWarns:
|
|
req = await self.post(
|
|
'/moderation/warnings',
|
|
headers=self._auth(access_token),
|
|
params={
|
|
'broadcaster_id': broadcaster_id,
|
|
'moderator_id': moderator_id,
|
|
},
|
|
json={
|
|
'data': {
|
|
'user_id': user_id,
|
|
'reason': reason,
|
|
}
|
|
},
|
|
)
|
|
|
|
return await self._process(req, s.UserWarns)
|
|
|
|
async def get_polls(
|
|
self,
|
|
access_token: str,
|
|
broadcaster_id: int,
|
|
*,
|
|
poll_id: int | list[int] | None = None,
|
|
first: int = 20,
|
|
cache_ttl: int | None = None,
|
|
) -> AsyncGenerator[s.Poll]:
|
|
async for data in self._paginate(
|
|
'/polls',
|
|
s.Polls,
|
|
self._auth(access_token),
|
|
self.clean_dict(
|
|
{
|
|
'broadcaster_id': broadcaster_id,
|
|
'id': poll_id,
|
|
'first': first,
|
|
}
|
|
),
|
|
cache_ttl,
|
|
):
|
|
for item in data.data:
|
|
yield item
|
|
|
|
async def create_poll(
|
|
self,
|
|
access_token: str,
|
|
broadcaster_id: int,
|
|
title: str,
|
|
choices: list[str],
|
|
duration: int,
|
|
*,
|
|
channel_points_voting_enabled: bool = False,
|
|
channel_points_per_vote: int | None = None,
|
|
) -> s.Polls:
|
|
body = self.clean_dict(
|
|
{
|
|
'broadcaster_id': broadcaster_id,
|
|
'title': title,
|
|
'choices': [
|
|
{'title': choice_title} for choice_title in choices
|
|
],
|
|
'duration': duration,
|
|
'channel_points_voting_enabled': channel_points_voting_enabled,
|
|
'channel_points_per_vote': channel_points_per_vote,
|
|
}
|
|
)
|
|
req = await self.post(
|
|
'/polls',
|
|
headers=self._auth(access_token),
|
|
json=body,
|
|
)
|
|
|
|
return await self._process(req, s.Polls)
|
|
|
|
async def end_poll(
|
|
self,
|
|
access_token: str,
|
|
broadcaster_id: int,
|
|
poll_id: str,
|
|
status: Literal['TERMINATED', 'ARCHIVED'],
|
|
) -> s.Polls:
|
|
req = await self.patch(
|
|
'/polls',
|
|
headers=self._auth(access_token),
|
|
json={
|
|
'broadcaster_id': broadcaster_id,
|
|
'id': poll_id,
|
|
'status': status,
|
|
},
|
|
)
|
|
|
|
return await self._process(req, s.Polls)
|
|
|
|
async def get_predictions(
|
|
self,
|
|
access_token: str,
|
|
broadcaster_id: int,
|
|
*,
|
|
prediction_id: int | list[int] | None = None,
|
|
first: int = 20,
|
|
cache_ttl: int | None = None,
|
|
) -> AsyncGenerator[s.Prediction]:
|
|
async for data in self._paginate(
|
|
'/predictions',
|
|
s.Predictions,
|
|
self._auth(access_token),
|
|
self.clean_dict(
|
|
{
|
|
'broadcaster_id': broadcaster_id,
|
|
'id': prediction_id,
|
|
'first': first,
|
|
}
|
|
),
|
|
cache_ttl,
|
|
):
|
|
for item in data.data:
|
|
yield item
|
|
|
|
async def create_prediction(
|
|
self,
|
|
access_token: str,
|
|
broadcaster_id: int,
|
|
title: str,
|
|
outcomes: list[str],
|
|
prediction_window: int,
|
|
) -> s.Predictions:
|
|
body = self.clean_dict(
|
|
{
|
|
'broadcaster_id': broadcaster_id,
|
|
'title': title,
|
|
'outcomes': [{'title': outcome} for outcome in outcomes],
|
|
'prediction_window': prediction_window,
|
|
}
|
|
)
|
|
req = await self.post(
|
|
'/predictions',
|
|
headers=self._auth(access_token),
|
|
json=body,
|
|
)
|
|
|
|
return await self._process(req, s.Predictions)
|
|
|
|
async def end_prediction(
|
|
self,
|
|
access_token: str,
|
|
broadcaster_id: int,
|
|
prediction_id: str,
|
|
status: Literal['RESOLVED', 'CANCELED', 'LOCKED'],
|
|
*,
|
|
winning_outcome_id: str | None = None,
|
|
) -> s.Predictions:
|
|
req = await self.patch(
|
|
'/predictions',
|
|
headers=self._auth(access_token),
|
|
json=self.clean_dict(
|
|
{
|
|
'broadcaster_id': broadcaster_id,
|
|
'id': prediction_id,
|
|
'status': status,
|
|
'winning_outcome_id': winning_outcome_id,
|
|
}
|
|
),
|
|
)
|
|
|
|
return await self._process(req, s.Predictions)
|
|
|
|
async def start_a_raid(
|
|
self,
|
|
access_token: str,
|
|
from_broadcaster_id: int,
|
|
to_broadcaster_id: int,
|
|
) -> s.Raids:
|
|
req = await self.post(
|
|
'/raids',
|
|
headers=self._auth(access_token),
|
|
json={
|
|
'from_broadcaster_id': from_broadcaster_id,
|
|
'to_broadcaster_id': to_broadcaster_id,
|
|
},
|
|
)
|
|
|
|
return await self._process(req, s.Raids)
|
|
|
|
async def cancel_a_raid(
|
|
self, access_token: str, broadcaster_id: int
|
|
) -> bool:
|
|
req = await self.patch(
|
|
'/raids',
|
|
headers=self._auth(access_token),
|
|
params={'broadcaster_id': broadcaster_id},
|
|
)
|
|
|
|
return await self._process(req, None)
|
|
|
|
async def get_channel_stream_schedule(
|
|
self,
|
|
access_token: str,
|
|
broadcaster_id: int,
|
|
*,
|
|
segment_id: int | list[int] | None = None,
|
|
start_time: datetime | None = None,
|
|
first: int = 20,
|
|
cache_ttl: int | None = None,
|
|
) -> AsyncGenerator[s.Schedule]:
|
|
async for data in self._paginate(
|
|
'/schedule',
|
|
s.Schedules,
|
|
self._auth(access_token),
|
|
self.clean_dict(
|
|
{
|
|
'broadcaster_id': broadcaster_id,
|
|
'id': segment_id,
|
|
'start_time': start_time,
|
|
'first': first,
|
|
}
|
|
),
|
|
cache_ttl,
|
|
):
|
|
for item in data.data:
|
|
yield item
|
|
|
|
async def get_channel_icalendar(
|
|
self,
|
|
access_token: str,
|
|
broadcaster_id: int,
|
|
*,
|
|
cache_ttl: int | None = None,
|
|
) -> bytes:
|
|
req = await self.get(
|
|
'/schedule/icalendar',
|
|
cache_ttl=cache_ttl,
|
|
headers=self._auth(access_token),
|
|
params={'broadcaster_id': broadcaster_id},
|
|
)
|
|
|
|
match req.status_code:
|
|
case st.OK:
|
|
mem = await req.get_bytes()
|
|
return bytes(mem)
|
|
|
|
case st.BAD_REQUEST:
|
|
raise s.ClientError(
|
|
req.status_code, (await req.json())['message']
|
|
)
|
|
|
|
case _:
|
|
raise s.InternalError(req.status_code, 'Internal Server Error')
|
|
|
|
async def update_channel_stream_schedule(
|
|
self,
|
|
access_token: str,
|
|
broadcaster_id: int,
|
|
*,
|
|
is_vacation_enabled: bool | None = None,
|
|
vacation_start_time: datetime | None = None,
|
|
vacation_end_time: datetime | None = None,
|
|
timezone: ZoneInfo | None = None,
|
|
) -> bool:
|
|
req = await self.patch(
|
|
'/schedule/settings',
|
|
headers=self._auth(access_token),
|
|
params=self.clean_dict(
|
|
{
|
|
'broadcaster_id': broadcaster_id,
|
|
'is_vacation_enabled': is_vacation_enabled,
|
|
'vacation_start_time': vacation_start_time,
|
|
'vacation_end_time': vacation_end_time,
|
|
'timezone': str(timezone) if timezone else None,
|
|
}
|
|
),
|
|
)
|
|
|
|
return await self._process(req, None)
|
|
|
|
async def create_channel_stream_schedule_segment(
|
|
self,
|
|
access_token: str,
|
|
broadcaster_id: int,
|
|
start_time: datetime,
|
|
timezone: ZoneInfo,
|
|
duration: int,
|
|
*,
|
|
is_recurring: bool | None = None,
|
|
category_id: int | None,
|
|
title: str | None = None,
|
|
) -> s.Schedules:
|
|
req = await self.post(
|
|
'/schedule/segment',
|
|
headers=self._auth(access_token),
|
|
params={'broadcaster_id': broadcaster_id},
|
|
json=self.clean_dict(
|
|
{
|
|
'start_time': start_time,
|
|
'timezone': str(timezone),
|
|
'duration': duration,
|
|
'is_recurring': is_recurring,
|
|
'category_id': category_id,
|
|
'title': title,
|
|
}
|
|
),
|
|
)
|
|
|
|
return await self._process(req, s.Schedules)
|
|
|
|
async def update_channel_stream_schedule_segment(
|
|
self,
|
|
access_token: str,
|
|
broadcaster_id: int,
|
|
segment_id: str,
|
|
*,
|
|
start_time: datetime | None = None,
|
|
duration: int | None = None,
|
|
category_id: int | None = None,
|
|
title: str | None = None,
|
|
is_canceled: bool | None = None,
|
|
timezone: ZoneInfo | None = None,
|
|
) -> s.Schedules:
|
|
req = await self.patch(
|
|
'/schedule/segment',
|
|
headers=self._auth(access_token),
|
|
params=self.clean_dict(
|
|
{
|
|
'broadcaster_id': broadcaster_id,
|
|
'id': segment_id,
|
|
}
|
|
),
|
|
json=self.clean_dict(
|
|
{
|
|
'start_time': start_time,
|
|
'duration': duration,
|
|
'category_id': category_id,
|
|
'title': title,
|
|
'is_canceled': is_canceled,
|
|
'timezone': str(timezone) if timezone else None,
|
|
}
|
|
),
|
|
)
|
|
|
|
return await self._process(req, s.Schedules)
|
|
|
|
async def delete_channel_stream_schedule_segment(
|
|
self, access_token: str, broadcaster_id: int, segment_id: str
|
|
) -> bool:
|
|
req = await self.delete(
|
|
'/schedule/segment',
|
|
headers=self._auth(access_token),
|
|
params=self.clean_dict(
|
|
{
|
|
'broadcaster_id': broadcaster_id,
|
|
'id': segment_id,
|
|
}
|
|
),
|
|
)
|
|
|
|
return await self._process(req, None)
|
|
|
|
async def search_categories(
|
|
self,
|
|
access_token: str,
|
|
query: str,
|
|
*,
|
|
first: int = 20,
|
|
cache_ttl: int | None = None,
|
|
) -> AsyncGenerator[s.Category]:
|
|
async for data in self._paginate(
|
|
'/search/categories',
|
|
s.Categories,
|
|
self._auth(access_token),
|
|
{'query': query, 'first': first},
|
|
cache_ttl,
|
|
):
|
|
for item in data.data:
|
|
yield item
|
|
|
|
async def search_channels(
|
|
self,
|
|
access_token: str,
|
|
query: str,
|
|
*,
|
|
live_only: bool = False,
|
|
first: int = 20,
|
|
cache_ttl: int | None = None,
|
|
) -> AsyncGenerator[s.Channel]:
|
|
async for data in self._paginate(
|
|
'/search/channels',
|
|
s.Channels,
|
|
self._auth(access_token),
|
|
self.clean_dict(
|
|
{'query': query, 'live_only': live_only, 'first': first}
|
|
),
|
|
cache_ttl,
|
|
):
|
|
for item in data.data:
|
|
yield item
|
|
|
|
async def get_stream_key(
|
|
self,
|
|
access_token: str,
|
|
broadcaster_id: int,
|
|
*,
|
|
cache_ttl: int | None = None,
|
|
) -> s.StreamKeys:
|
|
req = await self.get(
|
|
'/streams/key',
|
|
cache_ttl=cache_ttl,
|
|
headers=self._auth(access_token),
|
|
params={'broadcaster_id': broadcaster_id},
|
|
)
|
|
|
|
return await self._process(req, s.StreamKeys)
|
|
|
|
async def get_streams(
|
|
self,
|
|
access_token: str,
|
|
*,
|
|
user_id: int | list[int] | None = None,
|
|
user_login: str | list[str] | None = None,
|
|
game_id: int | list[int] | None = None,
|
|
stream_type: Literal['live', 'all'] = 'all',
|
|
language: str | None = None,
|
|
first: int = 20,
|
|
before: str | None = None,
|
|
cache_ttl: int | None = None,
|
|
) -> AsyncGenerator[s.Stream]:
|
|
async for data in self._paginate(
|
|
'/streams',
|
|
s.Streams,
|
|
self._auth(access_token),
|
|
self.clean_dict(
|
|
{
|
|
'user_id': user_id,
|
|
'user_login': user_login,
|
|
'game_id': game_id,
|
|
'type': stream_type,
|
|
'language': language,
|
|
'first': first,
|
|
'before': before,
|
|
}
|
|
),
|
|
cache_ttl,
|
|
):
|
|
for item in data.data:
|
|
yield item
|
|
|
|
async def get_followed_streams(
|
|
self,
|
|
access_token: str,
|
|
user_id: int,
|
|
*,
|
|
first: int = 20,
|
|
cache_ttl: int | None = None,
|
|
) -> AsyncGenerator[s.Stream]:
|
|
async for data in self._paginate(
|
|
'/streams/followed',
|
|
s.Streams,
|
|
self._auth(access_token),
|
|
{
|
|
'user_id': user_id,
|
|
'first': first,
|
|
},
|
|
cache_ttl,
|
|
):
|
|
for item in data.data:
|
|
yield item
|
|
|
|
async def create_stream_marker(
|
|
self,
|
|
access_token: str,
|
|
user_id: int,
|
|
*,
|
|
description: str | None = None,
|
|
) -> s.CreateStreamMarkers:
|
|
req = await self.post(
|
|
'/streams/markers',
|
|
headers=self._auth(access_token),
|
|
json=self.clean_dict(
|
|
{
|
|
'user_id': user_id,
|
|
'description': description,
|
|
}
|
|
),
|
|
)
|
|
|
|
return await self._process(req, s.CreateStreamMarkers)
|
|
|
|
async def get_stream_markers(
|
|
self,
|
|
access_token: str,
|
|
user_id: int,
|
|
video_id: int,
|
|
*,
|
|
first: int = 20,
|
|
before: str | None = None,
|
|
cache_ttl: int | None = None,
|
|
) -> AsyncGenerator[s.StreamMarkersData]:
|
|
async for data in self._paginate(
|
|
'/streams/markers',
|
|
s.StreamMarkers,
|
|
self._auth(access_token),
|
|
self.clean_dict(
|
|
{
|
|
'user_id': user_id,
|
|
'video_id': video_id,
|
|
'first': first,
|
|
'before': before,
|
|
}
|
|
),
|
|
cache_ttl,
|
|
):
|
|
for item in data.data:
|
|
yield item
|
|
|
|
async def get_broadcaster_subscriptions(
|
|
self,
|
|
access_token: str,
|
|
broadcaster_id: int,
|
|
*,
|
|
first: int = 20,
|
|
before: str | None = None,
|
|
cache_ttl: int | None = None,
|
|
) -> AsyncGenerator[tuple[int, int, s.BroadcasterSubscription]]:
|
|
async for data in self._paginate(
|
|
'/subscriptions',
|
|
s.BroadcasterSubscriptions,
|
|
self._auth(access_token),
|
|
self.clean_dict(
|
|
{
|
|
'broadcaster_id': broadcaster_id,
|
|
'first': first,
|
|
'before': before,
|
|
}
|
|
),
|
|
cache_ttl,
|
|
):
|
|
for item in data.data:
|
|
yield data.total, data.points, item
|
|
|
|
async def check_user_subscription(
|
|
self,
|
|
access_token: str,
|
|
broadcaster_id: int,
|
|
user_id: int,
|
|
*,
|
|
cache_ttl: int | None = None,
|
|
) -> s.UserSubscription:
|
|
req = await self.get(
|
|
'/subscriptions/user',
|
|
cache_ttl=cache_ttl,
|
|
headers=self._auth(access_token),
|
|
params=self.clean_dict(
|
|
{
|
|
'broadcaster_id': broadcaster_id,
|
|
'user_id': user_id,
|
|
}
|
|
),
|
|
)
|
|
|
|
return await self._process(req, s.UserSubscription)
|
|
|
|
async def get_channel_teams(
|
|
self,
|
|
access_token: str,
|
|
broadcaster_id: int,
|
|
*,
|
|
cache_ttl: int | None = None,
|
|
) -> s.ChannelTeams:
|
|
req = await self.get(
|
|
'/teams/channel',
|
|
cache_ttl=cache_ttl,
|
|
headers=self._auth(access_token),
|
|
params={'broadcaster_id': broadcaster_id},
|
|
)
|
|
|
|
return await self._process(req, s.ChannelTeams)
|
|
|
|
async def get_teams(
|
|
self,
|
|
access_token: str,
|
|
*,
|
|
name: str | None = None,
|
|
team_id: int | None = None,
|
|
cache_ttl: int | None = None,
|
|
) -> s.Teams:
|
|
req = await self.get(
|
|
'/teams',
|
|
cache_ttl=cache_ttl,
|
|
headers=self._auth(access_token),
|
|
params=self.clean_dict({'name': name, 'id': team_id}),
|
|
)
|
|
|
|
return await self._process(req, s.Teams)
|
|
|
|
async def get_users(
|
|
self,
|
|
access_token: str,
|
|
*,
|
|
user_id: int | list[int] | None = None,
|
|
login: str | list[str] | None = None,
|
|
cache_ttl: int | None = None,
|
|
) -> s.Users:
|
|
req = await self.get(
|
|
'/users',
|
|
cache_ttl=cache_ttl,
|
|
headers=self._auth(access_token),
|
|
params=self.clean_dict({'id': user_id, 'login': login}),
|
|
)
|
|
|
|
return await self._process(req, s.Users)
|
|
|
|
async def update_user(
|
|
self, access_token: str, *, description: str = ''
|
|
) -> s.Users:
|
|
req = await self.put(
|
|
'/users',
|
|
headers=self._auth(access_token),
|
|
json={'description': description},
|
|
)
|
|
|
|
return await self._process(req, s.Users)
|
|
|
|
async def get_authorization_by_user(
|
|
self,
|
|
access_token: str,
|
|
user_id: int | list[int],
|
|
*,
|
|
cache_ttl: int | None = None,
|
|
) -> s.AuthorizationsByUser:
|
|
req = await self.get(
|
|
'/authorization/users',
|
|
cache_ttl=cache_ttl,
|
|
headers=self._auth(access_token),
|
|
params={'user_id': user_id},
|
|
)
|
|
|
|
return await self._process(req, s.AuthorizationsByUser)
|
|
|
|
async def get_user_block_list(
|
|
self,
|
|
access_token: str,
|
|
broadcaster_id: int,
|
|
*,
|
|
first: int = 20,
|
|
cache_ttl: int | None = None,
|
|
) -> AsyncGenerator[s.UserBlock]:
|
|
async for data in self._paginate(
|
|
'/users/blocks',
|
|
s.UserBlockList,
|
|
self._auth(access_token),
|
|
{'broadcaster_id': broadcaster_id, 'first': first},
|
|
cache_ttl,
|
|
):
|
|
for item in data.data:
|
|
yield item
|
|
|
|
async def block_user(
|
|
self,
|
|
access_token: str,
|
|
target_user_id: int,
|
|
*,
|
|
source_context: Literal['chat', 'whisper'] | None = None,
|
|
reason: Literal['harassment', 'spam', 'other'] | None = None,
|
|
) -> bool:
|
|
req = await self.put(
|
|
'/users/blocks',
|
|
headers=self._auth(access_token),
|
|
json=self.clean_dict(
|
|
{
|
|
'target_user_id': target_user_id,
|
|
'source_context': source_context,
|
|
'reason': reason,
|
|
}
|
|
),
|
|
)
|
|
|
|
return await self._process(req, None)
|
|
|
|
async def unblock_user(
|
|
self, access_token: str, target_user_id: int
|
|
) -> bool:
|
|
req = await self.delete(
|
|
'/users/blocks',
|
|
headers=self._auth(access_token),
|
|
params={'target_user_id': target_user_id},
|
|
)
|
|
|
|
return await self._process(req, None)
|
|
|
|
async def get_user_extensions(
|
|
self, access_token: str, *, cache_ttl: int | None = None
|
|
) -> s.UserExtensions:
|
|
req = await self.get(
|
|
'/users/extensions/list',
|
|
cache_ttl=cache_ttl,
|
|
headers=self._auth(access_token),
|
|
)
|
|
|
|
return await self._process(req, s.UserExtensions)
|
|
|
|
async def get_user_active_extensions(
|
|
self,
|
|
access_token: str,
|
|
*,
|
|
user_id: int | None = None,
|
|
cache_ttl: int | None = None,
|
|
) -> s.UserActiveExtensions:
|
|
req = await self.get(
|
|
'/users/extensions',
|
|
cache_ttl=cache_ttl,
|
|
headers=self._auth(access_token),
|
|
params={'user_id': user_id},
|
|
)
|
|
|
|
return await self._process(req, s.UserActiveExtensions)
|
|
|
|
async def update_user_extensions(
|
|
self, access_token: str, extensions: s.UserActiveExtensions
|
|
) -> bool:
|
|
req = await self.put(
|
|
'/users/extensions',
|
|
headers=self._auth(access_token),
|
|
json=extensions.model_dump(),
|
|
)
|
|
|
|
return await self._process(req, None)
|
|
|
|
async def get_videos(
|
|
self,
|
|
access_token: str,
|
|
video_id: int | list[int],
|
|
user_id: int,
|
|
game_id: int,
|
|
*,
|
|
language: str | None = None,
|
|
period: Literal['all', 'day', 'month', 'week'] = 'all',
|
|
sort: Literal['time', 'trending', 'views'] = 'time',
|
|
video_type: Literal['all', 'archive', 'highlight', 'upload'] = 'all',
|
|
first: int = 20,
|
|
cache_ttl: int | None = None,
|
|
) -> AsyncGenerator[s.Video]:
|
|
async for data in self._paginate(
|
|
'/videos',
|
|
s.Videos,
|
|
self._auth(access_token),
|
|
self.clean_dict(
|
|
{
|
|
'video_id': video_id,
|
|
'user_id': user_id,
|
|
'game_id': game_id,
|
|
'language': language,
|
|
'period': period,
|
|
'sort': sort,
|
|
'video_type': video_type,
|
|
'first': first,
|
|
}
|
|
),
|
|
cache_ttl,
|
|
):
|
|
for item in data.data:
|
|
yield item
|
|
|
|
async def delete_videos(
|
|
self, access_token: str, video_id: int | list[int]
|
|
) -> s.DeleteVideos:
|
|
req = await self.delete(
|
|
'/videos',
|
|
headers=self._auth(access_token),
|
|
params={'id': video_id},
|
|
)
|
|
|
|
return await self._process(req, s.DeleteVideos)
|
|
|
|
async def send_whisper(
|
|
self,
|
|
access_token: str,
|
|
from_user_id: int,
|
|
to_user_id: int,
|
|
message: str,
|
|
) -> bool:
|
|
req = await self.post(
|
|
'/whispers',
|
|
headers=self._auth(access_token),
|
|
params={'from_user_id': from_user_id, 'to_user_id': to_user_id},
|
|
json={'message': message},
|
|
)
|
|
|
|
return await self._process(req, None)
|