forked from loafle/openapi-generator-original
better code format for python cilent (#12251)
This commit is contained in:
parent
37905e8bfa
commit
e0d8d0f5d6
@ -260,7 +260,8 @@ class ApiClient(object):
|
||||
if collection_types is None:
|
||||
collection_types = (dict)
|
||||
for k, v in params.items() if isinstance(params, dict) else params: # noqa: E501
|
||||
if isinstance(v, collection_types): # v is instance of collection_type, formatting as application/json
|
||||
if isinstance(
|
||||
v, collection_types): # v is instance of collection_type, formatting as application/json
|
||||
v = json.dumps(v, ensure_ascii=False).encode("utf-8")
|
||||
field = RequestField(k, v)
|
||||
field.make_multipart(content_type="application/json; charset=utf-8")
|
||||
@ -285,8 +286,10 @@ class ApiClient(object):
|
||||
"""
|
||||
if isinstance(obj, (ModelNormal, ModelComposed)):
|
||||
return {
|
||||
key: cls.sanitize_for_serialization(val) for key, val in model_to_dict(obj, serialize=True).items()
|
||||
}
|
||||
key: cls.sanitize_for_serialization(val) for key,
|
||||
val in model_to_dict(
|
||||
obj,
|
||||
serialize=True).items()}
|
||||
elif isinstance(obj, io.IOBase):
|
||||
return cls.get_file_data_and_close_file(obj)
|
||||
elif isinstance(obj, (str, int, float, none_type, bool)):
|
||||
@ -299,7 +302,9 @@ class ApiClient(object):
|
||||
return [cls.sanitize_for_serialization(item) for item in obj]
|
||||
if isinstance(obj, dict):
|
||||
return {key: cls.sanitize_for_serialization(val) for key, val in obj.items()}
|
||||
raise ApiValueError('Unable to prepare type {} for serialization'.format(obj.__class__.__name__))
|
||||
raise ApiValueError(
|
||||
'Unable to prepare type {} for serialization'.format(
|
||||
obj.__class__.__name__))
|
||||
|
||||
def deserialize(self, response, response_type, _check_type):
|
||||
"""Deserializes response into an object.
|
||||
@ -543,7 +548,9 @@ class ApiClient(object):
|
||||
file_instance.close()
|
||||
return file_data
|
||||
|
||||
def files_parameters(self, files: typing.Optional[typing.Dict[str, typing.List[io.IOBase]]] = None):
|
||||
def files_parameters(self,
|
||||
files: typing.Optional[typing.Dict[str,
|
||||
typing.List[io.IOBase]]] = None):
|
||||
"""Builds form parameters.
|
||||
|
||||
:param files: None or a dict with key=param_name and
|
||||
@ -634,13 +641,15 @@ class ApiClient(object):
|
||||
|
||||
if request_auths:
|
||||
for auth_setting in request_auths:
|
||||
self._apply_auth_params(headers, queries, resource_path, method, body, auth_setting)
|
||||
self._apply_auth_params(
|
||||
headers, queries, resource_path, method, body, auth_setting)
|
||||
return
|
||||
|
||||
for auth in auth_settings:
|
||||
auth_setting = self.configuration.auth_settings().get(auth)
|
||||
if auth_setting:
|
||||
self._apply_auth_params(headers, queries, resource_path, method, body, auth_setting)
|
||||
self._apply_auth_params(
|
||||
headers, queries, resource_path, method, body, auth_setting)
|
||||
|
||||
def _apply_auth_params(self, headers, queries, resource_path, method, body, auth_setting):
|
||||
if auth_setting['in'] == 'cookie':
|
||||
|
@ -66,7 +66,8 @@ class RESTClientObject(object):
|
||||
maxsize = 4
|
||||
|
||||
# https pool manager
|
||||
if configuration.proxy and not should_bypass_proxies(configuration.host, no_proxy=configuration.no_proxy or ''):
|
||||
if configuration.proxy and not should_bypass_proxies(
|
||||
configuration.host, no_proxy=configuration.no_proxy or ''):
|
||||
self.pool_manager = urllib3.ProxyManager(
|
||||
num_pools=pools_size,
|
||||
maxsize=maxsize,
|
||||
@ -139,7 +140,8 @@ class RESTClientObject(object):
|
||||
headers['Content-Type'] = 'application/json'
|
||||
if query_params:
|
||||
url += '?' + urlencode(query_params)
|
||||
if ('Content-Type' not in headers) or (re.search('json', headers['Content-Type'], re.IGNORECASE)):
|
||||
if ('Content-Type' not in headers) or (re.search('json',
|
||||
headers['Content-Type'], re.IGNORECASE)):
|
||||
request_body = None
|
||||
if body is not None:
|
||||
request_body = json.dumps(body)
|
||||
@ -286,6 +288,8 @@ class RESTClientObject(object):
|
||||
body=body)
|
||||
|
||||
# end of class RESTClientObject
|
||||
|
||||
|
||||
def is_ipv4(target):
|
||||
""" Test if IPv4 address or not
|
||||
"""
|
||||
@ -295,6 +299,7 @@ def is_ipv4(target):
|
||||
except ipaddress.AddressValueError:
|
||||
return False
|
||||
|
||||
|
||||
def in_ipv4net(target, net):
|
||||
""" Test if target belongs to given IPv4 network
|
||||
"""
|
||||
@ -309,6 +314,7 @@ def in_ipv4net(target, net):
|
||||
except ipaddress.NetmaskValueError:
|
||||
return False
|
||||
|
||||
|
||||
def should_bypass_proxies(url, no_proxy=None):
|
||||
""" Yet another requests.should_bypass_proxies
|
||||
Test if proxies should not be used for a particular url.
|
||||
|
@ -248,7 +248,8 @@ class ApiClient(object):
|
||||
if collection_types is None:
|
||||
collection_types = (dict)
|
||||
for k, v in params.items() if isinstance(params, dict) else params: # noqa: E501
|
||||
if isinstance(v, collection_types): # v is instance of collection_type, formatting as application/json
|
||||
if isinstance(
|
||||
v, collection_types): # v is instance of collection_type, formatting as application/json
|
||||
v = json.dumps(v, ensure_ascii=False).encode("utf-8")
|
||||
field = RequestField(k, v)
|
||||
field.make_multipart(content_type="application/json; charset=utf-8")
|
||||
@ -273,8 +274,10 @@ class ApiClient(object):
|
||||
"""
|
||||
if isinstance(obj, (ModelNormal, ModelComposed)):
|
||||
return {
|
||||
key: cls.sanitize_for_serialization(val) for key, val in model_to_dict(obj, serialize=True).items()
|
||||
}
|
||||
key: cls.sanitize_for_serialization(val) for key,
|
||||
val in model_to_dict(
|
||||
obj,
|
||||
serialize=True).items()}
|
||||
elif isinstance(obj, io.IOBase):
|
||||
return cls.get_file_data_and_close_file(obj)
|
||||
elif isinstance(obj, (str, int, float, none_type, bool)):
|
||||
@ -287,7 +290,9 @@ class ApiClient(object):
|
||||
return [cls.sanitize_for_serialization(item) for item in obj]
|
||||
if isinstance(obj, dict):
|
||||
return {key: cls.sanitize_for_serialization(val) for key, val in obj.items()}
|
||||
raise ApiValueError('Unable to prepare type {} for serialization'.format(obj.__class__.__name__))
|
||||
raise ApiValueError(
|
||||
'Unable to prepare type {} for serialization'.format(
|
||||
obj.__class__.__name__))
|
||||
|
||||
def deserialize(self, response, response_type, _check_type):
|
||||
"""Deserializes response into an object.
|
||||
@ -531,7 +536,9 @@ class ApiClient(object):
|
||||
file_instance.close()
|
||||
return file_data
|
||||
|
||||
def files_parameters(self, files: typing.Optional[typing.Dict[str, typing.List[io.IOBase]]] = None):
|
||||
def files_parameters(self,
|
||||
files: typing.Optional[typing.Dict[str,
|
||||
typing.List[io.IOBase]]] = None):
|
||||
"""Builds form parameters.
|
||||
|
||||
:param files: None or a dict with key=param_name and
|
||||
@ -622,13 +629,15 @@ class ApiClient(object):
|
||||
|
||||
if request_auths:
|
||||
for auth_setting in request_auths:
|
||||
self._apply_auth_params(headers, queries, resource_path, method, body, auth_setting)
|
||||
self._apply_auth_params(
|
||||
headers, queries, resource_path, method, body, auth_setting)
|
||||
return
|
||||
|
||||
for auth in auth_settings:
|
||||
auth_setting = self.configuration.auth_settings().get(auth)
|
||||
if auth_setting:
|
||||
self._apply_auth_params(headers, queries, resource_path, method, body, auth_setting)
|
||||
self._apply_auth_params(
|
||||
headers, queries, resource_path, method, body, auth_setting)
|
||||
|
||||
def _apply_auth_params(self, headers, queries, resource_path, method, body, auth_setting):
|
||||
if auth_setting['in'] == 'cookie':
|
||||
|
@ -74,7 +74,8 @@ class RESTClientObject(object):
|
||||
maxsize = 4
|
||||
|
||||
# https pool manager
|
||||
if configuration.proxy and not should_bypass_proxies(configuration.host, no_proxy=configuration.no_proxy or ''):
|
||||
if configuration.proxy and not should_bypass_proxies(
|
||||
configuration.host, no_proxy=configuration.no_proxy or ''):
|
||||
self.pool_manager = urllib3.ProxyManager(
|
||||
num_pools=pools_size,
|
||||
maxsize=maxsize,
|
||||
@ -147,7 +148,8 @@ class RESTClientObject(object):
|
||||
headers['Content-Type'] = 'application/json'
|
||||
if query_params:
|
||||
url += '?' + urlencode(query_params)
|
||||
if ('Content-Type' not in headers) or (re.search('json', headers['Content-Type'], re.IGNORECASE)):
|
||||
if ('Content-Type' not in headers) or (re.search('json',
|
||||
headers['Content-Type'], re.IGNORECASE)):
|
||||
request_body = None
|
||||
if body is not None:
|
||||
request_body = json.dumps(body)
|
||||
@ -294,6 +296,8 @@ class RESTClientObject(object):
|
||||
body=body)
|
||||
|
||||
# end of class RESTClientObject
|
||||
|
||||
|
||||
def is_ipv4(target):
|
||||
""" Test if IPv4 address or not
|
||||
"""
|
||||
@ -303,6 +307,7 @@ def is_ipv4(target):
|
||||
except ipaddress.AddressValueError:
|
||||
return False
|
||||
|
||||
|
||||
def in_ipv4net(target, net):
|
||||
""" Test if target belongs to given IPv4 network
|
||||
"""
|
||||
@ -317,6 +322,7 @@ def in_ipv4net(target, net):
|
||||
except ipaddress.NetmaskValueError:
|
||||
return False
|
||||
|
||||
|
||||
def should_bypass_proxies(url, no_proxy=None):
|
||||
""" Yet another requests.should_bypass_proxies
|
||||
Test if proxies should not be used for a particular url.
|
||||
|
@ -248,7 +248,8 @@ class ApiClient(object):
|
||||
if collection_types is None:
|
||||
collection_types = (dict)
|
||||
for k, v in params.items() if isinstance(params, dict) else params: # noqa: E501
|
||||
if isinstance(v, collection_types): # v is instance of collection_type, formatting as application/json
|
||||
if isinstance(
|
||||
v, collection_types): # v is instance of collection_type, formatting as application/json
|
||||
v = json.dumps(v, ensure_ascii=False).encode("utf-8")
|
||||
field = RequestField(k, v)
|
||||
field.make_multipart(content_type="application/json; charset=utf-8")
|
||||
@ -273,8 +274,10 @@ class ApiClient(object):
|
||||
"""
|
||||
if isinstance(obj, (ModelNormal, ModelComposed)):
|
||||
return {
|
||||
key: cls.sanitize_for_serialization(val) for key, val in model_to_dict(obj, serialize=True).items()
|
||||
}
|
||||
key: cls.sanitize_for_serialization(val) for key,
|
||||
val in model_to_dict(
|
||||
obj,
|
||||
serialize=True).items()}
|
||||
elif isinstance(obj, io.IOBase):
|
||||
return cls.get_file_data_and_close_file(obj)
|
||||
elif isinstance(obj, (str, int, float, none_type, bool)):
|
||||
@ -287,7 +290,9 @@ class ApiClient(object):
|
||||
return [cls.sanitize_for_serialization(item) for item in obj]
|
||||
if isinstance(obj, dict):
|
||||
return {key: cls.sanitize_for_serialization(val) for key, val in obj.items()}
|
||||
raise ApiValueError('Unable to prepare type {} for serialization'.format(obj.__class__.__name__))
|
||||
raise ApiValueError(
|
||||
'Unable to prepare type {} for serialization'.format(
|
||||
obj.__class__.__name__))
|
||||
|
||||
def deserialize(self, response, response_type, _check_type):
|
||||
"""Deserializes response into an object.
|
||||
@ -531,7 +536,9 @@ class ApiClient(object):
|
||||
file_instance.close()
|
||||
return file_data
|
||||
|
||||
def files_parameters(self, files: typing.Optional[typing.Dict[str, typing.List[io.IOBase]]] = None):
|
||||
def files_parameters(self,
|
||||
files: typing.Optional[typing.Dict[str,
|
||||
typing.List[io.IOBase]]] = None):
|
||||
"""Builds form parameters.
|
||||
|
||||
:param files: None or a dict with key=param_name and
|
||||
@ -622,13 +629,15 @@ class ApiClient(object):
|
||||
|
||||
if request_auths:
|
||||
for auth_setting in request_auths:
|
||||
self._apply_auth_params(headers, queries, resource_path, method, body, auth_setting)
|
||||
self._apply_auth_params(
|
||||
headers, queries, resource_path, method, body, auth_setting)
|
||||
return
|
||||
|
||||
for auth in auth_settings:
|
||||
auth_setting = self.configuration.auth_settings().get(auth)
|
||||
if auth_setting:
|
||||
self._apply_auth_params(headers, queries, resource_path, method, body, auth_setting)
|
||||
self._apply_auth_params(
|
||||
headers, queries, resource_path, method, body, auth_setting)
|
||||
|
||||
def _apply_auth_params(self, headers, queries, resource_path, method, body, auth_setting):
|
||||
if auth_setting['in'] == 'cookie':
|
||||
|
@ -74,7 +74,8 @@ class RESTClientObject(object):
|
||||
maxsize = 4
|
||||
|
||||
# https pool manager
|
||||
if configuration.proxy and not should_bypass_proxies(configuration.host, no_proxy=configuration.no_proxy or ''):
|
||||
if configuration.proxy and not should_bypass_proxies(
|
||||
configuration.host, no_proxy=configuration.no_proxy or ''):
|
||||
self.pool_manager = urllib3.ProxyManager(
|
||||
num_pools=pools_size,
|
||||
maxsize=maxsize,
|
||||
@ -147,7 +148,8 @@ class RESTClientObject(object):
|
||||
headers['Content-Type'] = 'application/json'
|
||||
if query_params:
|
||||
url += '?' + urlencode(query_params)
|
||||
if ('Content-Type' not in headers) or (re.search('json', headers['Content-Type'], re.IGNORECASE)):
|
||||
if ('Content-Type' not in headers) or (re.search('json',
|
||||
headers['Content-Type'], re.IGNORECASE)):
|
||||
request_body = None
|
||||
if body is not None:
|
||||
request_body = json.dumps(body)
|
||||
@ -294,6 +296,8 @@ class RESTClientObject(object):
|
||||
body=body)
|
||||
|
||||
# end of class RESTClientObject
|
||||
|
||||
|
||||
def is_ipv4(target):
|
||||
""" Test if IPv4 address or not
|
||||
"""
|
||||
@ -303,6 +307,7 @@ def is_ipv4(target):
|
||||
except ipaddress.AddressValueError:
|
||||
return False
|
||||
|
||||
|
||||
def in_ipv4net(target, net):
|
||||
""" Test if target belongs to given IPv4 network
|
||||
"""
|
||||
@ -317,6 +322,7 @@ def in_ipv4net(target, net):
|
||||
except ipaddress.NetmaskValueError:
|
||||
return False
|
||||
|
||||
|
||||
def should_bypass_proxies(url, no_proxy=None):
|
||||
""" Yet another requests.should_bypass_proxies
|
||||
Test if proxies should not be used for a particular url.
|
||||
|
@ -248,7 +248,8 @@ class ApiClient(object):
|
||||
if collection_types is None:
|
||||
collection_types = (dict)
|
||||
for k, v in params.items() if isinstance(params, dict) else params: # noqa: E501
|
||||
if isinstance(v, collection_types): # v is instance of collection_type, formatting as application/json
|
||||
if isinstance(
|
||||
v, collection_types): # v is instance of collection_type, formatting as application/json
|
||||
v = json.dumps(v, ensure_ascii=False).encode("utf-8")
|
||||
field = RequestField(k, v)
|
||||
field.make_multipart(content_type="application/json; charset=utf-8")
|
||||
@ -273,8 +274,10 @@ class ApiClient(object):
|
||||
"""
|
||||
if isinstance(obj, (ModelNormal, ModelComposed)):
|
||||
return {
|
||||
key: cls.sanitize_for_serialization(val) for key, val in model_to_dict(obj, serialize=True).items()
|
||||
}
|
||||
key: cls.sanitize_for_serialization(val) for key,
|
||||
val in model_to_dict(
|
||||
obj,
|
||||
serialize=True).items()}
|
||||
elif isinstance(obj, io.IOBase):
|
||||
return cls.get_file_data_and_close_file(obj)
|
||||
elif isinstance(obj, (str, int, float, none_type, bool)):
|
||||
@ -287,7 +290,9 @@ class ApiClient(object):
|
||||
return [cls.sanitize_for_serialization(item) for item in obj]
|
||||
if isinstance(obj, dict):
|
||||
return {key: cls.sanitize_for_serialization(val) for key, val in obj.items()}
|
||||
raise ApiValueError('Unable to prepare type {} for serialization'.format(obj.__class__.__name__))
|
||||
raise ApiValueError(
|
||||
'Unable to prepare type {} for serialization'.format(
|
||||
obj.__class__.__name__))
|
||||
|
||||
def deserialize(self, response, response_type, _check_type):
|
||||
"""Deserializes response into an object.
|
||||
@ -531,7 +536,9 @@ class ApiClient(object):
|
||||
file_instance.close()
|
||||
return file_data
|
||||
|
||||
def files_parameters(self, files: typing.Optional[typing.Dict[str, typing.List[io.IOBase]]] = None):
|
||||
def files_parameters(self,
|
||||
files: typing.Optional[typing.Dict[str,
|
||||
typing.List[io.IOBase]]] = None):
|
||||
"""Builds form parameters.
|
||||
|
||||
:param files: None or a dict with key=param_name and
|
||||
@ -622,13 +629,15 @@ class ApiClient(object):
|
||||
|
||||
if request_auths:
|
||||
for auth_setting in request_auths:
|
||||
self._apply_auth_params(headers, queries, resource_path, method, body, auth_setting)
|
||||
self._apply_auth_params(
|
||||
headers, queries, resource_path, method, body, auth_setting)
|
||||
return
|
||||
|
||||
for auth in auth_settings:
|
||||
auth_setting = self.configuration.auth_settings().get(auth)
|
||||
if auth_setting:
|
||||
self._apply_auth_params(headers, queries, resource_path, method, body, auth_setting)
|
||||
self._apply_auth_params(
|
||||
headers, queries, resource_path, method, body, auth_setting)
|
||||
|
||||
def _apply_auth_params(self, headers, queries, resource_path, method, body, auth_setting):
|
||||
if auth_setting['in'] == 'cookie':
|
||||
|
@ -74,7 +74,8 @@ class RESTClientObject(object):
|
||||
maxsize = 4
|
||||
|
||||
# https pool manager
|
||||
if configuration.proxy and not should_bypass_proxies(configuration.host, no_proxy=configuration.no_proxy or ''):
|
||||
if configuration.proxy and not should_bypass_proxies(
|
||||
configuration.host, no_proxy=configuration.no_proxy or ''):
|
||||
self.pool_manager = urllib3.ProxyManager(
|
||||
num_pools=pools_size,
|
||||
maxsize=maxsize,
|
||||
@ -147,7 +148,8 @@ class RESTClientObject(object):
|
||||
headers['Content-Type'] = 'application/json'
|
||||
if query_params:
|
||||
url += '?' + urlencode(query_params)
|
||||
if ('Content-Type' not in headers) or (re.search('json', headers['Content-Type'], re.IGNORECASE)):
|
||||
if ('Content-Type' not in headers) or (re.search('json',
|
||||
headers['Content-Type'], re.IGNORECASE)):
|
||||
request_body = None
|
||||
if body is not None:
|
||||
request_body = json.dumps(body)
|
||||
@ -294,6 +296,8 @@ class RESTClientObject(object):
|
||||
body=body)
|
||||
|
||||
# end of class RESTClientObject
|
||||
|
||||
|
||||
def is_ipv4(target):
|
||||
""" Test if IPv4 address or not
|
||||
"""
|
||||
@ -303,6 +307,7 @@ def is_ipv4(target):
|
||||
except ipaddress.AddressValueError:
|
||||
return False
|
||||
|
||||
|
||||
def in_ipv4net(target, net):
|
||||
""" Test if target belongs to given IPv4 network
|
||||
"""
|
||||
@ -317,6 +322,7 @@ def in_ipv4net(target, net):
|
||||
except ipaddress.NetmaskValueError:
|
||||
return False
|
||||
|
||||
|
||||
def should_bypass_proxies(url, no_proxy=None):
|
||||
""" Yet another requests.should_bypass_proxies
|
||||
Test if proxies should not be used for a particular url.
|
||||
|
@ -248,7 +248,8 @@ class ApiClient(object):
|
||||
if collection_types is None:
|
||||
collection_types = (dict)
|
||||
for k, v in params.items() if isinstance(params, dict) else params: # noqa: E501
|
||||
if isinstance(v, collection_types): # v is instance of collection_type, formatting as application/json
|
||||
if isinstance(
|
||||
v, collection_types): # v is instance of collection_type, formatting as application/json
|
||||
v = json.dumps(v, ensure_ascii=False).encode("utf-8")
|
||||
field = RequestField(k, v)
|
||||
field.make_multipart(content_type="application/json; charset=utf-8")
|
||||
@ -273,8 +274,10 @@ class ApiClient(object):
|
||||
"""
|
||||
if isinstance(obj, (ModelNormal, ModelComposed)):
|
||||
return {
|
||||
key: cls.sanitize_for_serialization(val) for key, val in model_to_dict(obj, serialize=True).items()
|
||||
}
|
||||
key: cls.sanitize_for_serialization(val) for key,
|
||||
val in model_to_dict(
|
||||
obj,
|
||||
serialize=True).items()}
|
||||
elif isinstance(obj, io.IOBase):
|
||||
return cls.get_file_data_and_close_file(obj)
|
||||
elif isinstance(obj, (str, int, float, none_type, bool)):
|
||||
@ -287,7 +290,9 @@ class ApiClient(object):
|
||||
return [cls.sanitize_for_serialization(item) for item in obj]
|
||||
if isinstance(obj, dict):
|
||||
return {key: cls.sanitize_for_serialization(val) for key, val in obj.items()}
|
||||
raise ApiValueError('Unable to prepare type {} for serialization'.format(obj.__class__.__name__))
|
||||
raise ApiValueError(
|
||||
'Unable to prepare type {} for serialization'.format(
|
||||
obj.__class__.__name__))
|
||||
|
||||
def deserialize(self, response, response_type, _check_type):
|
||||
"""Deserializes response into an object.
|
||||
@ -531,7 +536,9 @@ class ApiClient(object):
|
||||
file_instance.close()
|
||||
return file_data
|
||||
|
||||
def files_parameters(self, files: typing.Optional[typing.Dict[str, typing.List[io.IOBase]]] = None):
|
||||
def files_parameters(self,
|
||||
files: typing.Optional[typing.Dict[str,
|
||||
typing.List[io.IOBase]]] = None):
|
||||
"""Builds form parameters.
|
||||
|
||||
:param files: None or a dict with key=param_name and
|
||||
@ -622,13 +629,15 @@ class ApiClient(object):
|
||||
|
||||
if request_auths:
|
||||
for auth_setting in request_auths:
|
||||
self._apply_auth_params(headers, queries, resource_path, method, body, auth_setting)
|
||||
self._apply_auth_params(
|
||||
headers, queries, resource_path, method, body, auth_setting)
|
||||
return
|
||||
|
||||
for auth in auth_settings:
|
||||
auth_setting = self.configuration.auth_settings().get(auth)
|
||||
if auth_setting:
|
||||
self._apply_auth_params(headers, queries, resource_path, method, body, auth_setting)
|
||||
self._apply_auth_params(
|
||||
headers, queries, resource_path, method, body, auth_setting)
|
||||
|
||||
def _apply_auth_params(self, headers, queries, resource_path, method, body, auth_setting):
|
||||
if auth_setting['in'] == 'cookie':
|
||||
|
@ -74,7 +74,8 @@ class RESTClientObject(object):
|
||||
maxsize = 4
|
||||
|
||||
# https pool manager
|
||||
if configuration.proxy and not should_bypass_proxies(configuration.host, no_proxy=configuration.no_proxy or ''):
|
||||
if configuration.proxy and not should_bypass_proxies(
|
||||
configuration.host, no_proxy=configuration.no_proxy or ''):
|
||||
self.pool_manager = urllib3.ProxyManager(
|
||||
num_pools=pools_size,
|
||||
maxsize=maxsize,
|
||||
@ -147,7 +148,8 @@ class RESTClientObject(object):
|
||||
headers['Content-Type'] = 'application/json'
|
||||
if query_params:
|
||||
url += '?' + urlencode(query_params)
|
||||
if ('Content-Type' not in headers) or (re.search('json', headers['Content-Type'], re.IGNORECASE)):
|
||||
if ('Content-Type' not in headers) or (re.search('json',
|
||||
headers['Content-Type'], re.IGNORECASE)):
|
||||
request_body = None
|
||||
if body is not None:
|
||||
request_body = json.dumps(body)
|
||||
@ -294,6 +296,8 @@ class RESTClientObject(object):
|
||||
body=body)
|
||||
|
||||
# end of class RESTClientObject
|
||||
|
||||
|
||||
def is_ipv4(target):
|
||||
""" Test if IPv4 address or not
|
||||
"""
|
||||
@ -303,6 +307,7 @@ def is_ipv4(target):
|
||||
except ipaddress.AddressValueError:
|
||||
return False
|
||||
|
||||
|
||||
def in_ipv4net(target, net):
|
||||
""" Test if target belongs to given IPv4 network
|
||||
"""
|
||||
@ -317,6 +322,7 @@ def in_ipv4net(target, net):
|
||||
except ipaddress.NetmaskValueError:
|
||||
return False
|
||||
|
||||
|
||||
def should_bypass_proxies(url, no_proxy=None):
|
||||
""" Yet another requests.should_bypass_proxies
|
||||
Test if proxies should not be used for a particular url.
|
||||
|
@ -248,7 +248,8 @@ class ApiClient(object):
|
||||
if collection_types is None:
|
||||
collection_types = (dict)
|
||||
for k, v in params.items() if isinstance(params, dict) else params: # noqa: E501
|
||||
if isinstance(v, collection_types): # v is instance of collection_type, formatting as application/json
|
||||
if isinstance(
|
||||
v, collection_types): # v is instance of collection_type, formatting as application/json
|
||||
v = json.dumps(v, ensure_ascii=False).encode("utf-8")
|
||||
field = RequestField(k, v)
|
||||
field.make_multipart(content_type="application/json; charset=utf-8")
|
||||
@ -273,8 +274,10 @@ class ApiClient(object):
|
||||
"""
|
||||
if isinstance(obj, (ModelNormal, ModelComposed)):
|
||||
return {
|
||||
key: cls.sanitize_for_serialization(val) for key, val in model_to_dict(obj, serialize=True).items()
|
||||
}
|
||||
key: cls.sanitize_for_serialization(val) for key,
|
||||
val in model_to_dict(
|
||||
obj,
|
||||
serialize=True).items()}
|
||||
elif isinstance(obj, io.IOBase):
|
||||
return cls.get_file_data_and_close_file(obj)
|
||||
elif isinstance(obj, (str, int, float, none_type, bool)):
|
||||
@ -287,7 +290,9 @@ class ApiClient(object):
|
||||
return [cls.sanitize_for_serialization(item) for item in obj]
|
||||
if isinstance(obj, dict):
|
||||
return {key: cls.sanitize_for_serialization(val) for key, val in obj.items()}
|
||||
raise ApiValueError('Unable to prepare type {} for serialization'.format(obj.__class__.__name__))
|
||||
raise ApiValueError(
|
||||
'Unable to prepare type {} for serialization'.format(
|
||||
obj.__class__.__name__))
|
||||
|
||||
def deserialize(self, response, response_type, _check_type):
|
||||
"""Deserializes response into an object.
|
||||
@ -531,7 +536,9 @@ class ApiClient(object):
|
||||
file_instance.close()
|
||||
return file_data
|
||||
|
||||
def files_parameters(self, files: typing.Optional[typing.Dict[str, typing.List[io.IOBase]]] = None):
|
||||
def files_parameters(self,
|
||||
files: typing.Optional[typing.Dict[str,
|
||||
typing.List[io.IOBase]]] = None):
|
||||
"""Builds form parameters.
|
||||
|
||||
:param files: None or a dict with key=param_name and
|
||||
@ -622,13 +629,15 @@ class ApiClient(object):
|
||||
|
||||
if request_auths:
|
||||
for auth_setting in request_auths:
|
||||
self._apply_auth_params(headers, queries, resource_path, method, body, auth_setting)
|
||||
self._apply_auth_params(
|
||||
headers, queries, resource_path, method, body, auth_setting)
|
||||
return
|
||||
|
||||
for auth in auth_settings:
|
||||
auth_setting = self.configuration.auth_settings().get(auth)
|
||||
if auth_setting:
|
||||
self._apply_auth_params(headers, queries, resource_path, method, body, auth_setting)
|
||||
self._apply_auth_params(
|
||||
headers, queries, resource_path, method, body, auth_setting)
|
||||
|
||||
def _apply_auth_params(self, headers, queries, resource_path, method, body, auth_setting):
|
||||
if auth_setting['in'] == 'cookie':
|
||||
|
@ -74,7 +74,8 @@ class RESTClientObject(object):
|
||||
maxsize = 4
|
||||
|
||||
# https pool manager
|
||||
if configuration.proxy and not should_bypass_proxies(configuration.host, no_proxy=configuration.no_proxy or ''):
|
||||
if configuration.proxy and not should_bypass_proxies(
|
||||
configuration.host, no_proxy=configuration.no_proxy or ''):
|
||||
self.pool_manager = urllib3.ProxyManager(
|
||||
num_pools=pools_size,
|
||||
maxsize=maxsize,
|
||||
@ -147,7 +148,8 @@ class RESTClientObject(object):
|
||||
headers['Content-Type'] = 'application/json'
|
||||
if query_params:
|
||||
url += '?' + urlencode(query_params)
|
||||
if ('Content-Type' not in headers) or (re.search('json', headers['Content-Type'], re.IGNORECASE)):
|
||||
if ('Content-Type' not in headers) or (re.search('json',
|
||||
headers['Content-Type'], re.IGNORECASE)):
|
||||
request_body = None
|
||||
if body is not None:
|
||||
request_body = json.dumps(body)
|
||||
@ -294,6 +296,8 @@ class RESTClientObject(object):
|
||||
body=body)
|
||||
|
||||
# end of class RESTClientObject
|
||||
|
||||
|
||||
def is_ipv4(target):
|
||||
""" Test if IPv4 address or not
|
||||
"""
|
||||
@ -303,6 +307,7 @@ def is_ipv4(target):
|
||||
except ipaddress.AddressValueError:
|
||||
return False
|
||||
|
||||
|
||||
def in_ipv4net(target, net):
|
||||
""" Test if target belongs to given IPv4 network
|
||||
"""
|
||||
@ -317,6 +322,7 @@ def in_ipv4net(target, net):
|
||||
except ipaddress.NetmaskValueError:
|
||||
return False
|
||||
|
||||
|
||||
def should_bypass_proxies(url, no_proxy=None):
|
||||
""" Yet another requests.should_bypass_proxies
|
||||
Test if proxies should not be used for a particular url.
|
||||
|
Loading…
x
Reference in New Issue
Block a user