[Python] Support for HTTP signature (#4958)

* start implementation of HTTP signature

* add api key parameters for http message signature

* HTTP signature authentication

* start implementation of HTTP signature

* add api key parameters for http message signature

* HTTP signature authentication

* HTTP signature authentication

* start implementation of HTTP signature

* fix merge issues

* Address formatting issues

* Address formatting issues

* move python-experimental-openapiv3-sample to a separate PR

* Add support for HTTP signature

* Add code comments

* Add code comments

* Fix formatting issues

* Fix formatting issues

* Fix formatting issues

* add code comments

* add code comments

* fix python formatting issues

* Make PKCS1v15 string constant consistent between Python and Golang

* fix python formatting issues

* Add code comments in generated Python. Start adding unit tests for HTTP signature

* compliance with HTTP signature draft 12

* compliance with HTTP signature draft 12

* working on review comments

* working on review comments

* working on review comments

* working on review comments

* working on review comments

* working on review comments

* working on review comments

* working on review comments

* working on review comments

* fix python formatting issues

* fix trailing white space

* address PR comments

* address PR comments

* address PR comments

* Add suppport for '(expires)' signature parameter

* address PR comments

* address PR comments

* Fix python formatting issues

* Fix python formatting issues

* Starting to move code to dedicated file for HTTP signatures

* Continue to refactor code to dedicated file for HTTP signatures

* Continue to refactor code to dedicated file for HTTP signatures

* Continue to refactor code to dedicated file for HTTP signatures

* Continue to refactor code to dedicated file for HTTP signatures

* move method to ProcessUtils

* conditionally build signing.py

* move method to ProcessUtils

* Code reformatting

* externalize http signature configuration

* address PR review comments

* address PR review comments

* run samples scripts

* Address PR review comments

* Move 'private_key' field to signing module

* Move 'private_key' field to signing module

* code cleanup

* remove use of strftime('%s'), which is non portable

* code cleanup

* code cleanup

* code cleanup

* run sample scripts

* Address PR review comments.

* Add http-signature security scheme

* Run sample scripts for go

* Fix issue uncovered in integration branch

* Fix issue uncovered in integration branch

* Fix issue uncovered in integration branch

* Fix issue uncovered in integration branch

* Run samples scripts

* move http signature tests to separate file

* move http signature tests to separate file

* unit tests for HTTP signature

* continue implementation of unit tests

* add http_signature_test to security scheme

* add unit tests for http signature

* address review comments

* remove http signature from petapi

* Add separate OAS file with support for HTTP signature

* Add support for private key passphrase. Add more unit tests

* Add unit test to validate the signature against the public key

* remove http signature from petstore-with-fake-endpoints-models-for-testing.yaml

* fix unit test issues

* run scripts in bin directory

* Refact unit test with better variable names

* do not throw exception if security scheme is unrecognized

* change URL of apache license to use https

* sync from master

* fix usage of escape character in python regex. Fix generated python documentation

* write HTTP signed headers in user-specified order. Fix PEP8 formatting issues

* write HTTP signed headers in user-specified order. Fix PEP8 formatting issues

* http signature unit tests

* Fix PEP8 format issue

* spread out each requirement to a separate line

* run samples scripts

* run sample scripts

* remove encoding of '+' character
This commit is contained in:
Sebastien Rosset
2020-01-26 18:17:26 -08:00
committed by Justin Black
parent c0f7b47292
commit 4f350bc01c
30 changed files with 1827 additions and 44 deletions

View File

@@ -27,6 +27,8 @@ fi
# if you've executed sbt assembly previously it will use that instead.
export JAVA_OPTS="${JAVA_OPTS} -Xmx1024M -DloggerPath=conf/log4j.properties"
ags="generate -t modules/openapi-generator/src/main/resources/python -i modules/openapi-generator/src/test/resources/3_0/petstore-with-fake-endpoints-models-for-testing.yaml -g python-experimental -o samples/openapi3/client/petstore/python-experimental/ --additional-properties packageName=petstore_api $@"
#yaml="modules/openapi-generator/src/test/resources/3_0/petstore-with-fake-endpoints-models-for-testing.yaml"
yaml="modules/openapi-generator/src/test/resources/3_0/petstore-with-fake-endpoints-models-for-testing-with-http-signature.yaml"
ags="generate -t modules/openapi-generator/src/main/resources/python -i $yaml -g python-experimental -o samples/openapi3/client/petstore/python-experimental/ --additional-properties packageName=petstore_api $@"
java $JAVA_OPTS -jar $executable $ags

View File

@@ -25,12 +25,14 @@ import io.swagger.v3.oas.models.media.Schema;
import io.swagger.v3.oas.models.parameters.Parameter;
import io.swagger.v3.oas.models.parameters.RequestBody;
import io.swagger.v3.oas.models.responses.ApiResponse;
import io.swagger.v3.oas.models.security.SecurityScheme;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.lang3.StringUtils;
import org.openapitools.codegen.*;
import org.openapitools.codegen.examples.ExampleGenerator;
import org.openapitools.codegen.meta.features.*;
import org.openapitools.codegen.utils.ModelUtils;
import org.openapitools.codegen.utils.ProcessUtils;
import org.openapitools.codegen.meta.GeneratorMetadata;
import org.openapitools.codegen.meta.Stability;
import org.slf4j.Logger;
@@ -116,6 +118,14 @@ public class PythonClientExperimentalCodegen extends PythonClientCodegen {
supportingFiles.add(new SupportingFile("python-experimental/__init__package.mustache", packagePath(), "__init__.py"));
// Generate the 'signing.py' module, but only if the 'HTTP signature' security scheme is specified in the OAS.
Map<String, SecurityScheme> securitySchemeMap = openAPI != null ?
(openAPI.getComponents() != null ? openAPI.getComponents().getSecuritySchemes() : null) : null;
List<CodegenSecurity> authMethods = fromSecurity(securitySchemeMap);
if (ProcessUtils.hasHttpSignatureMethods(authMethods)) {
supportingFiles.add(new SupportingFile("python-experimental/signing.mustache", packagePath(), "signing.py"));
}
Boolean generateSourceCodeOnly = false;
if (additionalProperties.containsKey(CodegenConstants.SOURCECODEONLY_GENERATION)) {
generateSourceCodeOnly = Boolean.valueOf(additionalProperties.get(CodegenConstants.SOURCECODEONLY_GENERATION).toString());

View File

@@ -31,6 +31,8 @@ class Configuration(object):
The dict value is an API key prefix when generating the auth data.
:param username: Username for HTTP basic authentication
:param password: Password for HTTP basic authentication
:param signing_info: Configuration parameters for HTTP signature.
Must be an instance of {{{packageName}}}.signing.HttpSigningConfiguration
:Example:
@@ -49,11 +51,50 @@ class Configuration(object):
)
The following cookie will be added to the HTTP request:
Cookie: JSESSIONID abc123
Configure API client with HTTP basic authentication:
conf = {{{packageName}}}.Configuration(
username='the-user',
password='the-password',
)
Configure API client with HTTP signature authentication. Use the 'hs2019' signature scheme,
sign the HTTP requests with the RSA-SSA-PSS signature algorithm, and set the expiration time
of the signature to 5 minutes after the signature has been created.
Note you can use the constants defined in the {{{packageName}}}.signing module, and you can
also specify arbitrary HTTP headers to be included in the HTTP signature, except for the
'Authorization' header, which is used to carry the signature.
One may be tempted to sign all headers by default, but in practice it rarely works.
This is beccause explicit proxies, transparent proxies, TLS termination endpoints or
load balancers may add/modify/remove headers. Include the HTTP headers that you know
are not going to be modified in transit.
conf = {{{packageName}}}.Configuration(
signing_info = {{{packageName}}}.signing.HttpSigningConfiguration(
key_id = 'my-key-id',
private_key_path = 'rsa.pem',
signing_scheme = signing.SCHEME_HS2019,
signing_algorithm = signing.ALGORITHM_RSASSA_PSS,
signed_headers = [signing.HEADER_REQUEST_TARGET,
signing.HEADER_CREATED,
signing.HEADER_EXPIRES,
signing.HEADER_HOST,
signing.HEADER_DATE,
signing.HEADER_DIGEST,
'Content-Type',
'Content-Length',
'User-Agent'
],
signature_max_validity = datetime.timedelta(minutes=5)
)
)
"""
def __init__(self, host="{{{basePath}}}",
api_key=None, api_key_prefix=None,
username=None, password=None):
username=None, password=None,
signing_info=None):
"""Constructor
"""
self.host = host
@@ -82,6 +123,11 @@ class Configuration(object):
self.password = password
"""Password for HTTP basic authentication
"""
if signing_info is not None:
signing_info.host = host
self.signing_info = signing_info
"""The HTTP signing configuration
"""
{{#hasOAuthMethods}}
self.access_token = ""
"""access token for OAuth/Bearer
@@ -318,6 +364,15 @@ class Configuration(object):
'value': 'Bearer ' + self.access_token
}
{{/isBasicBearer}}
{{#isHttpSignature}}
if self.signing_info is not None:
auth['{{name}}'] = {
'type': 'http-signature',
'in': 'header',
'key': 'Authorization',
'value': None # Signature headers are calculated for every HTTP request
}
{{/isHttpSignature}}
{{/isBasic}}
{{#isOAuth}}
if self.access_token is not None:

View File

@@ -56,6 +56,9 @@ Class | Method | HTTP request | Description
{{#isBasicBearer}}
- **Type**: Bearer authentication{{#bearerFormat}} ({{{.}}}){{/bearerFormat}}
{{/isBasicBearer}}
{{#isHttpSignature}}
- **Type**: HTTP signature authentication
{{/isHttpSignature}}
{{/isBasic}}
{{#isOAuth}}
- **Type**: OAuth

View File

@@ -120,7 +120,7 @@ class {{classname}}(object):
{{#-first}}
'auth': [
{{/-first}}
'{{name}}'{{#hasMore}}, {{/hasMore}}
'{{name}}'{{#hasMore}},{{/hasMore}}
{{#-last}}
],
{{/-last}}

View File

@@ -150,13 +150,14 @@ class ApiClient(object):
collection_formats)
post_params.extend(self.files_parameters(files))
# auth setting
self.update_params_for_auth(header_params, query_params, auth_settings)
# body
if body:
body = self.sanitize_for_serialization(body)
# auth setting
self.update_params_for_auth(header_params, query_params,
auth_settings, resource_path, method, body)
# request url
if _host is None:
url = self.configuration.host + resource_path
@@ -517,12 +518,17 @@ class ApiClient(object):
else:
return content_types[0]
def update_params_for_auth(self, headers, querys, auth_settings):
def update_params_for_auth(self, headers, querys, auth_settings,
resource_path, method, body):
"""Updates header and query params based on authentication setting.
:param headers: Header parameters dict to be updated.
:param querys: Query parameters tuple list to be updated.
:param auth_settings: Authentication setting identifiers list.
:resource_path: A string representation of the HTTP request resource path.
:method: A string representation of the HTTP request method.
:body: A object representing the body of the HTTP request.
The object type is the return value of sanitize_for_serialization().
"""
if not auth_settings:
return
@@ -533,7 +539,17 @@ class ApiClient(object):
if auth_setting['in'] == 'cookie':
headers['Cookie'] = auth_setting['value']
elif auth_setting['in'] == 'header':
headers[auth_setting['key']] = auth_setting['value']
if auth_setting['type'] != 'http-signature':
headers[auth_setting['key']] = auth_setting['value']
{{#hasHttpSignatureMethods}}
else:
# The HTTP signature scheme requires multiple HTTP headers
# that are calculated dynamically.
signing_info = self.configuration.signing_info
auth_headers = signing_info.get_http_signature_headers(
resource_path, method, headers, body, querys)
headers.update(auth_headers)
{{/hasHttpSignatureMethods}}
elif auth_setting['in'] == 'query':
querys.append((auth_setting['key'], auth_setting['value']))
else:

View File

@@ -16,13 +16,22 @@ VERSION = "{{packageVersion}}"
# prerequisite: setuptools
# http://pypi.python.org/pypi/setuptools
REQUIRES = ["urllib3 >= 1.15", "six >= 1.10", "certifi", "python-dateutil"]
REQUIRES = [
"urllib3 >= 1.15",
"six >= 1.10",
"certifi",
"python-dateutil",
{{#asyncio}}
REQUIRES.append("aiohttp >= 3.0.0")
"aiohttp >= 3.0.0",
{{/asyncio}}
{{#tornado}}
REQUIRES.append("tornado>=4.2,<5")
"tornado>=4.2,<5",
{{/tornado}}
{{#hasHttpSignatureMethods}}
"pem>=19.3.0",
"pycryptodome>=3.9.0",
{{/hasHttpSignatureMethods}}
]
EXTRAS = {':python_version <= "2.7"': ['future']}
setup(

View File

@@ -0,0 +1,368 @@
# coding: utf-8
{{>partial_header}}
from __future__ import absolute_import
from base64 import b64encode
from Crypto.IO import PEM, PKCS8
from Crypto.Hash import SHA256, SHA512
from Crypto.PublicKey import RSA, ECC
from Crypto.Signature import PKCS1_v1_5, pss, DSS
from datetime import datetime
from email.utils import formatdate
import json
import os
import re
from six.moves.urllib.parse import urlencode, urlparse
from time import mktime
HEADER_REQUEST_TARGET = '(request-target)'
HEADER_CREATED = '(created)'
HEADER_EXPIRES = '(expires)'
HEADER_HOST = 'Host'
HEADER_DATE = 'Date'
HEADER_DIGEST = 'Digest' # RFC 3230, include digest of the HTTP request body.
HEADER_AUTHORIZATION = 'Authorization'
SCHEME_HS2019 = 'hs2019'
SCHEME_RSA_SHA256 = 'rsa-sha256'
SCHEME_RSA_SHA512 = 'rsa-sha512'
ALGORITHM_RSASSA_PSS = 'RSASSA-PSS'
ALGORITHM_RSASSA_PKCS1v15 = 'RSASSA-PKCS1-v1_5'
ALGORITHM_ECDSA_MODE_FIPS_186_3 = 'fips-186-3'
ALGORITHM_ECDSA_MODE_DETERMINISTIC_RFC6979 = 'deterministic-rfc6979'
ALGORITHM_ECDSA_KEY_SIGNING_ALGORITHMS = {
ALGORITHM_ECDSA_MODE_FIPS_186_3,
ALGORITHM_ECDSA_MODE_DETERMINISTIC_RFC6979
}
class HttpSigningConfiguration(object):
"""The configuration parameters for the HTTP signature security scheme.
The HTTP signature security scheme is used to sign HTTP requests with a private key
which is in possession of the API client.
An 'Authorization' header is calculated by creating a hash of select headers,
and optionally the body of the HTTP request, then signing the hash value using
a private key. The 'Authorization' header is added to outbound HTTP requests.
NOTE: This class is auto generated by OpenAPI Generator
Ref: https://openapi-generator.tech
Do not edit the class manually.
:param key_id: A string value specifying the identifier of the cryptographic key,
when signing HTTP requests.
:param signing_scheme: A string value specifying the signature scheme, when
signing HTTP requests.
Supported value are hs2019, rsa-sha256, rsa-sha512.
Avoid using rsa-sha256, rsa-sha512 as they are deprecated. These values are
available for server-side applications that only support the older
HTTP signature algorithms.
:param private_key_path: A string value specifying the path of the file containing
a private key. The private key is used to sign HTTP requests.
:param private_key_passphrase: A string value specifying the passphrase to decrypt
the private key.
:param signed_headers: A list of strings. Each value is the name of a HTTP header
that must be included in the HTTP signature calculation.
The two special signature headers '(request-target)' and '(created)' SHOULD be
included in SignedHeaders.
The '(created)' header expresses when the signature was created.
The '(request-target)' header is a concatenation of the lowercased :method, an
ASCII space, and the :path pseudo-headers.
When signed_headers is not specified, the client defaults to a single value,
'(created)', in the list of HTTP headers.
When SignedHeaders contains the 'Digest' value, the client performs the
following operations:
1. Calculate a digest of request body, as specified in RFC3230, section 4.3.2.
2. Set the 'Digest' header in the request body.
3. Include the 'Digest' header and value in the HTTP signature.
:param signing_algorithm: A string value specifying the signature algorithm, when
signing HTTP requests.
Supported values are:
1. For RSA keys: RSASSA-PSS, RSASSA-PKCS1-v1_5.
2. For ECDSA keys: fips-186-3, deterministic-rfc6979.
The default value is inferred from the private key.
The default value for RSA keys is RSASSA-PSS.
The default value for ECDSA keys is fips-186-3.
:param signature_max_validity: The signature max validity, expressed as
a datetime.timedelta value. It must be a positive value.
"""
def __init__(self, key_id, signing_scheme, private_key_path,
private_key_passphrase=None,
signed_headers=None,
signing_algorithm=None,
signature_max_validity=None):
self.key_id = key_id
if signing_scheme not in {SCHEME_HS2019, SCHEME_RSA_SHA256, SCHEME_RSA_SHA512}:
raise Exception("Unsupported security scheme: {0}".format(signing_scheme))
self.signing_scheme = signing_scheme
if not os.path.exists(private_key_path):
raise Exception("Private key file does not exist")
self.private_key_path = private_key_path
self.private_key_passphrase = private_key_passphrase
self.signing_algorithm = signing_algorithm
if signature_max_validity is not None and signature_max_validity.total_seconds() < 0:
raise Exception("The signature max validity must be a positive value")
self.signature_max_validity = signature_max_validity
# If the user has not provided any signed_headers, the default must be set to '(created)',
# as specified in the 'HTTP signature' standard.
if signed_headers is None or len(signed_headers) == 0:
signed_headers = [HEADER_CREATED]
if self.signature_max_validity is None and HEADER_EXPIRES in signed_headers:
raise Exception(
"Signature max validity must be set when "
"'(expires)' signature parameter is specified")
if len(signed_headers) != len(set(signed_headers)):
raise Exception("Cannot have duplicates in the signed_headers parameter")
if HEADER_AUTHORIZATION in signed_headers:
raise Exception("'Authorization' header cannot be included in signed headers")
self.signed_headers = signed_headers
self.private_key = None
"""The private key used to sign HTTP requests.
Initialized when the PEM-encoded private key is loaded from a file.
"""
self.host = None
"""The host name, optionally followed by a colon and TCP port number.
"""
self._load_private_key()
def get_http_signature_headers(self, resource_path, method, headers, body, query_params):
"""Create a cryptographic message signature for the HTTP request and add the signed headers.
:param resource_path : A string representation of the HTTP request resource path.
:param method: A string representation of the HTTP request method, e.g. GET, POST.
:param headers: A dict containing the HTTP request headers.
:param body: The object representing the HTTP request body.
:param query_params: A string representing the HTTP request query parameters.
:return: A dict of HTTP headers that must be added to the outbound HTTP request.
"""
if method is None:
raise Exception("HTTP method must be set")
if resource_path is None:
raise Exception("Resource path must be set")
signed_headers_list, request_headers_dict = self._get_signed_header_info(
resource_path, method, headers, body, query_params)
header_items = [
"{0}: {1}".format(key.lower(), value) for key, value in signed_headers_list]
string_to_sign = "\n".join(header_items)
digest, digest_prefix = self._get_message_digest(string_to_sign.encode())
b64_signed_msg = self._sign_digest(digest)
request_headers_dict[HEADER_AUTHORIZATION] = self._get_authorization_header(
signed_headers_list, b64_signed_msg)
return request_headers_dict
def get_public_key(self):
"""Returns the public key object associated with the private key.
"""
pubkey = None
if isinstance(self.private_key, RSA.RsaKey):
pubkey = self.private_key.publickey()
elif isinstance(self.private_key, ECC.EccKey):
pubkey = self.private_key.public_key()
return pubkey
def _load_private_key(self):
"""Load the private key used to sign HTTP requests.
The private key is used to sign HTTP requests as defined in
https://datatracker.ietf.org/doc/draft-cavage-http-signatures/.
"""
if self.private_key is not None:
return
with open(self.private_key_path, 'r') as f:
pem_data = f.read()
# Verify PEM Pre-Encapsulation Boundary
r = re.compile(r"\s*-----BEGIN (.*)-----\s+")
m = r.match(pem_data)
if not m:
raise ValueError("Not a valid PEM pre boundary")
pem_header = m.group(1)
if pem_header == 'RSA PRIVATE KEY':
self.private_key = RSA.importKey(pem_data, self.private_key_passphrase)
elif pem_header == 'EC PRIVATE KEY':
self.private_key = ECC.import_key(pem_data, self.private_key_passphrase)
elif pem_header in {'PRIVATE KEY', 'ENCRYPTED PRIVATE KEY'}:
# Key is in PKCS8 format, which is capable of holding many different
# types of private keys, not just EC keys.
(key_binary, pem_header, is_encrypted) = \
PEM.decode(pem_data, self.private_key_passphrase)
(oid, privkey, params) = \
PKCS8.unwrap(key_binary, passphrase=self.private_key_passphrase)
if oid == '1.2.840.10045.2.1':
self.private_key = ECC.import_key(pem_data, self.private_key_passphrase)
else:
raise Exception("Unsupported key: {0}. OID: {1}".format(pem_header, oid))
else:
raise Exception("Unsupported key: {0}".format(pem_header))
# Validate the specified signature algorithm is compatible with the private key.
if self.signing_algorithm is not None:
supported_algs = None
if isinstance(self.private_key, RSA.RsaKey):
supported_algs = {ALGORITHM_RSASSA_PSS, ALGORITHM_RSASSA_PKCS1v15}
elif isinstance(self.private_key, ECC.EccKey):
supported_algs = ALGORITHM_ECDSA_KEY_SIGNING_ALGORITHMS
if supported_algs is not None and self.signing_algorithm not in supported_algs:
raise Exception(
"Signing algorithm {0} is not compatible with private key".format(
self.signing_algorithm))
def _get_unix_time(self, ts):
"""Converts and returns a datetime object to UNIX time, the number of seconds
elapsed since January 1, 1970 UTC.
"""
return (ts - datetime(1970, 1, 1)).total_seconds()
def _get_signed_header_info(self, resource_path, method, headers, body, query_params):
"""Build the HTTP headers (name, value) that need to be included in
the HTTP signature scheme.
:param resource_path : A string representation of the HTTP request resource path.
:param method: A string representation of the HTTP request method, e.g. GET, POST.
:param headers: A dict containing the HTTP request headers.
:param body: The object (e.g. a dict) representing the HTTP request body.
:param query_params: A string representing the HTTP request query parameters.
:return: A tuple containing two dict objects:
The first dict contains the HTTP headers that are used to calculate
the HTTP signature.
The second dict contains the HTTP headers that must be added to
the outbound HTTP request.
"""
if body is None:
body = ''
else:
body = json.dumps(body)
# Build the '(request-target)' HTTP signature parameter.
target_host = urlparse(self.host).netloc
target_path = urlparse(self.host).path
request_target = method.lower() + " " + target_path + resource_path
if query_params:
request_target += "?" + urlencode(query_params)
# Get current time and generate RFC 1123 (HTTP/1.1) date/time string.
now = datetime.now()
stamp = mktime(now.timetuple())
cdate = formatdate(timeval=stamp, localtime=False, usegmt=True)
# The '(created)' value MUST be a Unix timestamp integer value.
# Subsecond precision is not supported.
created = int(self._get_unix_time(now))
if self.signature_max_validity is not None:
expires = self._get_unix_time(now + self.signature_max_validity)
signed_headers_list = []
request_headers_dict = {}
for hdr_key in self.signed_headers:
hdr_key = hdr_key.lower()
if hdr_key == HEADER_REQUEST_TARGET:
value = request_target
elif hdr_key == HEADER_CREATED:
value = '{0}'.format(created)
elif hdr_key == HEADER_EXPIRES:
value = '{0}'.format(expires)
elif hdr_key == HEADER_DATE.lower():
value = cdate
request_headers_dict[HEADER_DATE] = '{0}'.format(cdate)
elif hdr_key == HEADER_DIGEST.lower():
request_body = body.encode()
body_digest, digest_prefix = self._get_message_digest(request_body)
b64_body_digest = b64encode(body_digest.digest())
value = digest_prefix + b64_body_digest.decode('ascii')
request_headers_dict[HEADER_DIGEST] = '{0}{1}'.format(
digest_prefix, b64_body_digest.decode('ascii'))
elif hdr_key == HEADER_HOST.lower():
value = target_host
request_headers_dict[HEADER_HOST] = '{0}'.format(target_host)
else:
value = next((v for k, v in headers.items() if k.lower() == hdr_key), None)
if value is None:
raise Exception(
"Cannot sign HTTP request. "
"Request does not contain the '{0}' header".format(hdr_key))
signed_headers_list.append((hdr_key, value))
return signed_headers_list, request_headers_dict
def _get_message_digest(self, data):
"""Calculates and returns a cryptographic digest of a specified HTTP request.
:param data: The string representation of the date to be hashed with a cryptographic hash.
:return: A tuple of (digest, prefix).
The digest is a hashing object that contains the cryptographic digest of
the HTTP request.
The prefix is a string that identifies the cryptographc hash. It is used
to generate the 'Digest' header as specified in RFC 3230.
"""
if self.signing_scheme in {SCHEME_RSA_SHA512, SCHEME_HS2019}:
digest = SHA512.new()
prefix = 'SHA-512='
elif self.signing_scheme == SCHEME_RSA_SHA256:
digest = SHA256.new()
prefix = 'SHA-256='
else:
raise Exception("Unsupported signing algorithm: {0}".format(self.signing_scheme))
digest.update(data)
return digest, prefix
def _sign_digest(self, digest):
"""Signs a message digest with a private key specified in the signing_info.
:param digest: A hashing object that contains the cryptographic digest of the HTTP request.
:return: A base-64 string representing the cryptographic signature of the input digest.
"""
sig_alg = self.signing_algorithm
if isinstance(self.private_key, RSA.RsaKey):
if sig_alg is None or sig_alg == ALGORITHM_RSASSA_PSS:
# RSASSA-PSS in Section 8.1 of RFC8017.
signature = pss.new(self.private_key).sign(digest)
elif sig_alg == ALGORITHM_RSASSA_PKCS1v15:
# RSASSA-PKCS1-v1_5 in Section 8.2 of RFC8017.
signature = PKCS1_v1_5.new(self.private_key).sign(digest)
else:
raise Exception("Unsupported signature algorithm: {0}".format(sig_alg))
elif isinstance(self.private_key, ECC.EccKey):
if sig_alg is None:
sig_alg = ALGORITHM_ECDSA_MODE_FIPS_186_3
if sig_alg in ALGORITHM_ECDSA_KEY_SIGNING_ALGORITHMS:
signature = DSS.new(self.private_key, sig_alg).sign(digest)
else:
raise Exception("Unsupported signature algorithm: {0}".format(sig_alg))
else:
raise Exception("Unsupported private key: {0}".format(type(self.private_key)))
return b64encode(signature)
def _get_authorization_header(self, signed_headers, signed_msg):
"""Calculates and returns the value of the 'Authorization' header when signing HTTP requests.
:param signed_headers : A list of tuples. Each value is the name of a HTTP header that
must be included in the HTTP signature calculation.
:param signed_msg: A base-64 encoded string representation of the signature.
:return: The string value of the 'Authorization' header, representing the signature
of the HTTP request.
"""
created_ts = None
expires_ts = None
for k, v in signed_headers:
if k == HEADER_CREATED:
created_ts = v
elif k == HEADER_EXPIRES:
expires_ts = v
lower_keys = [k.lower() for k, v in signed_headers]
headers_value = " ".join(lower_keys)
auth_str = "Signature keyId=\"{0}\",algorithm=\"{1}\",".format(
self.key_id, self.signing_scheme)
if created_ts is not None:
auth_str = auth_str + "created={0},".format(created_ts)
if expires_ts is not None:
auth_str = auth_str + "expires={0},".format(expires_ts)
auth_str = auth_str + "headers=\"{0}\",signature=\"{1}\"".format(
headers_value, signed_msg.decode('ascii'))
print("AUTH: {0}".format(auth_str))
return auth_str

View File

@@ -10,4 +10,7 @@ pytest~=4.6.7 # needed for python 2.7+3.4
pytest-cov>=2.8.1
pytest-randomly==1.2.3 # needed for python 2.7+3.4
{{/useNose}}
{{#hasHttpSignatureMethods}}
pycryptodome>=3.9.0
{{/hasHttpSignatureMethods}}
mock; python_version<="2.7"

View File

@@ -11,6 +11,28 @@ configuration.password = 'YOUR_PASSWORD'
# Configure Bearer authorization{{#bearerFormat}} ({{{.}}}){{/bearerFormat}}: {{{name}}}
configuration.access_token = 'YOUR_BEARER_TOKEN'
{{/isBasicBearer}}
{{#isHttpSignature}}
# Configure HTTP signature authorization: {{{name}}}
# You can specify the signing key-id, private key path, signing scheme, signing algorithm,
# list of signed headers and signature max validity.
configuration.signing_info = {{{packageName}}}.signing.HttpSigningConfiguration(
key_id = 'my-key-id',
private_key_path = 'rsa.pem',
signing_scheme = signing.SCHEME_HS2019,
signing_algorithm = signing.ALGORITHM_RSASSA_PSS,
signed_headers = [signing.HEADER_REQUEST_TARGET,
signing.HEADER_CREATED,
signing.HEADER_EXPIRES,
signing.HEADER_HOST,
signing.HEADER_DATE,
signing.HEADER_DIGEST,
'Content-Type',
'Content-Length',
'User-Agent'
],
signature_max_validity = datetime.timedelta(minutes=5)
)
{{/isHttpSignature}}
{{/isBasic}}
{{#isApiKey}}
# Configure API key authorization: {{{name}}}

View File

@@ -36,6 +36,8 @@ class Configuration(object):
The dict value is an API key prefix when generating the auth data.
:param username: Username for HTTP basic authentication
:param password: Password for HTTP basic authentication
:param signing_info: Configuration parameters for HTTP signature.
Must be an instance of petstore_api.signing.HttpSigningConfiguration
:Example:
@@ -54,11 +56,50 @@ class Configuration(object):
)
The following cookie will be added to the HTTP request:
Cookie: JSESSIONID abc123
Configure API client with HTTP basic authentication:
conf = petstore_api.Configuration(
username='the-user',
password='the-password',
)
Configure API client with HTTP signature authentication. Use the 'hs2019' signature scheme,
sign the HTTP requests with the RSA-SSA-PSS signature algorithm, and set the expiration time
of the signature to 5 minutes after the signature has been created.
Note you can use the constants defined in the petstore_api.signing module, and you can
also specify arbitrary HTTP headers to be included in the HTTP signature, except for the
'Authorization' header, which is used to carry the signature.
One may be tempted to sign all headers by default, but in practice it rarely works.
This is beccause explicit proxies, transparent proxies, TLS termination endpoints or
load balancers may add/modify/remove headers. Include the HTTP headers that you know
are not going to be modified in transit.
conf = petstore_api.Configuration(
signing_info = petstore_api.signing.HttpSigningConfiguration(
key_id = 'my-key-id',
private_key_path = 'rsa.pem',
signing_scheme = signing.SCHEME_HS2019,
signing_algorithm = signing.ALGORITHM_RSASSA_PSS,
signed_headers = [signing.HEADER_REQUEST_TARGET,
signing.HEADER_CREATED,
signing.HEADER_EXPIRES,
signing.HEADER_HOST,
signing.HEADER_DATE,
signing.HEADER_DIGEST,
'Content-Type',
'Content-Length',
'User-Agent'
],
signature_max_validity = datetime.timedelta(minutes=5)
)
)
"""
def __init__(self, host="http://petstore.swagger.io:80/v2",
api_key=None, api_key_prefix=None,
username=None, password=None):
username=None, password=None,
signing_info=None):
"""Constructor
"""
self.host = host
@@ -87,6 +128,11 @@ class Configuration(object):
self.password = password
"""Password for HTTP basic authentication
"""
if signing_info is not None:
signing_info.host = host
self.signing_info = signing_info
"""The HTTP signing configuration
"""
self.access_token = ""
"""access token for OAuth/Bearer
"""

View File

@@ -152,13 +152,14 @@ class ApiClient(object):
collection_formats)
post_params.extend(self.files_parameters(files))
# auth setting
self.update_params_for_auth(header_params, query_params, auth_settings)
# body
if body:
body = self.sanitize_for_serialization(body)
# auth setting
self.update_params_for_auth(header_params, query_params,
auth_settings, resource_path, method, body)
# request url
if _host is None:
url = self.configuration.host + resource_path
@@ -510,12 +511,17 @@ class ApiClient(object):
else:
return content_types[0]
def update_params_for_auth(self, headers, querys, auth_settings):
def update_params_for_auth(self, headers, querys, auth_settings,
resource_path, method, body):
"""Updates header and query params based on authentication setting.
:param headers: Header parameters dict to be updated.
:param querys: Query parameters tuple list to be updated.
:param auth_settings: Authentication setting identifiers list.
:resource_path: A string representation of the HTTP request resource path.
:method: A string representation of the HTTP request method.
:body: A object representing the body of the HTTP request.
The object type is the return value of sanitize_for_serialization().
"""
if not auth_settings:
return
@@ -526,7 +532,8 @@ class ApiClient(object):
if auth_setting['in'] == 'cookie':
headers['Cookie'] = auth_setting['value']
elif auth_setting['in'] == 'header':
headers[auth_setting['key']] = auth_setting['value']
if auth_setting['type'] != 'http-signature':
headers[auth_setting['key']] = auth_setting['value']
elif auth_setting['in'] == 'query':
querys.append((auth_setting['key'], auth_setting['value']))
else:

View File

@@ -37,6 +37,8 @@ class Configuration(object):
The dict value is an API key prefix when generating the auth data.
:param username: Username for HTTP basic authentication
:param password: Password for HTTP basic authentication
:param signing_info: Configuration parameters for HTTP signature.
Must be an instance of petstore_api.signing.HttpSigningConfiguration
:Example:
@@ -55,11 +57,50 @@ class Configuration(object):
)
The following cookie will be added to the HTTP request:
Cookie: JSESSIONID abc123
Configure API client with HTTP basic authentication:
conf = petstore_api.Configuration(
username='the-user',
password='the-password',
)
Configure API client with HTTP signature authentication. Use the 'hs2019' signature scheme,
sign the HTTP requests with the RSA-SSA-PSS signature algorithm, and set the expiration time
of the signature to 5 minutes after the signature has been created.
Note you can use the constants defined in the petstore_api.signing module, and you can
also specify arbitrary HTTP headers to be included in the HTTP signature, except for the
'Authorization' header, which is used to carry the signature.
One may be tempted to sign all headers by default, but in practice it rarely works.
This is beccause explicit proxies, transparent proxies, TLS termination endpoints or
load balancers may add/modify/remove headers. Include the HTTP headers that you know
are not going to be modified in transit.
conf = petstore_api.Configuration(
signing_info = petstore_api.signing.HttpSigningConfiguration(
key_id = 'my-key-id',
private_key_path = 'rsa.pem',
signing_scheme = signing.SCHEME_HS2019,
signing_algorithm = signing.ALGORITHM_RSASSA_PSS,
signed_headers = [signing.HEADER_REQUEST_TARGET,
signing.HEADER_CREATED,
signing.HEADER_EXPIRES,
signing.HEADER_HOST,
signing.HEADER_DATE,
signing.HEADER_DIGEST,
'Content-Type',
'Content-Length',
'User-Agent'
],
signature_max_validity = datetime.timedelta(minutes=5)
)
)
"""
def __init__(self, host="http://petstore.swagger.io:80/v2",
api_key=None, api_key_prefix=None,
username=None, password=None):
username=None, password=None,
signing_info=None):
"""Constructor
"""
self.host = host
@@ -88,6 +129,11 @@ class Configuration(object):
self.password = password
"""Password for HTTP basic authentication
"""
if signing_info is not None:
signing_info.host = host
self.signing_info = signing_info
"""The HTTP signing configuration
"""
self.access_token = ""
"""access token for OAuth/Bearer
"""

View File

@@ -21,7 +21,12 @@ VERSION = "1.0.0"
# prerequisite: setuptools
# http://pypi.python.org/pypi/setuptools
REQUIRES = ["urllib3 >= 1.15", "six >= 1.10", "certifi", "python-dateutil"]
REQUIRES = [
"urllib3 >= 1.15",
"six >= 1.10",
"certifi",
"python-dateutil",
]
EXTRAS = {':python_version <= "2.7"': ['future']}
setup(

View File

@@ -44,7 +44,7 @@ class ApiClientTests(unittest.TestCase):
self.assertEqual('PREFIX', client.configuration.api_key_prefix['api_key'])
# update parameters based on auth setting
client.update_params_for_auth(header_params, query_params, auth_settings)
client.update_params_for_auth(header_params, query_params, auth_settings, resource_path=None, method=None, body=None)
# test api key auth
self.assertEqual(header_params['test1'], 'value1')
@@ -59,14 +59,14 @@ class ApiClientTests(unittest.TestCase):
config.api_key['api_key'] = '123456'
config.api_key_prefix['api_key'] = None
# update parameters based on auth setting
client.update_params_for_auth(header_params, query_params, auth_settings)
client.update_params_for_auth(header_params, query_params, auth_settings, resource_path=None, method=None, body=None)
self.assertEqual(header_params['api_key'], '123456')
# test api key with empty prefix
config.api_key['api_key'] = '123456'
config.api_key_prefix['api_key'] = ''
# update parameters based on auth setting
client.update_params_for_auth(header_params, query_params, auth_settings)
client.update_params_for_auth(header_params, query_params, auth_settings, resource_path=None, method=None, body=None)
self.assertEqual(header_params['api_key'], '123456')
# test api key with prefix specified in the api_key, useful when the prefix
@@ -74,7 +74,7 @@ class ApiClientTests(unittest.TestCase):
config.api_key['api_key'] = 'PREFIX=123456'
config.api_key_prefix['api_key'] = None
# update parameters based on auth setting
client.update_params_for_auth(header_params, query_params, auth_settings)
client.update_params_for_auth(header_params, query_params, auth_settings, resource_path=None, method=None, body=None)
self.assertEqual(header_params['api_key'], 'PREFIX=123456')

View File

@@ -119,21 +119,21 @@ class PetApiTests(unittest.TestCase):
def test_config(self):
config = Configuration(host=HOST)
self.assertIsNotNone(config.get_host_settings())
self.assertEquals(config.get_basic_auth_token(),
self.assertEqual(config.get_basic_auth_token(),
urllib3.util.make_headers(basic_auth=":").get('authorization'))
self.assertEquals(len(config.auth_settings()), 1)
self.assertEqual(len(config.auth_settings()), 1)
self.assertIn("petstore_auth", config.auth_settings().keys())
config.username = "user"
config.password = "password"
self.assertEquals(
self.assertEqual(
config.get_basic_auth_token(),
urllib3.util.make_headers(basic_auth="user:password").get('authorization'))
self.assertEquals(len(config.auth_settings()), 2)
self.assertEqual(len(config.auth_settings()), 2)
self.assertIn("petstore_auth", config.auth_settings().keys())
self.assertIn("http_basic_test", config.auth_settings().keys())
config.username = None
config.password = None
self.assertEquals(len(config.auth_settings()), 1)
self.assertEqual(len(config.auth_settings()), 1)
self.assertIn("petstore_auth", config.auth_settings().keys())
def test_timeout(self):
@@ -193,7 +193,7 @@ class PetApiTests(unittest.TestCase):
response = thread.get()
response2 = thread2.get()
self.assertEquals(response.id, self.pet.id)
self.assertEqual(response.id, self.pet.id)
self.assertIsNotNone(response2.id, self.pet.id)
def test_async_with_http_info(self):
@@ -204,7 +204,7 @@ class PetApiTests(unittest.TestCase):
data, status, headers = thread.get()
self.assertIsInstance(data, petstore_api.Pet)
self.assertEquals(status, 200)
self.assertEqual(status, 200)
def test_async_exception(self):
self.pet_api.add_pet(self.pet)

View File

@@ -37,6 +37,8 @@ class Configuration(object):
The dict value is an API key prefix when generating the auth data.
:param username: Username for HTTP basic authentication
:param password: Password for HTTP basic authentication
:param signing_info: Configuration parameters for HTTP signature.
Must be an instance of petstore_api.signing.HttpSigningConfiguration
:Example:
@@ -55,11 +57,50 @@ class Configuration(object):
)
The following cookie will be added to the HTTP request:
Cookie: JSESSIONID abc123
Configure API client with HTTP basic authentication:
conf = petstore_api.Configuration(
username='the-user',
password='the-password',
)
Configure API client with HTTP signature authentication. Use the 'hs2019' signature scheme,
sign the HTTP requests with the RSA-SSA-PSS signature algorithm, and set the expiration time
of the signature to 5 minutes after the signature has been created.
Note you can use the constants defined in the petstore_api.signing module, and you can
also specify arbitrary HTTP headers to be included in the HTTP signature, except for the
'Authorization' header, which is used to carry the signature.
One may be tempted to sign all headers by default, but in practice it rarely works.
This is beccause explicit proxies, transparent proxies, TLS termination endpoints or
load balancers may add/modify/remove headers. Include the HTTP headers that you know
are not going to be modified in transit.
conf = petstore_api.Configuration(
signing_info = petstore_api.signing.HttpSigningConfiguration(
key_id = 'my-key-id',
private_key_path = 'rsa.pem',
signing_scheme = signing.SCHEME_HS2019,
signing_algorithm = signing.ALGORITHM_RSASSA_PSS,
signed_headers = [signing.HEADER_REQUEST_TARGET,
signing.HEADER_CREATED,
signing.HEADER_EXPIRES,
signing.HEADER_HOST,
signing.HEADER_DATE,
signing.HEADER_DIGEST,
'Content-Type',
'Content-Length',
'User-Agent'
],
signature_max_validity = datetime.timedelta(minutes=5)
)
)
"""
def __init__(self, host="http://petstore.swagger.io:80/v2",
api_key=None, api_key_prefix=None,
username=None, password=None):
username=None, password=None,
signing_info=None):
"""Constructor
"""
self.host = host
@@ -88,6 +129,11 @@ class Configuration(object):
self.password = password
"""Password for HTTP basic authentication
"""
if signing_info is not None:
signing_info.host = host
self.signing_info = signing_info
"""The HTTP signing configuration
"""
self.access_token = ""
"""access token for OAuth/Bearer
"""

View File

@@ -37,6 +37,8 @@ class Configuration(object):
The dict value is an API key prefix when generating the auth data.
:param username: Username for HTTP basic authentication
:param password: Password for HTTP basic authentication
:param signing_info: Configuration parameters for HTTP signature.
Must be an instance of petstore_api.signing.HttpSigningConfiguration
:Example:
@@ -55,11 +57,50 @@ class Configuration(object):
)
The following cookie will be added to the HTTP request:
Cookie: JSESSIONID abc123
Configure API client with HTTP basic authentication:
conf = petstore_api.Configuration(
username='the-user',
password='the-password',
)
Configure API client with HTTP signature authentication. Use the 'hs2019' signature scheme,
sign the HTTP requests with the RSA-SSA-PSS signature algorithm, and set the expiration time
of the signature to 5 minutes after the signature has been created.
Note you can use the constants defined in the petstore_api.signing module, and you can
also specify arbitrary HTTP headers to be included in the HTTP signature, except for the
'Authorization' header, which is used to carry the signature.
One may be tempted to sign all headers by default, but in practice it rarely works.
This is beccause explicit proxies, transparent proxies, TLS termination endpoints or
load balancers may add/modify/remove headers. Include the HTTP headers that you know
are not going to be modified in transit.
conf = petstore_api.Configuration(
signing_info = petstore_api.signing.HttpSigningConfiguration(
key_id = 'my-key-id',
private_key_path = 'rsa.pem',
signing_scheme = signing.SCHEME_HS2019,
signing_algorithm = signing.ALGORITHM_RSASSA_PSS,
signed_headers = [signing.HEADER_REQUEST_TARGET,
signing.HEADER_CREATED,
signing.HEADER_EXPIRES,
signing.HEADER_HOST,
signing.HEADER_DATE,
signing.HEADER_DIGEST,
'Content-Type',
'Content-Length',
'User-Agent'
],
signature_max_validity = datetime.timedelta(minutes=5)
)
)
"""
def __init__(self, host="http://petstore.swagger.io:80/v2",
api_key=None, api_key_prefix=None,
username=None, password=None):
username=None, password=None,
signing_info=None):
"""Constructor
"""
self.host = host
@@ -88,6 +129,11 @@ class Configuration(object):
self.password = password
"""Password for HTTP basic authentication
"""
if signing_info is not None:
signing_info.host = host
self.signing_info = signing_info
"""The HTTP signing configuration
"""
self.access_token = ""
"""access token for OAuth/Bearer
"""

View File

@@ -193,6 +193,11 @@ Class | Method | HTTP request | Description
- **Type**: HTTP basic authentication
## http_signature_test
- **Type**: HTTP signature authentication
## petstore_auth
- **Type**: OAuth

View File

@@ -29,6 +29,27 @@ import time
import petstore_api
from pprint import pprint
configuration = petstore_api.Configuration()
# Configure HTTP signature authorization: http_signature_test
# You can specify the signing key-id, private key path, signing scheme, signing algorithm,
# list of signed headers and signature max validity.
configuration.signing_info = petstore_api.signing.HttpSigningConfiguration(
key_id = 'my-key-id',
private_key_path = 'rsa.pem',
signing_scheme = signing.SCHEME_HS2019,
signing_algorithm = signing.ALGORITHM_RSASSA_PSS,
signed_headers = [signing.HEADER_REQUEST_TARGET,
signing.HEADER_CREATED,
signing.HEADER_EXPIRES,
signing.HEADER_HOST,
signing.HEADER_DATE,
signing.HEADER_DIGEST,
'Content-Type',
'Content-Length',
'User-Agent'
],
signature_max_validity = datetime.timedelta(minutes=5)
)
configuration = petstore_api.Configuration()
# Configure OAuth2 access token for authorization: petstore_auth
configuration.access_token = 'YOUR_ACCESS_TOKEN'
@@ -58,7 +79,7 @@ void (empty response body)
### Authorization
[petstore_auth](../README.md#petstore_auth)
[http_signature_test](../README.md#http_signature_test), [petstore_auth](../README.md#petstore_auth)
### HTTP request headers
@@ -155,6 +176,27 @@ import time
import petstore_api
from pprint import pprint
configuration = petstore_api.Configuration()
# Configure HTTP signature authorization: http_signature_test
# You can specify the signing key-id, private key path, signing scheme, signing algorithm,
# list of signed headers and signature max validity.
configuration.signing_info = petstore_api.signing.HttpSigningConfiguration(
key_id = 'my-key-id',
private_key_path = 'rsa.pem',
signing_scheme = signing.SCHEME_HS2019,
signing_algorithm = signing.ALGORITHM_RSASSA_PSS,
signed_headers = [signing.HEADER_REQUEST_TARGET,
signing.HEADER_CREATED,
signing.HEADER_EXPIRES,
signing.HEADER_HOST,
signing.HEADER_DATE,
signing.HEADER_DIGEST,
'Content-Type',
'Content-Length',
'User-Agent'
],
signature_max_validity = datetime.timedelta(minutes=5)
)
configuration = petstore_api.Configuration()
# Configure OAuth2 access token for authorization: petstore_auth
configuration.access_token = 'YOUR_ACCESS_TOKEN'
@@ -185,7 +227,7 @@ Name | Type | Description | Notes
### Authorization
[petstore_auth](../README.md#petstore_auth)
[http_signature_test](../README.md#http_signature_test), [petstore_auth](../README.md#petstore_auth)
### HTTP request headers
@@ -216,6 +258,27 @@ import time
import petstore_api
from pprint import pprint
configuration = petstore_api.Configuration()
# Configure HTTP signature authorization: http_signature_test
# You can specify the signing key-id, private key path, signing scheme, signing algorithm,
# list of signed headers and signature max validity.
configuration.signing_info = petstore_api.signing.HttpSigningConfiguration(
key_id = 'my-key-id',
private_key_path = 'rsa.pem',
signing_scheme = signing.SCHEME_HS2019,
signing_algorithm = signing.ALGORITHM_RSASSA_PSS,
signed_headers = [signing.HEADER_REQUEST_TARGET,
signing.HEADER_CREATED,
signing.HEADER_EXPIRES,
signing.HEADER_HOST,
signing.HEADER_DATE,
signing.HEADER_DIGEST,
'Content-Type',
'Content-Length',
'User-Agent'
],
signature_max_validity = datetime.timedelta(minutes=5)
)
configuration = petstore_api.Configuration()
# Configure OAuth2 access token for authorization: petstore_auth
configuration.access_token = 'YOUR_ACCESS_TOKEN'
@@ -246,7 +309,7 @@ Name | Type | Description | Notes
### Authorization
[petstore_auth](../README.md#petstore_auth)
[http_signature_test](../README.md#http_signature_test), [petstore_auth](../README.md#petstore_auth)
### HTTP request headers
@@ -339,6 +402,27 @@ import time
import petstore_api
from pprint import pprint
configuration = petstore_api.Configuration()
# Configure HTTP signature authorization: http_signature_test
# You can specify the signing key-id, private key path, signing scheme, signing algorithm,
# list of signed headers and signature max validity.
configuration.signing_info = petstore_api.signing.HttpSigningConfiguration(
key_id = 'my-key-id',
private_key_path = 'rsa.pem',
signing_scheme = signing.SCHEME_HS2019,
signing_algorithm = signing.ALGORITHM_RSASSA_PSS,
signed_headers = [signing.HEADER_REQUEST_TARGET,
signing.HEADER_CREATED,
signing.HEADER_EXPIRES,
signing.HEADER_HOST,
signing.HEADER_DATE,
signing.HEADER_DIGEST,
'Content-Type',
'Content-Length',
'User-Agent'
],
signature_max_validity = datetime.timedelta(minutes=5)
)
configuration = petstore_api.Configuration()
# Configure OAuth2 access token for authorization: petstore_auth
configuration.access_token = 'YOUR_ACCESS_TOKEN'
@@ -368,7 +452,7 @@ void (empty response body)
### Authorization
[petstore_auth](../README.md#petstore_auth)
[http_signature_test](../README.md#http_signature_test), [petstore_auth](../README.md#petstore_auth)
### HTTP request headers

View File

@@ -110,6 +110,7 @@ class PetApi(object):
settings={
'response_type': None,
'auth': [
'http_signature_test',
'petstore_auth'
],
'endpoint_path': '/pet',
@@ -336,6 +337,7 @@ class PetApi(object):
settings={
'response_type': ([pet.Pet],),
'auth': [
'http_signature_test',
'petstore_auth'
],
'endpoint_path': '/pet/findByStatus',
@@ -455,6 +457,7 @@ class PetApi(object):
settings={
'response_type': ([pet.Pet],),
'auth': [
'http_signature_test',
'petstore_auth'
],
'endpoint_path': '/pet/findByTags',
@@ -677,6 +680,7 @@ class PetApi(object):
settings={
'response_type': None,
'auth': [
'http_signature_test',
'petstore_auth'
],
'endpoint_path': '/pet',

View File

@@ -152,13 +152,14 @@ class ApiClient(object):
collection_formats)
post_params.extend(self.files_parameters(files))
# auth setting
self.update_params_for_auth(header_params, query_params, auth_settings)
# body
if body:
body = self.sanitize_for_serialization(body)
# auth setting
self.update_params_for_auth(header_params, query_params,
auth_settings, resource_path, method, body)
# request url
if _host is None:
url = self.configuration.host + resource_path
@@ -510,12 +511,17 @@ class ApiClient(object):
else:
return content_types[0]
def update_params_for_auth(self, headers, querys, auth_settings):
def update_params_for_auth(self, headers, querys, auth_settings,
resource_path, method, body):
"""Updates header and query params based on authentication setting.
:param headers: Header parameters dict to be updated.
:param querys: Query parameters tuple list to be updated.
:param auth_settings: Authentication setting identifiers list.
:resource_path: A string representation of the HTTP request resource path.
:method: A string representation of the HTTP request method.
:body: A object representing the body of the HTTP request.
The object type is the return value of sanitize_for_serialization().
"""
if not auth_settings:
return
@@ -526,7 +532,15 @@ class ApiClient(object):
if auth_setting['in'] == 'cookie':
headers['Cookie'] = auth_setting['value']
elif auth_setting['in'] == 'header':
headers[auth_setting['key']] = auth_setting['value']
if auth_setting['type'] != 'http-signature':
headers[auth_setting['key']] = auth_setting['value']
else:
# The HTTP signature scheme requires multiple HTTP headers
# that are calculated dynamically.
signing_info = self.configuration.signing_info
auth_headers = signing_info.get_http_signature_headers(
resource_path, method, headers, body, querys)
headers.update(auth_headers)
elif auth_setting['in'] == 'query':
querys.append((auth_setting['key'], auth_setting['value']))
else:

View File

@@ -37,6 +37,8 @@ class Configuration(object):
The dict value is an API key prefix when generating the auth data.
:param username: Username for HTTP basic authentication
:param password: Password for HTTP basic authentication
:param signing_info: Configuration parameters for HTTP signature.
Must be an instance of petstore_api.signing.HttpSigningConfiguration
:Example:
@@ -55,11 +57,50 @@ class Configuration(object):
)
The following cookie will be added to the HTTP request:
Cookie: JSESSIONID abc123
Configure API client with HTTP basic authentication:
conf = petstore_api.Configuration(
username='the-user',
password='the-password',
)
Configure API client with HTTP signature authentication. Use the 'hs2019' signature scheme,
sign the HTTP requests with the RSA-SSA-PSS signature algorithm, and set the expiration time
of the signature to 5 minutes after the signature has been created.
Note you can use the constants defined in the petstore_api.signing module, and you can
also specify arbitrary HTTP headers to be included in the HTTP signature, except for the
'Authorization' header, which is used to carry the signature.
One may be tempted to sign all headers by default, but in practice it rarely works.
This is beccause explicit proxies, transparent proxies, TLS termination endpoints or
load balancers may add/modify/remove headers. Include the HTTP headers that you know
are not going to be modified in transit.
conf = petstore_api.Configuration(
signing_info = petstore_api.signing.HttpSigningConfiguration(
key_id = 'my-key-id',
private_key_path = 'rsa.pem',
signing_scheme = signing.SCHEME_HS2019,
signing_algorithm = signing.ALGORITHM_RSASSA_PSS,
signed_headers = [signing.HEADER_REQUEST_TARGET,
signing.HEADER_CREATED,
signing.HEADER_EXPIRES,
signing.HEADER_HOST,
signing.HEADER_DATE,
signing.HEADER_DIGEST,
'Content-Type',
'Content-Length',
'User-Agent'
],
signature_max_validity = datetime.timedelta(minutes=5)
)
)
"""
def __init__(self, host="http://petstore.swagger.io:80/v2",
api_key=None, api_key_prefix=None,
username=None, password=None):
username=None, password=None,
signing_info=None):
"""Constructor
"""
self.host = host
@@ -88,6 +129,11 @@ class Configuration(object):
self.password = password
"""Password for HTTP basic authentication
"""
if signing_info is not None:
signing_info.host = host
self.signing_info = signing_info
"""The HTTP signing configuration
"""
self.access_token = ""
"""access token for OAuth/Bearer
"""
@@ -304,6 +350,13 @@ class Configuration(object):
'key': 'Authorization',
'value': self.get_basic_auth_token()
}
if self.signing_info is not None:
auth['http_signature_test'] = {
'type': 'http-signature',
'in': 'header',
'key': 'Authorization',
'value': None # Signature headers are calculated for every HTTP request
}
if self.access_token is not None:
auth['petstore_auth'] = {
'type': 'oauth2',

View File

@@ -0,0 +1,376 @@
# coding: utf-8
"""
OpenAPI Petstore
This spec is mainly for testing Petstore server and contains fake endpoints, models. Please do not use this for any other purpose. Special characters: \" \\ # noqa: E501
The version of the OpenAPI document: 1.0.0
Generated by: https://openapi-generator.tech
"""
from __future__ import absolute_import
from base64 import b64encode
from Crypto.IO import PEM, PKCS8
from Crypto.Hash import SHA256, SHA512
from Crypto.PublicKey import RSA, ECC
from Crypto.Signature import PKCS1_v1_5, pss, DSS
from datetime import datetime
from email.utils import formatdate
import json
import os
import re
from six.moves.urllib.parse import urlencode, urlparse
from time import mktime
HEADER_REQUEST_TARGET = '(request-target)'
HEADER_CREATED = '(created)'
HEADER_EXPIRES = '(expires)'
HEADER_HOST = 'Host'
HEADER_DATE = 'Date'
HEADER_DIGEST = 'Digest' # RFC 3230, include digest of the HTTP request body.
HEADER_AUTHORIZATION = 'Authorization'
SCHEME_HS2019 = 'hs2019'
SCHEME_RSA_SHA256 = 'rsa-sha256'
SCHEME_RSA_SHA512 = 'rsa-sha512'
ALGORITHM_RSASSA_PSS = 'RSASSA-PSS'
ALGORITHM_RSASSA_PKCS1v15 = 'RSASSA-PKCS1-v1_5'
ALGORITHM_ECDSA_MODE_FIPS_186_3 = 'fips-186-3'
ALGORITHM_ECDSA_MODE_DETERMINISTIC_RFC6979 = 'deterministic-rfc6979'
ALGORITHM_ECDSA_KEY_SIGNING_ALGORITHMS = {
ALGORITHM_ECDSA_MODE_FIPS_186_3,
ALGORITHM_ECDSA_MODE_DETERMINISTIC_RFC6979
}
class HttpSigningConfiguration(object):
"""The configuration parameters for the HTTP signature security scheme.
The HTTP signature security scheme is used to sign HTTP requests with a private key
which is in possession of the API client.
An 'Authorization' header is calculated by creating a hash of select headers,
and optionally the body of the HTTP request, then signing the hash value using
a private key. The 'Authorization' header is added to outbound HTTP requests.
NOTE: This class is auto generated by OpenAPI Generator
Ref: https://openapi-generator.tech
Do not edit the class manually.
:param key_id: A string value specifying the identifier of the cryptographic key,
when signing HTTP requests.
:param signing_scheme: A string value specifying the signature scheme, when
signing HTTP requests.
Supported value are hs2019, rsa-sha256, rsa-sha512.
Avoid using rsa-sha256, rsa-sha512 as they are deprecated. These values are
available for server-side applications that only support the older
HTTP signature algorithms.
:param private_key_path: A string value specifying the path of the file containing
a private key. The private key is used to sign HTTP requests.
:param private_key_passphrase: A string value specifying the passphrase to decrypt
the private key.
:param signed_headers: A list of strings. Each value is the name of a HTTP header
that must be included in the HTTP signature calculation.
The two special signature headers '(request-target)' and '(created)' SHOULD be
included in SignedHeaders.
The '(created)' header expresses when the signature was created.
The '(request-target)' header is a concatenation of the lowercased :method, an
ASCII space, and the :path pseudo-headers.
When signed_headers is not specified, the client defaults to a single value,
'(created)', in the list of HTTP headers.
When SignedHeaders contains the 'Digest' value, the client performs the
following operations:
1. Calculate a digest of request body, as specified in RFC3230, section 4.3.2.
2. Set the 'Digest' header in the request body.
3. Include the 'Digest' header and value in the HTTP signature.
:param signing_algorithm: A string value specifying the signature algorithm, when
signing HTTP requests.
Supported values are:
1. For RSA keys: RSASSA-PSS, RSASSA-PKCS1-v1_5.
2. For ECDSA keys: fips-186-3, deterministic-rfc6979.
The default value is inferred from the private key.
The default value for RSA keys is RSASSA-PSS.
The default value for ECDSA keys is fips-186-3.
:param signature_max_validity: The signature max validity, expressed as
a datetime.timedelta value. It must be a positive value.
"""
def __init__(self, key_id, signing_scheme, private_key_path,
private_key_passphrase=None,
signed_headers=None,
signing_algorithm=None,
signature_max_validity=None):
self.key_id = key_id
if signing_scheme not in {SCHEME_HS2019, SCHEME_RSA_SHA256, SCHEME_RSA_SHA512}:
raise Exception("Unsupported security scheme: {0}".format(signing_scheme))
self.signing_scheme = signing_scheme
if not os.path.exists(private_key_path):
raise Exception("Private key file does not exist")
self.private_key_path = private_key_path
self.private_key_passphrase = private_key_passphrase
self.signing_algorithm = signing_algorithm
if signature_max_validity is not None and signature_max_validity.total_seconds() < 0:
raise Exception("The signature max validity must be a positive value")
self.signature_max_validity = signature_max_validity
# If the user has not provided any signed_headers, the default must be set to '(created)',
# as specified in the 'HTTP signature' standard.
if signed_headers is None or len(signed_headers) == 0:
signed_headers = [HEADER_CREATED]
if self.signature_max_validity is None and HEADER_EXPIRES in signed_headers:
raise Exception(
"Signature max validity must be set when "
"'(expires)' signature parameter is specified")
if len(signed_headers) != len(set(signed_headers)):
raise Exception("Cannot have duplicates in the signed_headers parameter")
if HEADER_AUTHORIZATION in signed_headers:
raise Exception("'Authorization' header cannot be included in signed headers")
self.signed_headers = signed_headers
self.private_key = None
"""The private key used to sign HTTP requests.
Initialized when the PEM-encoded private key is loaded from a file.
"""
self.host = None
"""The host name, optionally followed by a colon and TCP port number.
"""
self._load_private_key()
def get_http_signature_headers(self, resource_path, method, headers, body, query_params):
"""Create a cryptographic message signature for the HTTP request and add the signed headers.
:param resource_path : A string representation of the HTTP request resource path.
:param method: A string representation of the HTTP request method, e.g. GET, POST.
:param headers: A dict containing the HTTP request headers.
:param body: The object representing the HTTP request body.
:param query_params: A string representing the HTTP request query parameters.
:return: A dict of HTTP headers that must be added to the outbound HTTP request.
"""
if method is None:
raise Exception("HTTP method must be set")
if resource_path is None:
raise Exception("Resource path must be set")
signed_headers_list, request_headers_dict = self._get_signed_header_info(
resource_path, method, headers, body, query_params)
header_items = [
"{0}: {1}".format(key.lower(), value) for key, value in signed_headers_list]
string_to_sign = "\n".join(header_items)
digest, digest_prefix = self._get_message_digest(string_to_sign.encode())
b64_signed_msg = self._sign_digest(digest)
request_headers_dict[HEADER_AUTHORIZATION] = self._get_authorization_header(
signed_headers_list, b64_signed_msg)
return request_headers_dict
def get_public_key(self):
"""Returns the public key object associated with the private key.
"""
pubkey = None
if isinstance(self.private_key, RSA.RsaKey):
pubkey = self.private_key.publickey()
elif isinstance(self.private_key, ECC.EccKey):
pubkey = self.private_key.public_key()
return pubkey
def _load_private_key(self):
"""Load the private key used to sign HTTP requests.
The private key is used to sign HTTP requests as defined in
https://datatracker.ietf.org/doc/draft-cavage-http-signatures/.
"""
if self.private_key is not None:
return
with open(self.private_key_path, 'r') as f:
pem_data = f.read()
# Verify PEM Pre-Encapsulation Boundary
r = re.compile(r"\s*-----BEGIN (.*)-----\s+")
m = r.match(pem_data)
if not m:
raise ValueError("Not a valid PEM pre boundary")
pem_header = m.group(1)
if pem_header == 'RSA PRIVATE KEY':
self.private_key = RSA.importKey(pem_data, self.private_key_passphrase)
elif pem_header == 'EC PRIVATE KEY':
self.private_key = ECC.import_key(pem_data, self.private_key_passphrase)
elif pem_header in {'PRIVATE KEY', 'ENCRYPTED PRIVATE KEY'}:
# Key is in PKCS8 format, which is capable of holding many different
# types of private keys, not just EC keys.
(key_binary, pem_header, is_encrypted) = \
PEM.decode(pem_data, self.private_key_passphrase)
(oid, privkey, params) = \
PKCS8.unwrap(key_binary, passphrase=self.private_key_passphrase)
if oid == '1.2.840.10045.2.1':
self.private_key = ECC.import_key(pem_data, self.private_key_passphrase)
else:
raise Exception("Unsupported key: {0}. OID: {1}".format(pem_header, oid))
else:
raise Exception("Unsupported key: {0}".format(pem_header))
# Validate the specified signature algorithm is compatible with the private key.
if self.signing_algorithm is not None:
supported_algs = None
if isinstance(self.private_key, RSA.RsaKey):
supported_algs = {ALGORITHM_RSASSA_PSS, ALGORITHM_RSASSA_PKCS1v15}
elif isinstance(self.private_key, ECC.EccKey):
supported_algs = ALGORITHM_ECDSA_KEY_SIGNING_ALGORITHMS
if supported_algs is not None and self.signing_algorithm not in supported_algs:
raise Exception(
"Signing algorithm {0} is not compatible with private key".format(
self.signing_algorithm))
def _get_unix_time(self, ts):
"""Converts and returns a datetime object to UNIX time, the number of seconds
elapsed since January 1, 1970 UTC.
"""
return (ts - datetime(1970, 1, 1)).total_seconds()
def _get_signed_header_info(self, resource_path, method, headers, body, query_params):
"""Build the HTTP headers (name, value) that need to be included in
the HTTP signature scheme.
:param resource_path : A string representation of the HTTP request resource path.
:param method: A string representation of the HTTP request method, e.g. GET, POST.
:param headers: A dict containing the HTTP request headers.
:param body: The object (e.g. a dict) representing the HTTP request body.
:param query_params: A string representing the HTTP request query parameters.
:return: A tuple containing two dict objects:
The first dict contains the HTTP headers that are used to calculate
the HTTP signature.
The second dict contains the HTTP headers that must be added to
the outbound HTTP request.
"""
if body is None:
body = ''
else:
body = json.dumps(body)
# Build the '(request-target)' HTTP signature parameter.
target_host = urlparse(self.host).netloc
target_path = urlparse(self.host).path
request_target = method.lower() + " " + target_path + resource_path
if query_params:
request_target += "?" + urlencode(query_params)
# Get current time and generate RFC 1123 (HTTP/1.1) date/time string.
now = datetime.now()
stamp = mktime(now.timetuple())
cdate = formatdate(timeval=stamp, localtime=False, usegmt=True)
# The '(created)' value MUST be a Unix timestamp integer value.
# Subsecond precision is not supported.
created = int(self._get_unix_time(now))
if self.signature_max_validity is not None:
expires = self._get_unix_time(now + self.signature_max_validity)
signed_headers_list = []
request_headers_dict = {}
for hdr_key in self.signed_headers:
hdr_key = hdr_key.lower()
if hdr_key == HEADER_REQUEST_TARGET:
value = request_target
elif hdr_key == HEADER_CREATED:
value = '{0}'.format(created)
elif hdr_key == HEADER_EXPIRES:
value = '{0}'.format(expires)
elif hdr_key == HEADER_DATE.lower():
value = cdate
request_headers_dict[HEADER_DATE] = '{0}'.format(cdate)
elif hdr_key == HEADER_DIGEST.lower():
request_body = body.encode()
body_digest, digest_prefix = self._get_message_digest(request_body)
b64_body_digest = b64encode(body_digest.digest())
value = digest_prefix + b64_body_digest.decode('ascii')
request_headers_dict[HEADER_DIGEST] = '{0}{1}'.format(
digest_prefix, b64_body_digest.decode('ascii'))
elif hdr_key == HEADER_HOST.lower():
value = target_host
request_headers_dict[HEADER_HOST] = '{0}'.format(target_host)
else:
value = next((v for k, v in headers.items() if k.lower() == hdr_key), None)
if value is None:
raise Exception(
"Cannot sign HTTP request. "
"Request does not contain the '{0}' header".format(hdr_key))
signed_headers_list.append((hdr_key, value))
return signed_headers_list, request_headers_dict
def _get_message_digest(self, data):
"""Calculates and returns a cryptographic digest of a specified HTTP request.
:param data: The string representation of the date to be hashed with a cryptographic hash.
:return: A tuple of (digest, prefix).
The digest is a hashing object that contains the cryptographic digest of
the HTTP request.
The prefix is a string that identifies the cryptographc hash. It is used
to generate the 'Digest' header as specified in RFC 3230.
"""
if self.signing_scheme in {SCHEME_RSA_SHA512, SCHEME_HS2019}:
digest = SHA512.new()
prefix = 'SHA-512='
elif self.signing_scheme == SCHEME_RSA_SHA256:
digest = SHA256.new()
prefix = 'SHA-256='
else:
raise Exception("Unsupported signing algorithm: {0}".format(self.signing_scheme))
digest.update(data)
return digest, prefix
def _sign_digest(self, digest):
"""Signs a message digest with a private key specified in the signing_info.
:param digest: A hashing object that contains the cryptographic digest of the HTTP request.
:return: A base-64 string representing the cryptographic signature of the input digest.
"""
sig_alg = self.signing_algorithm
if isinstance(self.private_key, RSA.RsaKey):
if sig_alg is None or sig_alg == ALGORITHM_RSASSA_PSS:
# RSASSA-PSS in Section 8.1 of RFC8017.
signature = pss.new(self.private_key).sign(digest)
elif sig_alg == ALGORITHM_RSASSA_PKCS1v15:
# RSASSA-PKCS1-v1_5 in Section 8.2 of RFC8017.
signature = PKCS1_v1_5.new(self.private_key).sign(digest)
else:
raise Exception("Unsupported signature algorithm: {0}".format(sig_alg))
elif isinstance(self.private_key, ECC.EccKey):
if sig_alg is None:
sig_alg = ALGORITHM_ECDSA_MODE_FIPS_186_3
if sig_alg in ALGORITHM_ECDSA_KEY_SIGNING_ALGORITHMS:
signature = DSS.new(self.private_key, sig_alg).sign(digest)
else:
raise Exception("Unsupported signature algorithm: {0}".format(sig_alg))
else:
raise Exception("Unsupported private key: {0}".format(type(self.private_key)))
return b64encode(signature)
def _get_authorization_header(self, signed_headers, signed_msg):
"""Calculates and returns the value of the 'Authorization' header when signing HTTP requests.
:param signed_headers : A list of tuples. Each value is the name of a HTTP header that
must be included in the HTTP signature calculation.
:param signed_msg: A base-64 encoded string representation of the signature.
:return: The string value of the 'Authorization' header, representing the signature
of the HTTP request.
"""
created_ts = None
expires_ts = None
for k, v in signed_headers:
if k == HEADER_CREATED:
created_ts = v
elif k == HEADER_EXPIRES:
expires_ts = v
lower_keys = [k.lower() for k, v in signed_headers]
headers_value = " ".join(lower_keys)
auth_str = "Signature keyId=\"{0}\",algorithm=\"{1}\",".format(
self.key_id, self.signing_scheme)
if created_ts is not None:
auth_str = auth_str + "created={0},".format(created_ts)
if expires_ts is not None:
auth_str = auth_str + "expires={0},".format(expires_ts)
auth_str = auth_str + "headers=\"{0}\",signature=\"{1}\"".format(
headers_value, signed_msg.decode('ascii'))
print("AUTH: {0}".format(auth_str))
return auth_str

View File

@@ -21,7 +21,14 @@ VERSION = "1.0.0"
# prerequisite: setuptools
# http://pypi.python.org/pypi/setuptools
REQUIRES = ["urllib3 >= 1.15", "six >= 1.10", "certifi", "python-dateutil"]
REQUIRES = [
"urllib3 >= 1.15",
"six >= 1.10",
"certifi",
"python-dateutil",
"pem>=19.3.0",
"pycryptodome>=3.9.0",
]
EXTRAS = {':python_version <= "2.7"': ['future']}
setup(

View File

@@ -1,4 +1,5 @@
pytest~=4.6.7 # needed for python 2.7+3.4
pytest-cov>=2.8.1
pytest-randomly==1.2.3 # needed for python 2.7+3.4
pycryptodome>=3.9.0
mock; python_version<="2.7"

View File

@@ -0,0 +1,501 @@
# coding: utf-8
# flake8: noqa
"""
Run the tests.
$ docker pull swaggerapi/petstore
$ docker run -d -e SWAGGER_HOST=http://petstore.swagger.io -e SWAGGER_BASE_PATH=/v2 -p 80:8080 swaggerapi/petstore
$ pip install nose (optional)
$ cd petstore_api-python
$ nosetests -v
"""
from collections import namedtuple
from datetime import datetime, timedelta
import base64
import json
import os
import re
import shutil
import unittest
from Crypto.Hash import SHA256, SHA512
from Crypto.PublicKey import ECC, RSA
from Crypto.Signature import pkcs1_15, pss, DSS
from six.moves.urllib.parse import urlencode, urlparse
import petstore_api
from petstore_api import Configuration, signing
from petstore_api.rest import (
RESTClientObject,
RESTResponse
)
import six
from petstore_api.exceptions import (
ApiException,
ApiValueError,
ApiTypeError,
)
from .util import id_gen
import urllib3
if six.PY3:
from unittest.mock import patch
else:
from mock import patch
HOST = 'http://localhost/v2'
# This test RSA private key below is published in Appendix C 'Test Values' of
# https://www.ietf.org/id/draft-cavage-http-signatures-12.txt
RSA_TEST_PRIVATE_KEY = """-----BEGIN RSA PRIVATE KEY-----
MIICXgIBAAKBgQDCFENGw33yGihy92pDjZQhl0C36rPJj+CvfSC8+q28hxA161QF
NUd13wuCTUcq0Qd2qsBe/2hFyc2DCJJg0h1L78+6Z4UMR7EOcpfdUE9Hf3m/hs+F
UR45uBJeDK1HSFHD8bHKD6kv8FPGfJTotc+2xjJwoYi+1hqp1fIekaxsyQIDAQAB
AoGBAJR8ZkCUvx5kzv+utdl7T5MnordT1TvoXXJGXK7ZZ+UuvMNUCdN2QPc4sBiA
QWvLw1cSKt5DsKZ8UETpYPy8pPYnnDEz2dDYiaew9+xEpubyeW2oH4Zx71wqBtOK
kqwrXa/pzdpiucRRjk6vE6YY7EBBs/g7uanVpGibOVAEsqH1AkEA7DkjVH28WDUg
f1nqvfn2Kj6CT7nIcE3jGJsZZ7zlZmBmHFDONMLUrXR/Zm3pR5m0tCmBqa5RK95u
412jt1dPIwJBANJT3v8pnkth48bQo/fKel6uEYyboRtA5/uHuHkZ6FQF7OUkGogc
mSJluOdc5t6hI1VsLn0QZEjQZMEOWr+wKSMCQQCC4kXJEsHAve77oP6HtG/IiEn7
kpyUXRNvFsDE0czpJJBvL/aRFUJxuRK91jhjC68sA7NsKMGg5OXb5I5Jj36xAkEA
gIT7aFOYBFwGgQAQkWNKLvySgKbAZRTeLBacpHMuQdl1DfdntvAyqpAZ0lY0RKmW
G6aFKaqQfOXKCyWoUiVknQJAXrlgySFci/2ueKlIE1QqIiLSZ8V8OlpFLRnb1pzI
7U1yQXnTAEFYM560yJlzUpOb1V4cScGd365tiSMvxLOvTA==
-----END RSA PRIVATE KEY-----"""
class TimeoutWithEqual(urllib3.Timeout):
def __init__(self, *arg, **kwargs):
super(TimeoutWithEqual, self).__init__(*arg, **kwargs)
def __eq__(self, other):
return self._read == other._read and self._connect == other._connect and self.total == other.total
class MockPoolManager(object):
def __init__(self, tc):
self._tc = tc
self._reqs = []
def expect_request(self, *args, **kwargs):
self._reqs.append((args, kwargs))
def set_signing_config(self, signing_cfg):
self.signing_cfg = signing_cfg
self._tc.assertIsNotNone(self.signing_cfg)
self.pubkey = self.signing_cfg.get_public_key()
self._tc.assertIsNotNone(self.pubkey)
def request(self, *actual_request_target, **actual_request_headers_and_body):
self._tc.assertTrue(len(self._reqs) > 0)
expected_results = self._reqs.pop(0)
self._tc.maxDiff = None
expected_request_target = expected_results[0] # The expected HTTP method and URL path.
expected_request_headers_and_body = expected_results[1] # dict that contains the expected body, headers
self._tc.assertEqual(expected_request_target, actual_request_target)
# actual_request_headers_and_body is a dict that contains the actual body, headers
for k, expected in expected_request_headers_and_body.items():
self._tc.assertIn(k, actual_request_headers_and_body)
if k == 'body':
actual_body = actual_request_headers_and_body[k]
self._tc.assertEqual(expected, actual_body)
elif k == 'headers':
actual_headers = actual_request_headers_and_body[k]
for expected_header_name, expected_header_value in expected.items():
# Validate the generated request contains the expected header.
self._tc.assertIn(expected_header_name, actual_headers)
actual_header_value = actual_headers[expected_header_name]
# Compare the actual value of the header against the expected value.
pattern = re.compile(expected_header_value)
m = pattern.match(actual_header_value)
self._tc.assertTrue(m, msg="Expected:\n{0}\nActual:\n{1}".format(
expected_header_value,actual_header_value))
if expected_header_name == 'Authorization':
self._validate_authorization_header(
expected_request_target, actual_headers, actual_header_value)
elif k == 'timeout':
self._tc.assertEqual(expected, actual_request_headers_and_body[k])
return urllib3.HTTPResponse(status=200, body=b'test')
def _validate_authorization_header(self, request_target, actual_headers, authorization_header):
"""Validate the signature.
"""
# Extract (created)
r1 = re.compile(r'created=([0-9]+)')
m1 = r1.search(authorization_header)
self._tc.assertIsNotNone(m1)
created = m1.group(1)
# Extract list of signed headers
r1 = re.compile(r'headers="([^"]+)"')
m1 = r1.search(authorization_header)
self._tc.assertIsNotNone(m1)
headers = m1.group(1).split(' ')
signed_headers_list = []
for h in headers:
if h == '(created)':
signed_headers_list.append((h, created))
elif h == '(request-target)':
url = request_target[1]
target_path = urlparse(url).path
signed_headers_list.append((h, "{0} {1}".format(request_target[0].lower(), target_path)))
else:
value = next((v for k, v in actual_headers.items() if k.lower() == h), None)
self._tc.assertIsNotNone(value)
signed_headers_list.append((h, value))
header_items = [
"{0}: {1}".format(key.lower(), value) for key, value in signed_headers_list]
string_to_sign = "\n".join(header_items)
digest = None
if self.signing_cfg.signing_scheme in {signing.SCHEME_RSA_SHA512, signing.SCHEME_HS2019}:
digest = SHA512.new()
elif self.signing_cfg.signing_scheme == signing.SCHEME_RSA_SHA256:
digest = SHA256.new()
else:
self._tc.fail("Unsupported signature scheme: {0}".format(self.signing_cfg.signing_scheme))
digest.update(string_to_sign.encode())
b64_body_digest = base64.b64encode(digest.digest()).decode()
# Extract the signature
r2 = re.compile(r'signature="([^"]+)"')
m2 = r2.search(authorization_header)
self._tc.assertIsNotNone(m2)
b64_signature = m2.group(1)
signature = base64.b64decode(b64_signature)
# Build the message
signing_alg = self.signing_cfg.signing_algorithm
if signing_alg is None:
# Determine default
if isinstance(self.pubkey, RSA.RsaKey):
signing_alg = signing.ALGORITHM_RSASSA_PSS
elif isinstance(self.pubkey, ECC.EccKey):
signing_alg = signing.ALGORITHM_ECDSA_MODE_FIPS_186_3
else:
self._tc.fail("Unsupported key: {0}".format(type(self.pubkey)))
if signing_alg == signing.ALGORITHM_RSASSA_PKCS1v15:
pkcs1_15.new(self.pubkey).verify(digest, signature)
elif signing_alg == signing.ALGORITHM_RSASSA_PSS:
pss.new(self.pubkey).verify(digest, signature)
elif signing_alg == signing.ALGORITHM_ECDSA_MODE_FIPS_186_3:
verifier = DSS.new(self.pubkey, signing.ALGORITHM_ECDSA_MODE_FIPS_186_3)
verifier.verify(digest, signature)
elif signing_alg == signing.ALGORITHM_ECDSA_MODE_DETERMINISTIC_RFC6979:
verifier = DSS.new(self.pubkey, signing.ALGORITHM_ECDSA_MODE_DETERMINISTIC_RFC6979)
verifier.verify(digest, signature)
else:
self._tc.fail("Unsupported signing algorithm: {0}".format(signing_alg))
class PetApiTests(unittest.TestCase):
def setUp(self):
self.setUpModels()
self.setUpFiles()
def setUpModels(self):
self.category = petstore_api.Category()
self.category.id = id_gen()
self.category.name = "dog"
self.tag = petstore_api.Tag()
self.tag.id = id_gen()
self.tag.name = "python-pet-tag"
self.pet = petstore_api.Pet(name="hello kity", photo_urls=["http://foo.bar.com/1", "http://foo.bar.com/2"])
self.pet.id = id_gen()
self.pet.status = "sold"
self.pet.category = self.category
self.pet.tags = [self.tag]
def setUpFiles(self):
self.test_file_dir = os.path.join(os.path.dirname(__file__), "..", "testfiles")
self.test_file_dir = os.path.realpath(self.test_file_dir)
if not os.path.exists(self.test_file_dir):
os.mkdir(self.test_file_dir)
self.private_key_passphrase = 'test-passphrase'
self.rsa_key_path = os.path.join(self.test_file_dir, 'rsa.pem')
self.rsa4096_key_path = os.path.join(self.test_file_dir, 'rsa4096.pem')
self.ec_p521_key_path = os.path.join(self.test_file_dir, 'ecP521.pem')
if not os.path.exists(self.rsa_key_path):
with open(self.rsa_key_path, 'w') as f:
f.write(RSA_TEST_PRIVATE_KEY)
if not os.path.exists(self.rsa4096_key_path):
key = RSA.generate(4096)
private_key = key.export_key(
passphrase=self.private_key_passphrase,
protection='PEM'
)
with open(self.rsa4096_key_path, "wb") as f:
f.write(private_key)
if not os.path.exists(self.ec_p521_key_path):
key = ECC.generate(curve='P-521')
private_key = key.export_key(
format='PEM',
passphrase=self.private_key_passphrase,
use_pkcs8=True,
protection='PBKDF2WithHMAC-SHA1AndAES128-CBC'
)
with open(self.ec_p521_key_path, "wt") as f:
f.write(private_key)
def test_valid_http_signature(self):
privkey_path = self.rsa_key_path
signing_cfg = signing.HttpSigningConfiguration(
key_id="my-key-id",
signing_scheme=signing.SCHEME_HS2019,
private_key_path=privkey_path,
private_key_passphrase=self.private_key_passphrase,
signing_algorithm=signing.ALGORITHM_RSASSA_PKCS1v15,
signed_headers=[
signing.HEADER_REQUEST_TARGET,
signing.HEADER_CREATED,
signing.HEADER_HOST,
signing.HEADER_DATE,
signing.HEADER_DIGEST,
'Content-Type'
]
)
config = Configuration(host=HOST, signing_info=signing_cfg)
# Set the OAuth2 acces_token to None. Here we are interested in testing
# the HTTP signature scheme.
config.access_token = None
api_client = petstore_api.ApiClient(config)
pet_api = petstore_api.PetApi(api_client)
mock_pool = MockPoolManager(self)
api_client.rest_client.pool_manager = mock_pool
mock_pool.set_signing_config(signing_cfg)
mock_pool.expect_request('POST', 'http://petstore.swagger.io/v2/pet',
body=json.dumps(api_client.sanitize_for_serialization(self.pet)),
headers={'Content-Type': r'application/json',
'Authorization': r'Signature keyId="my-key-id",algorithm="hs2019",created=[0-9]+,'
r'headers="\(request-target\) \(created\) host date digest content-type",'
r'signature="[a-zA-Z0-9+/]+="',
'User-Agent': r'OpenAPI-Generator/1.0.0/python'},
preload_content=True, timeout=None)
pet_api.add_pet(self.pet)
def test_valid_http_signature_with_defaults(self):
privkey_path = self.rsa4096_key_path
signing_cfg = signing.HttpSigningConfiguration(
key_id="my-key-id",
signing_scheme=signing.SCHEME_HS2019,
private_key_path=privkey_path,
private_key_passphrase=self.private_key_passphrase,
)
config = Configuration(host=HOST, signing_info=signing_cfg)
# Set the OAuth2 acces_token to None. Here we are interested in testing
# the HTTP signature scheme.
config.access_token = None
api_client = petstore_api.ApiClient(config)
pet_api = petstore_api.PetApi(api_client)
mock_pool = MockPoolManager(self)
api_client.rest_client.pool_manager = mock_pool
mock_pool.set_signing_config(signing_cfg)
mock_pool.expect_request('POST', 'http://petstore.swagger.io/v2/pet',
body=json.dumps(api_client.sanitize_for_serialization(self.pet)),
headers={'Content-Type': r'application/json',
'Authorization': r'Signature keyId="my-key-id",algorithm="hs2019",created=[0-9]+,'
r'headers="\(created\)",'
r'signature="[a-zA-Z0-9+/]+="',
'User-Agent': r'OpenAPI-Generator/1.0.0/python'},
preload_content=True, timeout=None)
pet_api.add_pet(self.pet)
def test_valid_http_signature_rsassa_pkcs1v15(self):
privkey_path = self.rsa4096_key_path
signing_cfg = signing.HttpSigningConfiguration(
key_id="my-key-id",
signing_scheme=signing.SCHEME_HS2019,
private_key_path=privkey_path,
private_key_passphrase=self.private_key_passphrase,
signing_algorithm=signing.ALGORITHM_RSASSA_PKCS1v15,
signed_headers=[
signing.HEADER_REQUEST_TARGET,
signing.HEADER_CREATED,
]
)
config = Configuration(host=HOST, signing_info=signing_cfg)
# Set the OAuth2 acces_token to None. Here we are interested in testing
# the HTTP signature scheme.
config.access_token = None
api_client = petstore_api.ApiClient(config)
pet_api = petstore_api.PetApi(api_client)
mock_pool = MockPoolManager(self)
api_client.rest_client.pool_manager = mock_pool
mock_pool.set_signing_config(signing_cfg)
mock_pool.expect_request('POST', 'http://petstore.swagger.io/v2/pet',
body=json.dumps(api_client.sanitize_for_serialization(self.pet)),
headers={'Content-Type': r'application/json',
'Authorization': r'Signature keyId="my-key-id",algorithm="hs2019",created=[0-9]+,'
r'headers="\(request-target\) \(created\)",'
r'signature="[a-zA-Z0-9+/]+="',
'User-Agent': r'OpenAPI-Generator/1.0.0/python'},
preload_content=True, timeout=None)
pet_api.add_pet(self.pet)
def test_valid_http_signature_rsassa_pss(self):
privkey_path = self.rsa4096_key_path
signing_cfg = signing.HttpSigningConfiguration(
key_id="my-key-id",
signing_scheme=signing.SCHEME_HS2019,
private_key_path=privkey_path,
private_key_passphrase=self.private_key_passphrase,
signing_algorithm=signing.ALGORITHM_RSASSA_PSS,
signed_headers=[
signing.HEADER_REQUEST_TARGET,
signing.HEADER_CREATED,
]
)
config = Configuration(host=HOST, signing_info=signing_cfg)
# Set the OAuth2 acces_token to None. Here we are interested in testing
# the HTTP signature scheme.
config.access_token = None
api_client = petstore_api.ApiClient(config)
pet_api = petstore_api.PetApi(api_client)
mock_pool = MockPoolManager(self)
api_client.rest_client.pool_manager = mock_pool
mock_pool.set_signing_config(signing_cfg)
mock_pool.expect_request('POST', 'http://petstore.swagger.io/v2/pet',
body=json.dumps(api_client.sanitize_for_serialization(self.pet)),
headers={'Content-Type': r'application/json',
'Authorization': r'Signature keyId="my-key-id",algorithm="hs2019",created=[0-9]+,'
r'headers="\(request-target\) \(created\)",'
r'signature="[a-zA-Z0-9+/]+="',
'User-Agent': r'OpenAPI-Generator/1.0.0/python'},
preload_content=True, timeout=None)
pet_api.add_pet(self.pet)
def test_valid_http_signature_ec_p521(self):
privkey_path = self.ec_p521_key_path
signing_cfg = signing.HttpSigningConfiguration(
key_id="my-key-id",
signing_scheme=signing.SCHEME_HS2019,
private_key_path=privkey_path,
private_key_passphrase=self.private_key_passphrase,
signed_headers=[
signing.HEADER_REQUEST_TARGET,
signing.HEADER_CREATED,
]
)
config = Configuration(host=HOST, signing_info=signing_cfg)
# Set the OAuth2 acces_token to None. Here we are interested in testing
# the HTTP signature scheme.
config.access_token = None
api_client = petstore_api.ApiClient(config)
pet_api = petstore_api.PetApi(api_client)
mock_pool = MockPoolManager(self)
api_client.rest_client.pool_manager = mock_pool
mock_pool.set_signing_config(signing_cfg)
mock_pool.expect_request('POST', 'http://petstore.swagger.io/v2/pet',
body=json.dumps(api_client.sanitize_for_serialization(self.pet)),
headers={'Content-Type': r'application/json',
'Authorization': r'Signature keyId="my-key-id",algorithm="hs2019",created=[0-9]+,'
r'headers="\(request-target\) \(created\)",'
r'signature="[a-zA-Z0-9+/]+"',
'User-Agent': r'OpenAPI-Generator/1.0.0/python'},
preload_content=True, timeout=None)
pet_api.add_pet(self.pet)
def test_invalid_configuration(self):
# Signing scheme must be valid.
with self.assertRaises(Exception) as cm:
signing_cfg = signing.HttpSigningConfiguration(
key_id="my-key-id",
signing_scheme='foo',
private_key_path=self.ec_p521_key_path
)
self.assertTrue(re.match('Unsupported security scheme', str(cm.exception)),
'Exception message: {0}'.format(str(cm.exception)))
# Signing scheme must be specified.
with self.assertRaises(Exception) as cm:
signing_cfg = signing.HttpSigningConfiguration(
key_id="my-key-id",
private_key_path=self.ec_p521_key_path,
signing_scheme=None
)
self.assertTrue(re.match('Unsupported security scheme', str(cm.exception)),
'Exception message: {0}'.format(str(cm.exception)))
# Private key passphrase is missing but key is encrypted.
with self.assertRaises(Exception) as cm:
signing_cfg = signing.HttpSigningConfiguration(
key_id="my-key-id",
signing_scheme=signing.SCHEME_HS2019,
private_key_path=self.ec_p521_key_path,
)
self.assertTrue(re.match('Not a valid clear PKCS#8 structure', str(cm.exception)),
'Exception message: {0}'.format(str(cm.exception)))
# File containing private key must exist.
with self.assertRaises(Exception) as cm:
signing_cfg = signing.HttpSigningConfiguration(
key_id="my-key-id",
signing_scheme=signing.SCHEME_HS2019,
private_key_path='foobar',
)
self.assertTrue(re.match('Private key file does not exist', str(cm.exception)),
'Exception message: {0}'.format(str(cm.exception)))
# The max validity must be a positive value.
with self.assertRaises(Exception) as cm:
signing_cfg = signing.HttpSigningConfiguration(
key_id="my-key-id",
signing_scheme=signing.SCHEME_HS2019,
private_key_path=self.ec_p521_key_path,
signature_max_validity=timedelta(hours=-1)
)
self.assertTrue(re.match('The signature max validity must be a positive value',
str(cm.exception)),
'Exception message: {0}'.format(str(cm.exception)))
# Cannot include the 'Authorization' header.
with self.assertRaises(Exception) as cm:
signing_cfg = signing.HttpSigningConfiguration(
key_id="my-key-id",
signing_scheme=signing.SCHEME_HS2019,
private_key_path=self.ec_p521_key_path,
signed_headers=['Authorization']
)
self.assertTrue(re.match("'Authorization' header cannot be included", str(cm.exception)),
'Exception message: {0}'.format(str(cm.exception)))
# Cannot specify duplicate headers.
with self.assertRaises(Exception) as cm:
signing_cfg = signing.HttpSigningConfiguration(
key_id="my-key-id",
signing_scheme=signing.SCHEME_HS2019,
private_key_path=self.ec_p521_key_path,
signed_headers=['Host', 'Date', 'Host']
)
self.assertTrue(re.match('Cannot have duplicates in the signed_headers parameter',
str(cm.exception)),
'Exception message: {0}'.format(str(cm.exception)))

View File

@@ -0,0 +1,8 @@
# flake8: noqa
import random
def id_gen(bits=32):
""" Returns a n-bit randomly generated int """
return int(random.getrandbits(bits))

View File

@@ -37,6 +37,8 @@ class Configuration(object):
The dict value is an API key prefix when generating the auth data.
:param username: Username for HTTP basic authentication
:param password: Password for HTTP basic authentication
:param signing_info: Configuration parameters for HTTP signature.
Must be an instance of petstore_api.signing.HttpSigningConfiguration
:Example:
@@ -55,11 +57,50 @@ class Configuration(object):
)
The following cookie will be added to the HTTP request:
Cookie: JSESSIONID abc123
Configure API client with HTTP basic authentication:
conf = petstore_api.Configuration(
username='the-user',
password='the-password',
)
Configure API client with HTTP signature authentication. Use the 'hs2019' signature scheme,
sign the HTTP requests with the RSA-SSA-PSS signature algorithm, and set the expiration time
of the signature to 5 minutes after the signature has been created.
Note you can use the constants defined in the petstore_api.signing module, and you can
also specify arbitrary HTTP headers to be included in the HTTP signature, except for the
'Authorization' header, which is used to carry the signature.
One may be tempted to sign all headers by default, but in practice it rarely works.
This is beccause explicit proxies, transparent proxies, TLS termination endpoints or
load balancers may add/modify/remove headers. Include the HTTP headers that you know
are not going to be modified in transit.
conf = petstore_api.Configuration(
signing_info = petstore_api.signing.HttpSigningConfiguration(
key_id = 'my-key-id',
private_key_path = 'rsa.pem',
signing_scheme = signing.SCHEME_HS2019,
signing_algorithm = signing.ALGORITHM_RSASSA_PSS,
signed_headers = [signing.HEADER_REQUEST_TARGET,
signing.HEADER_CREATED,
signing.HEADER_EXPIRES,
signing.HEADER_HOST,
signing.HEADER_DATE,
signing.HEADER_DIGEST,
'Content-Type',
'Content-Length',
'User-Agent'
],
signature_max_validity = datetime.timedelta(minutes=5)
)
)
"""
def __init__(self, host="http://petstore.swagger.io:80/v2",
api_key=None, api_key_prefix=None,
username=None, password=None):
username=None, password=None,
signing_info=None):
"""Constructor
"""
self.host = host
@@ -88,6 +129,11 @@ class Configuration(object):
self.password = password
"""Password for HTTP basic authentication
"""
if signing_info is not None:
signing_info.host = host
self.signing_info = signing_info
"""The HTTP signing configuration
"""
self.access_token = ""
"""access token for OAuth/Bearer
"""