Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feature] 增加 aksk 授权方式 #640

Open
QingsiLiu opened this issue May 8, 2024 · 3 comments
Open

[feature] 增加 aksk 授权方式 #640

QingsiLiu opened this issue May 8, 2024 · 3 comments
Assignees
Labels
enhancement New feature or request p2 Priority second

Comments

@QingsiLiu
Copy link

昨天看到作者在知乎上的帖子,今天到公司后就安装体验,但是发现授权方式支持的较少。做的海外业务较多,希望可以对 aws 签名方式有支持
image
参考 apifox 中的方式:
image

@QingsiLiu QingsiLiu added the enhancement New feature or request label May 8, 2024
@MegatronKing
Copy link
Contributor

AWS授权目前还不支持,你可以通过启用脚本功能来动态生成授权。我写了个脚本,你试试:

from reqable import *
import hmac
import hashlib
import datetime

# TODO Replace with your config
access_key = 'YOUR_ACCESS_KEY'
secret_key = 'YOUR_SECRET_KEY'
region = 'us-east-1'
service = 's3'

def sign(key, msg):
  return hmac.new(key, msg.encode('utf-8'), hashlib.sha256).digest()

def get_signature_key(key, date_stamp, region_name, service_name):
  sign_date = sign(('AWS4' + key).encode('utf-8'), date_stamp)
  sign_region = sign(sign_date, region_name)
  sign_service = sign(sign_region, service_name)
  return sign(sign_service, 'aws4_request')

def onRequest(context, request):
  method = request.method
  # Build canonical query parameters.
  canonical_uri = request.path
  sorted_queries = sorted(request.queries)
  canonical_query_string = CaptureHttpQueries(sorted_queries).concat()

  # Calculate body hash.
  if request.body.isNone:
    payload_hash = hashlib.sha256(''.encode('utf-8')).hexdigest()
  elif request.body.isText:
    payload_hash = hashlib.sha256(request.body.encode('utf-8')).hexdigest()
  elif request.body.isBinary:
    payload_hash = hashlib.sha256(request.body).hexdigest()
  else:
    raise Exception('Unsupported body type.')
  request.headers['X-Amz-Content-Sha256'] = payload_hash

  # Build datetime
  t = datetime.datetime.utcnow()
  amz_date = t.strftime('%Y%m%dT%H%M%SZ')
  request.headers['X-Amz-Date'] = amz_date

  # Build canonical headers.
  sorted_headers = sorted(request.headers)
  canonical_headers = f'host:{context.host}\n'
  signed_headers = 'host;'
  for header in sorted_headers:
    name, value = header.split(': ')
    if name.lower().startswith('x-amz-') or name.lower() is 'content-type':
      canonical_headers += name.lower().strip() + ':' + value.strip() + '\n'
      signed_headers += name.lower().strip() + ';'

  # Build canonical request.
  canonical_request = f"{method}\n{canonical_uri}\n{canonical_query_string}\n{canonical_headers}\n{signed_headers}\n{payload_hash}"

  date_stamp = t.strftime('%Y%m%d')
  credential_scope = f"{date_stamp}/{region}/{service}/aws4_request"

  algorithm = 'AWS4-HMAC-SHA256'
  string_to_sign = f"{algorithm}\n{amz_date}\n{credential_scope}\n{hashlib.sha256(canonical_request.encode('utf-8')).hexdigest()}"
  signing_key = get_signature_key(secret_key, date_stamp, region, service)
  signature = hmac.new(signing_key, (string_to_sign).encode('utf-8'), hashlib.sha256).hexdigest()

  request.headers['Authorization'] = f'{algorithm} Credential={access_key}/{credential_scope}, SignedHeaders={signed_headers}, Signature={signature}'
  return request

def onResponse(context, response):
  return response

@MegatronKing
Copy link
Contributor

MegatronKing commented May 8, 2024

上面代码按照AWS官网文档要求实现的,由于我这里没有AWS的服务,无法实测。如果你测试遇到问题,欢迎反馈给我,或者提交PR到这里:https://github.com/reqable/python-scripting-templates/blob/main/templates/aws_signature.py

@QingsiLiu
Copy link
Author

AWS授权目前还不支持,你可以通过启用脚本功能来动态生成授权。我写了个脚本,你试试:

from reqable import *
import hmac
import hashlib
import datetime

# TODO Replace with your config
access_key = 'YOUR_ACCESS_KEY'
secret_key = 'YOUR_SECRET_KEY'
region = 'us-east-1'
service = 's3'

def sign(key, msg):
  return hmac.new(key, msg.encode('utf-8'), hashlib.sha256).digest()

def get_signature_key(key, date_stamp, region_name, service_name):
  sign_date = sign(('AWS4' + key).encode('utf-8'), date_stamp)
  sign_region = sign(sign_date, region_name)
  sign_service = sign(sign_region, service_name)
  return sign(sign_service, 'aws4_request')

def onRequest(context, request):
  method = request.method
  # Build canonical query parameters.
  canonical_uri = request.path
  sorted_queries = sorted(request.queries)
  canonical_query_string = CaptureHttpQueries(sorted_queries).concat()

  # Calculate body hash.
  if request.body.isNone:
    payload_hash = hashlib.sha256(''.encode('utf-8')).hexdigest()
  elif request.body.isText:
    payload_hash = hashlib.sha256(request.body.encode('utf-8')).hexdigest()
  elif request.body.isBinary:
    payload_hash = hashlib.sha256(request.body).hexdigest()
  else:
    raise Exception('Unsupported body type.')
  request.headers['X-Amz-Content-Sha256'] = payload_hash

  # Build datetime
  t = datetime.datetime.utcnow()
  amz_date = t.strftime('%Y%m%dT%H%M%SZ')
  request.headers['X-Amz-Date'] = amz_date

  # Build canonical headers.
  sorted_headers = sorted(request.headers)
  canonical_headers = f'host:{context.host}\n'
  signed_headers = 'host;'
  for header in sorted_headers:
    name, value = header.split(': ')
    if name.lower().startswith('x-amz-') or name.lower() is 'content-type':
      canonical_headers += name.lower().strip() + ':' + value.strip() + '\n'
      signed_headers += name.lower().strip() + ';'

  # Build canonical request.
  canonical_request = f"{method}\n{canonical_uri}\n{canonical_query_string}\n{canonical_headers}\n{signed_headers}\n{payload_hash}"

  date_stamp = t.strftime('%Y%m%d')
  credential_scope = f"{date_stamp}/{region}/{service}/aws4_request"

  algorithm = 'AWS4-HMAC-SHA256'
  string_to_sign = f"{algorithm}\n{amz_date}\n{credential_scope}\n{hashlib.sha256(canonical_request.encode('utf-8')).hexdigest()}"
  signing_key = get_signature_key(secret_key, date_stamp, region, service)
  signature = hmac.new(signing_key, (string_to_sign).encode('utf-8'), hashlib.sha256).hexdigest()

  request.headers['Authorization'] = f'{algorithm} Credential={access_key}/{credential_scope}, SignedHeaders={signed_headers}, Signature={signature}'
  return request

def onResponse(context, response):
  return response

谢谢!这个我也有实现!我在代码中测试是 OK 的啦哈哈,就是用工具时还不支持,所以来提个建议,谢谢咯

@MegatronKing MegatronKing added the p2 Priority second label May 9, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request p2 Priority second
Projects
None yet
Development

No branches or pull requests

2 participants