Coverage for microservice_websocket/app/utils/external_archiviation.py: 80%
19 statements
« prev ^ index » next coverage.py v7.0.0, created at 2022-12-20 14:31 +0000
« prev ^ index » next coverage.py v7.0.0, created at 2022-12-20 14:31 +0000
1import requests
3from ..services.database import Reading
4from .exceptions import NotFoundException
6SET_NAME = "ext-archiviation-endpoints"
9def get_external_endpoints() -> list[str]:
10 from .. import redis_client
12 endpoints = redis_client.smembers(SET_NAME)
14 return [x.decode() for x in endpoints]
17def add_external_endpoint(endpoint: str):
18 from .. import redis_client
20 redis_client.sadd(SET_NAME, endpoint)
23def delete_external_endpoint(endpoint: str):
24 from .. import redis_client
26 if redis_client.srem(SET_NAME, endpoint) == 0:
27 raise NotFoundException(endpoint)
30async def send_payload(payload: Reading):
31 for endpoint in get_external_endpoints():
32 serialized_payload = await payload.serialize()
33 requests.post(endpoint, json=serialized_payload.dict())