Coverage for microservice_websocket/app/services/jwt/__init__.py: 73%
36 statements
« prev ^ index » next coverage.py v7.0.0, created at 2022-12-20 14:31 +0000
« prev ^ index » next coverage.py v7.0.0, created at 2022-12-20 14:31 +0000
1from datetime import datetime, timedelta
2from typing import Optional
4from fastapi import Depends, HTTPException, status
5from fastapi.security import OAuth2PasswordBearer
6from jose import ExpiredSignatureError, JWTError, jwt
8from ...config import config as Config
9from ..database import User
11SECRET_KEY = Config.jwt.secret_key
12ALGORITHM = "HS256"
13ACCESS_TOKEN_EXPIRE_MINUTES = Config.jwt.access_token_expires_minutes
16oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
18credentials_exception = HTTPException(
19 status_code=status.HTTP_401_UNAUTHORIZED,
20 detail="Could not validate credentials",
21 headers={"WWW-Authenticate": "Bearer"},
22)
25def create_access_token(email: str, expires_delta: Optional[timedelta] = None) -> str:
26 if expires_delta: 26 ↛ 27line 26 didn't jump to line 27, because the condition on line 26 was never true
27 expire = datetime.utcnow() + expires_delta
28 else:
29 expire = datetime.utcnow() + timedelta(minutes=15)
31 to_encode = {"sub": email, "exp": expire}
33 encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
34 return encoded_jwt
37async def get_user_from_jwt(token: str = Depends(oauth2_scheme)) -> User:
38 try:
39 payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
40 email = payload.get("sub")
41 if email is None: 41 ↛ 42line 41 didn't jump to line 42, because the condition on line 41 was never true
42 raise credentials_exception
43 except ExpiredSignatureError:
44 raise HTTPException(
45 status_code=status.HTTP_401_UNAUTHORIZED,
46 detail="JWT Expired",
47 headers={"WWW-Authenticate": "Bearer"},
48 )
49 except JWTError:
50 raise credentials_exception
52 from ..database.user_manager import get_user_from_mail
54 user = await get_user_from_mail(email)
55 if user is None: 55 ↛ 56line 55 didn't jump to line 56, because the condition on line 55 was never true
56 raise credentials_exception
58 return user
61async def jwt_required(token: str = Depends(oauth2_scheme)):
62 await get_user_from_jwt(token)