Coverage for microservice_websocket/app/blueprints/api/node.py: 100%

23 statements  

« prev     ^ index     » next       coverage.py v7.0.0, created at 2022-12-20 14:31 +0000

1from fastapi import APIRouter, Depends 

2from pydantic import BaseModel 

3 

4from ...services.database import Node, NodeSettings 

5from ...services.jwt import jwt_required 

6from ...utils.node import get_nodes 

7from ...utils.node_settings import get_node_settings, update_node_settings 

8 

9node_router = APIRouter() 

10 

11 

12class GetNodesResponse(BaseModel): 

13 nodes: list[Node.Serialized] 

14 

15 

16@node_router.get( 

17 "/nodes/", dependencies=[Depends(jwt_required)], response_model=GetNodesResponse 

18) 

19async def get_nodes_route(applicationID: str): 

20 nodes: list[Node] = await get_nodes(applicationID) 

21 

22 return {"nodes": [await x.serialize() for x in nodes]} 

23 

24 

25@node_router.get( 

26 "/node/{nodeID}/settings", 

27 dependencies=[Depends(jwt_required)], 

28 response_model=NodeSettings.Serialized, 

29) 

30async def get_node_settings_route(nodeID: int): 

31 settings: NodeSettings = await get_node_settings(nodeID) 

32 

33 return settings.serialize() 

34 

35 

36@node_router.put("/node/{nodeID}/settings", dependencies=[Depends(jwt_required)]) 

37async def update_node_settings_route(payload: NodeSettings.Serialized, nodeID: int): 

38 from ... import socketManager 

39 

40 await update_node_settings(nodeID, payload) 

41 

42 await socketManager.emit("settings-update") 

43 

44 return {"message": "Updated"}