Coverage for microservice_websocket/app/utils/alert.py: 95%
30 statements
« prev ^ index » next coverage.py v7.0.0, created at 2022-12-20 14:31 +0000
« prev ^ index » next coverage.py v7.0.0, created at 2022-12-20 14:31 +0000
1from datetime import datetime
3from beanie import PydanticObjectId
4from beanie.operators import And, Eq
6from ..blueprints.api.models import HandlePayload
7from ..services.database import Alert, Node, User
8from .enums import EventType
9from .exceptions import NotFoundException
10from .node import update_state
13async def handle_alert(alertID: str, payload: HandlePayload, user: User):
14 alert: Alert | None = await Alert.get(PydanticObjectId(alertID))
15 if alert is None:
16 raise NotFoundException("Alert")
18 node: Node | None = await Node.get(PydanticObjectId(alert.node))
19 if node is None: 19 ↛ 20line 19 didn't jump to line 20, because the condition on line 19 was never true
20 raise NotFoundException("Node")
22 alert.isConfirmed = payload.isConfirmed
23 alert.isHandled = True
24 alert.handledBy = user.id
25 alert.handledAt = datetime.now()
26 alert.handleNote = payload.handleNote
27 await alert.save()
29 if (
30 await Alert.find_one(And(Eq(Alert.node, node.id), Eq(Alert.isHandled, False)))
31 ) is None:
32 node.state = update_state(node.state, node.lastSeenAt, EventType.HANDLE_ALERT)
33 await node.save()
35 return alert
38async def get_alert(alert_id: str) -> Alert:
39 alert: Alert | None = await Alert.get(PydanticObjectId(alert_id))
40 if alert is None:
41 raise NotFoundException("Alert")
43 return alert