55 lines
1.5 KiB
Python
55 lines
1.5 KiB
Python
# _*_ coding: utf-8 _*_
|
|
# @Time :2022/5/29 11:41
|
|
# @Email :508737091@qq.com
|
|
# @Author :qiangyanwen
|
|
# @File :jwt_token.py.py
|
|
import hashlib
|
|
from datetime import datetime, timedelta
|
|
from fastapi.security import OAuth2PasswordBearer
|
|
from fastapi import Depends, HTTPException, status
|
|
from passlib.context import CryptContext
|
|
from jose import jwt, JWTError
|
|
from config import settings
|
|
|
|
crypt_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
|
|
|
|
|
|
def get_hash_pwd(password: str):
|
|
return crypt_context.hash(password)
|
|
|
|
|
|
def get_md5_pwd(pwd: str):
|
|
m = hashlib.md5()
|
|
m.update(pwd.encode("utf-8"))
|
|
return m.hexdigest()
|
|
|
|
|
|
def create_token(data: dict, expire_time):
|
|
if expire_time:
|
|
expire = datetime.utcnow() + expire_time
|
|
else:
|
|
expire = datetime.utcnow() + timedelta(minutes=30)
|
|
|
|
data.update({"exp": expire})
|
|
token = jwt.encode(data, settings.ACCESS.SECRET_KEY, algorithm=settings.ACCESS.ALGORITHM)
|
|
return token
|
|
|
|
|
|
auth = OAuth2PasswordBearer(tokenUrl="/api/login")
|
|
|
|
|
|
def parse_token(token: str = Depends(auth)):
|
|
token_exception = HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="token不正确或已过期",
|
|
headers={"WWW-Authenticate": "Bearer"}
|
|
)
|
|
try:
|
|
jwt_data = jwt.decode(token, settings.ACCESS.SECRET_KEY, algorithms=settings.ACCESS.ALGORITHM)
|
|
user_id = jwt_data.get("id")
|
|
if user_id is None or user_id == "":
|
|
raise token_exception
|
|
except JWTError:
|
|
raise token_exception
|
|
return user_id
|