Coverage for microservice_websocket/app/services/database/__init__.py: 83%

10 statements  

« prev     ^ index     » next       coverage.py v7.0.0, created at 2022-12-20 14:31 +0000

1# class Node(Document): 

2# def to_dashboard(self) -> dict: 

3# unhandledAlerts = Alert.objects(node=self, isHandled=False) 

4# 

5# unhandledAlertIDs = [str(x["id"]) for x in unhandledAlerts] 

6# 

7# return { 

8# "nodeID": self.nodeID, 

9# "nodeName": self.nodeName, 

10# "applicationID": str(self.application["id"]), 

11# "state": NodeState.to_irma_ui_state(self.state), 

12# "unhandledAlertIDs": unhandledAlertIDs, 

13# } 

14 

15 

16from beanie import init_beanie 

17from mongomock_motor import AsyncMongoMockClient 

18from motor.motor_asyncio import AsyncIOMotorClient 

19 

20from .models import Alert, Application, Node, NodeSettings, Organization, Reading, User 

21 

22 

23async def init_db(db_uri: str, db_name: str): 

24 if "mongomock" in db_uri: 24 ↛ 27line 24 didn't jump to line 27, because the condition on line 24 was never false

25 client = AsyncMongoMockClient() 

26 else: 

27 client = AsyncIOMotorClient(db_uri) 

28 db = client[db_name] 

29 

30 await init_beanie( 

31 database=db, 

32 document_models=[ 

33 Organization, 

34 Application, 

35 User, 

36 Node, 

37 NodeSettings, 

38 Reading, 

39 Alert, 

40 ], 

41 )