Files
HospitalAssistantBackend/src/clients/esia/api.py
Miwory 967bb8d936
Some checks failed
Build And Push / publish (push) Failing after 3m15s
first commit
2025-09-24 04:11:55 +03:00

72 lines
2.1 KiB
Python

import uuid
from datetime import UTC, datetime
from logging import getLogger
from typing import Any
import jwt
from fastapi import status as st
from httpx import AsyncClient
from apps.esia.scopes import SCOPES
from apps.esia.sign import sign_params
from core.config import settings
from shared import exceptions as e
from . import schema as s
class ESIA_API(AsyncClient):
def __init__(self):
self.logger = getLogger(__name__)
super().__init__(base_url=settings.ESIA_BASE_URL)
async def sign_request(self, data: dict[str, Any]):
timestamp = datetime.now(UTC).strftime('%Y.%m.%d %H:%M:%S %z').strip()
state = str(uuid.uuid4())
params = {
'client_id': settings.ESIA_CLIENT_ID,
'timestamp': timestamp,
'state': state,
'scope': ' '.join(SCOPES),
}
params.update(data)
params['client_secret'] = sign_params(params)
return params
async def access_token(self, code: str):
params = {
'grant_type': 'authorization_code',
'redirect_uri': settings.ESIA_REDIRECT_URI,
'code': code,
}
signed_params = await self.sign_request(params)
res = await self.post('/aas/oauth2/te', data=signed_params)
match res.status_code:
case st.HTTP_200_OK:
return s.AccessTokenModel.model_validate(res.json())
case st.HTTP_400_BAD_REQUEST:
return None
case _:
self.logger.error(res.json())
raise e.UnknownException
async def get_user_info(self, access_token: str, id_token: str):
IDToken = s.IDTokenModel.model_validate(
jwt.decode(id_token, options={'verify_signature': False})
)
res = await self.get(
f'/rs/prns/{IDToken.urn_esia_sbj.oid}',
headers={'Authorization': f'Bearer {access_token}'},
)
match res.status_code:
case st.HTTP_200_OK:
return s.UserInfoModel.model_validate(res.json())
case _:
self.logger.error(res.json())
raise e.UnknownException