Coverage for microservice_websocket/app/__init__.py: 82%
26 statements
« prev ^ index » next coverage.py v7.0.0, created at 2022-12-20 18:11 +0000
« prev ^ index » next coverage.py v7.0.0, created at 2022-12-20 18:11 +0000
1from fakeredis import FakeStrictRedis
2from fastapi import FastAPI
3from fastapi.middleware.cors import CORSMiddleware
4from fastapi_socketio import SocketManager
5from redis import Redis
7from .services.database import init_db, user_manager
8from .services.mqtt import init_mqtt
9from .services.scheduler import init_scheduler
11mqtt = None
13app = FastAPI()
14app.add_middleware(
15 CORSMiddleware,
16 allow_origins=["*"],
17 allow_credentials=True,
18 allow_methods=["*"],
19 allow_headers=["*"],
20)
22socketManager = SocketManager(app=app, cors_allowed_origins=[])
25@app.on_event("startup")
26async def app_init():
27 global redis_client
28 global mqtt
30 init_scheduler()
32 from .config import TESTING, config as Config
34 if not TESTING: 34 ↛ 35line 34 didn't jump to line 35, because the condition on line 34 was never true
35 mqtt = init_mqtt(Config.mqtt)
36 mqtt.init_app(app)
37 redis_client = Redis(
38 host=Config.redis.host, port=Config.redis.port, db=Config.redis.db
39 )
40 await init_db(Config.mongo.uri, Config.mongo.db)
41 else:
42 redis_client = FakeStrictRedis()
43 await init_db("mongomock://localhost:27017/test", "test")
45 await user_manager.create_user("foo@bar.com", "baz", "John", "Doe", role="admin")
48from .blueprints.api import main_router
50app.include_router(main_router)