Патч
All checks were successful
Build And Push / publish (push) Successful in 5m43s

This commit is contained in:
2025-10-31 15:11:38 +03:00
parent 44ed7796ab
commit b602b75234
10 changed files with 663 additions and 23 deletions

146
src/clients/aemd/_schema.py Normal file
View File

@ -0,0 +1,146 @@
from datetime import datetime
from typing import Literal
from pydantic_xml import BaseXmlModel, attr, element, wrapped
# SOAP Envelope namespaces
NS_SOAP = 'http://www.w3.org/2003/05/soap-envelope'
NS_ADDR = 'http://www.w3.org/2005/08/addressing'
NS_WSSE = 'http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-secext-1.0.xsd'
NS_WSU = 'http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-utility-1.0.xsd'
NS_DS = 'http://www.w3.org/2000/09/xmldsig#'
NS_APP = 'http://egisz.rosminzdrav.ru/iehr/emdr/service/'
NSMAP = {
's': NS_SOAP,
'a': NS_ADDR,
'wsse': NS_WSSE,
'wsu': NS_WSU,
'ds': NS_DS,
'app': NS_APP,
}
class Reference(BaseXmlModel, tag='Reference', ns='ds', nsmap=NSMAP):
uri: str = attr(name='URI')
digest_method: Literal['http://www.w3.org/2001/04/xmlenc#sha256'] = (
element(tag='DigestMethod', ns='ds', attr_name='Algorithm')
)
digest_value: str = element(tag='DigestValue', ns='ds')
class SignedInfo(BaseXmlModel, tag='SignedInfo', ns=NS_DS, nsmap=NSMAP):
canon_method: Literal['http://www.w3.org/2001/10/xml-exc-c14n#'] = element(
tag='CanonicalizationMethod', ns=NS_DS, attr_name='Algorithm'
)
sig_method: Literal[
'urn:ietf:params:xml:ns:cpxmlsec:algorithms:gostr34102012-gostr34112012-256'
] = element(tag='SignatureMethod', ns=NS_DS, attr_name='Algorithm')
references: list[Reference] = element(tag='Reference', ns=NS_DS)
class SecurityTokenReference(
BaseXmlModel,
tag='SecurityTokenReference',
ns=NS_WSSE,
nsmap={'wsse': NS_WSSE},
):
ref_uri: str = wrapped('Reference', attr(name='URI'))
ref_type: Literal[
'http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-x509-token-profile-1.0#X509v3'
] = wrapped('Reference', attr(name='ValueType'))
class KeyInfo(BaseXmlModel, tag='KeyInfo', ns=NS_DS, nsmap=NSMAP):
token_ref: SecurityTokenReference = element(
tag='SecurityTokenReference', ns=NS_WSSE
)
class Signature(BaseXmlModel, tag='Signature', ns=NS_DS, nsmap=NSMAP):
signed_info: SignedInfo = element()
signature_value: str = element(tag='SignatureValue')
key_info: KeyInfo = element()
class BinarySecurityToken(
BaseXmlModel,
tag='BinarySecurityToken',
ns=NS_WSSE,
nsmap=NSMAP,
):
id: str = attr(name='Id', ns=NS_WSU)
value_type: Literal[
'http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-x509-token-profile-1.0#X509v3'
] = attr(name='ValueType')
encoding_type: Literal[
'http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-soap-message-security-1.0#Base64Binary'
] = attr(name='EncodingType')
value: str
class Security(BaseXmlModel, tag='Security', ns=NS_WSSE, nsmap=NSMAP):
token: BinarySecurityToken = element()
signature: Signature = element()
class Action(BaseXmlModel, tag='Action', ns=NS_ADDR, nsmap=NSMAP):
id: str = attr(name='Id', ns=NS_WSU)
class MessageID(BaseXmlModel, tag='MessageID', ns=NS_ADDR, nsmap=NSMAP):
id: str = attr(name='Id', ns=NS_WSU)
value: str
class Header(BaseXmlModel, tag='Header', ns=NS_SOAP, nsmap=NSMAP):
action: Action | None = element(tag='Action', ns=NS_ADDR)
message_id: MessageID | None = element(tag='MessageID', ns=NS_ADDR)
security: Security = element(tag='Security', ns=NS_WSSE)
class Page(BaseXmlModel, tag='page', ns=NS_APP):
items_per_page: int = element(tag='itemsPerPage')
has_next: bool = element(tag='hasNext')
class Item(BaseXmlModel, tag='item', ns=NS_APP):
emdr_id: str = element(tag='emdrId')
local_uid: str = element(tag='localUid')
registration_date: datetime = element(tag='registrationDate')
registration_date_time: datetime = element(tag='registrationDateTime')
store_till_date: str = element(tag='storeTillDate')
doc_kind: str = element(tag='DocKind')
is_semd: bool = element(tag='IsSemd')
class Matches(BaseXmlModel, tag='matches', ns=NS_APP):
items: list[Item] = element(tag='item')
page: Page = element()
class SearchRegistryItemResponse(
BaseXmlModel,
tag='searchRegistryItemResponse',
ns=NS_APP,
nsmap={'app': NS_APP},
):
status: Literal['success'] = element()
matches: Matches = element()
class Body(BaseXmlModel, tag='Body', ns=NS_SOAP, nsmap=NSMAP):
id: str = attr(name='Id', ns=NS_WSU)
response: SearchRegistryItemResponse = element(
tag='searchRegistryItemResponse', ns=NS_APP
)
class Envelope(
BaseXmlModel,
tag='Envelope',
ns=NS_SOAP,
nsmap=NSMAP,
):
header: Header = element(tag='Header', ns=NS_SOAP)
body: Body = element(tag='Body', ns=NS_SOAP)

