first commit
Some checks failed
Build And Push / publish (push) Failing after 3m15s

This commit is contained in:
2025-09-24 04:11:55 +03:00
commit 967bb8d936
45 changed files with 2651 additions and 0 deletions

View File

10
src/apps/esia/scopes.py Normal file
View File

@ -0,0 +1,10 @@
SCOPES = [
'openid',
'fullname',
# 'email',
# 'birthdate',
# 'gender',
# 'snils',
# 'id_doc',
# 'mobile',
]

82
src/apps/esia/sign.py Normal file
View File

@ -0,0 +1,82 @@
import base64
import secrets
import subprocess # noqa: S404
import tempfile
import uuid
from datetime import UTC, datetime
from pathlib import Path
from typing import Any
from urllib.parse import urlencode
from apps.esia.scopes import SCOPES
from core.config import settings
ACCESS_TYPE = 'online'
RESPONSE_CODE = 'code'
def csp_sign(data: str):
with tempfile.TemporaryDirectory() as tmp_dir:
tmp_file_name = secrets.token_hex(8)
source_path = Path(tmp_dir) / f'{tmp_file_name}.txt'
destination_path = source_path.with_suffix('.txt.sgn')
with open(source_path, 'w', encoding='utf-8') as f:
f.write(data)
print(data)
cmd = [
'cryptcp',
'-signf',
'-norev',
'-nochain',
'-der',
'-strict',
'-cert',
'-detached',
'-thumbprint',
settings.ESIA_CONTAINER_THUMBPRINT,
'-pin',
settings.ESIA_CONTAINER_PASSWORD,
'-dir',
tmp_dir,
str(source_path),
]
subprocess.run( # noqa: S603
cmd, input=b'y\n', capture_output=True, check=True, text=False
)
signed_message = destination_path.read_bytes()
return signed_message
def sign_params(params: dict[str, Any]):
plaintext = (
params.get('scope', '')
+ params.get('timestamp', '')
+ params.get('client_id', '')
+ params.get('state', '')
)
client_secret = csp_sign(plaintext)
return base64.urlsafe_b64encode(client_secret).decode('utf-8')
def get_url():
timestamp = datetime.now(UTC).strftime('%Y.%m.%d %H:%M:%S %z').strip()
state = str(uuid.uuid4())
params = {
'client_id': settings.ESIA_CLIENT_ID,
'client_secret': '',
'redirect_uri': settings.ESIA_REDIRECT_URI,
'response_type': RESPONSE_CODE,
'state': state,
'timestamp': timestamp,
'access_type': ACCESS_TYPE,
'scope': ' '.join(SCOPES),
}
params['client_secret'] = sign_params(params)
return f'{settings.ESIA_BASE_URL}/aas/oauth2/ac?{urlencode(params)}'

View File

View File

@ -0,0 +1,50 @@
import secrets
from logging import getLogger
from fastapi import APIRouter
from apps.esia.sign import get_url
from clients import clients as c
from shared import exceptions as e
from shared.redis import client as cache
from . import schema as s
logger = getLogger(__name__)
router = APIRouter(
prefix='/esia',
tags=[
'ESIA',
],
)
@router.get('/login', response_model=s.LoginURL)
async def login():
url = get_url()
return s.LoginURL(url=url)
@router.post('/callback')
async def callback(code: str):
token = None
for i in range(3):
try:
token = await c.esia_api.access_token(code)
break
except Exception:
logger.warning(
'Error occurred while accessing ESI API. Retrying...'
)
if i == 2:
raise
if token is None:
raise e.BadRequestException
await c.esia_api.get_user_info(token.access_token, token.id_token)
access_token = secrets.token_urlsafe(32)
cache.set(access_token, access_token)
return s.Token(access_token=access_token)

View File

@ -0,0 +1,9 @@
from typing import TypedDict
class LoginURL(TypedDict):
url: str
class Token(TypedDict):
access_token: str