网络安全-如何设计一个安全的API(安全角度)
目录
- API安全概述
- 设计一个安全的API
- 一个基本的API
- 主要代码
- 调用
- API的一些问题
- BasicAuth
- 认证流程
- 主要代码
- 问题
- API Key
- 流程
- 主要代码
- 问题
- Bearer auth/Token auth
- 流程
- Digest Auth
- 流程
- 主要代码
- 问题
- JWT Token
- 流程
- 代码
- 问题
- Hmac
- 流程
- 主要代码
- 问题
- OAuth
- 比较
- 自定义请求签名
- 身份认证&密钥加密
- 防重放
- 请求时效性
- 请求签名算法设计
- 代码
- 攻击与防御
- SQL注入
- 敏感信息泄露
- 越权攻击
- 重放攻击
- 全部代码
- 参考
API安全概述
利用API可进行以下常见的攻击:
- 注入攻击(SQL注入、命令注入、XSS等)
- DOS/DDOS攻击
- SSRF
- 未授权/水平(垂直)越权
- 敏感数据泄露
- 中间人攻击
- 更改请求方法调用
- 并发攻击
- 重放攻击
- 数据篡改和伪造
有以下常见的防御方式:
- 资源请求限制,通过限频等手段来解决DOS、DDOS攻击
- 线程加锁来解决并发攻击
- 权限控制,通过ABAC、RBAC等方式解决越权攻击
- 敏感信息防泄露,通过分类分级引擎,数据库加密存储等方式来解决敏感信息泄露
- 防重放,通过API认证解决重放攻击
- 加密,例如HTTPS来解决中间人攻击
- 安全产品,例如API网关、WAF等来解决大部分攻击
当然,有些还是需要API后端代码来进行防御,例如命令注入、SSRF等。
本文以API身份认证为主要内容,浅谈各种认证的使用场景与优缺点,同时穿插部分攻击与防御。
设计一个安全的API
一个基本的API
主要代码
import uuid
import re
import tracebackfrom flask import Flask, request, jsonify
from mysql import MysqlCli
from log import log
from setting import *app = Flask(__name__)
log.set_file()# 验证username
def validate_username(username:str)->bool:if len(username) > 20:return Falsereturn True# 验证手机号
def validate_phone_number(phone_number:str)->bool:# 使用正则表达式检查手机号格式pattern = re.compile(r'^1[3456789]\d{9}$')if re.match(pattern, phone_number):return Trueelse:return False# 管理员注册用户接口
@app.route('/api/v1.0/admin/add_user', methods=['POST'])
def add_user():resp = {"requestid": uuid.uuid4()}if request.is_json:data = request.get_json()username = data.get('username')if username is None:resp['error'] = 'username is required'log.logger.error(f"url:{request.url},params:{username},resp:{resp}")return jsonify(resp),400phone_number = data.get('phone_number')if phone_number is None:resp['error'] = 'phone_number is required'log.logger.error(f"url:{request.url},params:{phone_number},resp:{resp}")return jsonify(resp),400if not validate_username(username):resp['error'] = 'username length should not exceed 10'log.logger.error(f"url:{request.url},params:{username},resp:{resp}")return jsonify(resp),400try:cli = MysqlCli(MYSQL_HOST,MYSQL_USER,MYSQL_PASSWORD,MYSQL_DATABASE)cli.insert_one("users",{"username": username,"phone_number": phone_number})cli.close()resp['message'] = f'success to add user:{username}!'log.logger.info(f"url:{request.url},params:{username},resp:{resp}")return jsonify(resp)except Exception:log.logger.info(f"url:{request.url},params:{username},resp:{resp}")resp['error'] = f"failed to insert to mysql:{traceback.format_exc()}"return jsonify(resp),500else:resp['error'] = "Invalid JSON format in request"return jsonify(resp),400# 用户获取信息接口
@app.route('/api/v1.0/get_user_info', methods=['POST'])
def get_user_info():resp = {"requestid": uuid.uuid4()}if request.is_json:data = request.get_json()username = data.get('username')if username is None:resp['error'] = 'username is required'log.logger.error(f"url:{request.url},params:{username},resp:{resp}")return jsonify(resp),400if not validate_username(username):resp['error'] = 'username length should not exceed 10'log.logger.error(f"url:{request.url},params:{username},resp:{resp}")return jsonify(resp),400try:cli = MysqlCli(MYSQL_HOST, MYSQL_USER, MYSQL_PASSWORD, MYSQL_DATABASE)sql = f"select * from users where username = '{username}' limit 1"user = cli.select_all(sql)resp['message'] = f'success to get user:{user}.'log.logger.info(f"url:{request.url},params:{username},resp:{resp}")return jsonify(resp)except Exception:resp['error'] = f'failed to get user, error:{traceback.format_exc()}'log.logger.error(f"url:{request.url},params:{username},resp:{resp}")return jsonify(resp),500else:resp['error'] = "Invalid JSON format in request"return jsonify(resp),400if __name__ == '__main__':app.run()
可以看到共有两个API,管理员注册用户接口、用户获取信息接口,拥有以下功能或安全措施:
- 版本控制
- 日志记录
- 请求方法校验
- 请求数据校验
调用
API的一些问题
- 没有身份认证,只要有人知道api地址、方法、参数就能调用
- 没有调用次数限制,可能被Dos或DDos攻击,造成服务器压力
- 没有防止数据篡改或伪造,被抓包后可能被篡改或伪造数据
- 没有对重放攻击的防御,抓包后可能重放包
- 越权问题,管理员接口任何人都可以调用
- SQL注入,没有做参数化查询等防止SQL操作,可能被拖库
BasicAuth
认证流程
- 客户端发送请求头Authorization为Basic username:password(base64编码)的数据包
- 服务端对请求头Authorization判断,解码后从数据库查询判断账号密码是否正确
主要代码
def basic_auth(f):@wraps(f)def decorated_function(*args, **kwargs):resp = {"requestid": uuid.uuid4()}auth_header = request.headers.get('Authorization')if not auth_header or not auth_header.startswith('Basic '):resp['error'] = 'basic auth is required'log.logger.error(f"url:{request.url},resp:{resp}")return jsonify(resp), 400else:try:encoded_credentials = auth_header.split(' ')[1]decoded_credentials = base64.b64decode(encoded_credentials).decode('utf-8')username, password = decoded_credentials.split(':')if not check_basic_auth(username, password):resp['error'] = 'basic auth failed,check your username or password is right'log.logger.error(f"url:{request.url},resp:{resp}")return jsonify(resp), 401except Exception:resp['error'] = f'basic auth failed,err: {traceback.format_exc()}'log.logger.error(f"url:{request.url},resp:{resp}")return jsonify(resp), 500return f(*args, **kwargs)return decorated_function
# 管理员注册用户接口 v2.0 增加密码
@app.route('/api/v2.0/admin/basic_auth/add_user', methods=['POST'])
@basic_auth
def add_user_basic_auth():resp = {"requestid": uuid.uuid4()}# 请求是json格式if request.is_json:data = request.get_json()username = data.get('username')# username检查if username is None:resp['error'] = 'username is required'log.logger.error(f"url:{request.url},params:{username},resp:{resp}")return jsonify(resp),400if not validate_username(username):resp['error'] = 'username length should not exceed 10'log.logger.error(f"url:{request.url},params:{username},resp:{resp}")return jsonify(resp),400# phone_number检查phone_number = data.get('phone_number')if phone_number is None:resp['error'] = 'phone_number is required'log.logger.error(f"url:{request.url},params:{phone_number},resp:{resp}")return jsonify(resp),400# 插入数据库try:cli = MysqlCli(MYSQL_HOST,MYSQL_USER,MYSQL_PASSWORD,MYSQL_DATABASE)cli.insert_one("users",{"username": username,"phone_number": phone_number})pwd = generate_random_password(secrets.choice(range(8, 17)))cli.insert_one("passwords", {"username": username,"password": pwd})cli.close()resp['message'] = f'success to add user:{username},password {pwd},remember it!'log.logger.info(f"url:{request.url},params:{username},resp:{resp}")# 异常返回except Exception:log.logger.info(f"url:{request.url},params:{username},resp:{resp}")resp['error'] = f"failed to insert to mysql:{traceback.format_exc()}",500return jsonify(resp)return jsonify(resp)# 请求不是json格式else:resp['error'] = "Invalid JSON format in request"return jsonify(resp),400
问题
- 没有调用次数限制,可能被Dos或DDos攻击,造成服务器压力
- 没有防止数据篡改或伪造,被抓包后可能被篡改或伪造数据
- 没有对重放攻击的防御,抓包后可能重放包
- 越权问题,管理员接口任何人都可以调用
- SQL注入,没有做参数化查询等防止SQL操作,可能被拖库
引入问题:
- 请求的密码进行base64编码,容易获得并解码获得明文
- 数据库密码明文存储
- 数据库sql语句执行没有按事务处理,可能用户插入到数据库但密码没有入库
API Key
流程
- 客户端发送请求时通过query string、请求头(X-API-Key或其他自定义请求头)、Cookie中携带apikey
- 服务端根据约定的方式获取后查询数据库判断是否存在
主要代码
def api_key_auth(f):@wraps(f)def decorated_function(*args, **kwargs):resp = {"requestid": uuid.uuid4()}# api key检查if "api_key" not in request.headers:resp['error'] = 'api_key is required in headers'log.logger.error(f"url:{request.url},resp:{resp}")return jsonify(resp), 400if not check_api_key(request.headers["api_key"]):resp['error'] = f'api_key {request.headers["api_key"]} is invalid'log.logger.error(f"url:{request.url},resp:{resp}")return jsonify(resp), 401return f(*args, **kwargs)return decorated_function
# 管理员注册用户接口 v2.0 增加api token
@app.route('/api/v2.0/admin/api_key/add_user', methods=['POST'])
@api_key_auth
def add_user_api_key():resp = {"requestid": uuid.uuid4()}# request是jsonif request.is_json:data = request.get_json()username = data.get('username')if username is None:resp['error'] = 'username is required'log.logger.error(f"url:{request.url},params:{username},resp:{resp}")return jsonify(resp),400phone_number = data.get('phone_number')if phone_number is None:resp['error'] = 'phone_number is required'log.logger.error(f"url:{request.url},params:{phone_number},resp:{resp}")return jsonify(resp),400if not validate_username(username):resp['error'] = 'username length should not exceed 10'log.logger.error(f"url:{request.url},params:{username},resp:{resp}")return jsonify(resp),400try:cli = MysqlCli(MYSQL_HOST,MYSQL_USER,MYSQL_PASSWORD,MYSQL_DATABASE)cli.insert_one("users",{"username": username,"phone_number": phone_number})key = secrets.token_hex(16)cli.insert_one("keys",{"username": username,"key": key})cli.close()resp['message'] =f'success to add user:{username},key {key},remember it!'log.logger.info(f"url:{request.url},params:{username},resp:{resp}")except Exception:log.logger.info(f"url:{request.url},params:{username},resp:{resp}")resp['error'] = f"failed to insert to mysql:{traceback.format_exc()}"return jsonify(resp),500return jsonify(resp)else:resp['error'] = "Invalid JSON format in request"return jsonify(resp),400
博主这里实现时,添加了自定义请求头api_key
问题
- 没有调用次数限制,可能被Dos或DDos攻击,造成服务器压力
- 没有防止数据篡改或伪造,被抓包后可能被篡改或伪造数据
- 没有对重放攻击的防御,抓包后可能重放包
- 越权问题,管理员接口任何人都可以调用
- SQL注入,没有做参数化查询等防止SQL操作,可能被拖库
引入问题:
- API key明文发送,容器抓包获取
- API key明文存储
- 数据库sql语句执行没有按事务处理,可能用户插入到数据库但api key没有入库
Bearer auth/Token auth
流程
- 请求端通过某种认证机制(比如用户名密码登录、OAuth 认证等)获取令牌。 在发起 HTTP 请求时,客户端将这个令牌添加到Authorization 请求头中,格式为 “Bearer token”。
- 服务器接收到请求后,会验证这个令牌的有效性,如果有效则允许请求继续处理,否则拒绝访问。
使用 Bearer authentication 的优势在于令牌本身可以包含更多的信息、具有较长的有效期,并且不需要在服务器端保存会话状态,这样可以减轻服务器负担并提高安全性。
这里就不实现了,后面通过jwt token,算是实现其中的一种。
Digest Auth
流程
- 客户端发送一个未经认证的请求给服务器。 服务器返回一个 401 Unauthorized响应,有一个响应头WWW-Authenticate,其中包含一个随机数(nonce)和其他认证需要的信息。
- 客户端收到 401响应后,会向用户提示输入用户名和密码,然后根据特定的算法(通常是 MD5)对用户名、密码、随机数(nonce)、HTTP 方法和请求的URI 进行摘要计算。
- 客户端将计算出的摘要放在 Authorization 请求头中发送给服务器。
- 服务器收到请求后,会根据事先约定好的算法再次计算摘要,如果两个摘要匹配,则验证通过,否则拒绝访问。
涉及的几个常见参数如下:
- realm,必选。是一个保护空间的名称,用于向用户表明请求的资源属于哪个保护空间。它通常用于表示一组受保护的资源,用于构造摘要字符串。
- nonce,必选。是一个唯一的字符串,401时由服务器生成并发送给客户端。它用于防止重放攻击(replay attack)。每次认证请求都会使用一个新的 nonce 值,使得每次请求的摘要都是不同的,从而提高了安全性。
- qop(Quality of Protection) ,必选。可以是 “auth” 或 “auth-int”。auth 代表身份验证,而 auth-int 代表身份验证和消息完整性保护。Qop 的存在使得摘要认证更加灵活和安全。
- algorithm,可选,默认MD5。指定了用于计算摘要的哈希算法,通常是 MD5。服务器在 WWW-Authenticate 响应头中指定,客户端按照这个算法进行摘要计算。还有MD5-sess、SHA、SHA-256、SHA-512等。
- nc,可选,是一个计数器,用于跟踪特定 nonce 值的使用次数。每次客户端发送请求时,Nc 都会递增,帮助防止重放攻击。
- cnonce(Client Nonce) ,可选。是客户端生成的随机字符串,用于与服务器的 nonce 一起使用,以增加请求的独特性和安全性。
- opaque,可选。是服务器生成的字符串,客户端在后续请求中必须原样返回。它用来保持服务器状态或防止某些类型的攻击。
- charset,可选。默认utf-8,编码方式。
- userhash,可选。默认false。服务端返回的是否支持username哈希。
主要代码
# 将字符串保存到Redis中,并设置过期时间
def save_nonce_with_expiry(key, value, expiry_seconds):""":param key: 键:param value: 值:param expiry_seconds: 过期时间(秒)"""# 连接Redis数据库redis_client = redis.StrictRedis(host=REDIS_HOST,password=REDIS_PASSWORD, port=REDIS_PORT, db=REDIS_DB)redis_client.setex(key, expiry_seconds, value)def check_nonce(key):"""检查字符串是否存在于Redis中:param key: 键:return: 布尔值,表示键是否存在"""# 连接Redis数据库redis_client = redis.StrictRedis(host=REDIS_HOST,password=REDIS_PASSWORD ,port=REDIS_PORT, db=REDIS_DB)return redis_client.exists(key)# 校验username 返回密码
def check_username(username):try:cli = MysqlCli(MYSQL_HOST, MYSQL_USER, MYSQL_PASSWORD, MYSQL_DATABASE)sql = f"select * from passwords where username = '{username}' limit 1"user = cli.select_one(sql)return user['password'] if user else ''except Exception:log.logger.error(f"check_basic_auth failed,error:{traceback.format_exc()}")return False# 校验response
def check_response(response,realm,username,password,method,uri,req_nonce,nc,cnonce,qop):try:log.logger.info(f"check_response ,response:{response},realm:{realm},username:{username},"f"password:{password},method:{method},uri:{uri},req_nonce:{req_nonce},nc:{nc},"f"cnonce:{cnonce},qop:{qop}")# 校验nonceif not check_nonce(req_nonce):log.logger.error(f"check_signature failed,error:{req_nonce} not exist!")return Falseha1=hashlib.md5(f"{username}:{realm}:{password}".encode()).hexdigest()ha2=hashlib.md5(f"{method}:{uri}".encode()).hexdigest()return response == hashlib.md5(f"{ha1}:{req_nonce}:{nc}:{cnonce}:{qop}:{ha2}".encode()).hexdigest()except Exception:log.logger.error(f"check_responce failed,error:{traceback.format_exc()}")return False# 添加用户 用于添加用户时无需额外增加用户关联的密钥等信息时的接口
def add_user():resp = {"requestid": uuid.uuid4()}log.logger.info(request.headers)# request是jsonif request.is_json:data = request.get_json()username = data.get('username')if username is None:resp['error'] = 'username is required'log.logger.error(f"url:{request.url},params:{username},resp:{resp}")return jsonify(resp)phone_number = data.get('phone_number')if phone_number is None:resp['error'] = 'phone_number is required'log.logger.error(f"url:{request.url},params:{phone_number},resp:{resp}")return jsonify(resp)if not validate_username(username):resp['error'] = 'username length should not exceed 10'log.logger.error(f"url:{request.url},params:{username},resp:{resp}")return jsonify(resp)try:cli = MysqlCli(MYSQL_HOST, MYSQL_USER, MYSQL_PASSWORD, MYSQL_DATABASE)cli.insert_one("users",{"username": username,"phone_number": phone_number})cli.close()resp['message'] = f'success to add user:{username}!'log.logger.info(f"url:{request.url},params:{username},resp:{resp}")except Exception:log.logger.info(f"url:{request.url},params:{username},resp:{resp}")resp['error'] = f"failed to insert to mysql:{traceback.format_exc()}"return jsonify(resp)else:resp['error'] = "Invalid JSON format in request"return jsonify(resp)# 管理员注册用户接口 v2.0增加digest算法
@app.route('/api/v2.0/admin/digest_auth/add_user', methods=['POST'])
@digest_auth
def add_user_digest():return add_user()
输入账号密码后登录,发起请求
问题
- 没有调用次数限制,可能被Dos或DDos攻击,造成服务器压力
- 没有防止数据篡改或伪造,被抓包后可能被篡改或伪造数据
- 没有对重放攻击的防御,抓包后可能重放包
- 越权问题,管理员接口任何人都可以调用
- SQL注入,没有做参数化查询等防止SQL操作,可能被拖库
优点:
- 可以防止重放攻击
引入问题:
- 发送两个请求,更加消耗资源
- 需要存储nonce,这里有设置过期时间
- 实现比较复杂
JWT Token
流程
通常由三个部分组成:header、payload 和 signature。
- header:包含两个部分:令牌类型(即 “JWT”)和所使用的签名算法(如 HMAC SHA256 或 RSA)。
- payload:包含声明(claims)。声明是关于实体(通常是用户)和其他数据的声明。
- iss(Issuer):该声明标识了 JWT 的发行者。
- sub(Subject):该声明标识了 JWT 的主题,即所描述的实体。
- aud(Audience):该声明标识了 JWT 的受众,即预期的接收者。
- exp(Expiration Time):该声明指定了 JWT 的过期时间,在此时间之后,JWT 将被认为是无效的。
- nbf(Not Before):该声明指定了 JWT 的生效时间,在此时间之前,JWT 将被认为是无效的。
- iat(Issued At):该声明指定了 JWT 的签发时间。
- jti(JWT ID):该声明为 JWT 提供了一个唯一标识符。
- signature:为了确保 JWT 未被篡改,需要对编码后的 header 和 payload 使用指定的签名算法和一个密钥进行签名。
代码
def jwt_auth(f):@wraps(f)def decorated_function(*args, **kwargs):resp = {"requestid": uuid.uuid4()}auth_header = request.headers.get('Authorization')if not auth_header or not auth_header.startswith("Bearer "):resp["message"] = "no Authorization header or invalid format"return jsonify(resp), 400try:token = auth_header.split(' ')[1]jwt.decode(token, JWT_SECRET, algorithms=['HS256'])except jwt.ExpiredSignatureError:resp["error"] = "token has expired"log.logger.error(f"url:{request.url},token:{token},resp:{resp}")return jsonify(resp), 401except jwt.InvalidTokenError:resp["message"] = f"invalid token {token}"log.logger.error(f"url:{request.url},token:{token},resp:{resp}")return jsonify(resp), 401except Exception as e:resp["message"] = f"interal error {e}"log.logger.error(f"url:{request.url},token:{token},resp:{resp}")return jsonify(resp), 500return f(*args, **kwargs)return decorated_function# 用户登录获取jwt token接口
@app.route('/api/v2.0/admin/jwt/login', methods=['POST'])
def get_jwt_token():resp = {"requestid": uuid.uuid4()}if request.is_json:data = request.get_json()username = data.get('username')password = data.get('password')if username is None or password is None:resp['error'] = 'username or password is required'log.logger.error(f"url:{request.url},params:{username},resp:{resp}")return jsonify(resp),400if not validate_username(username):resp['error'] = 'username length should not exceed 10'log.logger.error(f"url:{request.url},params:{username},resp:{resp}")return jsonify(resp),400try:cli = MysqlCli(MYSQL_HOST, MYSQL_USER, MYSQL_PASSWORD, MYSQL_DATABASE)sql = f"select * from passwords where username = '{username}' limit 1"user = cli.select_one(sql)if not user:resp['error'] = 'username not found'log.logger.error(f"url:{request.url},params:{username},resp:{resp}")return jsonify(resp), 400if user['password'] != password:resp['error'] = 'password is not right'log.logger.error(f"url:{request.url},params:{username},resp:{resp}")return jsonify(resp), 401payload = {"iss": "lady_killer9","exp": datetime.now() + timedelta(seconds=5*60),"jti": str(uuid.uuid4())}resp['message'] = f'success to login :{user},token:{jwt.encode(payload,JWT_SECRET,algorithm="HS256")}'log.logger.info(f"url:{request.url},params:{username},resp:{resp}")return jsonify(resp), 200except Exception:resp['error'] = f'failed to get user, error:{traceback.format_exc()}'log.logger.error(f"url:{request.url},params:{username},resp:{resp}")return jsonify(resp),500else:resp['error'] = "Invalid JSON format in request"return jsonify(resp),400# 管理员注册用户接口 v2.0增加jwt sha256算法
@app.route('/api/v2.0/admin/jwt/add_user', methods=['POST'])
@jwt_auth
def add_user_jwt():return add_user()# 用户获取信息接口 v2.0 增加jwt sha256算法
@app.route('/api/v2.0/jwt/get_user_info', methods=['POST'])
@jwt_auth
def get_user_info_jwt():return get_user_info()
问题
- 没有调用次数限制,可能被Dos或DDos攻击,造成服务器压力
- 没有防止数据篡改或伪造,被抓包后可能被篡改或伪造数据
- 没有对重放攻击的防御,抓包后可能重放包
- 越权问题,管理员接口任何人都可以调用
- SQL注入,没有做参数化查询等防止SQL操作,可能被拖库
优点:
- token可以设置时间限制,过期后不可调用
- payload的aud等参数可以用于鉴权
- 可添加自定义payload,方便做其他的功能
引入问题:
- jwt token容易破解
bejson jwt在线解密
Hmac
流程
和Digest Auth差不多,可以由客户端生成随机数,这样请求一次即可,随机数不可重复。
主要代码
# 验证摘要
def check_signature(signture:str,username:str,nonce:int,data:dict):try:cli = MysqlCli(MYSQL_HOST, MYSQL_USER, MYSQL_PASSWORD, MYSQL_DATABASE)sql = f"select * from `secrets` where `username` = '{username}' limit 1"secret = cli.select_one(sql)cli.close()if secret:server_signature = hmac.new(str(secret['secret']).encode('utf-8'), json.dumps(data).encode('utf-8'), hashlib.sha256).hexdigest()log.logger.info(f"secret:{str(secret['secret'])},data:{data},server_signature:{server_signature}")return server_signature == signturelog.logger.error(f"check_signature failed,error:{username} {secret}")return Falseexcept Exception:log.logger.error(f"check_signature failed,error:{traceback.format_exc()}")return Falsedef hmac_auth(f):@wraps(f)def decorated_function(*args, **kwargs):resp = {"requestid": uuid.uuid4()}# query检查 nonce usernamenonce = request.args.get('nonce', type=int)username = request.args.get('username')if nonce is None or username is None:resp['error'] = 'username or nonce not found in query string'log.logger.error(f"url:{request.url},resp:{resp}")return jsonify(resp)# 随机数验证if check_nonce(nonce):log.logger.error(f"check_signature failed,error:{nonce} is in database")resp['error'] = f"check_signature failed,error:{nonce} is in database"log.logger.error(f"url:{request.url},resp:{resp}")return jsonify(resp)save_nonce_with_expiry(nonce,1,MIN*60)# signature检查if "Signature" not in request.headers:resp['error'] = 'signature is required in headers'log.logger.error(f"url:{request.url},resp:{resp}")return jsonify(resp)# request是jsonif request.is_json:data = request.get_json()if not check_signature(request.headers['Signature'], username, nonce, data):resp['error'] = f'signature {request.headers["Signature"]} is invalid'log.logger.error(f"url:{request.url},resp:{resp}")return jsonify(resp),401else:resp['error'] = "Invalid JSON format in request"return jsonify(resp),400return f(*args, **kwargs)return decorated_function
# 管理员注册用户接口 v2.0增加hmac sha256算法
@app.route('/api/v2.0/admin/hmac/add_user', methods=['POST'])
@hmac_auth
def add_user_hmac():resp = {"requestid": uuid.uuid4()}# request是jsonif request.is_json:data = request.get_json()username = data.get('username')if username is None:resp['error'] = 'username is required'log.logger.error(f"url:{request.url},params:{username},resp:{resp}")return jsonify(resp)phone_number = data.get('phone_number')if phone_number is None:resp['error'] = 'phone_number is required'log.logger.error(f"url:{request.url},params:{phone_number},resp:{resp}")return jsonify(resp)if not validate_username(username):resp['error'] = 'username length should not exceed 10'log.logger.error(f"url:{request.url},params:{username},resp:{resp}")return jsonify(resp)try:cli = MysqlCli(MYSQL_HOST,MYSQL_USER,MYSQL_PASSWORD,MYSQL_DATABASE)cli.insert_one("users",{"username": username,"phone_number": phone_number})secret = secrets.token_hex(16)cli.insert_one("secrets",{"username": username,"secret": secret})cli.close()resp['message'] =f'success to add user:{username},secret {secret},remember it!'log.logger.info(f"url:{request.url},params:{username},resp:{resp}")except Exception:log.logger.info(f"url:{request.url},params:{username},resp:{resp}")resp['error'] = f"failed to insert to mysql:{traceback.format_exc()}"return jsonify(resp)else:resp['error'] = "Invalid JSON format in request"return jsonify(resp)# 用户获取信息接口 v2.0 增加hmac
@app.route('/api/v2.0/hmac/get_user_info', methods=['POST'])
@hmac_auth
def get_user_info_hmac():return get_user_info()
问题
- 没有调用次数限制,可能被Dos或DDos攻击,造成服务器压力
- 没有防止数据篡改或伪造,被抓包后可能被篡改或伪造数据
- 没有对重放攻击的防御,抓包后可能重放包
- 越权问题,管理员接口任何人都可以调用
- SQL注入,没有做参数化查询等防止SQL操作,可能被拖库
引入问题:
- 需要存储nonce,这里有设置过期时间
OAuth
内容太多,留坑先不看了
比较
比较项 | Basic Auth | API key | Bearer Auth/Token auth | Digest Auth | Hmac Auth | JWT |
---|---|---|---|---|---|---|
身份认证 | √ | √ | √ | √ | √ | √ |
密钥加密 | × | × | × | √ | √ | × |
服务端存储 | √ | √ | √ | √ | √ | ×(token可以不存) |
防重放 | × | × | × | √ | √ | ×(token失效前可重放) |
时效性 | × | × | × | √ | √ | √ |
自定义 | × | × | √ | √ | √ | √ |
通过以上比较,如果设计一个签名具有以下优点会比较好:
- 防重放。通过随机数防止请求重放,随机数由客户端计算可以减轻服务端压力。
- 密钥加密。使用密钥计算签名。
- 服务端存储。存储随机数,设置过期时间。
- 时效性。添加时间戳,过期后请求失败。
- 自定义。自定义请求签名当然可以自定义一些东西,用于鉴权等。
自定义请求签名
身份认证&密钥加密
请求需要验证身份,就需要有账密,这里就用SecretId、SecretKey,其中SecretKey用于进行签名的计算。
防重放
为了防重放,生成一个随机数Nonce,Nonce唯一,这样服务端收到携带该Nonce的请求后,还发送带该Nonce的请求,就拒绝掉。因此Nonce需要服务器保存,同时为了防止篡改,签名时需要。
那么问题来了,Nonce需要服务器保存,不能一直保存吧,随着时间推移,存储成本会越来越高,因此需要时间限制。
请求时效性
请求应该具有时效性,这里使用unix时间戳Timestamp。规定在1分钟内请求有效,这样Nonce保存时间在1分钟即可。
请求签名算法设计
密钥加密选择SHA-256算法,当然算法可以当做参数,由客户端指定,就用Algorithm吧
url类似:xxx?SecretId=xxx&Nonce=xxx&Timestamp=xxx&Algorithm=xxx
添加一个自定义请求头Signature,放上签名,待签名字符串规定格式如下
{Nonce}:{Timestamp}:{Algorithm}:{HTTPMethod}:{base64(HTTPBody)}
- HTTPMethod:HTTP请求方法,例如POST
- HTTPBody:HTTP请求体,例如{“name”:“lady_killer9”}
当然,还可以添加更多到待签名字符串
客户端
- 生成一个随机字符串(不包含:)
- 对请求头和请求体做base64编码,并按照格式拼接
- 使用算法和SecretKey进行签名
- 生成时间戳,发送请求
服务端
- 从url获取时间戳Timestamp,校验是否在时间内
- 从url获取随机字符串Nonce,校验是否在数据库中
- 对请求头和请求体做base64编码
- 从url获取SecretId后从数据库查询对应的SecretKey
- 使用SecretKey和Algorithm算法对拼接同样格式字符串进行签名得到ServerSignature,比较Signature是否与请求头Signature的值一致
代码
def verify_signature(signature, nonce,timestamp,algorithm,method,body_base64):format_str = f"{nonce}:{timestamp}:{algorithm}:{method}:{body_base64}"server_signature = hashlib.sha256(format_str.encode()).hexdigest()log.logger.info(format_str)log.logger.info(server_signature)return signature == server_signaturedef require_signature(f):@wraps(f)def decorated_function(*args, **kwargs):resp = {"requestid": uuid.uuid4()}secret_id = request.args.get('SecretId', type=str)nonce = request.args.get('Nonce', type=str)timestamp = request.args.get('Timestamp', type=int)algorithm = request.args.get('Algorithm', type=str)if secret_id is None:resp['error'] = 'No SecretId in query string'return jsonify(resp),400if nonce is None:resp['error'] = 'No Nonce in query string'return jsonify(resp),400if timestamp is None:resp['error'] = 'No Timestamp in query string'return jsonify(resp),400if algorithm is None:resp['error'] = 'No Algorithm in query string'return jsonify(resp),400if algorithm not in ["sha256"]:resp['error'] = f'can not support {algorithm}'return jsonify(resp), 400if (datetime.now() - timedelta(minutes=MIN)).timestamp() > timestamp:resp['error'] = 'Request is send before 5 mins ago, check Timestamp'return jsonify(resp), 400if ':' in nonce:resp['error'] = 'can not contain : in Nonce, generate a new one'return jsonify(resp), 400cli = redis.Redis.from_url(REDIS_URL)if cli.exists(nonce):resp['error'] = 'can not request with same Nonce'return jsonify(resp), 400else:cli.setex(nonce, MIN*60, 1)signature = request.headers.get('Signature')if not signature:resp['error'] = 'no Signature in headers'return jsonify(resp), 400# 解析 Authorization header,验证签名body_base64 = bytes.decode(b64encode(json.dumps(request.get_json(),ensure_ascii=False).encode()))if not verify_signature(signature, nonce,timestamp,algorithm,request.method,body_base64):resp['error'] = 'invalid signature'return jsonify(resp), 401return f(*args, **kwargs)return decorated_function
攻击与防御
SQL注入
例如,在v1.0的get_user_info接口,存在将用户输入拼接到sql的漏洞,可以被SQL注入。
抓包如下:
防御方面可以通过预编译等方式来解决
v3.0已解决
敏感信息泄露
例如,在v1.0的get_user_info接口,用户手机号被完整返回,没有打码。
防御上可以通过加*打码或MFA等来解决
v3.0已解决
越权攻击
例如,在v2.0的api key认证接口,任意用户都能查询admin用户的信息,只需要知道username即可
防御方面可以通过添加RBAC等鉴权来解决
重放攻击
例如,v2.0的api key认证接口,设置burpsuite代理,放到重放器Reapter,发送多少次都可以。
v3.0通过自定义请求签名就解决了此类问题。
全部代码
Github-api_history
参考
API-Security Owasp top 10
API 鉴权都有哪些分类,这些重点不要错过
best-practices-for-authentication-and-authorization-for-rest-apis/
pyjwt
https://github.com/ticarpi/jwt_tool
rfc6750-The OAuth 2.0 Authorization Framework: Bearer Token Usage
rfc7616-HTTP Digest Access Authentication
rfc2617-HTTP Authentication: Basic and Digest Access Authentication
rfc7519-JSON web Token (JWT)
Github-jwt_tool
相关文章:
网络安全-如何设计一个安全的API(安全角度)
目录 API安全概述设计一个安全的API一个基本的API主要代码调用API的一些问题 BasicAuth认证流程主要代码问题 API Key流程主要代码问题 Bearer auth/Token auth流程 Digest Auth流程主要代码问题 JWT Token流程代码问题 Hmac流程主要代码问题 OAuth比较自定义请求签名身份认证&…...
微积分-导数1(导数与变化率)
切线 要求与曲线 C C C相切于 P ( a , f ( a ) ) P(a, f(a)) P(a,f(a))点的切线,我们可以在曲线上找到与之相近的一点 Q ( x , f ( x ) ) Q(x, f(x)) Q(x,f(x)),然后求出割线 P Q PQ PQ的斜率: m P Q f ( x ) − f ( a ) x − a m_{PQ} \…...
最新PHP仿猪八戒任务威客网整站源码/在线接任务网站源码
资源介绍 老规矩,截图为亲测,前后台显示正常,细节功能未测,有兴趣的自己下载。 PHP仿猪八戒整站源码下载,phpmysql环境。威客开源建站系统,其主要交易对象是以用户为主的技能、经验、时间和智慧型商品。经…...
Windows安装配置jdk和maven
他妈的远程连接不上公司电脑,只能在家重新配置一遍,在此记录一下后端环境全部配置 Windows安装配置JDK 1.8一、下载 JDK 1.8二、配置环境变量三、验证安装 Windows安装配置Maven 3.8.8一、下载安装 Maven并配置环境变量二、设置仓库镜像及本地仓库三、测…...
电子SOP实施(MQTT协议)
架构图 服务与程序 用docker启动mqtt broker(服务器) 访问:http://192.168.88.173:18083/#/dashboard/overview 用户名:admin 密码:*** 消息发布者(查找sop的url地址,发布出去) 修改url,重新发布消息 import ran…...
【Unity导航系统】Navigation组件的概念及其使用示例
Unity中的NavMeshObstacle组件是一个用于动态障碍物的组件,它可以实时地影响导航网格(NavMesh)。当游戏对象附加了NavMeshObstacle组件时,它可以在AI进行路径规划时被识别为障碍物,从而让AI避开这些动态变化的障碍。 …...
vue-cli 根据文字生成pdf格式文件 jsPDF
1.安装jspdf npm install jspdf --save 2.下载ttf格式文件 也可以用C:\Windows\Fonts下的字体文件,反正调一个需要的ttf字体文件就行,但有的字体存在部分字体乱码现象 微软雅黑ttf下载地址: FontsMarket.com - Download Microsoft YaHei …...
【嵌入式DIY实例】-Nokia 5110显示DS3231 RTC数据
Nokia 5110显示DS3231 RTC数据 文章目录 Nokia 5110显示DS3231 RTC数据1、硬件准备与接线2、代码实现本文将介绍如何使用 ESP8266 NodeMCU 板和 DS3231 RTC 模块制作一个简单的数字实时时钟,其中可以使用连接到 NodeMCU 的两个按钮设置时间和日期,并将它们打印在诺基亚 5110 …...
【十三】图解mybatis缓存模块之装饰器模式
图解mybatis缓存模块之装饰器模式 简介 之前有写过一篇博客介绍过mybatis的缓存模块设计【九】mybatis 缓存模块设计-CSDN博客 ,当时着重讲解的是mybatis种一级缓存和二级缓存,本次博客补充讲解一下装饰器模式的应用,本篇主要分两部分讲解&a…...
字节大神强推千页PDF学习笔记,弱化学历问题,已拿意向书字节提前批移动端!
主要问java,以及虚拟机,问了一点android 1.实习项目有关的介绍以及问题回答 2.反射与代理的区别,动态代理,静态代理,二者的区别,以及代理模式的UML图 3.字节码技术 4.虚拟机的双亲委派,以及好…...
Python爬虫-贝壳二手房“改进版”
前言 本文是该专栏的第31篇,后面会持续分享python爬虫干货知识,记得关注。 在本专栏之前的文章《Python爬虫-贝壳二手房》中,笔者有详细介绍,基于python爬虫采集对应城市的二手房数据。 而在本文,笔者将基于该项目案例的基础上,进行一个项目代码的“改进版”。 具体实…...
zookeeper学习、配置文件参数详解
zookeeper学习、配置文件参数详解 zookeeper 配置文件参数详解tickTime 、session 的过期时间、maxSessionTimeout 三者之间的关系initLimit,syncLimit什么区别minSessionTimeout 默认值,**他的单位是ms** zookeeper 配置文件参数详解 ZooKeeper 是一个分布式协调服…...
SVG 模糊效果
SVG 模糊效果 SVG(Scalable Vector Graphics,可缩放矢量图形)是一种基于XML的图像格式,用于描述二维图形。它是一种矢量图形格式,因此可以无限放大而不失真。SVG广泛应用于网页设计、动画制作和图形编辑等领域。本文将介绍SVG中一种特殊的效果——模糊效果,以及如何使用…...
Electron+vite+vuetify项目搭建
最近想用Electron来进行跨平台的桌面应用开发。同时想用vuetify作为组件,于是想搭建一个这样的开发环境。其中踩了不少坑,总是会出现各种的编译错误和问题,依赖的各种问题,搞了好久最终环境终于弄好可正常开发了。这里分享下快速搭…...
洛谷:P1085 [NOIP2004 普及组] 不高兴的津津
1. 题目链接 https://www.luogu.com.cn/problem/P1085 P1085 [NOIP2004 普及组] 不高兴的津津 2. 题目描述 题目描述:津津每天要上课还要上辅导班,每天学习超过8小时就不开心,帮忙检查下津津的下周日程安排,然后告诉我她哪天不高…...
Webpack4从入门到精通以及和webpack5对比_webpack现在用的是哪个版本
3.1 打包样式资源css-loader、style-loader… {// 匹配哪些文件test: /\.less$/,// 使用哪些loader进行处理use: [// use数组中loader执行顺序:从右到左,从下到上,依次执行(先执行css-loader)// style-loader:创建style标签&#…...
巴鲁夫MacroBuilder2.0.0.0软件巴鲁夫和使用手侧
巴鲁夫MacroBuilder2.0.0.0软件巴鲁夫和使用手侧...
分享:Javascript开源桌面环境-Puter
Puter这是一个运行在浏览器里的桌面操作系统,提供了笔记本、代码编辑器、终端、画图、相机、录音等应用和一些小游戏。该项目作者出于性能方面的考虑没有选择 Vue 和 React 技术栈,而是采用的 JavaScript 和 jQuery 构建,支持 Docker 一键部署…...
【idea-jdk1.8】使用Spring Initializr 创建 Spring Boot项目没有JDK8
信息差真可怕! 很久没创建springboot项目,今天使用idea的Spring Initializr 创建 Spring Boot项目时,发现java版本里,无法选择jdk1.8,只有17、21、22;前段时间也听说过,springboot将放弃java8&a…...
647. 回文子串(leetcode)
647. 回文子串(leetcode) 题目描述 给你一个字符串 s ,请你统计并返回这个字符串中回文子串的数目。 回文字符串 是正着读和倒过来读一样的字符串。 子字符串 是字符串中的由连续字符组成的一个序列。 示例1 输入:s “abc” 输出…...
【车载开发系列】汽车嵌入式开发常用工具介绍
【车载开发系列】汽车嵌入式开发常用工具介绍 【车载开发系列】汽车嵌入式开发常用工具介绍 【车载开发系列】汽车嵌入式开发常用工具介绍一. ChipON IDE For KungFu32二. ChipON PRO KF32三. GIT四. JLink五. S32DS六. parasoft ctest七. TCANLINPro八. vector Canoe 一. Chip…...
python脚本获取本机IP的方式
#方法一: #!/usr/bin/python import socket import fcntl import struct def get_ip_address(ifname): s socket.socket(socket.AF_INET,socket.SOCK_DGRAM) return socket.inet_ntoa(fcntl.ioctl( s.fileno(), 0x8915, struct.pack(256s,ifna…...
查看LabVIEW及各个模块和驱动的版本号
要方便地查看当前计算机上安装的LabVIEW版本以及各个模块和驱动的版本号,可以使用以下几种方法: 1. 使用NI MAX (Measurement & Automation Explorer) NI MAX 是一个强大的工具,可以帮助你管理National Instruments硬件、软件和驱动程序…...
LLM主流架构和模型
本文参考自https://github.com/HqWu-HITCS/Awesome-Chinese-LLM?tabreadme-ov-file和Huggingface中的ModelCard(https://huggingface.co/) LLM主要类别架构 LLM本身基于transformer架构。自2017年,attention is all you need诞生起&#x…...
为企业提供动力:用于大型组织的WordPress
可扩展且灵活的架构可通过主题、插件和集成进行定制内置 SEO 功能和营销功能内容管理和协作工具支持多站点安装托管解决方案和面向平台的提供商采用现代前端技术的 Headless CMS 功能 拥有强大、灵活且可扩展的内容管理系统 (CMS) 对于大型组织至关重要。作为最受欢迎和广泛使用…...
Django框架数据库ORM查询操作
Django框架在生成数据库的models模型文件后,旧可以在应用中通过ORM来操作数据库了。今天抽空试了下查询语句。以下是常用的查询语句。 以下查询需要引入django的Sum,Count,Q模块 from django.db.models import Sum,Count,Q 导入生成的mode…...
font-spider按需生成字体文件
font-spider可以全局安装,也可以单个项目内安装,使用npm run xxxx的形式 npm i font-spider "dev": "font-spider ./*.html" <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name&…...
双叒叕-一个-Android-MVVM-组件化架构框架?
LifecycleViewModelLiveDataViewBindingAndroid KTXOkHttp:网络请求Retrofit:网络请求MMKV:腾讯基于 mmap 内存映射的 key-value 本地存储组件Glide:快速高效的Android图片加载库ARoute:阿里用于帮助 Android App 进行组件化改造的框架 —— 支持模块间的路由、通信、解耦BaseR…...
STM32单片机BKP备份寄存器和RTC实时时钟详解
文章目录 1. Unix时间戳 2. UTC/GMT 3. 时间戳转换 4. BKP简介 5. BKP基本结构 6. RTC简介 7. RTC框架图 8. RTC基本结构 9. 代码示例 1. Unix时间戳 实时时钟,本质上是一个定时器,专门用来产生年月日时分秒。 Unix 时间戳(Unix T…...
vue3+ts 使用vue3-ace-editor实现Json编辑器
1、效果图 输入代码,点击格式化就出现以上效果,再点击压缩,是以下效果2、安装 npm i vue3-ace-editor 3、使用 新建aceConfig.js文件 // ace配置,使用动态加载来避免第一次加载开销 import ace from ace-builds// 导入不同的主…...
公司做网站怎么样/资源优化排名网站
智慧 毅力 无所不能 正确性、健壮性、可靠性、效率、易用性、可读性、可复用性、兼容性、可移植性... Lua和C交互详细总结 转自:http://cn.cocos2d-x.org/tutorial/show?id1474 一、Lua堆栈 要理解Lua和C交互,首先要理解Lua堆栈。 简单来说࿰…...
快速优化网站建设/海南百度推广公司电话
夫妻之间订立借款协议,能否成立借款关系? 一般而言,除法律另有规定或当事人另有约定外,婚后夫妻一方所得财产无论存于哪一方的名下账户,均属夫妻共同共有。夫妻之间钱款往来转账,仅改变其控制权࿰…...
wordpress 登录不上/广告软文代理平台
1、坚持开店很考验人的心态 现在开水果店的密集度已经非常高,很多地方开了关,关了又开了一家新店,这些水果对顾客都起着非常重要的教育作用。 一开始顾客不怎么吃水果,对水果不怎么了解,慢慢的,水果店多了…...
建立专业的官方网站/2022年列入传销组织最新骗法
点击上方蓝字关注星标★不迷路本文作者:朱莉来源:Excel 小超人(ID:Julie1391)本文编辑:小叮、竺兰通配符,顾名思义是指通用的字符,能够代替任意字符。在 Excel 中,通配符也有很多应用。学会了它…...
天津工程网站建设/百度app下载并安装最新版
ToString方法大概是.Net时被用得最多的方法了,所有类型都,引用的,值的,都传承了这个从祖先Object开始的光荣传统。调用一次ToString,相当于惊堂木“啪”一下,大喝“堂下案犯报上名来”,这家伙就…...
番禺做网站600元/b2b平台网站
in 判断元素是否存在于容器当中 list1 [1, 2, 3] tuple1 (1, 2, 3) set1 {1, 2, 3} print(3 in list1) # True print(3 in tuple1) # True print(3 in set1) # True 如果要判断是否在set当中,要注意被判断的元素必须可以保存在set当中,如果是列表,字典,集合,则不能判断 …...