Coverage for tests/test_microservice_websocket/test_routes/test_route_external_archiviation.py: 100%
38 statements
« prev ^ index » next coverage.py v7.0.0, created at 2022-12-20 14:31 +0000
« prev ^ index » next coverage.py v7.0.0, created at 2022-12-20 14:31 +0000
1from fastapi.testclient import TestClient
3from microservice_websocket.app.utils.external_archiviation import SET_NAME
6class TestGetExternalArchiviation:
7 endpoint = "/api/external-archiviations"
9 # Test get external archiviations endopoints
10 def test_get_data(self, app_client: TestClient, auth_header):
11 from microservice_websocket.app import redis_client
13 mock_endpoint = "foobar"
14 redis_client.sadd(SET_NAME, mock_endpoint)
16 response = app_client.get(self.endpoint, headers=auth_header)
18 assert response.status_code == 200, "Invalid response code when querying data"
19 assert len(response.json()["endpoints"]) == 1, "Invalid endpoints number"
20 assert response.json()["endpoints"][0] == mock_endpoint, "Invalid endpoint"
22 redis_client.srem(SET_NAME, mock_endpoint)
25class TestPostExternalArchiviation:
26 endpoint = "/api/external-archiviation"
28 # Test adding url of external archiviation adapter
29 # with wrong payload
30 def test_add_invalid_payload(self, app_client: TestClient, auth_header):
31 response = app_client.post(
32 self.endpoint,
33 headers=auth_header,
34 json={},
35 )
37 assert (
38 response.status_code == 422
39 ), "Invalid response code when posting invalid data"
41 # Test adding url of external archiviation adapter
42 def test_add(self, app_client: TestClient, auth_header):
43 from microservice_websocket.app import redis_client
45 mock_endpoint = "localhost:8080"
46 response = app_client.post(
47 self.endpoint,
48 headers=auth_header,
49 json={"endpoint": mock_endpoint},
50 )
52 assert (
53 response.status_code == 200
54 ), "Invalid response code when posting valid data"
55 assert len(redis_client.smembers(SET_NAME)) == 1, "Invalid number of endpoints"
56 assert redis_client.sismember(SET_NAME, mock_endpoint)
58 redis_client.srem(SET_NAME, mock_endpoint)
61class TestDeleteExternalArchiviation:
62 endpoint = "/api/external-archiviation/"
64 # Test deleting non-existing endpoint of external archiviation adapter
65 def test_delete_non_existing_endpoint(self, app_client: TestClient, auth_header):
66 response = app_client.delete(self.endpoint + "foobar", headers=auth_header)
68 assert (
69 response.status_code == 404
70 ), "Invalid response code when trying to delete non-existing endpoint"
72 # Test deleting the endpoint of external archiviation adapter
73 def test_delete(self, app_client: TestClient, auth_header):
74 from microservice_websocket.app import redis_client
76 mock_endpoint = "bazqux"
77 redis_client.sadd(SET_NAME, mock_endpoint)
79 response = app_client.delete(self.endpoint + mock_endpoint, headers=auth_header)
81 assert (
82 response.status_code == 200
83 ), "Invalid response code when trying to delete endpoint"
84 assert (
85 len(redis_client.smembers(SET_NAME)) == 0
86 ), "Invalid number of endpoints after deletion"