Coverage for microservice_websocket/app/services/database/__init__.py: 83%
10 statements
« prev ^ index » next coverage.py v7.0.0, created at 2022-12-20 14:31 +0000
« prev ^ index » next coverage.py v7.0.0, created at 2022-12-20 14:31 +0000
1# class Node(Document):
2# def to_dashboard(self) -> dict:
3# unhandledAlerts = Alert.objects(node=self, isHandled=False)
4#
5# unhandledAlertIDs = [str(x["id"]) for x in unhandledAlerts]
6#
7# return {
8# "nodeID": self.nodeID,
9# "nodeName": self.nodeName,
10# "applicationID": str(self.application["id"]),
11# "state": NodeState.to_irma_ui_state(self.state),
12# "unhandledAlertIDs": unhandledAlertIDs,
13# }
16from beanie import init_beanie
17from mongomock_motor import AsyncMongoMockClient
18from motor.motor_asyncio import AsyncIOMotorClient
20from .models import Alert, Application, Node, NodeSettings, Organization, Reading, User
23async def init_db(db_uri: str, db_name: str):
24 if "mongomock" in db_uri: 24 ↛ 27line 24 didn't jump to line 27, because the condition on line 24 was never false
25 client = AsyncMongoMockClient()
26 else:
27 client = AsyncIOMotorClient(db_uri)
28 db = client[db_name]
30 await init_beanie(
31 database=db,
32 document_models=[
33 Organization,
34 Application,
35 User,
36 Node,
37 NodeSettings,
38 Reading,
39 Alert,
40 ],
41 )