monitoring/utils/jwt_token.py

56 lines
1.5 KiB
Python

# _*_ coding: utf-8 _*_
# @Time :2022/5/29 11:41
# @Email :508737091@qq.com
# @Author :qiangyanwen
# @File :jwt_token.py.py
import hashlib
from datetime import datetime, timedelta
from fastapi.security import OAuth2PasswordBearer
from fastapi import Depends, HTTPException, status
from passlib.context import CryptContext
from jose import jwt, JWTError
from config import settings
crypt_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def get_hash_pwd(password: str):
return crypt_context.hash(password)
def get_md5_pwd(pwd: str):
m = hashlib.md5()
m.update(pwd.encode("utf-8"))
return m.hexdigest()
def create_token(data: dict, expire_time):
if expire_time:
expire = datetime.utcnow() + expire_time
else:
expire = datetime.utcnow() + timedelta(minutes=30)
data.update({"exp": expire})
token = jwt.encode(data, settings.ACCESS.SECRET_KEY, algorithm=settings.ACCESS.ALGORITHM)
return token
oauth_scame = OAuth2PasswordBearer(tokenUrl="/api/login")
def parse_token(token: str = Depends(oauth_scame)):
token_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="token不正确或已过期",
headers={"WWW-Authenticate": "Bearer"}
)
try:
jwt_data = jwt.decode(token, settings.ACCESS.SECRET_KEY, algorithms=settings.ACCESS.ALGORITHM)
user_id = jwt_data.get("id")
if user_id is None or user_id == "":
raise token_exception
except JWTError:
raise token_exception
return user_id