刘功瑞的博客

有一天你突然惊醒,发现这一切,都只不过是一场梦。

pyJWT使用

最近要用 Falsk 开发一个大点的后端,为了安全考虑,弃用传统的Cookie验证。转用JWT。

其实 Falsk 有一个 Falsk-JWT 但是我觉得封装的太高,还是喜欢通用的 PyJWT 。

JWT官网

https://jwt.io/

安装

pip install PyJWT

项目文档(英文)

https://pyjwt.readthedocs.io/en/latest/

 Demo

import jwt
import datetime

dic = {
    'exp': datetime.datetime.now() + datetime.timedelta(days=1),  # 过期时间
    'iat': datetime.datetime.now(),  #  开始时间
    'iss': 'lianzong',  # 签名
    'data': {  # 内容,一般存放该用户id和开始时间
        'a': 1,
        'b': 2,
    },
}

s = jwt.encode(dic, 'secret', algorithm='HS256')  # 加密生成字符串
print(s)
s = jwt.decode(s, 'secret', issuer='lianzong', algorithms=['HS256'])  # 解密,校验签名
print(s)
print(type(s))
b'eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJleHAiOjE1NDUzMDI5OTIsImlhdCI6MTU0NTIxNjU5MiwiaXNzIjoibGlhbnpvbmciLCJkYXRhIjp7ImEiOjEsImIiOjJ9fQ.pSq-XRcC-E7zeg3u0X6TsKdhhsCPh3tB40_YJNho8CY'
{'exp': 1545302992, 'iat': 1545216592, 'iss': 'lianzong', 'data': {'a': 1, 'b': 2}}
<class 'dict'>

解析

首先我们注意dic的结构

dic 有官方指定的key,程序在解密的时候会根据key的Value判断是否合法。这些key有

  • “exp”: 过期时间

  • “nbf”: 表示当前时间在nbf里的时间之前,则Token不被接受

  • “iss”: token签发者

  • “aud”: 接收者

  • “iat”: 发行时间

我们一般设置 过期时间,发行时间,接收者。我们来分别解释这些key

exp   

exp指过期时间,在生成token时,可以设置该token的有效时间,如果我们设置1天过期,1天后我们再解析此token会抛出

jwt.exceptions.ExpiredSignatureError: Signature has expired

nbf

nbf类似于token的 lat ,它指的是该token的生效时间,如果使用但是没到生效时间则抛出

jwt.exceptions.ImmatureSignatureError: The token is not yet valid (nbf)

iss

iss指的是该token的签发者,我们可以给他一个字符串。

注意,iss 在接收时如果不检验也没有问题,如果我们接收时需要检验但是又签名不一致,则会抛出

jwt.exceptions.InvalidIssuerError: Invalid issuer

aud

aud指定了接收者,接收者在接收时必须提供与token要求的一致的接收者(字符串),如果没写接收者或者接收者不一致会抛出

jwt.exceptions.InvalidAudienceError: Invalid audience

iat

iat指的是token的开始时间,如果当前时间在开始时间之前则抛出

jwt.exceptions.InvalidIssuedAtError: Issued At claim (iat) cannot be in the future.

 

注意

如果我们不需要验证所有信息直接生成token可以设置

jwt.decode(encoded, verify=False)

但是这样有什么用呢?

 

生成/解密参数

jwt.encode(payload, config.SECRET_KEY, algorithm='HS256')

上面代码的jwt.encode方法中传入了三个参数:第一个是payload,这是认证依据的主要信息,第二个是密钥,这里是读取配置文件中的SECRET_KEY配置变量,第三个是生成Token的算法。

一般我们使用HS256

第二个参数是生成token的密钥

我们需要在加密时指定

解密时也是第二个参数来指定解密密钥,这两个密钥必须相同




-------------------------------------------------------------------------------------------------------------------------------------

PyJWT是一个Python库,允许您对JSON Web Tokens(JWT)进行编码和解码。JWT是一个开放的行业标准(RFC 7519),用于在双方之间安全地声明索赔。

安装

1
pip install pyjwt

 

如需了解更多,请参阅安装了解更多信息

用法示例

1
2
3
4
5
6
7
8
>>> import jwt
 
>>> encoded_jwt = jwt.encode({'some''payload'}, 'secret', algorithm='HS256')
>>> encoded_jwt
'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzb21lIjoicGF5bG9hZCJ9.4twFt5NiznN84AWoo1d7KO1T_yoc0Z6XOpOVswacPZg'
 
>>> jwt.decode(encoded_jwt, 'secret', algorithms=['HS256'])
{'some''payload'}

加密相依性(可选)

