Coverage for microservice_websocket/app/utils/command.py: 58%
43 statements
« prev ^ index » next coverage.py v7.0.0, created at 2022-12-20 18:11 +0000
« prev ^ index » next coverage.py v7.0.0, created at 2022-12-20 18:11 +0000
1import json
2from beanie import PydanticObjectId
3from beanie.operators import And, Eq
5from ..services.database.models import Application, Node
6from .enums import CommandType
7from .exceptions import NotFoundException
10def publish(topic: str, data: bytes | str):
11 if isinstance(data, str): 11 ↛ 14line 11 didn't jump to line 14, because the condition on line 11 was never false
12 data = data.encode()
14 from .. import mqtt
16 print(f"[MQTT] Publishing '{data}' to '{topic}'")
18 if mqtt: 18 ↛ 19line 18 didn't jump to line 19, because the condition on line 18 was never true
19 mqtt.publish(topic, data)
22async def handle_command(command: CommandType, applicationID: str, nodeID: int):
23 await check_existence(applicationID, nodeID)
25 if command == CommandType.START_REC:
26 publish(f"{applicationID}/{nodeID}/command", "start:0")
27 elif command == CommandType.STOP_REC:
28 publish(f"{applicationID}/{nodeID}/command", "stop")
29 elif command == CommandType.SET_DEMO_1:
30 publish(f"{applicationID}/{nodeID}/command", "start:1")
31 elif command == CommandType.SET_DEMO_2:
32 publish(f"{applicationID}/{nodeID}/command", "start:2")
33 else:
34 raise Exception(
35 f"CommandType '{command}' for ui operations not implemented yet"
36 )
39async def check_existence(applicationID: str, nodeID: int):
40 application: Application | None = await Application.get(
41 PydanticObjectId(applicationID)
42 )
43 if application is None: 43 ↛ 44line 43 didn't jump to line 44, because the condition on line 43 was never true
44 raise NotFoundException("Application")
46 node: Node | None = await Node.find_one(
47 And(Eq(Node.application, application.id), Eq(Node.nodeID, nodeID))
48 )
49 if node is None: 49 ↛ 50line 49 didn't jump to line 50, because the condition on line 49 was never true
50 raise NotFoundException("Node")
53async def send_set_hv_command(
54 applicationID: str, nodeID: int, detector: int, sipm: int, value: int
55):
56 await check_existence(applicationID, nodeID)
58 data = json.dumps(
59 {"type": "hv", "detector": detector, "sipm": sipm, "value": value}
60 )
62 publish(f"{applicationID}/{nodeID}/set", data)
65async def send_set_window_low(
66 applicationID: str,
67 nodeID: int,
68 detector: int,
69 sipm: int,
70 window_number: int,
71 value: int,
72):
73 await check_existence(applicationID, nodeID)
75 data = json.dumps(
76 {
77 "type": "window_low",
78 "n": window_number,
79 "detector": detector,
80 "sipm": sipm,
81 "value": value,
82 }
83 )
85 publish(f"{applicationID}/{nodeID}/set", data)
88async def send_set_window_high(
89 applicationID: str,
90 nodeID: int,
91 detector: int,
92 sipm: int,
93 window_number: int,
94 value: int,
95):
96 await check_existence(applicationID, nodeID)
98 data = json.dumps(
99 {
100 "type": "window_high",
101 "n": window_number,
102 "detector": detector,
103 "sipm": sipm,
104 "value": value,
105 }
106 )
108 publish(f"{applicationID}/{nodeID}/set", data)