Coverage for microservice_websocket/app/services/mqtt/__init__.py: 12%

65 statements  

« prev     ^ index     » next       coverage.py v7.0.0, created at 2022-12-20 14:31 +0000

1from datetime import datetime 

2 

3from beanie import PydanticObjectId 

4from beanie.operators import And, Eq 

5from fastapi_mqtt import FastMQTT, MQTTConfig 

6 

7from ...config import MQTTConfig as MQTTConfigInternal 

8from ...utils.enums import EventType 

9from ...utils.node import on_launch, update_state 

10from ..database.models import Node 

11 

12STATUS_TOPIC = "+/+/status" 

13 

14 

15def init_mqtt(conf: MQTTConfigInternal) -> FastMQTT: 

16 mqtt_config = MQTTConfig( 

17 host=conf.host, 

18 port=conf.port, 

19 ssl=conf.tls_enabled, 

20 username=conf.username, 

21 password=conf.password, 

22 ) 

23 

24 mqtt = FastMQTT(config=mqtt_config) 

25 

26 @mqtt.on_connect() 

27 def connect(client, flags, rc, properties): 

28 print("[MQTT] Connected") 

29 mqtt.client.subscribe(STATUS_TOPIC) 

30 print(f"[MQTT] Subscribed to '{STATUS_TOPIC}'") 

31 

32 @mqtt.on_message() 

33 async def on_message(client, topic: str, payload: bytes, qos, properties): 

34 from ... import socketManager 

35 

36 print(f"[MQTT] Someone published '{str(payload)}' on '{topic}'") 

37 

38 topic_sliced = topic.split("/") 

39 

40 try: 

41 applicationID = topic_sliced[0] 

42 nodeID = topic_sliced[1] 

43 topic = topic_sliced[2] 

44 value = payload.decode() 

45 

46 if not nodeID.isnumeric(): 

47 print("[MQTT] nodeID is not parsable") 

48 return 

49 

50 node = await Node.find_one( 

51 And( 

52 Eq(Node.nodeID, int(nodeID)), 

53 Eq(Node.application, PydanticObjectId(applicationID)), 

54 ) 

55 ) 

56 if node is None: 

57 print(f"Couldn't find node '{nodeID}', applicationID '{applicationID}'") 

58 return 

59 

60 node.lastSeenAt = datetime.now() 

61 

62 changed: bool = False 

63 

64 if topic == "status": 

65 if value == "start": 

66 new_state = update_state( 

67 node.state, node.lastSeenAt, EventType.START_REC 

68 ) 

69 changed = node.state != new_state 

70 node.state = new_state 

71 await node.save() 

72 

73 elif value == "stop": 

74 new_state = update_state( 

75 node.state, node.lastSeenAt, EventType.STOP_REC 

76 ) 

77 changed = node.state != new_state 

78 node.state = new_state 

79 await node.save() 

80 

81 elif value == "keepalive": 

82 new_state = update_state( 

83 node.state, node.lastSeenAt, EventType.KEEP_ALIVE 

84 ) 

85 changed = node.state != new_state 

86 node.state = new_state 

87 await node.save() 

88 

89 elif value == "launch": 

90 new_state = update_state( 

91 node.state, node.lastSeenAt, EventType.ON_LAUNCH 

92 ) 

93 changed = node.state != new_state 

94 node.state = new_state 

95 await node.save() 

96 await on_launch(node) 

97 

98 else: 

99 print(f"Invalid value '{value}' for sub-topic '{topic}'") 

100 else: 

101 print(f"Invalid sub-topic '{topic}'") 

102 

103 if changed: 

104 await socketManager.emit("change") 

105 

106 except IndexError as error: 

107 print(f"Invalid topic '{topic}': {error}") 

108 

109 return mqtt