Coverage for microservice_websocket/app/services/jwt/__init__.py: 73%

36 statements  

« prev     ^ index     » next       coverage.py v7.0.0, created at 2022-12-20 14:31 +0000

1from datetime import datetime, timedelta 

2from typing import Optional 

3 

4from fastapi import Depends, HTTPException, status 

5from fastapi.security import OAuth2PasswordBearer 

6from jose import ExpiredSignatureError, JWTError, jwt 

7 

8from ...config import config as Config 

9from ..database import User 

10 

11SECRET_KEY = Config.jwt.secret_key 

12ALGORITHM = "HS256" 

13ACCESS_TOKEN_EXPIRE_MINUTES = Config.jwt.access_token_expires_minutes 

14 

15 

16oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") 

17 

18credentials_exception = HTTPException( 

19 status_code=status.HTTP_401_UNAUTHORIZED, 

20 detail="Could not validate credentials", 

21 headers={"WWW-Authenticate": "Bearer"}, 

22) 

23 

24 

25def create_access_token(email: str, expires_delta: Optional[timedelta] = None) -> str: 

26 if expires_delta: 26 ↛ 27line 26 didn't jump to line 27, because the condition on line 26 was never true

27 expire = datetime.utcnow() + expires_delta 

28 else: 

29 expire = datetime.utcnow() + timedelta(minutes=15) 

30 

31 to_encode = {"sub": email, "exp": expire} 

32 

33 encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) 

34 return encoded_jwt 

35 

36 

37async def get_user_from_jwt(token: str = Depends(oauth2_scheme)) -> User: 

38 try: 

39 payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) 

40 email = payload.get("sub") 

41 if email is None: 41 ↛ 42line 41 didn't jump to line 42, because the condition on line 41 was never true

42 raise credentials_exception 

43 except ExpiredSignatureError: 

44 raise HTTPException( 

45 status_code=status.HTTP_401_UNAUTHORIZED, 

46 detail="JWT Expired", 

47 headers={"WWW-Authenticate": "Bearer"}, 

48 ) 

49 except JWTError: 

50 raise credentials_exception 

51 

52 from ..database.user_manager import get_user_from_mail 

53 

54 user = await get_user_from_mail(email) 

55 if user is None: 55 ↛ 56line 55 didn't jump to line 56, because the condition on line 55 was never true

56 raise credentials_exception 

57 

58 return user 

59 

60 

61async def jwt_required(token: str = Depends(oauth2_scheme)): 

62 await get_user_from_jwt(token)