Релиз
This commit is contained in:
145
src/oxidedonationalerts/api.py
Normal file
145
src/oxidedonationalerts/api.py
Normal file
@ -0,0 +1,145 @@
|
||||
from typing import Literal, Never
|
||||
|
||||
from oxidehttp.client import OxideHTTP
|
||||
from oxidehttp.schema import CachedResponse, Response
|
||||
from pydantic import BaseModel
|
||||
|
||||
from . import schema as s
|
||||
|
||||
|
||||
class DonationAlertsAPIClient(OxideHTTP):
|
||||
def __init__(
|
||||
self,
|
||||
redis_url: str | None = None,
|
||||
proxy_url: str | None = None,
|
||||
) -> None:
|
||||
self.base_uri = 'https://www.donationalerts.com/api/v1'
|
||||
|
||||
super().__init__(
|
||||
base_url=self.base_uri,
|
||||
redis_url=redis_url,
|
||||
ratelimit_key='donation_alerts' if redis_url else None,
|
||||
ratelimit_limit=60 if redis_url else None,
|
||||
proxy_url=proxy_url,
|
||||
)
|
||||
|
||||
def _auth(self, access_token: str) -> dict[str, str]:
|
||||
return {'Authorization': f'Bearer {access_token}'}
|
||||
|
||||
async def _process[T: BaseModel](
|
||||
self, req: Response | CachedResponse, schema: type[T]
|
||||
) -> T:
|
||||
if req.status_code >= 500:
|
||||
raise s.InternalError(req.status_code, 'Internal Server Error')
|
||||
|
||||
if req.status_code >= 400 and req.status_code < 500:
|
||||
data = await req.json()
|
||||
message = data.get('message', 'DonationAlerts API Error')
|
||||
|
||||
raise s.ClientError(req.status_code, message)
|
||||
|
||||
data = await req.json()
|
||||
return schema.model_validate(data)
|
||||
|
||||
async def user_oauth(
|
||||
self, access_token: str, *, cache_ttl: int | None = None
|
||||
) -> s.UserOauth:
|
||||
req = await self.get(
|
||||
'/user/oauth', None, self._auth(access_token), cache_ttl
|
||||
)
|
||||
|
||||
return await self._process(req, s.UserOauth)
|
||||
|
||||
async def alerts_donations(
|
||||
self, access_token: str, *, page: int = 1, cache_ttl: int | None = None
|
||||
) -> s.AlertsDonations:
|
||||
req = await self.get(
|
||||
'/alerts/donations',
|
||||
{'page': page},
|
||||
self._auth(access_token),
|
||||
cache_ttl,
|
||||
)
|
||||
|
||||
return await self._process(req, s.AlertsDonations)
|
||||
|
||||
async def custom_alert(
|
||||
self,
|
||||
access_token: str,
|
||||
external_id: str,
|
||||
header: str,
|
||||
message: str,
|
||||
image_url: str,
|
||||
sound_url: str,
|
||||
is_shown: Literal[0, 1] = 0,
|
||||
) -> Never:
|
||||
raise NotImplementedError
|
||||
|
||||
async def merchandise(
|
||||
self,
|
||||
access_token: str,
|
||||
merchant_identifier: str,
|
||||
merchandise_identifier: str,
|
||||
title: dict[str, str],
|
||||
currency: str,
|
||||
price_user: int,
|
||||
price_service: str,
|
||||
url: str,
|
||||
img_url: str,
|
||||
end_at_ts: int,
|
||||
is_active: Literal[0, 1] = 0,
|
||||
is_percentage: Literal[0, 1] = 0,
|
||||
) -> Never:
|
||||
raise NotImplementedError
|
||||
|
||||
async def update_merchandise(
|
||||
self,
|
||||
access_token: str,
|
||||
merchant_identifier: str,
|
||||
merchandise_identifier: str,
|
||||
title: dict[str, str],
|
||||
currency: str,
|
||||
price_user: int,
|
||||
price_service: str,
|
||||
url: str,
|
||||
img_url: str,
|
||||
end_at_ts: int,
|
||||
is_active: Literal[0, 1] = 0,
|
||||
is_percentage: Literal[0, 1] = 0,
|
||||
) -> Never:
|
||||
raise NotImplementedError
|
||||
|
||||
async def get_user_data_from_promocode(
|
||||
self,
|
||||
access_token: str,
|
||||
promocode: str,
|
||||
*,
|
||||
cache_ttl: int | None = None,
|
||||
) -> Never:
|
||||
raise NotImplementedError
|
||||
|
||||
async def send_sale_alerts(
|
||||
self,
|
||||
access_token: str,
|
||||
user_id: int,
|
||||
external_id: str,
|
||||
merchant_identifier: str,
|
||||
merchandise_identifier: str,
|
||||
amount: float,
|
||||
currency: str,
|
||||
username: str,
|
||||
message: str,
|
||||
bought_amount: int = 1,
|
||||
) -> Never:
|
||||
raise NotImplementedError
|
||||
|
||||
async def centrifuge_subscribe(
|
||||
self, access_token: str, client: str, subscriptions: list[str]
|
||||
) -> s.CentrifugeSubscribe:
|
||||
req = await self.post(
|
||||
'/centrifuge/subscribe',
|
||||
None,
|
||||
self._auth(access_token),
|
||||
{'client': client, 'channels': subscriptions},
|
||||
)
|
||||
|
||||
return await self._process(req, s.CentrifugeSubscribe)
|
||||
Reference in New Issue
Block a user