Coverage for microservice_websocket/app/utils/alert.py: 95%

30 statements  

« prev     ^ index     » next       coverage.py v7.0.0, created at 2022-12-20 14:31 +0000

1from datetime import datetime 

2 

3from beanie import PydanticObjectId 

4from beanie.operators import And, Eq 

5 

6from ..blueprints.api.models import HandlePayload 

7from ..services.database import Alert, Node, User 

8from .enums import EventType 

9from .exceptions import NotFoundException 

10from .node import update_state 

11 

12 

13async def handle_alert(alertID: str, payload: HandlePayload, user: User): 

14 alert: Alert | None = await Alert.get(PydanticObjectId(alertID)) 

15 if alert is None: 

16 raise NotFoundException("Alert") 

17 

18 node: Node | None = await Node.get(PydanticObjectId(alert.node)) 

19 if node is None: 19 ↛ 20line 19 didn't jump to line 20, because the condition on line 19 was never true

20 raise NotFoundException("Node") 

21 

22 alert.isConfirmed = payload.isConfirmed 

23 alert.isHandled = True 

24 alert.handledBy = user.id 

25 alert.handledAt = datetime.now() 

26 alert.handleNote = payload.handleNote 

27 await alert.save() 

28 

29 if ( 

30 await Alert.find_one(And(Eq(Alert.node, node.id), Eq(Alert.isHandled, False))) 

31 ) is None: 

32 node.state = update_state(node.state, node.lastSeenAt, EventType.HANDLE_ALERT) 

33 await node.save() 

34 

35 return alert 

36 

37 

38async def get_alert(alert_id: str) -> Alert: 

39 alert: Alert | None = await Alert.get(PydanticObjectId(alert_id)) 

40 if alert is None: 

41 raise NotFoundException("Alert") 

42 

43 return alert