Coverage for microservice_websocket/app/__init__.py: 82%

26 statements  

« prev     ^ index     » next       coverage.py v7.0.0, created at 2022-12-20 18:11 +0000

1from fakeredis import FakeStrictRedis 

2from fastapi import FastAPI 

3from fastapi.middleware.cors import CORSMiddleware 

4from fastapi_socketio import SocketManager 

5from redis import Redis 

6 

7from .services.database import init_db, user_manager 

8from .services.mqtt import init_mqtt 

9from .services.scheduler import init_scheduler 

10 

11mqtt = None 

12 

13app = FastAPI() 

14app.add_middleware( 

15 CORSMiddleware, 

16 allow_origins=["*"], 

17 allow_credentials=True, 

18 allow_methods=["*"], 

19 allow_headers=["*"], 

20) 

21 

22socketManager = SocketManager(app=app, cors_allowed_origins=[]) 

23 

24 

25@app.on_event("startup") 

26async def app_init(): 

27 global redis_client 

28 global mqtt 

29 

30 init_scheduler() 

31 

32 from .config import TESTING, config as Config 

33 

34 if not TESTING: 34 ↛ 35line 34 didn't jump to line 35, because the condition on line 34 was never true

35 mqtt = init_mqtt(Config.mqtt) 

36 mqtt.init_app(app) 

37 redis_client = Redis( 

38 host=Config.redis.host, port=Config.redis.port, db=Config.redis.db 

39 ) 

40 await init_db(Config.mongo.uri, Config.mongo.db) 

41 else: 

42 redis_client = FakeStrictRedis() 

43 await init_db("mongomock://localhost:27017/test", "test") 

44 

45 await user_manager.create_user("foo@bar.com", "baz", "John", "Doe", role="admin") 

46 

47 

48from .blueprints.api import main_router 

49 

50app.include_router(main_router)