View File

@ -1,8 +1,12 @@
from logging import getLogger
from fastapi import status
from httpx import AsyncClient
from core.config import settings
from shared import exceptions as e
from . import schema as s
class AEMD_API(AsyncClient):
@ -43,4 +47,29 @@ class AEMD_API(AsyncClient):
)
req = await self.post('/', content=envelope)
return req.text
match req.status_code:
case status.HTTP_200_OK:
envelope = s.Envelope.from_xml(req.text)
case _:
self.logger.error(req.text)
raise e.UnknownException
return envelope.model_dump()['body']['response']['matches']
async def demandContent(self, messageId: str, emdrId: str):
envelope = self.get_envelope(
'demandContent',
f'<demandContentRequest xmlns="http://egisz.rosminzdrav.ru/iehr/emdr/service/"><messageId>{messageId}</messageId><emdrId>{emdrId}</emdrId></demandContentRequest>',
)
req = await self.post('/', content=envelope)
match req.status_code:
case status.HTTP_200_OK:
envelope = s.DemandContentEnvelope.from_xml(req.text)
case _:
self.logger.error(req.text)
raise e.UnknownException
return envelope.model_dump()['body']

168
src/clients/aemd/schema.py Normal file
View File

@ -0,0 +1,168 @@
from pydantic import constr
from pydantic_xml import BaseXmlModel, RootXmlModel, attr, element
NSMAP = {
'a': 'http://www.w3.org/2005/08/addressing',
'b': 'http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-utility-1.0.xsd',
's': 'http://www.w3.org/2003/05/soap-envelope',
'xsi': 'http://www.w3.org/2001/XMLSchema-instance',
'xsd': 'http://www.w3.org/2001/XMLSchema',
'd2p1': 'http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-utility-1.0.xsd',
'd3p1': 'http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-utility-1.0.xsd',
}
SECURITY_NSMAP = NSMAP | {
'': 'http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-secext-1.0.xsd'
}
SIGNATURE_NSMAP = NSMAP | {'': 'http://www.w3.org/2000/09/xmldsig#'}
RESPONSE_NSMAP = NSMAP | {'': 'http://egisz.rosminzdrav.ru/iehr/emdr/service/'}
class Action(BaseXmlModel, tag='Action', ns='a', nsmap=NSMAP):
id: str = attr(name='Id', ns='d3p1')
class MessageID(BaseXmlModel, tag='MessageID', ns='a', nsmap=NSMAP):
id: str = attr(name='Id', ns='d3p1')
description: constr(strip_whitespace=True) # type: ignore
class BinarySecurityToken(
BaseXmlModel, tag='BinarySecurityToken', nsmap=NSMAP
):
id: str = attr(name='Id', ns='b')
ValueType: str = attr(name='ValueType')
EncodingType: str = attr(name='EncodingType')
description: constr(strip_whitespace=True) # type: ignore
class CanonicalizationMethod(
BaseXmlModel, tag='CanonicalizationMethod', nsmap=SIGNATURE_NSMAP
):
algorithm: str = attr(name='Algorithm')
class SignatureMethod(
BaseXmlModel, tag='SignatureMethod', nsmap=SIGNATURE_NSMAP
):
algorithm: str = attr(name='Algorithm')
class Transform(BaseXmlModel, tag='Transform', nsmap=SIGNATURE_NSMAP):
algorithm: str = attr(name='Algorithm')
class Transforms(RootXmlModel, tag='Transforms', nsmap=SIGNATURE_NSMAP): # type: ignore
root: list[Transform] = element()
class DigestMethod(BaseXmlModel, tag='DigestMethod', nsmap=SIGNATURE_NSMAP):
algorithm: str = attr(name='Algorithm')
class DigestValue(BaseXmlModel, tag='DigestValue', nsmap=SIGNATURE_NSMAP):
value: str
class Reference(BaseXmlModel, tag='Reference', nsmap=SIGNATURE_NSMAP):
uri: str = attr(name='URI')
transforms: Transforms = element()
digest_method: DigestMethod = element()
digest_value: DigestValue = element()
class SignedInfo(BaseXmlModel, tag='SignedInfo', nsmap=SIGNATURE_NSMAP):
canonicalization_method: CanonicalizationMethod = element()
signature_method: SignatureMethod = element()
references: list[Reference] = element()
class SignatureValue(
BaseXmlModel, tag='SignatureValue', nsmap=SIGNATURE_NSMAP
):
description: constr(strip_whitespace=True) # type: ignore
class SecurityTokenReference(
BaseXmlModel, tag='Reference', nsmap=SECURITY_NSMAP
):
ValueType: str = attr(name='ValueType')
URI: str = attr(name='URI')
class SecurityToken(
BaseXmlModel, tag='SecurityTokenReference', ns='', nsmap=SECURITY_NSMAP
):
reference: SecurityTokenReference = element()
class KeyInfo(BaseXmlModel, tag='KeyInfo', nsmap=SIGNATURE_NSMAP):
security_token_reference: SecurityToken = element()
class Signature(BaseXmlModel, tag='Signature', ns='', nsmap=SIGNATURE_NSMAP):
signed_info: SignedInfo = element()
signature_value: SignatureValue = element()
key_info: KeyInfo = element()
class Security(BaseXmlModel, tag='Security', ns='', nsmap=SECURITY_NSMAP):
binary_security_token: BinarySecurityToken = element()
signature: Signature = element()
class Header(BaseXmlModel, tag='Header', ns='s', nsmap=NSMAP):
action: Action = element()
message_id: MessageID = element()
security: Security = element()
class searchRegistryItemResponseItem(
BaseXmlModel, tag='item', nsmap=RESPONSE_NSMAP
):
emdrId: str = element()
localUid: str = element()
registrationDate: str = element()
registrationDateTime: str = element()
storeTillDate: str = element()
DocKind: str = element()
IsSemd: bool = element()
class searchRegistryItemResponseMatches(
BaseXmlModel, tag='matches', nsmap=RESPONSE_NSMAP
):
items: list[searchRegistryItemResponseItem] = element()
class searchRegistryItemResponse(
BaseXmlModel, tag='searchRegistryItemResponse', ns='', nsmap=RESPONSE_NSMAP
):
status: str = element()
matches: searchRegistryItemResponseMatches
class Body(BaseXmlModel, tag='Body', ns='s', nsmap=NSMAP):
id: str = attr(name='Id', ns='d2p1')
response: searchRegistryItemResponse = element()
class Envelope(BaseXmlModel, tag='Envelope', ns='s', nsmap=NSMAP):
header: Header = element()
body: Body = element()
class Acknowledgment(
BaseXmlModel, tag='acknowledgment', ns='', nsmap=RESPONSE_NSMAP
):
status: str = element()
class DemandContentBody(BaseXmlModel, tag='Body', ns='s', nsmap=NSMAP):
id: str = attr(name='Id', ns='d2p1')
acknowledgment: Acknowledgment
class DemandContentEnvelope(BaseXmlModel, tag='Envelope', ns='s', nsmap=NSMAP):
header: Header = element()
body: DemandContentBody = element()

