[feature][python] Support aliasing of API keys (#6469)

* [python] Support aliasing of API keys

* Support for aliasing and prefix

* Make more realistic usage

* Regenerate

* Document alias in generated code

* Support override of aliased keys

* Use diferent id and name for api keys

* ensure up-to-date

* Simple example without x-auth-id-alias

* regenerate docs

* Regenerate

* Provide separate spec for x-auth-id-alias

* Apply suggestions from code review

* regenerated
This commit is contained in:
Jiri Kuncar
2020-06-11 18:19:57 +02:00
committed by GitHub
parent 0cb080d5f1
commit 046ba7de8e
38 changed files with 5152 additions and 30 deletions

View File

@@ -0,0 +1,6 @@
generatorName: python-experimental
outputDir: samples/openapi3/client/extensions/x-auth-id-alias/python-experimental/
inputSpec: modules/openapi-generator/src/test/resources/3_0/extensions/x-auth-id-alias.yaml
templateDir: modules/openapi-generator/src/main/resources/python
additionalProperties:
packageName: x_auth_id_alias

View File

@@ -423,15 +423,16 @@ conf = {{{packageName}}}.Configuration(
self.__logger_format = value
self.logger_formatter = logging.Formatter(self.__logger_format)
def get_api_key_with_prefix(self, identifier):
def get_api_key_with_prefix(self, identifier, alias=None):
"""Gets API key (with prefix if set).
:param identifier: The identifier of apiKey.
:param alias: The alternative identifier of apiKey.
:return: The token for api key authentication.
"""
if self.refresh_api_key_hook is not None:
self.refresh_api_key_hook(self)
key = self.api_key.get(identifier)
key = self.api_key.get(identifier, self.api_key.get(alias) if alias is not None else None)
if key:
prefix = self.api_key_prefix.get(identifier)
if prefix:
@@ -462,12 +463,15 @@ conf = {{{packageName}}}.Configuration(
auth = {}
{{#authMethods}}
{{#isApiKey}}
if '{{keyParamName}}' in self.api_key:
if '{{name}}' in self.api_key{{#vendorExtensions.x-auth-id-alias}} or '{{.}}' in self.api_key{{/vendorExtensions.x-auth-id-alias}}:
auth['{{name}}'] = {
'type': 'api_key',
'in': {{#isKeyInCookie}}'cookie'{{/isKeyInCookie}}{{#isKeyInHeader}}'header'{{/isKeyInHeader}}{{#isKeyInQuery}}'query'{{/isKeyInQuery}},
'key': '{{keyParamName}}',
'value': self.get_api_key_with_prefix('{{keyParamName}}')
'value': self.get_api_key_with_prefix(
'{{name}}',{{#vendorExtensions.x-auth-id-alias}}
alias='{{.}}',{{/vendorExtensions.x-auth-id-alias}}
),
}
{{/isApiKey}}
{{#isBasic}}

View File

@@ -95,11 +95,11 @@ configuration = {{{packageName}}}.Configuration(
configuration = {{{packageName}}}.Configuration(
host = "{{{basePath}}}",
api_key = {
'{{{keyParamName}}}': 'YOUR_API_KEY'
'{{name}}': 'YOUR_API_KEY'
}
)
# Uncomment below to setup prefix (e.g. Bearer) for API key, if needed
# configuration.api_key_prefix['{{{keyParamName}}}'] = 'Bearer'
# configuration.api_key_prefix['{{name}}'] = 'Bearer'
{{/isApiKey}}
{{#isOAuth}}

View File

@@ -0,0 +1,113 @@
openapi: 3.0.0
info:
description: This specification shows how to use x-auth-id-alias extension for API keys.
version: 1.0.0
title: OpenAPI Extension x-auth-id-alias
license:
name: Apache-2.0
url: 'https://www.apache.org/licenses/LICENSE-2.0.html'
tags:
- name: usage
description: Show usage of x-auth-id-alias
components:
securitySchemes:
api_key:
type: apiKey
name: X-Api-Key
in: header
api_key_query:
type: apiKey
name: api_key
# Test key aliasing
x-auth-id-alias: api_key
in: query
paths:
/both:
get:
tags:
- usage
summary: Use both API keys
description: Use both API keys
operationId: bothKeys
security:
- api_key_query: []
api_key: []
responses:
'200':
description: successful operation
content:
application/json:
schema:
type: object
/any:
get:
tags:
- usage
summary: Use any API key
description: Use any API key
operationId: anyKey
security:
- api_key_query: []
- api_key: []
responses:
'200':
description: successful operation
content:
application/json:
schema:
type: object
/query:
get:
tags:
- usage
summary: Use API key in query
description: Use API key in query
operationId: keyInQuery
security:
- api_key_query: []
responses:
'200':
description: successful operation
content:
application/json:
schema:
type: object
/header:
get:
tags:
- usage
summary: Use API key in header
description: Use API key in header
operationId: keyInHeader
security:
- api_key: []
responses:
'200':
description: successful operation
content:
application/json:
schema:
type: object
servers:
- url: 'http://{server}.swagger.io:{port}/v2'
description: petstore server
variables:
server:
enum:
- 'petstore'
- 'qa-petstore'
- 'dev-petstore'
default: 'petstore'
port:
enum:
- 80
- 8080
default: 80
- url: https://localhost:8080/{version}
description: The local server
variables:
version:
enum:
- 'v1'
- 'v2'
default: 'v2'

View File

@@ -341,15 +341,16 @@ conf = petstore_api.Configuration(
self.__logger_format = value
self.logger_formatter = logging.Formatter(self.__logger_format)
def get_api_key_with_prefix(self, identifier):
def get_api_key_with_prefix(self, identifier, alias=None):
"""Gets API key (with prefix if set).
:param identifier: The identifier of apiKey.
:param alias: The alternative identifier of apiKey.
:return: The token for api key authentication.
"""
if self.refresh_api_key_hook is not None:
self.refresh_api_key_hook(self)
key = self.api_key.get(identifier)
key = self.api_key.get(identifier, self.api_key.get(alias) if alias is not None else None)
if key:
prefix = self.api_key_prefix.get(identifier)
if prefix:
@@ -383,14 +384,18 @@ conf = petstore_api.Configuration(
'type': 'api_key',
'in': 'header',
'key': 'api_key',
'value': self.get_api_key_with_prefix('api_key')
'value': self.get_api_key_with_prefix(
'api_key',
),
}
if 'api_key_query' in self.api_key:
auth['api_key_query'] = {
'type': 'api_key',
'in': 'query',
'key': 'api_key_query',
'value': self.get_api_key_with_prefix('api_key_query')
'value': self.get_api_key_with_prefix(
'api_key_query',
),
}
if self.username is not None and self.password is not None:
auth['http_basic_test'] = {

View File

@@ -345,15 +345,16 @@ conf = petstore_api.Configuration(
self.__logger_format = value
self.logger_formatter = logging.Formatter(self.__logger_format)
def get_api_key_with_prefix(self, identifier):
def get_api_key_with_prefix(self, identifier, alias=None):
"""Gets API key (with prefix if set).
:param identifier: The identifier of apiKey.
:param alias: The alternative identifier of apiKey.
:return: The token for api key authentication.
"""
if self.refresh_api_key_hook is not None:
self.refresh_api_key_hook(self)
key = self.api_key.get(identifier)
key = self.api_key.get(identifier, self.api_key.get(alias) if alias is not None else None)
if key:
prefix = self.api_key_prefix.get(identifier)
if prefix:
@@ -387,14 +388,18 @@ conf = petstore_api.Configuration(
'type': 'api_key',
'in': 'header',
'key': 'api_key',
'value': self.get_api_key_with_prefix('api_key')
'value': self.get_api_key_with_prefix(
'api_key',
),
}
if 'api_key_query' in self.api_key:
auth['api_key_query'] = {
'type': 'api_key',
'in': 'query',
'key': 'api_key_query',
'value': self.get_api_key_with_prefix('api_key_query')
'value': self.get_api_key_with_prefix(
'api_key_query',
),
}
if self.username is not None and self.password is not None:
auth['http_basic_test'] = {

View File

@@ -345,15 +345,16 @@ conf = petstore_api.Configuration(
self.__logger_format = value
self.logger_formatter = logging.Formatter(self.__logger_format)
def get_api_key_with_prefix(self, identifier):
def get_api_key_with_prefix(self, identifier, alias=None):
"""Gets API key (with prefix if set).
:param identifier: The identifier of apiKey.
:param alias: The alternative identifier of apiKey.
:return: The token for api key authentication.
"""
if self.refresh_api_key_hook is not None:
self.refresh_api_key_hook(self)
key = self.api_key.get(identifier)
key = self.api_key.get(identifier, self.api_key.get(alias) if alias is not None else None)
if key:
prefix = self.api_key_prefix.get(identifier)
if prefix:
@@ -387,14 +388,18 @@ conf = petstore_api.Configuration(
'type': 'api_key',
'in': 'header',
'key': 'api_key',
'value': self.get_api_key_with_prefix('api_key')
'value': self.get_api_key_with_prefix(
'api_key',
),
}
if 'api_key_query' in self.api_key:
auth['api_key_query'] = {
'type': 'api_key',
'in': 'query',
'key': 'api_key_query',
'value': self.get_api_key_with_prefix('api_key_query')
'value': self.get_api_key_with_prefix(
'api_key_query',
),
}
if self.username is not None and self.password is not None:
auth['http_basic_test'] = {

View File

@@ -345,15 +345,16 @@ conf = petstore_api.Configuration(
self.__logger_format = value
self.logger_formatter = logging.Formatter(self.__logger_format)
def get_api_key_with_prefix(self, identifier):
def get_api_key_with_prefix(self, identifier, alias=None):
"""Gets API key (with prefix if set).
:param identifier: The identifier of apiKey.
:param alias: The alternative identifier of apiKey.
:return: The token for api key authentication.
"""
if self.refresh_api_key_hook is not None:
self.refresh_api_key_hook(self)
key = self.api_key.get(identifier)
key = self.api_key.get(identifier, self.api_key.get(alias) if alias is not None else None)
if key:
prefix = self.api_key_prefix.get(identifier)
if prefix:
@@ -387,14 +388,18 @@ conf = petstore_api.Configuration(
'type': 'api_key',
'in': 'header',
'key': 'api_key',
'value': self.get_api_key_with_prefix('api_key')
'value': self.get_api_key_with_prefix(
'api_key',
),
}
if 'api_key_query' in self.api_key:
auth['api_key_query'] = {
'type': 'api_key',
'in': 'query',
'key': 'api_key_query',
'value': self.get_api_key_with_prefix('api_key_query')
'value': self.get_api_key_with_prefix(
'api_key_query',
),
}
if self.username is not None and self.password is not None:
auth['http_basic_test'] = {

View File

@@ -0,0 +1,66 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
env/
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
*.egg-info/
.installed.cfg
*.egg
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*,cover
.hypothesis/
venv/
.venv/
.python-version
.pytest_cache
# Translations
*.mo
*.pot
# Django stuff:
*.log
# Sphinx documentation
docs/_build/
# PyBuilder
target/
#Ipython Notebook
.ipynb_checkpoints

View File

@@ -0,0 +1,33 @@
# ref: https://docs.gitlab.com/ee/ci/README.html
stages:
- test
.nosetest:
stage: test
script:
- pip install -r requirements.txt
- pip install -r test-requirements.txt
- pytest --cov=x_auth_id_alias
nosetest-2.7:
extends: .nosetest
image: python:2.7-alpine
nosetest-3.3:
extends: .nosetest
image: python:3.3-alpine
nosetest-3.4:
extends: .nosetest
image: python:3.4-alpine
nosetest-3.5:
extends: .nosetest
image: python:3.5-alpine
nosetest-3.6:
extends: .nosetest
image: python:3.6-alpine
nosetest-3.7:
extends: .nosetest
image: python:3.7-alpine
nosetest-3.8:
extends: .nosetest
image: python:3.8-alpine

View File

@@ -0,0 +1,23 @@
# OpenAPI Generator Ignore
# Generated by openapi-generator https://github.com/openapitools/openapi-generator
# Use this file to prevent files from being overwritten by the generator.
# The patterns follow closely to .gitignore or .dockerignore.
# As an example, the C# client generator defines ApiClient.cs.
# You can make changes and tell OpenAPI Generator to ignore just this file by uncommenting the following line:
#ApiClient.cs
# You can match any string of characters against a directory, file or extension with a single asterisk (*):
#foo/*/qux
# The above matches foo/bar/qux and foo/baz/qux, but not foo/bar/baz/qux
# You can recursively match patterns against a directory, file or extension with a double asterisk (**):
#foo/**/qux
# This matches foo/bar/qux, foo/baz/qux, and foo/bar/baz/qux
# You can also negate patterns with an exclamation (!).
# For example, you can ignore all files in a docs folder with the file extension .md:
#docs/*.md
# Then explicitly reverse the ignore rule for a single file:
#!docs/README.md

View File

@@ -0,0 +1,23 @@
.gitignore
.gitlab-ci.yml
.travis.yml
README.md
docs/UsageApi.md
git_push.sh
requirements.txt
setup.cfg
setup.py
test-requirements.txt
test/__init__.py
tox.ini
x_auth_id_alias/__init__.py
x_auth_id_alias/api/__init__.py
x_auth_id_alias/api/usage_api.py
x_auth_id_alias/api_client.py
x_auth_id_alias/apis/__init__.py
x_auth_id_alias/configuration.py
x_auth_id_alias/exceptions.py
x_auth_id_alias/model/__init__.py
x_auth_id_alias/model_utils.py
x_auth_id_alias/models/__init__.py
x_auth_id_alias/rest.py

View File

@@ -0,0 +1,17 @@
# ref: https://docs.travis-ci.com/user/languages/python
language: python
python:
- "2.7"
- "3.2"
- "3.3"
- "3.4"
- "3.5"
- "3.6"
- "3.7"
- "3.8"
# command to install dependencies
install:
- "pip install -r requirements.txt"
- "pip install -r test-requirements.txt"
# command to run tests
script: pytest --cov=x_auth_id_alias

View File

@@ -0,0 +1,155 @@
# x-auth-id-alias
This specification shows how to use x-auth-id-alias extension for API keys.
This Python package is automatically generated by the [OpenAPI Generator](https://openapi-generator.tech) project:
- API version: 1.0.0
- Package version: 1.0.0
- Build package: org.openapitools.codegen.languages.PythonClientExperimentalCodegen
## Requirements.
Python 2.7 and 3.4+
## Installation & Usage
### pip install
If the python package is hosted on a repository, you can install directly using:
```sh
pip install git+https://github.com/GIT_USER_ID/GIT_REPO_ID.git
```
(you may need to run `pip` with root permission: `sudo pip install git+https://github.com/GIT_USER_ID/GIT_REPO_ID.git`)
Then import the package:
```python
import x_auth_id_alias
```
### Setuptools
Install via [Setuptools](http://pypi.python.org/pypi/setuptools).
```sh
python setup.py install --user
```
(or `sudo python setup.py install` to install the package for all users)
Then import the package:
```python
import x_auth_id_alias
```
## Getting Started
Please follow the [installation procedure](#installation--usage) and then run the following:
```python
from __future__ import print_function
import time
import x_auth_id_alias
from pprint import pprint
from x_auth_id_alias.api import usage_api
# Defining the host is optional and defaults to http://petstore.swagger.io:80/v2
# See configuration.py for a list of all supported configuration parameters.
configuration = x_auth_id_alias.Configuration(
host = "http://petstore.swagger.io:80/v2"
)
# The client must configure the authentication and authorization parameters
# in accordance with the API server security policy.
# Examples for each auth method are provided below, use the example that
# satisfies your auth use case.
# Configure API key authorization: api_key
configuration = x_auth_id_alias.Configuration(
host = "http://petstore.swagger.io:80/v2",
api_key = {
'api_key': 'YOUR_API_KEY'
}
)
# Uncomment below to setup prefix (e.g. Bearer) for API key, if needed
# configuration.api_key_prefix['api_key'] = 'Bearer'
# Configure API key authorization: api_key_query
configuration = x_auth_id_alias.Configuration(
host = "http://petstore.swagger.io:80/v2",
api_key = {
'api_key_query': 'YOUR_API_KEY'
}
)
# Uncomment below to setup prefix (e.g. Bearer) for API key, if needed
# configuration.api_key_prefix['api_key_query'] = 'Bearer'
# Enter a context with an instance of the API client
with x_auth_id_alias.ApiClient(configuration) as api_client:
# Create an instance of the API class
api_instance = usage_api.UsageApi(api_client)
try:
# Use any API key
api_response = api_instance.any_key()
pprint(api_response)
except x_auth_id_alias.ApiException as e:
print("Exception when calling UsageApi->any_key: %s\n" % e)
```
## Documentation for API Endpoints
All URIs are relative to *http://petstore.swagger.io:80/v2*
Class | Method | HTTP request | Description
------------ | ------------- | ------------- | -------------
*UsageApi* | [**any_key**](docs/UsageApi.md#any_key) | **GET** /any | Use any API key
*UsageApi* | [**both_keys**](docs/UsageApi.md#both_keys) | **GET** /both | Use both API keys
*UsageApi* | [**key_in_header**](docs/UsageApi.md#key_in_header) | **GET** /header | Use API key in header
*UsageApi* | [**key_in_query**](docs/UsageApi.md#key_in_query) | **GET** /query | Use API key in query
## Documentation For Models
## Documentation For Authorization
## api_key
- **Type**: API key
- **API key parameter name**: X-Api-Key
- **Location**: HTTP header
## api_key_query
- **Type**: API key
- **API key parameter name**: api_key
- **Location**: URL query string
## Author
## Notes for Large OpenAPI documents
If the OpenAPI document is large, imports in x_auth_id_alias.apis and x_auth_id_alias.models may fail with a
RecursionError indicating the maximum recursion limit has been exceeded. In that case, there are a couple of solutions:
Solution 1:
Use specific imports for apis and models like:
- `from x_auth_id_alias.api.default_api import DefaultApi`
- `from x_auth_id_alias.model.pet import Pet`
Solution 1:
Before importing the package, adjust the maximum recursion limit as shown below:
```
import sys
sys.setrecursionlimit(1500)
import x_auth_id_alias
from x_auth_id_alias.apis import *
from x_auth_id_alias.models import *
```

View File

@@ -0,0 +1,330 @@
# x_auth_id_alias.UsageApi
All URIs are relative to *http://petstore.swagger.io:80/v2*
Method | HTTP request | Description
------------- | ------------- | -------------
[**any_key**](UsageApi.md#any_key) | **GET** /any | Use any API key
[**both_keys**](UsageApi.md#both_keys) | **GET** /both | Use both API keys
[**key_in_header**](UsageApi.md#key_in_header) | **GET** /header | Use API key in header
[**key_in_query**](UsageApi.md#key_in_query) | **GET** /query | Use API key in query
# **any_key**
> {str: (bool, date, datetime, dict, float, int, list, str, none_type)} any_key()
Use any API key
Use any API key
### Example
* Api Key Authentication (api_key):
* Api Key Authentication (api_key_query):
```python
from __future__ import print_function
import time
import x_auth_id_alias
from x_auth_id_alias.api import usage_api
from pprint import pprint
# Defining the host is optional and defaults to http://petstore.swagger.io:80/v2
# See configuration.py for a list of all supported configuration parameters.
configuration = x_auth_id_alias.Configuration(
host = "http://petstore.swagger.io:80/v2"
)
# The client must configure the authentication and authorization parameters
# in accordance with the API server security policy.
# Examples for each auth method are provided below, use the example that
# satisfies your auth use case.
# Configure API key authorization: api_key
configuration = x_auth_id_alias.Configuration(
host = "http://petstore.swagger.io:80/v2",
api_key = {
'api_key': 'YOUR_API_KEY'
}
)
# Uncomment below to setup prefix (e.g. Bearer) for API key, if needed
# configuration.api_key_prefix['api_key'] = 'Bearer'
# Configure API key authorization: api_key_query
configuration = x_auth_id_alias.Configuration(
host = "http://petstore.swagger.io:80/v2",
api_key = {
'api_key_query': 'YOUR_API_KEY'
}
)
# Uncomment below to setup prefix (e.g. Bearer) for API key, if needed
# configuration.api_key_prefix['api_key_query'] = 'Bearer'
# Enter a context with an instance of the API client
with x_auth_id_alias.ApiClient(configuration) as api_client:
# Create an instance of the API class
api_instance = usage_api.UsageApi(api_client)
# example, this endpoint has no required or optional parameters
try:
# Use any API key
api_response = api_instance.any_key()
pprint(api_response)
except x_auth_id_alias.ApiException as e:
print("Exception when calling UsageApi->any_key: %s\n" % e)
```
### Parameters
This endpoint does not need any parameter.
### Return type
**{str: (bool, date, datetime, dict, float, int, list, str, none_type)}**
### Authorization
[api_key](../README.md#api_key), [api_key_query](../README.md#api_key_query)
### HTTP request headers
- **Content-Type**: Not defined
- **Accept**: application/json
### HTTP response details
| Status code | Description | Response headers |
|-------------|-------------|------------------|
**200** | successful operation | - |
[[Back to top]](#) [[Back to API list]](../README.md#documentation-for-api-endpoints) [[Back to Model list]](../README.md#documentation-for-models) [[Back to README]](../README.md)
# **both_keys**
> {str: (bool, date, datetime, dict, float, int, list, str, none_type)} both_keys()
Use both API keys
Use both API keys
### Example
* Api Key Authentication (api_key):
* Api Key Authentication (api_key_query):
```python
from __future__ import print_function
import time
import x_auth_id_alias
from x_auth_id_alias.api import usage_api
from pprint import pprint
# Defining the host is optional and defaults to http://petstore.swagger.io:80/v2
# See configuration.py for a list of all supported configuration parameters.
configuration = x_auth_id_alias.Configuration(
host = "http://petstore.swagger.io:80/v2"
)
# The client must configure the authentication and authorization parameters
# in accordance with the API server security policy.
# Examples for each auth method are provided below, use the example that
# satisfies your auth use case.
# Configure API key authorization: api_key
configuration = x_auth_id_alias.Configuration(
host = "http://petstore.swagger.io:80/v2",
api_key = {
'api_key': 'YOUR_API_KEY'
}
)
# Uncomment below to setup prefix (e.g. Bearer) for API key, if needed
# configuration.api_key_prefix['api_key'] = 'Bearer'
# Configure API key authorization: api_key_query
configuration = x_auth_id_alias.Configuration(
host = "http://petstore.swagger.io:80/v2",
api_key = {
'api_key_query': 'YOUR_API_KEY'
}
)
# Uncomment below to setup prefix (e.g. Bearer) for API key, if needed
# configuration.api_key_prefix['api_key_query'] = 'Bearer'
# Enter a context with an instance of the API client
with x_auth_id_alias.ApiClient(configuration) as api_client:
# Create an instance of the API class
api_instance = usage_api.UsageApi(api_client)
# example, this endpoint has no required or optional parameters
try:
# Use both API keys
api_response = api_instance.both_keys()
pprint(api_response)
except x_auth_id_alias.ApiException as e:
print("Exception when calling UsageApi->both_keys: %s\n" % e)
```
### Parameters
This endpoint does not need any parameter.
### Return type
**{str: (bool, date, datetime, dict, float, int, list, str, none_type)}**
### Authorization
[api_key](../README.md#api_key), [api_key_query](../README.md#api_key_query)
### HTTP request headers
- **Content-Type**: Not defined
- **Accept**: application/json
### HTTP response details
| Status code | Description | Response headers |
|-------------|-------------|------------------|
**200** | successful operation | - |
[[Back to top]](#) [[Back to API list]](../README.md#documentation-for-api-endpoints) [[Back to Model list]](../README.md#documentation-for-models) [[Back to README]](../README.md)
# **key_in_header**
> {str: (bool, date, datetime, dict, float, int, list, str, none_type)} key_in_header()
Use API key in header
Use API key in header
### Example
* Api Key Authentication (api_key):
```python
from __future__ import print_function
import time
import x_auth_id_alias
from x_auth_id_alias.api import usage_api
from pprint import pprint
# Defining the host is optional and defaults to http://petstore.swagger.io:80/v2
# See configuration.py for a list of all supported configuration parameters.
configuration = x_auth_id_alias.Configuration(
host = "http://petstore.swagger.io:80/v2"
)
# The client must configure the authentication and authorization parameters
# in accordance with the API server security policy.
# Examples for each auth method are provided below, use the example that
# satisfies your auth use case.
# Configure API key authorization: api_key
configuration = x_auth_id_alias.Configuration(
host = "http://petstore.swagger.io:80/v2",
api_key = {
'api_key': 'YOUR_API_KEY'
}
)
# Uncomment below to setup prefix (e.g. Bearer) for API key, if needed
# configuration.api_key_prefix['api_key'] = 'Bearer'
# Enter a context with an instance of the API client
with x_auth_id_alias.ApiClient(configuration) as api_client:
# Create an instance of the API class
api_instance = usage_api.UsageApi(api_client)
# example, this endpoint has no required or optional parameters
try:
# Use API key in header
api_response = api_instance.key_in_header()
pprint(api_response)
except x_auth_id_alias.ApiException as e:
print("Exception when calling UsageApi->key_in_header: %s\n" % e)
```
### Parameters
This endpoint does not need any parameter.
### Return type
**{str: (bool, date, datetime, dict, float, int, list, str, none_type)}**
### Authorization
[api_key](../README.md#api_key)
### HTTP request headers
- **Content-Type**: Not defined
- **Accept**: application/json
### HTTP response details
| Status code | Description | Response headers |
|-------------|-------------|------------------|
**200** | successful operation | - |
[[Back to top]](#) [[Back to API list]](../README.md#documentation-for-api-endpoints) [[Back to Model list]](../README.md#documentation-for-models) [[Back to README]](../README.md)
# **key_in_query**
> {str: (bool, date, datetime, dict, float, int, list, str, none_type)} key_in_query()
Use API key in query
Use API key in query
### Example
* Api Key Authentication (api_key_query):
```python
from __future__ import print_function
import time
import x_auth_id_alias
from x_auth_id_alias.api import usage_api
from pprint import pprint
# Defining the host is optional and defaults to http://petstore.swagger.io:80/v2
# See configuration.py for a list of all supported configuration parameters.
configuration = x_auth_id_alias.Configuration(
host = "http://petstore.swagger.io:80/v2"
)
# The client must configure the authentication and authorization parameters
# in accordance with the API server security policy.
# Examples for each auth method are provided below, use the example that
# satisfies your auth use case.
# Configure API key authorization: api_key_query
configuration = x_auth_id_alias.Configuration(
host = "http://petstore.swagger.io:80/v2",
api_key = {
'api_key_query': 'YOUR_API_KEY'
}
)
# Uncomment below to setup prefix (e.g. Bearer) for API key, if needed
# configuration.api_key_prefix['api_key_query'] = 'Bearer'
# Enter a context with an instance of the API client
with x_auth_id_alias.ApiClient(configuration) as api_client:
# Create an instance of the API class
api_instance = usage_api.UsageApi(api_client)
# example, this endpoint has no required or optional parameters
try:
# Use API key in query
api_response = api_instance.key_in_query()
pprint(api_response)
except x_auth_id_alias.ApiException as e:
print("Exception when calling UsageApi->key_in_query: %s\n" % e)
```
### Parameters
This endpoint does not need any parameter.
### Return type
**{str: (bool, date, datetime, dict, float, int, list, str, none_type)}**
### Authorization
[api_key_query](../README.md#api_key_query)
### HTTP request headers
- **Content-Type**: Not defined
- **Accept**: application/json
### HTTP response details
| Status code | Description | Response headers |
|-------------|-------------|------------------|
**200** | successful operation | - |
[[Back to top]](#) [[Back to API list]](../README.md#documentation-for-api-endpoints) [[Back to Model list]](../README.md#documentation-for-models) [[Back to README]](../README.md)

View File

@@ -0,0 +1,58 @@
#!/bin/sh
# ref: https://help.github.com/articles/adding-an-existing-project-to-github-using-the-command-line/
#
# Usage example: /bin/sh ./git_push.sh wing328 openapi-pestore-perl "minor update" "gitlab.com"
git_user_id=$1
git_repo_id=$2
release_note=$3
git_host=$4
if [ "$git_host" = "" ]; then
git_host="github.com"
echo "[INFO] No command line input provided. Set \$git_host to $git_host"
fi
if [ "$git_user_id" = "" ]; then
git_user_id="GIT_USER_ID"
echo "[INFO] No command line input provided. Set \$git_user_id to $git_user_id"
fi
if [ "$git_repo_id" = "" ]; then
git_repo_id="GIT_REPO_ID"
echo "[INFO] No command line input provided. Set \$git_repo_id to $git_repo_id"
fi
if [ "$release_note" = "" ]; then
release_note="Minor update"
echo "[INFO] No command line input provided. Set \$release_note to $release_note"
fi
# Initialize the local directory as a Git repository
git init
# Adds the files in the local repository and stages them for commit.
git add .
# Commits the tracked changes and prepares them to be pushed to a remote repository.
git commit -m "$release_note"
# Sets the new remote
git_remote=`git remote`
if [ "$git_remote" = "" ]; then # git remote not defined
if [ "$GIT_TOKEN" = "" ]; then
echo "[INFO] \$GIT_TOKEN (environment variable) is not set. Using the git credential in your environment."
git remote add origin https://${git_host}/${git_user_id}/${git_repo_id}.git
else
git remote add origin https://${git_user_id}:${GIT_TOKEN}@${git_host}/${git_user_id}/${git_repo_id}.git
fi
fi
git pull origin master
# Pushes (Forces) the changes in the local repository up to the remote repository
echo "Git pushing to https://${git_host}/${git_user_id}/${git_repo_id}.git"
git push origin master 2>&1 | grep -v 'To https'

View File

@@ -0,0 +1,7 @@
nulltype
certifi >= 14.05.14
future; python_version<="2.7"
six >= 1.10
python_dateutil >= 2.5.3
setuptools >= 21.0.0
urllib3 >= 1.15.1

View File

@@ -0,0 +1,2 @@
[flake8]
max-line-length=99

View File

@@ -0,0 +1,49 @@
# coding: utf-8
"""
OpenAPI Extension x-auth-id-alias
This specification shows how to use x-auth-id-alias extension for API keys. # noqa: E501
The version of the OpenAPI document: 1.0.0
Generated by: https://openapi-generator.tech
"""
from setuptools import setup, find_packages # noqa: H301
NAME = "x-auth-id-alias"
VERSION = "1.0.0"
# To install the library, run the following
#
# python setup.py install
#
# prerequisite: setuptools
# http://pypi.python.org/pypi/setuptools
REQUIRES = [
"urllib3 >= 1.15",
"six >= 1.10",
"certifi",
"python-dateutil",
"nulltype",
]
EXTRAS = {':python_version <= "2.7"': ['future']}
setup(
name=NAME,
version=VERSION,
description="OpenAPI Extension x-auth-id-alias",
author="OpenAPI Generator community",
author_email="team@openapitools.org",
url="",
keywords=["OpenAPI", "OpenAPI-Generator", "OpenAPI Extension x-auth-id-alias"],
install_requires=REQUIRES,
extras_require=EXTRAS,
packages=find_packages(exclude=["test", "tests"]),
include_package_data=True,
license="Apache-2.0",
long_description="""\
This specification shows how to use x-auth-id-alias extension for API keys. # noqa: E501
"""
)

View File

@@ -0,0 +1,4 @@
pytest~=4.6.7 # needed for python 2.7+3.4
pytest-cov>=2.8.1
pytest-randomly==1.2.3 # needed for python 2.7+3.4
mock; python_version<="2.7"

View File

@@ -0,0 +1,60 @@
# coding: utf-8
"""
OpenAPI Extension x-auth-id-alias
This specification shows how to use x-auth-id-alias extension for API keys. # noqa: E501
The version of the OpenAPI document: 1.0.0
Generated by: https://openapi-generator.tech
"""
from __future__ import absolute_import
import unittest
import x_auth_id_alias
from x_auth_id_alias.api.usage_api import UsageApi # noqa: E501
class TestUsageApi(unittest.TestCase):
"""UsageApi unit test stubs"""
def setUp(self):
self.api = UsageApi() # noqa: E501
def tearDown(self):
pass
def test_any_key(self):
"""Test case for any_key
Use any API key # noqa: E501
"""
pass
def test_both_keys(self):
"""Test case for both_keys
Use both API keys # noqa: E501
"""
pass
def test_key_in_header(self):
"""Test case for key_in_header
Use API key in header # noqa: E501
"""
pass
def test_key_in_query(self):
"""Test case for key_in_query
Use API key in query # noqa: E501
"""
pass
if __name__ == '__main__':
unittest.main()

View File

@@ -0,0 +1,104 @@
# coding: utf-8
"""
Test usage of x-auth-id-alias.
"""
from __future__ import absolute_import
import unittest
import x_auth_id_alias
from x_auth_id_alias.api.usage_api import UsageApi
class TestUsageApi(unittest.TestCase):
"""UsageApi unit test stubs"""
def setUp(self):
self.configuration = x_auth_id_alias.Configuration(
host="http://localhost/",
api_key={"api_key": "SECRET_VALUE"},
api_key_prefix={"api_key": "PREFIX"},
)
self.client = x_auth_id_alias.ApiClient(self.configuration)
self.api = UsageApi(self.client)
def tearDown(self):
self.client.close()
def test_any_key(self):
"""Test case for any_key
Use any API key # noqa: E501
"""
def request(*args, **kwargs):
assert ("api_key", "SECRET_VALUE") in kwargs["query_params"]
assert "PREFIX SECRET_VALUE" == kwargs["headers"]["X-Api-Key"]
raise RuntimeError("passed")
self.client.request = request
try:
self.api.any_key()
except RuntimeError as e:
assert "passed" == str(e)
def test_both_keys(self):
"""Test case for both_keys
Use both API keys # noqa: E501
"""
def request(*args, **kwargs):
assert ("api_key", "SECRET_VALUE") in kwargs["query_params"]
assert "PREFIX SECRET_VALUE" == kwargs["headers"]["X-Api-Key"]
raise RuntimeError("passed")
self.client.request = request
try:
self.api.both_keys()
except RuntimeError as e:
assert "passed" == str(e)
def test_key_in_header(self):
"""Test case for key_in_header
Use API key in header # noqa: E501
"""
def request(*args, **kwargs):
assert ("api_key", "SECRET_VALUE") not in kwargs["query_params"]
assert "PREFIX SECRET_VALUE" == kwargs["headers"]["X-Api-Key"]
raise RuntimeError("passed")
self.client.request = request
try:
self.api.key_in_header()
except RuntimeError as e:
assert "passed" == str(e)
def test_key_in_query(self):
"""Test case for key_in_query
Use API key in query # noqa: E501
"""
def request(*args, **kwargs):
assert ("api_key", "SECRET_VALUE") in kwargs["query_params"]
assert "X-Api-Key" not in kwargs["headers"]
raise RuntimeError("passed")
self.client.request = request
try:
self.api.key_in_query()
except RuntimeError as e:
assert "passed" == str(e)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,9 @@
[tox]
envlist = py27, py3
[testenv]
deps=-r{toxinidir}/requirements.txt
-r{toxinidir}/test-requirements.txt
commands=
pytest --cov=x_auth_id_alias

View File

@@ -0,0 +1,31 @@
# coding: utf-8
# flake8: noqa
"""
OpenAPI Extension x-auth-id-alias
This specification shows how to use x-auth-id-alias extension for API keys. # noqa: E501
The version of the OpenAPI document: 1.0.0
Generated by: https://openapi-generator.tech
"""
from __future__ import absolute_import
__version__ = "1.0.0"
# import ApiClient
from x_auth_id_alias.api_client import ApiClient
# import Configuration
from x_auth_id_alias.configuration import Configuration
# import exceptions
from x_auth_id_alias.exceptions import OpenApiException
from x_auth_id_alias.exceptions import ApiAttributeError
from x_auth_id_alias.exceptions import ApiTypeError
from x_auth_id_alias.exceptions import ApiValueError
from x_auth_id_alias.exceptions import ApiKeyError
from x_auth_id_alias.exceptions import ApiException

View File

@@ -0,0 +1,6 @@
from __future__ import absolute_import
# flake8: noqa
# import apis into api package
from x_auth_id_alias.api.usage_api import UsageApi

View File

@@ -0,0 +1,699 @@
# coding: utf-8
"""
OpenAPI Extension x-auth-id-alias
This specification shows how to use x-auth-id-alias extension for API keys. # noqa: E501
The version of the OpenAPI document: 1.0.0
Generated by: https://openapi-generator.tech
"""
from __future__ import absolute_import
import re # noqa: F401
import sys # noqa: F401
# python 2 and python 3 compatibility library
import six
from x_auth_id_alias.api_client import ApiClient
from x_auth_id_alias.exceptions import (
ApiTypeError,
ApiValueError
)
from x_auth_id_alias.model_utils import ( # noqa: F401
check_allowed_values,
check_validations,
date,
datetime,
file_type,
int,
none_type,
str,
validate_and_convert_types
)
class UsageApi(object):
"""NOTE: This class is auto generated by OpenAPI Generator
Ref: https://openapi-generator.tech
Do not edit the class manually.
"""
def __init__(self, api_client=None):
if api_client is None:
api_client = ApiClient()
self.api_client = api_client
def __any_key(
self,
**kwargs
):
"""Use any API key # noqa: E501
Use any API key # noqa: E501
This method makes a synchronous HTTP request by default. To make an
asynchronous HTTP request, please pass async_req=True
>>> thread = api.any_key(async_req=True)
>>> result = thread.get()
Keyword Args:
_return_http_data_only (bool): response data without head status
code and headers. Default is True.
_preload_content (bool): if False, the urllib3.HTTPResponse object
will be returned without reading/decoding response data.
Default is True.
_request_timeout (float/tuple): timeout setting for this request. If one
number provided, it will be total request timeout. It can also
be a pair (tuple) of (connection, read) timeouts.
Default is None.
_check_input_type (bool): specifies if type checking
should be done one the data sent to the server.
Default is True.
_check_return_type (bool): specifies if type checking
should be done one the data received from the server.
Default is True.
_host_index (int): specifies the index of the server
that we want to use.
Default is 0.
async_req (bool): execute request asynchronously
Returns:
{str: (bool, date, datetime, dict, float, int, list, str, none_type)}
If the method is called asynchronously, returns the request
thread.
"""
kwargs['async_req'] = kwargs.get(
'async_req', False
)
kwargs['_return_http_data_only'] = kwargs.get(
'_return_http_data_only', True
)
kwargs['_preload_content'] = kwargs.get(
'_preload_content', True
)
kwargs['_request_timeout'] = kwargs.get(
'_request_timeout', None
)
kwargs['_check_input_type'] = kwargs.get(
'_check_input_type', True
)
kwargs['_check_return_type'] = kwargs.get(
'_check_return_type', True
)
kwargs['_host_index'] = kwargs.get('_host_index', 0)
return self.call_with_http_info(**kwargs)
self.any_key = Endpoint(
settings={
'response_type': ({str: (bool, date, datetime, dict, float, int, list, str, none_type)},),
'auth': [
'api_key',
'api_key_query'
],
'endpoint_path': '/any',
'operation_id': 'any_key',
'http_method': 'GET',
'servers': [],
},
params_map={
'all': [
],
'required': [],
'nullable': [
],
'enum': [
],
'validation': [
]
},
root_map={
'validations': {
},
'allowed_values': {
},
'openapi_types': {
},
'attribute_map': {
},
'location_map': {
},
'collection_format_map': {
}
},
headers_map={
'accept': [
'application/json'
],
'content_type': [],
},
api_client=api_client,
callable=__any_key
)
def __both_keys(
self,
**kwargs
):
"""Use both API keys # noqa: E501
Use both API keys # noqa: E501
This method makes a synchronous HTTP request by default. To make an
asynchronous HTTP request, please pass async_req=True
>>> thread = api.both_keys(async_req=True)
>>> result = thread.get()
Keyword Args:
_return_http_data_only (bool): response data without head status
code and headers. Default is True.
_preload_content (bool): if False, the urllib3.HTTPResponse object
will be returned without reading/decoding response data.
Default is True.
_request_timeout (float/tuple): timeout setting for this request. If one
number provided, it will be total request timeout. It can also
be a pair (tuple) of (connection, read) timeouts.
Default is None.
_check_input_type (bool): specifies if type checking
should be done one the data sent to the server.
Default is True.
_check_return_type (bool): specifies if type checking
should be done one the data received from the server.
Default is True.
_host_index (int): specifies the index of the server
that we want to use.
Default is 0.
async_req (bool): execute request asynchronously
Returns:
{str: (bool, date, datetime, dict, float, int, list, str, none_type)}
If the method is called asynchronously, returns the request
thread.
"""
kwargs['async_req'] = kwargs.get(
'async_req', False
)
kwargs['_return_http_data_only'] = kwargs.get(
'_return_http_data_only', True
)
kwargs['_preload_content'] = kwargs.get(
'_preload_content', True
)
kwargs['_request_timeout'] = kwargs.get(
'_request_timeout', None
)
kwargs['_check_input_type'] = kwargs.get(
'_check_input_type', True
)
kwargs['_check_return_type'] = kwargs.get(
'_check_return_type', True
)
kwargs['_host_index'] = kwargs.get('_host_index', 0)
return self.call_with_http_info(**kwargs)
self.both_keys = Endpoint(
settings={
'response_type': ({str: (bool, date, datetime, dict, float, int, list, str, none_type)},),
'auth': [
'api_key',
'api_key_query'
],
'endpoint_path': '/both',
'operation_id': 'both_keys',
'http_method': 'GET',
'servers': [],
},
params_map={
'all': [
],
'required': [],
'nullable': [
],
'enum': [
],
'validation': [
]
},
root_map={
'validations': {
},
'allowed_values': {
},
'openapi_types': {
},
'attribute_map': {
},
'location_map': {
},
'collection_format_map': {
}
},
headers_map={
'accept': [
'application/json'
],
'content_type': [],
},
api_client=api_client,
callable=__both_keys
)
def __key_in_header(
self,
**kwargs
):
"""Use API key in header # noqa: E501
Use API key in header # noqa: E501
This method makes a synchronous HTTP request by default. To make an
asynchronous HTTP request, please pass async_req=True
>>> thread = api.key_in_header(async_req=True)
>>> result = thread.get()
Keyword Args:
_return_http_data_only (bool): response data without head status
code and headers. Default is True.
_preload_content (bool): if False, the urllib3.HTTPResponse object
will be returned without reading/decoding response data.
Default is True.
_request_timeout (float/tuple): timeout setting for this request. If one
number provided, it will be total request timeout. It can also
be a pair (tuple) of (connection, read) timeouts.
Default is None.
_check_input_type (bool): specifies if type checking
should be done one the data sent to the server.
Default is True.
_check_return_type (bool): specifies if type checking
should be done one the data received from the server.
Default is True.
_host_index (int): specifies the index of the server
that we want to use.
Default is 0.
async_req (bool): execute request asynchronously
Returns:
{str: (bool, date, datetime, dict, float, int, list, str, none_type)}
If the method is called asynchronously, returns the request
thread.
"""
kwargs['async_req'] = kwargs.get(
'async_req', False
)
kwargs['_return_http_data_only'] = kwargs.get(
'_return_http_data_only', True
)
kwargs['_preload_content'] = kwargs.get(
'_preload_content', True
)
kwargs['_request_timeout'] = kwargs.get(
'_request_timeout', None
)
kwargs['_check_input_type'] = kwargs.get(
'_check_input_type', True
)
kwargs['_check_return_type'] = kwargs.get(
'_check_return_type', True
)
kwargs['_host_index'] = kwargs.get('_host_index', 0)
return self.call_with_http_info(**kwargs)
self.key_in_header = Endpoint(
settings={
'response_type': ({str: (bool, date, datetime, dict, float, int, list, str, none_type)},),
'auth': [
'api_key'
],
'endpoint_path': '/header',
'operation_id': 'key_in_header',
'http_method': 'GET',
'servers': [],
},
params_map={
'all': [
],
'required': [],
'nullable': [
],
'enum': [
],
'validation': [
]
},
root_map={
'validations': {
},
'allowed_values': {
},
'openapi_types': {
},
'attribute_map': {
},
'location_map': {
},
'collection_format_map': {
}
},
headers_map={
'accept': [
'application/json'
],
'content_type': [],
},
api_client=api_client,
callable=__key_in_header
)
def __key_in_query(
self,
**kwargs
):
"""Use API key in query # noqa: E501
Use API key in query # noqa: E501
This method makes a synchronous HTTP request by default. To make an
asynchronous HTTP request, please pass async_req=True
>>> thread = api.key_in_query(async_req=True)
>>> result = thread.get()
Keyword Args:
_return_http_data_only (bool): response data without head status
code and headers. Default is True.
_preload_content (bool): if False, the urllib3.HTTPResponse object
will be returned without reading/decoding response data.
Default is True.
_request_timeout (float/tuple): timeout setting for this request. If one
number provided, it will be total request timeout. It can also
be a pair (tuple) of (connection, read) timeouts.
Default is None.
_check_input_type (bool): specifies if type checking
should be done one the data sent to the server.
Default is True.
_check_return_type (bool): specifies if type checking
should be done one the data received from the server.
Default is True.
_host_index (int): specifies the index of the server
that we want to use.
Default is 0.
async_req (bool): execute request asynchronously
Returns:
{str: (bool, date, datetime, dict, float, int, list, str, none_type)}
If the method is called asynchronously, returns the request
thread.
"""
kwargs['async_req'] = kwargs.get(
'async_req', False
)
kwargs['_return_http_data_only'] = kwargs.get(
'_return_http_data_only', True
)
kwargs['_preload_content'] = kwargs.get(
'_preload_content', True
)
kwargs['_request_timeout'] = kwargs.get(
'_request_timeout', None
)
kwargs['_check_input_type'] = kwargs.get(
'_check_input_type', True
)
kwargs['_check_return_type'] = kwargs.get(
'_check_return_type', True
)
kwargs['_host_index'] = kwargs.get('_host_index', 0)
return self.call_with_http_info(**kwargs)
self.key_in_query = Endpoint(
settings={
'response_type': ({str: (bool, date, datetime, dict, float, int, list, str, none_type)},),
'auth': [
'api_key_query'
],
'endpoint_path': '/query',
'operation_id': 'key_in_query',
'http_method': 'GET',
'servers': [],
},
params_map={
'all': [
],
'required': [],
'nullable': [
],
'enum': [
],
'validation': [
]
},
root_map={
'validations': {
},
'allowed_values': {
},
'openapi_types': {
},
'attribute_map': {
},
'location_map': {
},
'collection_format_map': {
}
},
headers_map={
'accept': [
'application/json'
],
'content_type': [],
},
api_client=api_client,
callable=__key_in_query
)
class Endpoint(object):
def __init__(self, settings=None, params_map=None, root_map=None,
headers_map=None, api_client=None, callable=None):
"""Creates an endpoint
Args:
settings (dict): see below key value pairs
'response_type' (tuple/None): response type
'auth' (list): a list of auth type keys
'endpoint_path' (str): the endpoint path
'operation_id' (str): endpoint string identifier
'http_method' (str): POST/PUT/PATCH/GET etc
'servers' (list): list of str servers that this endpoint is at
params_map (dict): see below key value pairs
'all' (list): list of str endpoint parameter names
'required' (list): list of required parameter names
'nullable' (list): list of nullable parameter names
'enum' (list): list of parameters with enum values
'validation' (list): list of parameters with validations
root_map
'validations' (dict): the dict mapping endpoint parameter tuple
paths to their validation dictionaries
'allowed_values' (dict): the dict mapping endpoint parameter
tuple paths to their allowed_values (enum) dictionaries
'openapi_types' (dict): param_name to openapi type
'attribute_map' (dict): param_name to camelCase name
'location_map' (dict): param_name to 'body', 'file', 'form',
'header', 'path', 'query'
collection_format_map (dict): param_name to `csv` etc.
headers_map (dict): see below key value pairs
'accept' (list): list of Accept header strings
'content_type' (list): list of Content-Type header strings
api_client (ApiClient) api client instance
callable (function): the function which is invoked when the
Endpoint is called
"""
self.settings = settings
self.params_map = params_map
self.params_map['all'].extend([
'async_req',
'_host_index',
'_preload_content',
'_request_timeout',
'_return_http_data_only',
'_check_input_type',
'_check_return_type'
])
self.params_map['nullable'].extend(['_request_timeout'])
self.validations = root_map['validations']
self.allowed_values = root_map['allowed_values']
self.openapi_types = root_map['openapi_types']
extra_types = {
'async_req': (bool,),
'_host_index': (int,),
'_preload_content': (bool,),
'_request_timeout': (none_type, int, (int,), [int]),
'_return_http_data_only': (bool,),
'_check_input_type': (bool,),
'_check_return_type': (bool,)
}
self.openapi_types.update(extra_types)
self.attribute_map = root_map['attribute_map']
self.location_map = root_map['location_map']
self.collection_format_map = root_map['collection_format_map']
self.headers_map = headers_map
self.api_client = api_client
self.callable = callable
def __validate_inputs(self, kwargs):
for param in self.params_map['enum']:
if param in kwargs:
check_allowed_values(
self.allowed_values,
(param,),
kwargs[param]
)
for param in self.params_map['validation']:
if param in kwargs:
check_validations(
self.validations,
(param,),
kwargs[param],
configuration=self.api_client.configuration
)
if kwargs['_check_input_type'] is False:
return
for key, value in six.iteritems(kwargs):
fixed_val = validate_and_convert_types(
value,
self.openapi_types[key],
[key],
False,
kwargs['_check_input_type'],
configuration=self.api_client.configuration
)
kwargs[key] = fixed_val
def __gather_params(self, kwargs):
params = {
'body': None,
'collection_format': {},
'file': {},
'form': [],
'header': {},
'path': {},
'query': []
}
for param_name, param_value in six.iteritems(kwargs):
param_location = self.location_map.get(param_name)
if param_location is None:
continue
if param_location:
if param_location == 'body':
params['body'] = param_value
continue
base_name = self.attribute_map[param_name]
if (param_location == 'form' and
self.openapi_types[param_name] == (file_type,)):
params['file'][param_name] = [param_value]
elif (param_location == 'form' and
self.openapi_types[param_name] == ([file_type],)):
# param_value is already a list
params['file'][param_name] = param_value
elif param_location in {'form', 'query'}:
param_value_full = (base_name, param_value)
params[param_location].append(param_value_full)
if param_location not in {'form', 'query'}:
params[param_location][base_name] = param_value
collection_format = self.collection_format_map.get(param_name)
if collection_format:
params['collection_format'][base_name] = collection_format
return params
def __call__(self, *args, **kwargs):
""" This method is invoked when endpoints are called
Example:
pet_api = PetApi()
pet_api.add_pet # this is an instance of the class Endpoint
pet_api.add_pet() # this invokes pet_api.add_pet.__call__()
which then invokes the callable functions stored in that endpoint at
pet_api.add_pet.callable or self.callable in this class
"""
return self.callable(self, *args, **kwargs)
def call_with_http_info(self, **kwargs):
try:
_host = self.settings['servers'][kwargs['_host_index']]
except IndexError:
if self.settings['servers']:
raise ApiValueError(
"Invalid host index. Must be 0 <= index < %s" %
len(self.settings['servers'])
)
_host = None
for key, value in six.iteritems(kwargs):
if key not in self.params_map['all']:
raise ApiTypeError(
"Got an unexpected parameter '%s'"
" to method `%s`" %
(key, self.settings['operation_id'])
)
# only throw this nullable ApiValueError if _check_input_type
# is False, if _check_input_type==True we catch this case
# in self.__validate_inputs
if (key not in self.params_map['nullable'] and value is None
and kwargs['_check_input_type'] is False):
raise ApiValueError(
"Value may not be None for non-nullable parameter `%s`"
" when calling `%s`" %
(key, self.settings['operation_id'])
)
for key in self.params_map['required']:
if key not in kwargs.keys():
raise ApiValueError(
"Missing the required parameter `%s` when calling "
"`%s`" % (key, self.settings['operation_id'])
)
self.__validate_inputs(kwargs)
params = self.__gather_params(kwargs)
accept_headers_list = self.headers_map['accept']
if accept_headers_list:
params['header']['Accept'] = self.api_client.select_header_accept(
accept_headers_list)
content_type_headers_list = self.headers_map['content_type']
if content_type_headers_list:
header_list = self.api_client.select_header_content_type(
content_type_headers_list)
params['header']['Content-Type'] = header_list
return self.api_client.call_api(
self.settings['endpoint_path'], self.settings['http_method'],
params['path'],
params['query'],
params['header'],
body=params['body'],
post_params=params['form'],
files=params['file'],
response_type=self.settings['response_type'],
auth_settings=self.settings['auth'],
async_req=kwargs['async_req'],
_check_type=kwargs['_check_return_type'],
_return_http_data_only=kwargs['_return_http_data_only'],
_preload_content=kwargs['_preload_content'],
_request_timeout=kwargs['_request_timeout'],
_host=_host,
collection_formats=params['collection_format'])

View File

@@ -0,0 +1,578 @@
# coding: utf-8
"""
OpenAPI Extension x-auth-id-alias
This specification shows how to use x-auth-id-alias extension for API keys. # noqa: E501
The version of the OpenAPI document: 1.0.0
Generated by: https://openapi-generator.tech
"""
from __future__ import absolute_import
import json
import atexit
import mimetypes
from multiprocessing.pool import ThreadPool
import os
import re
# python 2 and python 3 compatibility library
import six
from six.moves.urllib.parse import quote
from x_auth_id_alias import rest
from x_auth_id_alias.configuration import Configuration
from x_auth_id_alias.exceptions import ApiValueError, ApiException
from x_auth_id_alias.model_utils import (
ModelNormal,
ModelSimple,
ModelComposed,
date,
datetime,
deserialize_file,
file_type,
model_to_dict,
str,
validate_and_convert_types
)
class ApiClient(object):
"""Generic API client for OpenAPI client library builds.
OpenAPI generic API client. This client handles the client-
server communication, and is invariant across implementations. Specifics of
the methods and models for each application are generated from the OpenAPI
templates.
NOTE: This class is auto generated by OpenAPI Generator.
Ref: https://openapi-generator.tech
Do not edit the class manually.
:param configuration: .Configuration object for this client
:param header_name: a header to pass when making calls to the API.
:param header_value: a header value to pass when making calls to
the API.
:param cookie: a cookie to include in the header when making calls
to the API
:param pool_threads: The number of threads to use for async requests
to the API. More threads means more concurrent API requests.
"""
# six.binary_type python2=str, python3=bytes
# six.text_type python2=unicode, python3=str
PRIMITIVE_TYPES = (
(float, bool, six.binary_type, six.text_type) + six.integer_types
)
_pool = None
def __init__(self, configuration=None, header_name=None, header_value=None,
cookie=None, pool_threads=1):
if configuration is None:
configuration = Configuration()
self.configuration = configuration
self.pool_threads = pool_threads
self.rest_client = rest.RESTClientObject(configuration)
self.default_headers = {}
if header_name is not None:
self.default_headers[header_name] = header_value
self.cookie = cookie
# Set default User-Agent.
self.user_agent = 'OpenAPI-Generator/1.0.0/python'
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
self.close()
def close(self):
if self._pool:
self._pool.close()
self._pool.join()
self._pool = None
if hasattr(atexit, 'unregister'):
atexit.unregister(self.close)
@property
def pool(self):
"""Create thread pool on first request
avoids instantiating unused threadpool for blocking clients.
"""
if self._pool is None:
atexit.register(self.close)
self._pool = ThreadPool(self.pool_threads)
return self._pool
@property
def user_agent(self):
"""User agent for this API client"""
return self.default_headers['User-Agent']
@user_agent.setter
def user_agent(self, value):
self.default_headers['User-Agent'] = value
def set_default_header(self, header_name, header_value):
self.default_headers[header_name] = header_value
def __call_api(
self, resource_path, method, path_params=None,
query_params=None, header_params=None, body=None, post_params=None,
files=None, response_type=None, auth_settings=None,
_return_http_data_only=None, collection_formats=None,
_preload_content=True, _request_timeout=None, _host=None,
_check_type=None):
config = self.configuration
# header parameters
header_params = header_params or {}
header_params.update(self.default_headers)
if self.cookie:
header_params['Cookie'] = self.cookie
if header_params:
header_params = self.sanitize_for_serialization(header_params)
header_params = dict(self.parameters_to_tuples(header_params,
collection_formats))
# path parameters
if path_params:
path_params = self.sanitize_for_serialization(path_params)
path_params = self.parameters_to_tuples(path_params,
collection_formats)
for k, v in path_params:
# specified safe chars, encode everything
resource_path = resource_path.replace(
'{%s}' % k,
quote(str(v), safe=config.safe_chars_for_path_param)
)
# query parameters
if query_params:
query_params = self.sanitize_for_serialization(query_params)
query_params = self.parameters_to_tuples(query_params,
collection_formats)
# post parameters
if post_params or files:
post_params = post_params if post_params else []
post_params = self.sanitize_for_serialization(post_params)
post_params = self.parameters_to_tuples(post_params,
collection_formats)
post_params.extend(self.files_parameters(files))
# body
if body:
body = self.sanitize_for_serialization(body)
# auth setting
self.update_params_for_auth(header_params, query_params,
auth_settings, resource_path, method, body)
# request url
if _host is None:
url = self.configuration.host + resource_path
else:
# use server/host defined in path or operation instead
url = _host + resource_path
try:
# perform request and return response
response_data = self.request(
method, url, query_params=query_params, headers=header_params,
post_params=post_params, body=body,
_preload_content=_preload_content,
_request_timeout=_request_timeout)
except ApiException as e:
e.body = e.body.decode('utf-8') if six.PY3 else e.body
raise e
content_type = response_data.getheader('content-type')
self.last_response = response_data
return_data = response_data
if not _preload_content:
return (return_data)
return return_data
if six.PY3 and response_type not in ["file", "bytes"]:
match = None
if content_type is not None:
match = re.search(r"charset=([a-zA-Z\-\d]+)[\s\;]?", content_type)
encoding = match.group(1) if match else "utf-8"
response_data.data = response_data.data.decode(encoding)
# deserialize response data
if response_type:
return_data = self.deserialize(
response_data,
response_type,
_check_type
)
else:
return_data = None
if _return_http_data_only:
return (return_data)
else:
return (return_data, response_data.status,
response_data.getheaders())
def sanitize_for_serialization(self, obj):
"""Builds a JSON POST object.
If obj is None, return None.
If obj is str, int, long, float, bool, return directly.
If obj is datetime.datetime, datetime.date
convert to string in iso8601 format.
If obj is list, sanitize each element in the list.
If obj is dict, return the dict.
If obj is OpenAPI model, return the properties dict.
:param obj: The data to serialize.
:return: The serialized form of data.
"""
if obj is None:
return None
elif isinstance(obj, self.PRIMITIVE_TYPES):
return obj
elif isinstance(obj, list):
return [self.sanitize_for_serialization(sub_obj)
for sub_obj in obj]
elif isinstance(obj, tuple):
return tuple(self.sanitize_for_serialization(sub_obj)
for sub_obj in obj)
elif isinstance(obj, (datetime, date)):
return obj.isoformat()
if isinstance(obj, dict):
obj_dict = obj
elif isinstance(obj, ModelNormal) or isinstance(obj, ModelComposed):
# Convert model obj to dict
# Convert attribute name to json key in
# model definition for request
obj_dict = model_to_dict(obj, serialize=True)
elif isinstance(obj, ModelSimple):
return self.sanitize_for_serialization(obj.value)
return {key: self.sanitize_for_serialization(val)
for key, val in six.iteritems(obj_dict)}
def deserialize(self, response, response_type, _check_type):
"""Deserializes response into an object.
:param response: RESTResponse object to be deserialized.
:param response_type: For the response, a tuple containing:
valid classes
a list containing valid classes (for list schemas)
a dict containing a tuple of valid classes as the value
Example values:
(str,)
(Pet,)
(float, none_type)
([int, none_type],)
({str: (bool, str, int, float, date, datetime, str, none_type)},)
:param _check_type: boolean, whether to check the types of the data
received from the server
:type _check_type: bool
:return: deserialized object.
"""
# handle file downloading
# save response body into a tmp file and return the instance
if response_type == (file_type,):
content_disposition = response.getheader("Content-Disposition")
return deserialize_file(response.data, self.configuration,
content_disposition=content_disposition)
# fetch data from response object
try:
received_data = json.loads(response.data)
except ValueError:
received_data = response.data
# store our data under the key of 'received_data' so users have some
# context if they are deserializing a string and the data type is wrong
deserialized_data = validate_and_convert_types(
received_data,
response_type,
['received_data'],
True,
_check_type,
configuration=self.configuration
)
return deserialized_data
def call_api(self, resource_path, method,
path_params=None, query_params=None, header_params=None,
body=None, post_params=None, files=None,
response_type=None, auth_settings=None, async_req=None,
_return_http_data_only=None, collection_formats=None,
_preload_content=True, _request_timeout=None, _host=None,
_check_type=None):
"""Makes the HTTP request (synchronous) and returns deserialized data.
To make an async_req request, set the async_req parameter.
:param resource_path: Path to method endpoint.
:param method: Method to call.
:param path_params: Path parameters in the url.
:param query_params: Query parameters in the url.
:param header_params: Header parameters to be
placed in the request header.
:param body: Request body.
:param post_params dict: Request post form parameters,
for `application/x-www-form-urlencoded`, `multipart/form-data`.
:param auth_settings list: Auth Settings names for the request.
:param response_type: For the response, a tuple containing:
valid classes
a list containing valid classes (for list schemas)
a dict containing a tuple of valid classes as the value
Example values:
(str,)
(Pet,)
(float, none_type)
([int, none_type],)
({str: (bool, str, int, float, date, datetime, str, none_type)},)
:param files: key -> field name, value -> a list of open file
objects for `multipart/form-data`.
:type files: dict
:param async_req bool: execute request asynchronously
:type async_req: bool, optional
:param _return_http_data_only: response data without head status code
and headers
:type _return_http_data_only: bool, optional
:param collection_formats: dict of collection formats for path, query,
header, and post parameters.
:type collection_formats: dict, optional
:param _preload_content: if False, the urllib3.HTTPResponse object will
be returned without reading/decoding response
data. Default is True.
:type _preload_content: bool, optional
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:param _check_type: boolean describing if the data back from the server
should have its type checked.
:type _check_type: bool, optional
:return:
If async_req parameter is True,
the request will be called asynchronously.
The method will return the request thread.
If parameter async_req is False or missing,
then the method will return the response directly.
"""
if not async_req:
return self.__call_api(resource_path, method,
path_params, query_params, header_params,
body, post_params, files,
response_type, auth_settings,
_return_http_data_only, collection_formats,
_preload_content, _request_timeout, _host,
_check_type)
return self.pool.apply_async(self.__call_api, (resource_path,
method, path_params,
query_params,
header_params, body,
post_params, files,
response_type,
auth_settings,
_return_http_data_only,
collection_formats,
_preload_content,
_request_timeout,
_host, _check_type))
def request(self, method, url, query_params=None, headers=None,
post_params=None, body=None, _preload_content=True,
_request_timeout=None):
"""Makes the HTTP request using RESTClient."""
if method == "GET":
return self.rest_client.GET(url,
query_params=query_params,
_preload_content=_preload_content,
_request_timeout=_request_timeout,
headers=headers)
elif method == "HEAD":
return self.rest_client.HEAD(url,
query_params=query_params,
_preload_content=_preload_content,
_request_timeout=_request_timeout,
headers=headers)
elif method == "OPTIONS":
return self.rest_client.OPTIONS(url,
query_params=query_params,
headers=headers,
post_params=post_params,
_preload_content=_preload_content,
_request_timeout=_request_timeout,
body=body)
elif method == "POST":
return self.rest_client.POST(url,
query_params=query_params,
headers=headers,
post_params=post_params,
_preload_content=_preload_content,
_request_timeout=_request_timeout,
body=body)
elif method == "PUT":
return self.rest_client.PUT(url,
query_params=query_params,
headers=headers,
post_params=post_params,
_preload_content=_preload_content,
_request_timeout=_request_timeout,
body=body)
elif method == "PATCH":
return self.rest_client.PATCH(url,
query_params=query_params,
headers=headers,
post_params=post_params,
_preload_content=_preload_content,
_request_timeout=_request_timeout,
body=body)
elif method == "DELETE":
return self.rest_client.DELETE(url,
query_params=query_params,
headers=headers,
_preload_content=_preload_content,
_request_timeout=_request_timeout,
body=body)
else:
raise ApiValueError(
"http method must be `GET`, `HEAD`, `OPTIONS`,"
" `POST`, `PATCH`, `PUT` or `DELETE`."
)
def parameters_to_tuples(self, params, collection_formats):
"""Get parameters as list of tuples, formatting collections.
:param params: Parameters as dict or list of two-tuples
:param dict collection_formats: Parameter collection formats
:return: Parameters as list of tuples, collections formatted
"""
new_params = []
if collection_formats is None:
collection_formats = {}
for k, v in six.iteritems(params) if isinstance(params, dict) else params: # noqa: E501
if k in collection_formats:
collection_format = collection_formats[k]
if collection_format == 'multi':
new_params.extend((k, value) for value in v)
else:
if collection_format == 'ssv':
delimiter = ' '
elif collection_format == 'tsv':
delimiter = '\t'
elif collection_format == 'pipes':
delimiter = '|'
else: # csv is the default
delimiter = ','
new_params.append(
(k, delimiter.join(str(value) for value in v)))
else:
new_params.append((k, v))
return new_params
def files_parameters(self, files=None):
"""Builds form parameters.
:param files: None or a dict with key=param_name and
value is a list of open file objects
:return: List of tuples of form parameters with file data
"""
if files is None:
return []
params = []
for param_name, file_instances in six.iteritems(files):
if file_instances is None:
# if the file field is nullable, skip None values
continue
for file_instance in file_instances:
if file_instance is None:
# if the file field is nullable, skip None values
continue
if file_instance.closed is True:
raise ApiValueError(
"Cannot read a closed file. The passed in file_type "
"for %s must be open." % param_name
)
filename = os.path.basename(file_instance.name)
filedata = file_instance.read()
mimetype = (mimetypes.guess_type(filename)[0] or
'application/octet-stream')
params.append(
tuple([param_name, tuple([filename, filedata, mimetype])]))
file_instance.close()
return params
def select_header_accept(self, accepts):
"""Returns `Accept` based on an array of accepts provided.
:param accepts: List of headers.
:return: Accept (e.g. application/json).
"""
if not accepts:
return
accepts = [x.lower() for x in accepts]
if 'application/json' in accepts:
return 'application/json'
else:
return ', '.join(accepts)
def select_header_content_type(self, content_types):
"""Returns `Content-Type` based on an array of content_types provided.
:param content_types: List of content-types.
:return: Content-Type (e.g. application/json).
"""
if not content_types:
return 'application/json'
content_types = [x.lower() for x in content_types]
if 'application/json' in content_types or '*/*' in content_types:
return 'application/json'
else:
return content_types[0]
def update_params_for_auth(self, headers, querys, auth_settings,
resource_path, method, body):
"""Updates header and query params based on authentication setting.
:param headers: Header parameters dict to be updated.
:param querys: Query parameters tuple list to be updated.
:param auth_settings: Authentication setting identifiers list.
:param resource_path: A string representation of the HTTP request resource path.
:param method: A string representation of the HTTP request method.
:param body: A object representing the body of the HTTP request.
The object type is the return value of sanitize_for_serialization().
"""
if not auth_settings:
return
for auth in auth_settings:
auth_setting = self.configuration.auth_settings().get(auth)
if auth_setting:
if auth_setting['in'] == 'cookie':
headers['Cookie'] = auth_setting['value']
elif auth_setting['in'] == 'header':
if auth_setting['type'] != 'http-signature':
headers[auth_setting['key']] = auth_setting['value']
elif auth_setting['in'] == 'query':
querys.append((auth_setting['key'], auth_setting['value']))
else:
raise ApiValueError(
'Authentication token must be in `query` or `header`'
)

View File

@@ -0,0 +1,18 @@
# coding: utf-8
# flake8: noqa
# Import all APIs into this package.
# If you have many APIs here with many many models used in each API this may
# raise a `RecursionError`.
# In order to avoid this, import only the API that you directly need like:
#
# from .api.pet_api import PetApi
#
# or import this package, but before doing it, use:
#
# import sys
# sys.setrecursionlimit(n)
# Import APIs into API package:
from x_auth_id_alias.api.usage_api import UsageApi

View File

@@ -0,0 +1,478 @@
# coding: utf-8
"""
OpenAPI Extension x-auth-id-alias
This specification shows how to use x-auth-id-alias extension for API keys. # noqa: E501
The version of the OpenAPI document: 1.0.0
Generated by: https://openapi-generator.tech
"""
from __future__ import absolute_import
import copy
import logging
import multiprocessing
import sys
import urllib3
import six
from six.moves import http_client as httplib
from x_auth_id_alias.exceptions import ApiValueError
JSON_SCHEMA_VALIDATION_KEYWORDS = {
'multipleOf', 'maximum', 'exclusiveMaximum',
'minimum', 'exclusiveMinimum', 'maxLength',
'minLength', 'pattern', 'maxItems', 'minItems'
}
class Configuration(object):
"""NOTE: This class is auto generated by OpenAPI Generator
Ref: https://openapi-generator.tech
Do not edit the class manually.
:param host: Base url
:param api_key: Dict to store API key(s).
Each entry in the dict specifies an API key.
The dict key is the name of the security scheme in the OAS specification.
The dict value is the API key secret.
:param api_key_prefix: Dict to store API prefix (e.g. Bearer)
The dict key is the name of the security scheme in the OAS specification.
The dict value is an API key prefix when generating the auth data.
:param username: Username for HTTP basic authentication
:param password: Password for HTTP basic authentication
:param discard_unknown_keys: Boolean value indicating whether to discard
unknown properties. A server may send a response that includes additional
properties that are not known by the client in the following scenarios:
1. The OpenAPI document is incomplete, i.e. it does not match the server
implementation.
2. The client was generated using an older version of the OpenAPI document
and the server has been upgraded since then.
If a schema in the OpenAPI document defines the additionalProperties attribute,
then all undeclared properties received by the server are injected into the
additional properties map. In that case, there are undeclared properties, and
nothing to discard.
:param disabled_client_side_validations (string): Comma-separated list of
JSON schema validation keywords to disable JSON schema structural validation
rules. The following keywords may be specified: multipleOf, maximum,
exclusiveMaximum, minimum, exclusiveMinimum, maxLength, minLength, pattern,
maxItems, minItems.
By default, the validation is performed for data generated locally by the client
and data received from the server, independent of any validation performed by
the server side. If the input data does not satisfy the JSON schema validation
rules specified in the OpenAPI document, an exception is raised.
If disabled_client_side_validations is set, structural validation is
disabled. This can be useful to troubleshoot data validation problem, such as
when the OpenAPI document validation rules do not match the actual API data
received by the server.
:Example:
API Key Authentication Example.
Given the following security scheme in the OpenAPI specification:
components:
securitySchemes:
cookieAuth: # name for the security scheme
type: apiKey
in: cookie
name: JSESSIONID # cookie name
You can programmatically set the cookie:
conf = x_auth_id_alias.Configuration(
api_key={'cookieAuth': 'abc123'}
api_key_prefix={'cookieAuth': 'JSESSIONID'}
)
The following cookie will be added to the HTTP request:
Cookie: JSESSIONID abc123
"""
_default = None
def __init__(self, host="http://petstore.swagger.io:80/v2",
api_key=None, api_key_prefix=None,
username=None, password=None,
discard_unknown_keys=False,
disabled_client_side_validations="",
):
"""Constructor
"""
self.host = host
"""Default Base url
"""
self.temp_folder_path = None
"""Temp file folder for downloading files
"""
# Authentication Settings
self.api_key = {}
if api_key:
self.api_key = api_key
"""dict to store API key(s)
"""
self.api_key_prefix = {}
if api_key_prefix:
self.api_key_prefix = api_key_prefix
"""dict to store API prefix (e.g. Bearer)
"""
self.refresh_api_key_hook = None
"""function hook to refresh API key if expired
"""
self.username = username
"""Username for HTTP basic authentication
"""
self.password = password
"""Password for HTTP basic authentication
"""
self.discard_unknown_keys = discard_unknown_keys
self.disabled_client_side_validations = disabled_client_side_validations
self.logger = {}
"""Logging Settings
"""
self.logger["package_logger"] = logging.getLogger("x_auth_id_alias")
self.logger["urllib3_logger"] = logging.getLogger("urllib3")
self.logger_format = '%(asctime)s %(levelname)s %(message)s'
"""Log format
"""
self.logger_stream_handler = None
"""Log stream handler
"""
self.logger_file_handler = None
"""Log file handler
"""
self.logger_file = None
"""Debug file location
"""
self.debug = False
"""Debug switch
"""
self.verify_ssl = True
"""SSL/TLS verification
Set this to false to skip verifying SSL certificate when calling API
from https server.
"""
self.ssl_ca_cert = None
"""Set this to customize the certificate file to verify the peer.
"""
self.cert_file = None
"""client certificate file
"""
self.key_file = None
"""client key file
"""
self.assert_hostname = None
"""Set this to True/False to enable/disable SSL hostname verification.
"""
self.connection_pool_maxsize = multiprocessing.cpu_count() * 5
"""urllib3 connection pool's maximum number of connections saved
per pool. urllib3 uses 1 connection as default value, but this is
not the best value when you are making a lot of possibly parallel
requests to the same host, which is often the case here.
cpu_count * 5 is used as default value to increase performance.
"""
self.proxy = None
"""Proxy URL
"""
self.proxy_headers = None
"""Proxy headers
"""
self.safe_chars_for_path_param = ''
"""Safe chars for path_param
"""
self.retries = None
"""Adding retries to override urllib3 default value 3
"""
# Disable client side validation
self.client_side_validation = True
def __deepcopy__(self, memo):
cls = self.__class__
result = cls.__new__(cls)
memo[id(self)] = result
for k, v in self.__dict__.items():
if k not in ('logger', 'logger_file_handler'):
setattr(result, k, copy.deepcopy(v, memo))
# shallow copy of loggers
result.logger = copy.copy(self.logger)
# use setters to configure loggers
result.logger_file = self.logger_file
result.debug = self.debug
return result
def __setattr__(self, name, value):
object.__setattr__(self, name, value)
if name == 'disabled_client_side_validations':
s = set(filter(None, value.split(',')))
for v in s:
if v not in JSON_SCHEMA_VALIDATION_KEYWORDS:
raise ApiValueError(
"Invalid keyword: '{0}''".format(v))
self._disabled_client_side_validations = s
@classmethod
def set_default(cls, default):
"""Set default instance of configuration.
It stores default configuration, which can be
returned by get_default_copy method.
:param default: object of Configuration
"""
cls._default = copy.deepcopy(default)
@classmethod
def get_default_copy(cls):
"""Return new instance of configuration.
This method returns newly created, based on default constructor,
object of Configuration class or returns a copy of default
configuration passed by the set_default method.
:return: The configuration object.
"""
if cls._default is not None:
return copy.deepcopy(cls._default)
return Configuration()
@property
def logger_file(self):
"""The logger file.
If the logger_file is None, then add stream handler and remove file
handler. Otherwise, add file handler and remove stream handler.
:param value: The logger_file path.
:type: str
"""
return self.__logger_file
@logger_file.setter
def logger_file(self, value):
"""The logger file.
If the logger_file is None, then add stream handler and remove file
handler. Otherwise, add file handler and remove stream handler.
:param value: The logger_file path.
:type: str
"""
self.__logger_file = value
if self.__logger_file:
# If set logging file,
# then add file handler and remove stream handler.
self.logger_file_handler = logging.FileHandler(self.__logger_file)
self.logger_file_handler.setFormatter(self.logger_formatter)
for _, logger in six.iteritems(self.logger):
logger.addHandler(self.logger_file_handler)
@property
def debug(self):
"""Debug status
:param value: The debug status, True or False.
:type: bool
"""
return self.__debug
@debug.setter
def debug(self, value):
"""Debug status
:param value: The debug status, True or False.
:type: bool
"""
self.__debug = value
if self.__debug:
# if debug status is True, turn on debug logging
for _, logger in six.iteritems(self.logger):
logger.setLevel(logging.DEBUG)
# turn on httplib debug
httplib.HTTPConnection.debuglevel = 1
else:
# if debug status is False, turn off debug logging,
# setting log level to default `logging.WARNING`
for _, logger in six.iteritems(self.logger):
logger.setLevel(logging.WARNING)
# turn off httplib debug
httplib.HTTPConnection.debuglevel = 0
@property
def logger_format(self):
"""The logger format.
The logger_formatter will be updated when sets logger_format.
:param value: The format string.
:type: str
"""
return self.__logger_format
@logger_format.setter
def logger_format(self, value):
"""The logger format.
The logger_formatter will be updated when sets logger_format.
:param value: The format string.
:type: str
"""
self.__logger_format = value
self.logger_formatter = logging.Formatter(self.__logger_format)
def get_api_key_with_prefix(self, identifier, alias=None):
"""Gets API key (with prefix if set).
:param identifier: The identifier of apiKey.
:param alias: The alternative identifier of apiKey.
:return: The token for api key authentication.
"""
if self.refresh_api_key_hook is not None:
self.refresh_api_key_hook(self)
key = self.api_key.get(identifier, self.api_key.get(alias) if alias is not None else None)
if key:
prefix = self.api_key_prefix.get(identifier)
if prefix:
return "%s %s" % (prefix, key)
else:
return key
def get_basic_auth_token(self):
"""Gets HTTP basic authentication header (string).
:return: The token for basic HTTP authentication.
"""
username = ""
if self.username is not None:
username = self.username
password = ""
if self.password is not None:
password = self.password
return urllib3.util.make_headers(
basic_auth=username + ':' + password
).get('authorization')
def auth_settings(self):
"""Gets Auth Settings dict for api client.
:return: The Auth Settings information dict.
"""
auth = {}
if 'api_key' in self.api_key:
auth['api_key'] = {
'type': 'api_key',
'in': 'header',
'key': 'X-Api-Key',
'value': self.get_api_key_with_prefix(
'api_key',
),
}
if 'api_key_query' in self.api_key or 'api_key' in self.api_key:
auth['api_key_query'] = {
'type': 'api_key',
'in': 'query',
'key': 'api_key',
'value': self.get_api_key_with_prefix(
'api_key_query',
alias='api_key',
),
}
return auth
def to_debug_report(self):
"""Gets the essential information for debugging.
:return: The report for debugging.
"""
return "Python SDK Debug Report:\n"\
"OS: {env}\n"\
"Python Version: {pyversion}\n"\
"Version of the API: 1.0.0\n"\
"SDK Package Version: 1.0.0".\
format(env=sys.platform, pyversion=sys.version)
def get_host_settings(self):
"""Gets an array of host settings
:return: An array of host settings
"""
return [
{
'url': "http://{server}.swagger.io:{port}/v2",
'description': "petstore server",
'variables': {
'server': {
'description': "No description provided",
'default_value': "petstore",
'enum_values': [
"petstore",
"qa-petstore",
"dev-petstore"
]
},
'port': {
'description': "No description provided",
'default_value': "80",
'enum_values': [
"80",
"8080"
]
}
}
},
{
'url': "https://localhost:8080/{version}",
'description': "The local server",
'variables': {
'version': {
'description': "No description provided",
'default_value': "v2",
'enum_values': [
"v1",
"v2"
]
}
}
}
]
def get_host_from_settings(self, index, variables=None):
"""Gets host URL based on the index and variables
:param index: array index of the host settings
:param variables: hash of variable and the corresponding value
:return: URL based on host settings
"""
variables = {} if variables is None else variables
servers = self.get_host_settings()
try:
server = servers[index]
except IndexError:
raise ValueError(
"Invalid index {0} when selecting the host settings. "
"Must be less than {1}".format(index, len(servers)))
url = server['url']
# go through variables and replace placeholders
for variable_name, variable in server['variables'].items():
used_value = variables.get(
variable_name, variable['default_value'])
if 'enum_values' in variable \
and used_value not in variable['enum_values']:
raise ValueError(
"The variable `{0}` in the host URL has invalid value "
"{1}. Must be {2}.".format(
variable_name, variables[variable_name],
variable['enum_values']))
url = url.replace("{" + variable_name + "}", used_value)
return url

View File

@@ -0,0 +1,139 @@
# coding: utf-8
"""
OpenAPI Extension x-auth-id-alias
This specification shows how to use x-auth-id-alias extension for API keys. # noqa: E501
The version of the OpenAPI document: 1.0.0
Generated by: https://openapi-generator.tech
"""
import six
class OpenApiException(Exception):
"""The base exception class for all OpenAPIExceptions"""
class ApiTypeError(OpenApiException, TypeError):
def __init__(self, msg, path_to_item=None, valid_classes=None,
key_type=None):
""" Raises an exception for TypeErrors
Args:
msg (str): the exception message
Keyword Args:
path_to_item (list): a list of keys an indices to get to the
current_item
None if unset
valid_classes (tuple): the primitive classes that current item
should be an instance of
None if unset
key_type (bool): False if our value is a value in a dict
True if it is a key in a dict
False if our item is an item in a list
None if unset
"""
self.path_to_item = path_to_item
self.valid_classes = valid_classes
self.key_type = key_type
full_msg = msg
if path_to_item:
full_msg = "{0} at {1}".format(msg, render_path(path_to_item))
super(ApiTypeError, self).__init__(full_msg)
class ApiValueError(OpenApiException, ValueError):
def __init__(self, msg, path_to_item=None):
"""
Args:
msg (str): the exception message
Keyword Args:
path_to_item (list) the path to the exception in the
received_data dict. None if unset
"""
self.path_to_item = path_to_item
full_msg = msg
if path_to_item:
full_msg = "{0} at {1}".format(msg, render_path(path_to_item))
super(ApiValueError, self).__init__(full_msg)
class ApiAttributeError(OpenApiException, AttributeError):
def __init__(self, msg, path_to_item=None):
"""
Raised when an attribute reference or assignment fails.
Args:
msg (str): the exception message
Keyword Args:
path_to_item (None/list) the path to the exception in the
received_data dict
"""
self.path_to_item = path_to_item
full_msg = msg
if path_to_item:
full_msg = "{0} at {1}".format(msg, render_path(path_to_item))
super(ApiAttributeError, self).__init__(full_msg)
class ApiKeyError(OpenApiException, KeyError):
def __init__(self, msg, path_to_item=None):
"""
Args:
msg (str): the exception message
Keyword Args:
path_to_item (None/list) the path to the exception in the
received_data dict
"""
self.path_to_item = path_to_item
full_msg = msg
if path_to_item:
full_msg = "{0} at {1}".format(msg, render_path(path_to_item))
super(ApiKeyError, self).__init__(full_msg)
class ApiException(OpenApiException):
def __init__(self, status=None, reason=None, http_resp=None):
if http_resp:
self.status = http_resp.status
self.reason = http_resp.reason
self.body = http_resp.data
self.headers = http_resp.getheaders()
else:
self.status = status
self.reason = reason
self.body = None
self.headers = None
def __str__(self):
"""Custom error messages for exception"""
error_message = "({0})\n"\
"Reason: {1}\n".format(self.status, self.reason)
if self.headers:
error_message += "HTTP response headers: {0}\n".format(
self.headers)
if self.body:
error_message += "HTTP response body: {0}\n".format(self.body)
return error_message
def render_path(path_to_item):
"""Returns a string representation of a path"""
result = ""
for pth in path_to_item:
if isinstance(pth, six.integer_types):
result += "[{0}]".format(pth)
else:
result += "['{0}']".format(pth)
return result

View File

@@ -0,0 +1,5 @@
# we can not import model classes here because that would create a circular
# reference which would not work in python2
# do not import all models into this module because that uses a lot of memory and stack frames
# if you need the ability to import all models from one package, import them with
# from {{packageName}.models import ModelA, ModelB

View File

@@ -0,0 +1,13 @@
# coding: utf-8
# flake8: noqa
# import all models into this package
# if you have many models here with many references from one model to another this may
# raise a RecursionError
# to avoid this, import only the models that you directly need like:
# from from x_auth_id_alias.model.pet import Pet
# or import this package, but before doing it, use:
# import sys
# sys.setrecursionlimit(n)

View File

@@ -0,0 +1,291 @@
# coding: utf-8
"""
OpenAPI Extension x-auth-id-alias
This specification shows how to use x-auth-id-alias extension for API keys. # noqa: E501
The version of the OpenAPI document: 1.0.0
Generated by: https://openapi-generator.tech
"""
from __future__ import absolute_import
import io
import json
import logging
import re
import ssl
import certifi
# python 2 and python 3 compatibility library
import six
from six.moves.urllib.parse import urlencode
import urllib3
from x_auth_id_alias.exceptions import ApiException, ApiValueError
logger = logging.getLogger(__name__)
class RESTResponse(io.IOBase):
def __init__(self, resp):
self.urllib3_response = resp
self.status = resp.status
self.reason = resp.reason
self.data = resp.data
def getheaders(self):
"""Returns a dictionary of the response headers."""
return self.urllib3_response.getheaders()
def getheader(self, name, default=None):
"""Returns a given response header."""
return self.urllib3_response.getheader(name, default)
class RESTClientObject(object):
def __init__(self, configuration, pools_size=4, maxsize=None):
# urllib3.PoolManager will pass all kw parameters to connectionpool
# https://github.com/shazow/urllib3/blob/f9409436f83aeb79fbaf090181cd81b784f1b8ce/urllib3/poolmanager.py#L75 # noqa: E501
# https://github.com/shazow/urllib3/blob/f9409436f83aeb79fbaf090181cd81b784f1b8ce/urllib3/connectionpool.py#L680 # noqa: E501
# maxsize is the number of requests to host that are allowed in parallel # noqa: E501
# Custom SSL certificates and client certificates: http://urllib3.readthedocs.io/en/latest/advanced-usage.html # noqa: E501
# cert_reqs
if configuration.verify_ssl:
cert_reqs = ssl.CERT_REQUIRED
else:
cert_reqs = ssl.CERT_NONE
# ca_certs
if configuration.ssl_ca_cert:
ca_certs = configuration.ssl_ca_cert
else:
# if not set certificate file, use Mozilla's root certificates.
ca_certs = certifi.where()
addition_pool_args = {}
if configuration.assert_hostname is not None:
addition_pool_args['assert_hostname'] = configuration.assert_hostname # noqa: E501
if configuration.retries is not None:
addition_pool_args['retries'] = configuration.retries
if maxsize is None:
if configuration.connection_pool_maxsize is not None:
maxsize = configuration.connection_pool_maxsize
else:
maxsize = 4
# https pool manager
if configuration.proxy:
self.pool_manager = urllib3.ProxyManager(
num_pools=pools_size,
maxsize=maxsize,
cert_reqs=cert_reqs,
ca_certs=ca_certs,
cert_file=configuration.cert_file,
key_file=configuration.key_file,
proxy_url=configuration.proxy,
proxy_headers=configuration.proxy_headers,
**addition_pool_args
)
else:
self.pool_manager = urllib3.PoolManager(
num_pools=pools_size,
maxsize=maxsize,
cert_reqs=cert_reqs,
ca_certs=ca_certs,
cert_file=configuration.cert_file,
key_file=configuration.key_file,
**addition_pool_args
)
def request(self, method, url, query_params=None, headers=None,
body=None, post_params=None, _preload_content=True,
_request_timeout=None):
"""Perform requests.
:param method: http request method
:param url: http request url
:param query_params: query parameters in the url
:param headers: http request headers
:param body: request json body, for `application/json`
:param post_params: request post parameters,
`application/x-www-form-urlencoded`
and `multipart/form-data`
:param _preload_content: if False, the urllib3.HTTPResponse object will
be returned without reading/decoding response
data. Default is True.
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
"""
method = method.upper()
assert method in ['GET', 'HEAD', 'DELETE', 'POST', 'PUT',
'PATCH', 'OPTIONS']
if post_params and body:
raise ApiValueError(
"body parameter cannot be used with post_params parameter."
)
post_params = post_params or {}
headers = headers or {}
timeout = None
if _request_timeout:
if isinstance(_request_timeout, (int, ) if six.PY3 else (int, long)): # noqa: E501,F821
timeout = urllib3.Timeout(total=_request_timeout)
elif (isinstance(_request_timeout, tuple) and
len(_request_timeout) == 2):
timeout = urllib3.Timeout(
connect=_request_timeout[0], read=_request_timeout[1])
if 'Content-Type' not in headers:
headers['Content-Type'] = 'application/json'
try:
# For `POST`, `PUT`, `PATCH`, `OPTIONS`, `DELETE`
if method in ['POST', 'PUT', 'PATCH', 'OPTIONS', 'DELETE']:
if query_params:
url += '?' + urlencode(query_params)
if re.search('json', headers['Content-Type'], re.IGNORECASE):
request_body = None
if body is not None:
request_body = json.dumps(body)
r = self.pool_manager.request(
method, url,
body=request_body,
preload_content=_preload_content,
timeout=timeout,
headers=headers)
elif headers['Content-Type'] == 'application/x-www-form-urlencoded': # noqa: E501
r = self.pool_manager.request(
method, url,
fields=post_params,
encode_multipart=False,
preload_content=_preload_content,
timeout=timeout,
headers=headers)
elif headers['Content-Type'] == 'multipart/form-data':
# must del headers['Content-Type'], or the correct
# Content-Type which generated by urllib3 will be
# overwritten.
del headers['Content-Type']
r = self.pool_manager.request(
method, url,
fields=post_params,
encode_multipart=True,
preload_content=_preload_content,
timeout=timeout,
headers=headers)
# Pass a `string` parameter directly in the body to support
# other content types than Json when `body` argument is
# provided in serialized form
elif isinstance(body, str) or isinstance(body, bytes):
request_body = body
r = self.pool_manager.request(
method, url,
body=request_body,
preload_content=_preload_content,
timeout=timeout,
headers=headers)
else:
# Cannot generate the request from given parameters
msg = """Cannot prepare a request message for provided
arguments. Please check that your arguments match
declared content type."""
raise ApiException(status=0, reason=msg)
# For `GET`, `HEAD`
else:
r = self.pool_manager.request(method, url,
fields=query_params,
preload_content=_preload_content,
timeout=timeout,
headers=headers)
except urllib3.exceptions.SSLError as e:
msg = "{0}\n{1}".format(type(e).__name__, str(e))
raise ApiException(status=0, reason=msg)
if _preload_content:
r = RESTResponse(r)
# log response body
logger.debug("response body: %s", r.data)
if not 200 <= r.status <= 299:
raise ApiException(http_resp=r)
return r
def GET(self, url, headers=None, query_params=None, _preload_content=True,
_request_timeout=None):
return self.request("GET", url,
headers=headers,
_preload_content=_preload_content,
_request_timeout=_request_timeout,
query_params=query_params)
def HEAD(self, url, headers=None, query_params=None, _preload_content=True,
_request_timeout=None):
return self.request("HEAD", url,
headers=headers,
_preload_content=_preload_content,
_request_timeout=_request_timeout,
query_params=query_params)
def OPTIONS(self, url, headers=None, query_params=None, post_params=None,
body=None, _preload_content=True, _request_timeout=None):
return self.request("OPTIONS", url,
headers=headers,
query_params=query_params,
post_params=post_params,
_preload_content=_preload_content,
_request_timeout=_request_timeout,
body=body)
def DELETE(self, url, headers=None, query_params=None, body=None,
_preload_content=True, _request_timeout=None):
return self.request("DELETE", url,
headers=headers,
query_params=query_params,
_preload_content=_preload_content,
_request_timeout=_request_timeout,
body=body)
def POST(self, url, headers=None, query_params=None, post_params=None,
body=None, _preload_content=True, _request_timeout=None):
return self.request("POST", url,
headers=headers,
query_params=query_params,
post_params=post_params,
_preload_content=_preload_content,
_request_timeout=_request_timeout,
body=body)
def PUT(self, url, headers=None, query_params=None, post_params=None,
body=None, _preload_content=True, _request_timeout=None):
return self.request("PUT", url,
headers=headers,
query_params=query_params,
post_params=post_params,
_preload_content=_preload_content,
_request_timeout=_request_timeout,
body=body)
def PATCH(self, url, headers=None, query_params=None, post_params=None,
body=None, _preload_content=True, _request_timeout=None):
return self.request("PATCH", url,
headers=headers,
query_params=query_params,
post_params=post_params,
_preload_content=_preload_content,
_request_timeout=_request_timeout,
body=body)

View File

@@ -396,15 +396,16 @@ conf = petstore_api.Configuration(
self.__logger_format = value
self.logger_formatter = logging.Formatter(self.__logger_format)
def get_api_key_with_prefix(self, identifier):
def get_api_key_with_prefix(self, identifier, alias=None):
"""Gets API key (with prefix if set).
:param identifier: The identifier of apiKey.
:param alias: The alternative identifier of apiKey.
:return: The token for api key authentication.
"""
if self.refresh_api_key_hook is not None:
self.refresh_api_key_hook(self)
key = self.api_key.get(identifier)
key = self.api_key.get(identifier, self.api_key.get(alias) if alias is not None else None)
if key:
prefix = self.api_key_prefix.get(identifier)
if prefix:
@@ -438,14 +439,18 @@ conf = petstore_api.Configuration(
'type': 'api_key',
'in': 'header',
'key': 'api_key',
'value': self.get_api_key_with_prefix('api_key')
'value': self.get_api_key_with_prefix(
'api_key',
),
}
if 'api_key_query' in self.api_key:
auth['api_key_query'] = {
'type': 'api_key',
'in': 'query',
'key': 'api_key_query',
'value': self.get_api_key_with_prefix('api_key_query')
'value': self.get_api_key_with_prefix(
'api_key_query',
),
}
if self.access_token is not None:
auth['bearer_test'] = {

View File

@@ -396,15 +396,16 @@ conf = petstore_api.Configuration(
self.__logger_format = value
self.logger_formatter = logging.Formatter(self.__logger_format)
def get_api_key_with_prefix(self, identifier):
def get_api_key_with_prefix(self, identifier, alias=None):
"""Gets API key (with prefix if set).
:param identifier: The identifier of apiKey.
:param alias: The alternative identifier of apiKey.
:return: The token for api key authentication.
"""
if self.refresh_api_key_hook is not None:
self.refresh_api_key_hook(self)
key = self.api_key.get(identifier)
key = self.api_key.get(identifier, self.api_key.get(alias) if alias is not None else None)
if key:
prefix = self.api_key_prefix.get(identifier)
if prefix:
@@ -438,14 +439,18 @@ conf = petstore_api.Configuration(
'type': 'api_key',
'in': 'header',
'key': 'api_key',
'value': self.get_api_key_with_prefix('api_key')
'value': self.get_api_key_with_prefix(
'api_key',
),
}
if 'api_key_query' in self.api_key:
auth['api_key_query'] = {
'type': 'api_key',
'in': 'query',
'key': 'api_key_query',
'value': self.get_api_key_with_prefix('api_key_query')
'value': self.get_api_key_with_prefix(
'api_key_query',
),
}
if self.access_token is not None:
auth['bearer_test'] = {