Coverage for microservice_websocket/app/utils/payload.py: 98%
60 statements
« prev ^ index » next coverage.py v7.0.0, created at 2022-12-20 14:31 +0000
« prev ^ index » next coverage.py v7.0.0, created at 2022-12-20 14:31 +0000
1from datetime import datetime
3from beanie import PydanticObjectId
4from beanie.operators import And, Eq
6from ..blueprints.api.models import PublishPayload
7from ..config import config as Config
8from ..services.database import Alert, Application, Node, Reading
9from .enums import EventType, NodeState, PayloadType
10from .exceptions import NotFoundException
11from .node import update_state
12from .sync_cache import add_to_cache
15async def publish(payload: PublishPayload):
16 application: Application | None = await Application.get(
17 PydanticObjectId(payload.applicationID)
18 )
19 if application is None:
20 raise NotFoundException("Application")
22 node: Node | None = await Node.find_one(
23 And(Eq(Node.application, application.id), Eq(Node.nodeID, payload.nodeID))
24 )
26 if node is None:
27 node = Node(
28 nodeID=payload.nodeID,
29 application=application.id,
30 nodeName=payload.nodeName,
31 state=NodeState.READY,
32 lastSeenAt=datetime.now(),
33 )
34 await node.save()
36 node.lastSeenAt = datetime.now()
38 if payload.payloadType == PayloadType.TOTAL_READING:
39 await handle_total_reading(node, payload)
41 elif payload.payloadType == PayloadType.WINDOW_READING: 41 ↛ 45line 41 didn't jump to line 45, because the condition on line 41 was never false
42 await handle_window_reading(node, payload)
43 node.state = update_state(node.state, node.lastSeenAt, EventType.START_REC)
45 data = payload.data
46 value = data.value if data else 0
48 if (
49 payload.payloadType == PayloadType.TOTAL_READING
50 and value >= Config.app.ALERT_TRESHOLD
51 ):
52 node.state = update_state(node.state, node.lastSeenAt, EventType.RAISE_ALERT)
53 else:
54 node.state = update_state(
55 node.state,
56 node.lastSeenAt,
57 EventType.KEEP_ALIVE,
58 )
59 await node.save()
62async def handle_total_reading(node: Node, record: PublishPayload):
63 data = record.data
64 # TODO: fix
65 assert data
67 reading: Reading | None = await Reading.find_one(
68 And(
69 Eq(Reading.node, node.id),
70 Eq(Reading.readingID, data.readingID),
71 Eq(Reading.canID, data.canID),
72 Eq(Reading.sensorNumber, data.sensorNumber),
73 )
74 )
76 if reading is None:
77 reading = Reading(
78 node=node.id,
79 canID=data.canID,
80 sensorNumber=data.sensorNumber,
81 readingID=data.readingID,
82 sessionID=data.sessionID,
83 publishedAt=datetime.now(),
84 )
86 reading.dangerLevel = data.value
87 await reading.save()
88 add_to_cache(str(reading.id))
90 if reading.dangerLevel >= Config.app.ALERT_TRESHOLD:
91 alert: Alert | None = await Alert.find_one(
92 And(Eq(Alert.sessionID, reading.sessionID), Eq(Alert.isHandled, False))
93 )
95 if alert is None: 95 ↛ exitline 95 didn't return from function 'handle_total_reading', because the condition on line 95 was never false
96 alert = Alert(
97 reading=reading.id,
98 node=node.id,
99 sessionID=reading.sessionID,
100 isHandled=False,
101 raisedAt=datetime.now(),
102 )
103 await alert.save()
106async def handle_window_reading(node: Node, payload: PublishPayload):
107 data = payload.data
108 # TODO: FIX
109 assert data
111 reading: Reading | None = await Reading.find_one(
112 And(
113 Eq(Reading.node, node.id),
114 Eq(Reading.readingID, data.readingID),
115 Eq(Reading.canID, data.canID),
116 Eq(Reading.sensorNumber, data.sensorNumber),
117 )
118 )
120 if reading is None:
121 reading = Reading(
122 node=node.id,
123 canID=data.canID,
124 sensorNumber=data.sensorNumber,
125 sessionID=data.sessionID,
126 readingID=data.readingID,
127 publishedAt=datetime.now(),
128 )
130 window_number = data.value
132 if window_number == 0:
133 reading.window1 = data.count
134 elif window_number == 1:
135 reading.window2 = data.count
136 elif window_number == 2:
137 reading.window3 = data.count
138 else:
139 raise ValueError(f"Unexpected window_number: {window_number}")
141 await reading.save()
142 add_to_cache(str(reading.id))