from typing import Literal, Never from oxidehttp.client import OxideHTTP from oxidehttp.schema import CachedResponse, Response from pydantic import BaseModel from . import schema as s class DonationAlertsAPIClient(OxideHTTP): def __init__( self, redis_url: str | None = None, proxy_url: str | None = None, ) -> None: self.base_uri = 'https://www.donationalerts.com/api/v1/' super().__init__( base_url=self.base_uri, redis_url=redis_url, ratelimit_key='donation_alerts' if redis_url else None, ratelimit_limit=60 if redis_url else None, proxy_url=proxy_url, ) def _auth(self, access_token: str) -> dict[str, str]: return {'Authorization': f'Bearer {access_token}'} async def _process[T: BaseModel]( self, req: Response | CachedResponse, schema: type[T] ) -> T: if req.status_code >= 500: raise s.InternalError(req.status_code, 'Internal Server Error') if req.status_code >= 400 and req.status_code < 500: data = await req.json() message = data.get('message', 'DonationAlerts API Error') raise s.ClientError(req.status_code, message) data = await req.json() return schema.model_validate(data) async def user_oauth( self, access_token: str, *, cache_ttl: int | None = None ) -> s.UserOauth: req = await self.get( '/user/oauth', None, self._auth(access_token), cache_ttl ) return await self._process(req, s.UserOauth) async def alerts_donations( self, access_token: str, *, page: int = 1, cache_ttl: int | None = None ) -> s.AlertsDonations: req = await self.get( '/alerts/donations', {'page': page}, self._auth(access_token), cache_ttl, ) return await self._process(req, s.AlertsDonations) async def custom_alert( self, access_token: str, external_id: str, header: str, message: str, image_url: str, sound_url: str, is_shown: Literal[0, 1] = 0, ) -> Never: raise NotImplementedError async def merchandise( self, access_token: str, merchant_identifier: str, merchandise_identifier: str, title: dict[str, str], currency: str, price_user: int, price_service: str, url: str, img_url: str, end_at_ts: int, is_active: Literal[0, 1] = 0, is_percentage: Literal[0, 1] = 0, ) -> Never: raise NotImplementedError async def update_merchandise( self, access_token: str, merchant_identifier: str, merchandise_identifier: str, title: dict[str, str], currency: str, price_user: int, price_service: str, url: str, img_url: str, end_at_ts: int, is_active: Literal[0, 1] = 0, is_percentage: Literal[0, 1] = 0, ) -> Never: raise NotImplementedError async def get_user_data_from_promocode( self, access_token: str, promocode: str, *, cache_ttl: int | None = None, ) -> Never: raise NotImplementedError async def send_sale_alerts( self, access_token: str, user_id: int, external_id: str, merchant_identifier: str, merchandise_identifier: str, amount: float, currency: str, username: str, message: str, bought_amount: int = 1, ) -> Never: raise NotImplementedError async def centrifuge_subscribe( self, access_token: str, client: str, subscriptions: list[str] ) -> s.CentrifugeSubscribe: req = await self.post( '/centrifuge/subscribe', self._auth(access_token), None, {'client': client, 'channels': subscriptions}, ) return await self._process(req, s.CentrifugeSubscribe)