Coverage for mock_mobius/app.py: 89%

57 statements  

« prev     ^ index     » next       coverage.py v7.0.0, created at 2022-12-20 14:31 +0000

1import json 

2import secrets 

3from datetime import datetime 

4 

5from database import Reading 

6from flask import Flask, jsonify, make_response, request 

7from flask_mongoengine import MongoEngine 

8 

9 

10def create_app(config_filename: str = "config.json") -> Flask: 

11 app = Flask(__name__) 

12 

13 # TODO: randomize key gen 

14 app.config["SECRET_KEY"] = "secret!" 

15 

16 app.config["MONGODB_SETTINGS"] = { 

17 "db": "mobius", 

18 "host": "localhost", 

19 "port": 27017, 

20 } 

21 

22 app.config.from_file(config_filename, load=json.load) 

23 

24 db = MongoEngine() 

25 db.init_app(app) 

26 

27 @app.route("/<SENSOR_PATH>", methods=["POST"]) 

28 def publish(SENSOR_PATH: str = ""): 

29 if SENSOR_PATH == "": 29 ↛ 30line 29 didn't jump to line 30, because the condition on line 29 was never true

30 return {}, 404 

31 

32 reading: Reading = Reading.from_json(request.data.decode()) 

33 reading.sensorPath = SENSOR_PATH 

34 reading.readingId = secrets.token_urlsafe(16) 

35 reading.save() 

36 

37 return {}, 200 

38 

39 @app.route("/<SENSOR_PATH>", methods=["GET"]) 

40 # Impostare header 

41 def read(SENSOR_PATH: str = ""): 

42 if SENSOR_PATH == "": 42 ↛ 43line 42 didn't jump to line 43, because the condition on line 42 was never true

43 return {}, 404 

44 

45 # Request args parsing 

46 sup_time_limit: str = request.args.get("crb", "") 

47 inf_time_limit: str = request.args.get("cra", "") 

48 n_limit: int = int(request.args.get("lim", "-1")) 

49 

50 # Querying for SENSOR_PATH 

51 letture_raw = Reading.objects(sensorPath=SENSOR_PATH) # type: ignore 

52 

53 # Apply superior time limit if sup_time_limit length is valid 

54 if len(sup_time_limit) in [8, 15]: 

55 sup_time_limit = datetime.strptime( 

56 sup_time_limit, 

57 "%Y%m%d" if len(sup_time_limit) == 8 else "%Y%m%dT%H%M%S", 

58 ).isoformat() 

59 letture_raw = letture_raw.filter(readingTimestamp__lt=sup_time_limit) 

60 

61 # Apply inferior time limit if inf_time_limit length is valid 

62 if len(inf_time_limit) in [8, 15]: 

63 inf_time_limit = datetime.strptime( 

64 inf_time_limit, 

65 "%Y%m%d" if len(inf_time_limit) == 8 else "%Y%m%dT%H%M%S", 

66 ).isoformat() 

67 letture_raw = letture_raw.filter(readingTimestamp__gt=inf_time_limit) 

68 

69 # Apply quantity limit if n_limit is valid 

70 if n_limit != -1 and len(letture_raw) > n_limit: 

71 letture_raw = letture_raw[:n_limit] 

72 

73 # Return if no reading is found 

74 if len(letture_raw) == 0: 74 ↛ 75line 74 didn't jump to line 75, because the condition on line 74 was never true

75 return {}, 404 

76 

77 # Convert remaining readings to json 

78 letture: list[dict] = [x.to_json() for x in letture_raw] 

79 

80 response = make_response(jsonify({"m2m:rsp": {"m2m:cin": letture}})) 

81 response.headers["X-M2M-Origin"] = "" 

82 response.headers["Content-Type"] = "application/vnd.onem2m-res+json;ty=4" 

83 response.headers["X-M2M-RI"] = datetime.isoformat(datetime.now()) 

84 

85 return response 

86 

87 @app.route("/<SENSOR_PATH>/la", methods=["GET"]) 

88 def read_last(SENSOR_PATH: str = ""): 

89 descending_time_readings = Reading.objects( 

90 sensorPath=SENSOR_PATH 

91 ).order_by( # type: ignore 

92 "-readingTimestamp" 

93 ) 

94 

95 if len(descending_time_readings) == 0: 95 ↛ 96line 95 didn't jump to line 96, because the condition on line 95 was never true

96 return {}, 404 

97 

98 response = make_response( 

99 jsonify({"m2m:cin": descending_time_readings[0].to_json()}) 

100 ) 

101 response.headers["X-M2M-Origin"] = "" 

102 response.headers["Content-Type"] = "application/vnd.onem2m-res+json;ty=4" 

103 response.headers["X-M2M-RI"] = datetime.isoformat(datetime.now()) 

104 

105 return response 

106 

107 return app