Coverage for tests/test_microservice_websocket/test_routes/test_route_external_archiviation.py: 100%

38 statements  

« prev     ^ index     » next       coverage.py v7.0.0, created at 2022-12-20 14:31 +0000

1from fastapi.testclient import TestClient 

2 

3from microservice_websocket.app.utils.external_archiviation import SET_NAME 

4 

5 

6class TestGetExternalArchiviation: 

7 endpoint = "/api/external-archiviations" 

8 

9 # Test get external archiviations endopoints 

10 def test_get_data(self, app_client: TestClient, auth_header): 

11 from microservice_websocket.app import redis_client 

12 

13 mock_endpoint = "foobar" 

14 redis_client.sadd(SET_NAME, mock_endpoint) 

15 

16 response = app_client.get(self.endpoint, headers=auth_header) 

17 

18 assert response.status_code == 200, "Invalid response code when querying data" 

19 assert len(response.json()["endpoints"]) == 1, "Invalid endpoints number" 

20 assert response.json()["endpoints"][0] == mock_endpoint, "Invalid endpoint" 

21 

22 redis_client.srem(SET_NAME, mock_endpoint) 

23 

24 

25class TestPostExternalArchiviation: 

26 endpoint = "/api/external-archiviation" 

27 

28 # Test adding url of external archiviation adapter 

29 # with wrong payload 

30 def test_add_invalid_payload(self, app_client: TestClient, auth_header): 

31 response = app_client.post( 

32 self.endpoint, 

33 headers=auth_header, 

34 json={}, 

35 ) 

36 

37 assert ( 

38 response.status_code == 422 

39 ), "Invalid response code when posting invalid data" 

40 

41 # Test adding url of external archiviation adapter 

42 def test_add(self, app_client: TestClient, auth_header): 

43 from microservice_websocket.app import redis_client 

44 

45 mock_endpoint = "localhost:8080" 

46 response = app_client.post( 

47 self.endpoint, 

48 headers=auth_header, 

49 json={"endpoint": mock_endpoint}, 

50 ) 

51 

52 assert ( 

53 response.status_code == 200 

54 ), "Invalid response code when posting valid data" 

55 assert len(redis_client.smembers(SET_NAME)) == 1, "Invalid number of endpoints" 

56 assert redis_client.sismember(SET_NAME, mock_endpoint) 

57 

58 redis_client.srem(SET_NAME, mock_endpoint) 

59 

60 

61class TestDeleteExternalArchiviation: 

62 endpoint = "/api/external-archiviation/" 

63 

64 # Test deleting non-existing endpoint of external archiviation adapter 

65 def test_delete_non_existing_endpoint(self, app_client: TestClient, auth_header): 

66 response = app_client.delete(self.endpoint + "foobar", headers=auth_header) 

67 

68 assert ( 

69 response.status_code == 404 

70 ), "Invalid response code when trying to delete non-existing endpoint" 

71 

72 # Test deleting the endpoint of external archiviation adapter 

73 def test_delete(self, app_client: TestClient, auth_header): 

74 from microservice_websocket.app import redis_client 

75 

76 mock_endpoint = "bazqux" 

77 redis_client.sadd(SET_NAME, mock_endpoint) 

78 

79 response = app_client.delete(self.endpoint + mock_endpoint, headers=auth_header) 

80 

81 assert ( 

82 response.status_code == 200 

83 ), "Invalid response code when trying to delete endpoint" 

84 assert ( 

85 len(redis_client.smembers(SET_NAME)) == 0 

86 ), "Invalid number of endpoints after deletion"