Coverage for microservice_websocket/app/utils/external_archiviation.py: 80%

19 statements  

« prev     ^ index     » next       coverage.py v7.0.0, created at 2022-12-20 14:31 +0000

1import requests 

2 

3from ..services.database import Reading 

4from .exceptions import NotFoundException 

5 

6SET_NAME = "ext-archiviation-endpoints" 

7 

8 

9def get_external_endpoints() -> list[str]: 

10 from .. import redis_client 

11 

12 endpoints = redis_client.smembers(SET_NAME) 

13 

14 return [x.decode() for x in endpoints] 

15 

16 

17def add_external_endpoint(endpoint: str): 

18 from .. import redis_client 

19 

20 redis_client.sadd(SET_NAME, endpoint) 

21 

22 

23def delete_external_endpoint(endpoint: str): 

24 from .. import redis_client 

25 

26 if redis_client.srem(SET_NAME, endpoint) == 0: 

27 raise NotFoundException(endpoint) 

28 

29 

30async def send_payload(payload: Reading): 

31 for endpoint in get_external_endpoints(): 

32 serialized_payload = await payload.serialize() 

33 requests.post(endpoint, json=serialized_payload.dict())