如果您计划使用某些数字签名算法(如RSA或ECDSA)进行编码或解码令牌,则需要安装 加密库。

1
pip install cryptography

传统的依赖关系

  • 一些环境,比如最常用的Google App Engine,不允许安装需要编译C扩展名的Python包,因此无法安装加密。如果你能安装加密,你应该忽略这个部分。

  • 如果你正在部署一个应用程序到这些环境之一,你需要使用数字签名算法的传统实现。 

1
pip install pycrypto ecdsa

一旦你已经安装了 pycrypto 和 ecdcsa,你可以在PyJWT中使用jwt.register_algorithm()。下面的示例代码将展示怎样配置PyJWT,去使用具有SHA256和EC与SHA256签名的RSA的传统实现。 

import jwt

from jwt.contrib.algorithms.pycrypto import RSAAlgorithm

from jwt.contrib.algorithms.py_ecdsa import ECAlgorithm

jwt.register_algorithm('RS256', RSAAlgorithm(RSAAlgorithm.SHA256))

jwt.register_algorithm('ES256', ECAlgorithm(ECAlgorithm.SHA256))



----------------------------------------------------------------------------------------------------------------------------------------------------------------------

import datetime, jwt, time
from app.dao.userDao import UserDao
from flask import jsonify
from .. import common
class Auth():
    @staticmethod
    def encode_auth_token(user_id, login_time):
        """
        生成认证Token
        :param user_id: int
        :param login_time: int(timestamp)
        :return: string
        """
        try:
            payload = {
                'exp': datetime.datetime.utcnow() + datetime.timedelta(days=0, seconds=10),
                'iat': datetime.datetime.utcnow(),
                'iss': 'ken',
                'data': {
                    'id':user_id,
                    'login_time': login_time
                }
            }
            return jwt.encode(
                payload,
                'secret',
                algorithm='HS256'
            )
        except Exception  as e:
            return e

    @staticmethod
    def decode_auth_token(auth_token):
        """
        验证Token
        :param auth_token:
        :return: integer|string
        """
        try:
            payload = jwt.decode(auth_token, 'secret', options= {'verify_exp':False})
            if ('data' in payload and 'id' in payload['data']):
                return payload
            else:
                raise jwt.InvalidTokenError

        except jwt.ExpiredSignatureError:
            return "Token过期"
        except jwt.InvalidTokenError:
            return "无效的Token"

    def authenticate(self, username, password):
        """
        用户登录,登录成功返回token,写将登录时间写入数据库;登录失败返回失败原因
        :param password:
        :return: json
        """
        userDao = UserDao()
        user = userDao.search(username)
        if (user is None):
            return jsonify(common.falseReturn('', '找不到用户'))
        else:
            if (user.password == password):
                login_time = int(time.time())
                token = self.encode_auth_token(user.username, login_time)
                return jsonify(common.trueReturn(token.decode(), '登陆成功'))
            else:
                return jsonify(common.falseReturn('', '密码不正确'))

    def identify(self, request):
        """
        用户鉴权
        :return: list
        """
        auth_header = request.headers.get('Authorization')
        if (auth_header):
            auth_tokenArr = auth_header.split(" ")
            if (not auth_tokenArr or auth_tokenArr[0]!= 'jwt' or len(auth_tokenArr) != 2 ):
                result = common.falseReturn('','请传递正确的验证头信息')
            else:
                auth_token = auth_tokenArr[1]
                payload = self.decode_auth_token(auth_token)
                if not isinstance(payload, str):
                    userDao = UserDao()
                    user = userDao.search(payload['data']['id'])
                    if (user is None):
                        result = common.falseReturn('', '找不到该用户信息')
                    else:
                        result = common.trueReturn('', '请求成功')
                else:
                    result = common.falseReturn('', payload)
        else:
            result = common.falseReturn('','没有提供认证token')
        return result

代码说明:

authenticate: 根据用户名/密码,到DB中进行校验,如果是合法的用户名/密码,调用encode_auth_token生成token返回;username加入到token的payload中。
encode_auth_token:  生成token的函数,payload可以存储一些不敏感的信息,比如用户名等,但是不能存密码;还有指定签名算法和秘钥。
identify: 用户的请求需要携带token信息,这个函数对request进行校验,调用decode_auth_token完成的校验。
decode_auth_token: 调用jwt.decode进行token校验(要指定秘钥,秘钥和生成token的秘钥一样)

3: 拦截所有的请求,都进行token校验
 @app.before_request
 def before_request():
     Auth.identify(Auth,request )


发表评论:

Powered By Z-BlogPHP 1.5.2 Zero

Copyright www.liugongrui.com.All Rights Reserved.