Coverage for mock_mobius/database.py: 100%

23 statements  

« prev     ^ index     » next       coverage.py v7.0.0, created at 2022-12-20 14:31 +0000

1from __future__ import annotations 

2 

3import json 

4 

5from flask_mongoengine import Document 

6from mongoengine.fields import DictField, FloatField, StringField 

7 

8 

9############################################################### 

10# definizione della struttura del documento inserito in mongo # 

11############################################################### 

12class Reading(Document): 

13 readingId = StringField(max_length=100, required=True) 

14 sensorPath = StringField(max_length=100, required=True) 

15 sensorId = StringField(max_length=100, required=True) 

16 readingTimestamp = StringField(max_length=100, required=True) 

17 latitude = FloatField() 

18 longitude = FloatField() 

19 sensorData = DictField(required=True) 

20 

21 def to_json(self) -> dict: 

22 return { 

23 "pi": "", 

24 "ri": self.readingId, 

25 "ct": self.readingTimestamp, # ? 

26 "con": { 

27 "metadata": { 

28 "sensorId": self.sensorId, 

29 "readingTimestamp": self.readingTimestamp, 

30 "latitude": self.latitude, 

31 "longitude": self.longitude, 

32 }, 

33 }, 

34 "sensorData": self.sensorData, 

35 } 

36 

37 @classmethod 

38 def from_json(cls, s: str) -> Reading: 

39 s_dict = json.loads(s) 

40 print(s_dict) 

41 new_p = Reading() 

42 new_p.sensorId = s_dict["m2m:cin"]["con"]["metadata"]["sensorId"] 

43 new_p.readingTimestamp = s_dict["m2m:cin"]["con"]["metadata"][ 

44 "readingTimestamp" 

45 ] 

46 new_p.sensorData = s_dict["m2m:cin"]["sensorData"] 

47 

48 return new_p