Coverage for microservice_websocket/app/services/mqtt/__init__.py: 12%
65 statements
« prev ^ index » next coverage.py v7.0.0, created at 2022-12-20 14:31 +0000
« prev ^ index » next coverage.py v7.0.0, created at 2022-12-20 14:31 +0000
1from datetime import datetime
3from beanie import PydanticObjectId
4from beanie.operators import And, Eq
5from fastapi_mqtt import FastMQTT, MQTTConfig
7from ...config import MQTTConfig as MQTTConfigInternal
8from ...utils.enums import EventType
9from ...utils.node import on_launch, update_state
10from ..database.models import Node
12STATUS_TOPIC = "+/+/status"
15def init_mqtt(conf: MQTTConfigInternal) -> FastMQTT:
16 mqtt_config = MQTTConfig(
17 host=conf.host,
18 port=conf.port,
19 ssl=conf.tls_enabled,
20 username=conf.username,
21 password=conf.password,
22 )
24 mqtt = FastMQTT(config=mqtt_config)
26 @mqtt.on_connect()
27 def connect(client, flags, rc, properties):
28 print("[MQTT] Connected")
29 mqtt.client.subscribe(STATUS_TOPIC)
30 print(f"[MQTT] Subscribed to '{STATUS_TOPIC}'")
32 @mqtt.on_message()
33 async def on_message(client, topic: str, payload: bytes, qos, properties):
34 from ... import socketManager
36 print(f"[MQTT] Someone published '{str(payload)}' on '{topic}'")
38 topic_sliced = topic.split("/")
40 try:
41 applicationID = topic_sliced[0]
42 nodeID = topic_sliced[1]
43 topic = topic_sliced[2]
44 value = payload.decode()
46 if not nodeID.isnumeric():
47 print("[MQTT] nodeID is not parsable")
48 return
50 node = await Node.find_one(
51 And(
52 Eq(Node.nodeID, int(nodeID)),
53 Eq(Node.application, PydanticObjectId(applicationID)),
54 )
55 )
56 if node is None:
57 print(f"Couldn't find node '{nodeID}', applicationID '{applicationID}'")
58 return
60 node.lastSeenAt = datetime.now()
62 changed: bool = False
64 if topic == "status":
65 if value == "start":
66 new_state = update_state(
67 node.state, node.lastSeenAt, EventType.START_REC
68 )
69 changed = node.state != new_state
70 node.state = new_state
71 await node.save()
73 elif value == "stop":
74 new_state = update_state(
75 node.state, node.lastSeenAt, EventType.STOP_REC
76 )
77 changed = node.state != new_state
78 node.state = new_state
79 await node.save()
81 elif value == "keepalive":
82 new_state = update_state(
83 node.state, node.lastSeenAt, EventType.KEEP_ALIVE
84 )
85 changed = node.state != new_state
86 node.state = new_state
87 await node.save()
89 elif value == "launch":
90 new_state = update_state(
91 node.state, node.lastSeenAt, EventType.ON_LAUNCH
92 )
93 changed = node.state != new_state
94 node.state = new_state
95 await node.save()
96 await on_launch(node)
98 else:
99 print(f"Invalid value '{value}' for sub-topic '{topic}'")
100 else:
101 print(f"Invalid sub-topic '{topic}'")
103 if changed:
104 await socketManager.emit("change")
106 except IndexError as error:
107 print(f"Invalid topic '{topic}': {error}")
109 return mqtt