View File

@ -1,3 +1,4 @@
from datetime import UTC, datetime
from json import dumps
from logging import getLogger
from urllib.parse import quote, urlencode
@ -65,18 +66,6 @@ class TDN_API(AsyncClient):
async def observations_measurement_search(
self, access_token: str, observationUid: str
):
# data = urlencode(
# dumps(
# {
# 'where': {'observationUid': observationUid},
# 'relations': [
# 'measurement',
# 'obsrvMtMetrics',
# 'obsrvMtMetrics.metric',
# ],
# }
# )
# )
encoded_query = urlencode(
{
'query': dumps(
@ -105,3 +94,69 @@ class TDN_API(AsyncClient):
case _:
self.logger.error(res.json())
raise e.UnknownException
async def create_series(
self, access_token: str, observationUid: str, obsrvMeasurementUid: str
):
now = datetime.now(UTC)
tz_offset = now.strftime('%z')
formatted_offset = (
f'{tz_offset[:3]}:{tz_offset[3:]}' if tz_offset else '+0000'
)
date_str = (
f'{now.strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]} {formatted_offset}'
)
res = await self.patch(
'/ddn/observation/series',
headers={'Authorization': f'Bearer {access_token}'},
json={
'observationUid': observationUid,
'obsrvMeasurementUid': obsrvMeasurementUid,
'date': date_str,
},
)
match res.status_code:
case st.HTTP_200_OK:
return s.SeriesModel.model_validate(res.json())
case _:
self.logger.error(res.json())
raise e.UnknownException
async def create_series_values(
self,
access_token: str,
seriesUid: str,
obsrvMtMetricUid: str,
*,
nvalue: str | None = None,
fvalue: str | None = None,
svalue: str | None = None,
):
data = {
'seriesUid': seriesUid,
'obsrvMtMetricUid': obsrvMtMetricUid,
}
if nvalue is not None:
data['nvalue'] = nvalue
if fvalue is not None:
data['fvalue'] = fvalue
if svalue is not None:
data['svalue'] = svalue
res = await self.patch(
'/ddn/observation/series-values',
headers={'Authorization': f'Bearer {access_token}'},
json=data,
)
match res.status_code:
case st.HTTP_200_OK:
return s.SeriesValueModel.model_validate(res.json())
case _:
self.logger.error(res.json())
raise e.UnknownException

View File

@ -68,7 +68,7 @@ class ObservationMeasurementModel(BaseModel):
timeFrequency: int
timePeriod: int
timePeriodMeasureUid: str
timeOfDay: list[str]
timeOfDay: list[str] | None
comment: str | None
mobileId: str | None
measurement: MeasurementModel
@ -78,3 +78,33 @@ class ObservationMeasurementModel(BaseModel):
class ObservationMeasurementsModel(BaseModel):
items: list[ObservationMeasurementModel]
total: int
class SeriesRealmModel(BaseModel):
uid: str
class SeriesModel(BaseModel):
observationUid: str
obsrvMeasurementUid: str
date: str
realm: SeriesRealmModel
mobileId: str | None
uid: str
createdAt: datetime
updatedAt: datetime
class SeriesValueModel(BaseModel):
uid: str
seriesUid: str
obsrvMtMetricUid: str
realm: SeriesRealmModel
createdAt: datetime
updatedAt: datetime
nvalue: str | None
fvalue: str | None
svalue: str | None
filepath: str | None
mobileId: str | None
tisId: str | None

View File

@ -200,6 +200,7 @@ class VITACORE_API(AsyncClient):
raise e.UnknownException
async def getHospExaminations(self, patId: str, examId: str):
patId = 'b66a85f1-4aaa-4db8-942a-2de44341824e'
token = await self.get_token()
req = await self.get(
'/getHospExaminations',
@ -210,6 +211,18 @@ class VITACORE_API(AsyncClient):
match req.status_code:
case st.HTTP_200_OK:
return s.HospExaminationsModel.model_validate(req.json())
case st.HTTP_206_PARTIAL_CONTENT:
error = s.ErrorModel.model_validate(req.json())
if error.error == 'Пациент не госпитализирован!':
return s.HospExaminationsModel(
EventID='none',
EventDate=datetime.now(UTC),
LpuName='fakeName',
Examinations=[],
)
case _:
self.logger.error(req.json())
raise e.UnknownException
@ -351,6 +364,13 @@ class VITACORE_API(AsyncClient):
match req.status_code:
case st.HTTP_200_OK:
return s.ELNsModel.model_validate(req.json())
case st.HTTP_206_PARTIAL_CONTENT:
error = s.ErrorModel.model_validate(req.json())
if error.error == 'Пациент не госпитализирован!':
return s.ELNsModel(PatientELNs=[])
case _:
self.logger.error(req.json())
raise e.UnknownException
@ -369,3 +389,28 @@ class VITACORE_API(AsyncClient):
case _:
self.logger.error(req.json())
raise e.UnknownException
async def getDiagResultFile(self, resultId: str):
token = await self.get_token()
req = await self.get(
'/getDiagResultFile',
params={'resultId': resultId},
headers={'Authorization': f'Bearer {token}'},
)
match req.status_code:
case st.HTTP_200_OK:
return s.DiagResultFileModel.model_validate(req.json())
case st.HTTP_206_PARTIAL_CONTENT:
error = s.ErrorModel.model_validate(req.json())
if (
error.error == 'Не найдены проведенные исследования по '
'данному идентификатору'
):
return s.DiagResultFileModel(content='')
case _:
self.logger.error(req.json())
raise e.UnknownException

View File

@ -672,3 +672,7 @@ class PatientFLGModel(BaseModel):
title='Контингент (флюорография)',
examples=['Неорганизованное население'],
)
class DiagResultFileModel(BaseModel):
content: str