forked from loafle/openapi-generator-original
add no_proxy support to python client (#10648)
* add no_proxy support to python client * add unittest for no_proxy supporting, python client * update samples for no_proxy supporting, python client * fix input parameter in samples/openapi3/.../tests_manual/test_extra_pool_config_options.py * re-implement no_proxy support to python client according to PR conversation #10648 * re-update samples for no_proxy supporting, python client
This commit is contained in:
@@ -278,6 +278,9 @@ conf = {{{packageName}}}.Configuration(
|
||||
self.proxy = None
|
||||
"""Proxy URL
|
||||
"""
|
||||
self.no_proxy = None
|
||||
"""bypass proxy for host in the no_proxy list.
|
||||
"""
|
||||
self.proxy_headers = None
|
||||
"""Proxy headers
|
||||
"""
|
||||
|
||||
@@ -6,8 +6,10 @@ import logging
|
||||
import re
|
||||
import ssl
|
||||
from urllib.parse import urlencode
|
||||
|
||||
from urllib.parse import urlparse
|
||||
from urllib.request import proxy_bypass_environment
|
||||
import urllib3
|
||||
import ipaddress
|
||||
|
||||
from {{packageName}}.exceptions import ApiException, UnauthorizedException, ForbiddenException, NotFoundException, ServiceException, ApiValueError
|
||||
|
||||
@@ -64,7 +66,7 @@ class RESTClientObject(object):
|
||||
maxsize = 4
|
||||
|
||||
# https pool manager
|
||||
if configuration.proxy:
|
||||
if configuration.proxy and not should_bypass_proxies(configuration.host, no_proxy=configuration.no_proxy or ''):
|
||||
self.pool_manager = urllib3.ProxyManager(
|
||||
num_pools=pools_size,
|
||||
maxsize=maxsize,
|
||||
@@ -282,3 +284,55 @@ class RESTClientObject(object):
|
||||
_preload_content=_preload_content,
|
||||
_request_timeout=_request_timeout,
|
||||
body=body)
|
||||
|
||||
# end of class RESTClientObject
|
||||
def is_ipv4(target):
|
||||
""" Test if IPv4 address or not
|
||||
"""
|
||||
try:
|
||||
chk = ipaddress.IPv4Address(target)
|
||||
return True
|
||||
except ipaddress.AddressValueError:
|
||||
return False
|
||||
|
||||
def in_ipv4net(target, net):
|
||||
""" Test if target belongs to given IPv4 network
|
||||
"""
|
||||
try:
|
||||
nw = ipaddress.IPv4Network(net)
|
||||
ip = ipaddress.IPv4Address(target)
|
||||
if ip in nw:
|
||||
return True
|
||||
return False
|
||||
except ipaddress.AddressValueError:
|
||||
return False
|
||||
except ipaddress.NetmaskValueError:
|
||||
return False
|
||||
|
||||
def should_bypass_proxies(url, no_proxy=None):
|
||||
""" Yet another requests.should_bypass_proxies
|
||||
Test if proxies should not be used for a particular url.
|
||||
"""
|
||||
|
||||
parsed = urlparse(url)
|
||||
|
||||
# special cases
|
||||
if parsed.hostname in [None, '']:
|
||||
return True
|
||||
|
||||
# special cases
|
||||
if no_proxy in [None , '']:
|
||||
return False
|
||||
if no_proxy == '*':
|
||||
return True
|
||||
|
||||
no_proxy = no_proxy.lower().replace(' ','');
|
||||
entries = (
|
||||
host for host in no_proxy.split(',') if host
|
||||
)
|
||||
|
||||
if is_ipv4(parsed.hostname):
|
||||
for item in entries:
|
||||
if in_ipv4net(parsed.hostname, item):
|
||||
return True
|
||||
return proxy_bypass_environment(parsed.hostname, {'no': no_proxy} )
|
||||
|
||||
@@ -29,6 +29,7 @@ import io.swagger.v3.parser.util.SchemaTypeUtil;
|
||||
import java.io.File;
|
||||
import java.math.BigDecimal;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
@@ -464,4 +465,26 @@ public class PythonClientTest {
|
||||
|
||||
}
|
||||
|
||||
@Test(description = "tests NoProxyPyClient")
|
||||
public void testNoProxyPyClient() throws Exception {
|
||||
|
||||
final String gen = "python";
|
||||
final String spec = "src/test/resources/3_0/petstore.yaml";
|
||||
|
||||
File output = Files.createTempDirectory("test").toFile();
|
||||
final CodegenConfigurator configurator = new CodegenConfigurator()
|
||||
.setGeneratorName(gen)
|
||||
.setInputSpec(spec)
|
||||
.setOutputDir(output.getAbsolutePath().replace("\\", "/"));
|
||||
final ClientOptInput clientOptInput = configurator.toClientOptInput();
|
||||
DefaultGenerator generator = new DefaultGenerator();
|
||||
List<File> files = generator.opts(clientOptInput).generate();
|
||||
|
||||
for (String f : new String[] { "openapi_client/configuration.py", "openapi_client/rest.py" } ) {
|
||||
TestUtils.ensureContainsFile(files, output, f);
|
||||
Path p = output.toPath().resolve(f);
|
||||
TestUtils.assertFileContains(p, "no_proxy");
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -215,6 +215,9 @@ conf = petstore_api.Configuration(
|
||||
self.proxy = None
|
||||
"""Proxy URL
|
||||
"""
|
||||
self.no_proxy = None
|
||||
"""bypass proxy for host in the no_proxy list.
|
||||
"""
|
||||
self.proxy_headers = None
|
||||
"""Proxy headers
|
||||
"""
|
||||
|
||||
@@ -14,8 +14,10 @@ import logging
|
||||
import re
|
||||
import ssl
|
||||
from urllib.parse import urlencode
|
||||
|
||||
from urllib.parse import urlparse
|
||||
from urllib.request import proxy_bypass_environment
|
||||
import urllib3
|
||||
import ipaddress
|
||||
|
||||
from petstore_api.exceptions import ApiException, UnauthorizedException, ForbiddenException, NotFoundException, ServiceException, ApiValueError
|
||||
|
||||
@@ -72,7 +74,7 @@ class RESTClientObject(object):
|
||||
maxsize = 4
|
||||
|
||||
# https pool manager
|
||||
if configuration.proxy:
|
||||
if configuration.proxy and not should_bypass_proxies(configuration.host, no_proxy=configuration.no_proxy or ''):
|
||||
self.pool_manager = urllib3.ProxyManager(
|
||||
num_pools=pools_size,
|
||||
maxsize=maxsize,
|
||||
@@ -290,3 +292,55 @@ class RESTClientObject(object):
|
||||
_preload_content=_preload_content,
|
||||
_request_timeout=_request_timeout,
|
||||
body=body)
|
||||
|
||||
# end of class RESTClientObject
|
||||
def is_ipv4(target):
|
||||
""" Test if IPv4 address or not
|
||||
"""
|
||||
try:
|
||||
chk = ipaddress.IPv4Address(target)
|
||||
return True
|
||||
except ipaddress.AddressValueError:
|
||||
return False
|
||||
|
||||
def in_ipv4net(target, net):
|
||||
""" Test if target belongs to given IPv4 network
|
||||
"""
|
||||
try:
|
||||
nw = ipaddress.IPv4Network(net)
|
||||
ip = ipaddress.IPv4Address(target)
|
||||
if ip in nw:
|
||||
return True
|
||||
return False
|
||||
except ipaddress.AddressValueError:
|
||||
return False
|
||||
except ipaddress.NetmaskValueError:
|
||||
return False
|
||||
|
||||
def should_bypass_proxies(url, no_proxy=None):
|
||||
""" Yet another requests.should_bypass_proxies
|
||||
Test if proxies should not be used for a particular url.
|
||||
"""
|
||||
|
||||
parsed = urlparse(url)
|
||||
|
||||
# special cases
|
||||
if parsed.hostname in [None, '']:
|
||||
return True
|
||||
|
||||
# special cases
|
||||
if no_proxy in [None , '']:
|
||||
return False
|
||||
if no_proxy == '*':
|
||||
return True
|
||||
|
||||
no_proxy = no_proxy.lower().replace(' ','');
|
||||
entries = (
|
||||
host for host in no_proxy.split(',') if host
|
||||
)
|
||||
|
||||
if is_ipv4(parsed.hostname):
|
||||
for item in entries:
|
||||
if in_ipv4net(parsed.hostname, item):
|
||||
return True
|
||||
return proxy_bypass_environment(parsed.hostname, {'no': no_proxy} )
|
||||
|
||||
@@ -215,6 +215,9 @@ conf = petstore_api.Configuration(
|
||||
self.proxy = None
|
||||
"""Proxy URL
|
||||
"""
|
||||
self.no_proxy = None
|
||||
"""bypass proxy for host in the no_proxy list.
|
||||
"""
|
||||
self.proxy_headers = None
|
||||
"""Proxy headers
|
||||
"""
|
||||
|
||||
@@ -14,8 +14,10 @@ import logging
|
||||
import re
|
||||
import ssl
|
||||
from urllib.parse import urlencode
|
||||
|
||||
from urllib.parse import urlparse
|
||||
from urllib.request import proxy_bypass_environment
|
||||
import urllib3
|
||||
import ipaddress
|
||||
|
||||
from petstore_api.exceptions import ApiException, UnauthorizedException, ForbiddenException, NotFoundException, ServiceException, ApiValueError
|
||||
|
||||
@@ -72,7 +74,7 @@ class RESTClientObject(object):
|
||||
maxsize = 4
|
||||
|
||||
# https pool manager
|
||||
if configuration.proxy:
|
||||
if configuration.proxy and not should_bypass_proxies(configuration.host, no_proxy=configuration.no_proxy or ''):
|
||||
self.pool_manager = urllib3.ProxyManager(
|
||||
num_pools=pools_size,
|
||||
maxsize=maxsize,
|
||||
@@ -290,3 +292,55 @@ class RESTClientObject(object):
|
||||
_preload_content=_preload_content,
|
||||
_request_timeout=_request_timeout,
|
||||
body=body)
|
||||
|
||||
# end of class RESTClientObject
|
||||
def is_ipv4(target):
|
||||
""" Test if IPv4 address or not
|
||||
"""
|
||||
try:
|
||||
chk = ipaddress.IPv4Address(target)
|
||||
return True
|
||||
except ipaddress.AddressValueError:
|
||||
return False
|
||||
|
||||
def in_ipv4net(target, net):
|
||||
""" Test if target belongs to given IPv4 network
|
||||
"""
|
||||
try:
|
||||
nw = ipaddress.IPv4Network(net)
|
||||
ip = ipaddress.IPv4Address(target)
|
||||
if ip in nw:
|
||||
return True
|
||||
return False
|
||||
except ipaddress.AddressValueError:
|
||||
return False
|
||||
except ipaddress.NetmaskValueError:
|
||||
return False
|
||||
|
||||
def should_bypass_proxies(url, no_proxy=None):
|
||||
""" Yet another requests.should_bypass_proxies
|
||||
Test if proxies should not be used for a particular url.
|
||||
"""
|
||||
|
||||
parsed = urlparse(url)
|
||||
|
||||
# special cases
|
||||
if parsed.hostname in [None, '']:
|
||||
return True
|
||||
|
||||
# special cases
|
||||
if no_proxy in [None , '']:
|
||||
return False
|
||||
if no_proxy == '*':
|
||||
return True
|
||||
|
||||
no_proxy = no_proxy.lower().replace(' ','');
|
||||
entries = (
|
||||
host for host in no_proxy.split(',') if host
|
||||
)
|
||||
|
||||
if is_ipv4(parsed.hostname):
|
||||
for item in entries:
|
||||
if in_ipv4net(parsed.hostname, item):
|
||||
return True
|
||||
return proxy_bypass_environment(parsed.hostname, {'no': no_proxy} )
|
||||
|
||||
@@ -199,6 +199,9 @@ conf = x_auth_id_alias.Configuration(
|
||||
self.proxy = None
|
||||
"""Proxy URL
|
||||
"""
|
||||
self.no_proxy = None
|
||||
"""bypass proxy for host in the no_proxy list.
|
||||
"""
|
||||
self.proxy_headers = None
|
||||
"""Proxy headers
|
||||
"""
|
||||
|
||||
@@ -14,8 +14,10 @@ import logging
|
||||
import re
|
||||
import ssl
|
||||
from urllib.parse import urlencode
|
||||
|
||||
from urllib.parse import urlparse
|
||||
from urllib.request import proxy_bypass_environment
|
||||
import urllib3
|
||||
import ipaddress
|
||||
|
||||
from x_auth_id_alias.exceptions import ApiException, UnauthorizedException, ForbiddenException, NotFoundException, ServiceException, ApiValueError
|
||||
|
||||
@@ -72,7 +74,7 @@ class RESTClientObject(object):
|
||||
maxsize = 4
|
||||
|
||||
# https pool manager
|
||||
if configuration.proxy:
|
||||
if configuration.proxy and not should_bypass_proxies(configuration.host, no_proxy=configuration.no_proxy or ''):
|
||||
self.pool_manager = urllib3.ProxyManager(
|
||||
num_pools=pools_size,
|
||||
maxsize=maxsize,
|
||||
@@ -290,3 +292,55 @@ class RESTClientObject(object):
|
||||
_preload_content=_preload_content,
|
||||
_request_timeout=_request_timeout,
|
||||
body=body)
|
||||
|
||||
# end of class RESTClientObject
|
||||
def is_ipv4(target):
|
||||
""" Test if IPv4 address or not
|
||||
"""
|
||||
try:
|
||||
chk = ipaddress.IPv4Address(target)
|
||||
return True
|
||||
except ipaddress.AddressValueError:
|
||||
return False
|
||||
|
||||
def in_ipv4net(target, net):
|
||||
""" Test if target belongs to given IPv4 network
|
||||
"""
|
||||
try:
|
||||
nw = ipaddress.IPv4Network(net)
|
||||
ip = ipaddress.IPv4Address(target)
|
||||
if ip in nw:
|
||||
return True
|
||||
return False
|
||||
except ipaddress.AddressValueError:
|
||||
return False
|
||||
except ipaddress.NetmaskValueError:
|
||||
return False
|
||||
|
||||
def should_bypass_proxies(url, no_proxy=None):
|
||||
""" Yet another requests.should_bypass_proxies
|
||||
Test if proxies should not be used for a particular url.
|
||||
"""
|
||||
|
||||
parsed = urlparse(url)
|
||||
|
||||
# special cases
|
||||
if parsed.hostname in [None, '']:
|
||||
return True
|
||||
|
||||
# special cases
|
||||
if no_proxy in [None , '']:
|
||||
return False
|
||||
if no_proxy == '*':
|
||||
return True
|
||||
|
||||
no_proxy = no_proxy.lower().replace(' ','');
|
||||
entries = (
|
||||
host for host in no_proxy.split(',') if host
|
||||
)
|
||||
|
||||
if is_ipv4(parsed.hostname):
|
||||
for item in entries:
|
||||
if in_ipv4net(parsed.hostname, item):
|
||||
return True
|
||||
return proxy_bypass_environment(parsed.hostname, {'no': no_proxy} )
|
||||
|
||||
@@ -179,6 +179,9 @@ class Configuration(object):
|
||||
self.proxy = None
|
||||
"""Proxy URL
|
||||
"""
|
||||
self.no_proxy = None
|
||||
"""bypass proxy for host in the no_proxy list.
|
||||
"""
|
||||
self.proxy_headers = None
|
||||
"""Proxy headers
|
||||
"""
|
||||
|
||||
@@ -14,8 +14,10 @@ import logging
|
||||
import re
|
||||
import ssl
|
||||
from urllib.parse import urlencode
|
||||
|
||||
from urllib.parse import urlparse
|
||||
from urllib.request import proxy_bypass_environment
|
||||
import urllib3
|
||||
import ipaddress
|
||||
|
||||
from dynamic_servers.exceptions import ApiException, UnauthorizedException, ForbiddenException, NotFoundException, ServiceException, ApiValueError
|
||||
|
||||
@@ -72,7 +74,7 @@ class RESTClientObject(object):
|
||||
maxsize = 4
|
||||
|
||||
# https pool manager
|
||||
if configuration.proxy:
|
||||
if configuration.proxy and not should_bypass_proxies(configuration.host, no_proxy=configuration.no_proxy or ''):
|
||||
self.pool_manager = urllib3.ProxyManager(
|
||||
num_pools=pools_size,
|
||||
maxsize=maxsize,
|
||||
@@ -290,3 +292,55 @@ class RESTClientObject(object):
|
||||
_preload_content=_preload_content,
|
||||
_request_timeout=_request_timeout,
|
||||
body=body)
|
||||
|
||||
# end of class RESTClientObject
|
||||
def is_ipv4(target):
|
||||
""" Test if IPv4 address or not
|
||||
"""
|
||||
try:
|
||||
chk = ipaddress.IPv4Address(target)
|
||||
return True
|
||||
except ipaddress.AddressValueError:
|
||||
return False
|
||||
|
||||
def in_ipv4net(target, net):
|
||||
""" Test if target belongs to given IPv4 network
|
||||
"""
|
||||
try:
|
||||
nw = ipaddress.IPv4Network(net)
|
||||
ip = ipaddress.IPv4Address(target)
|
||||
if ip in nw:
|
||||
return True
|
||||
return False
|
||||
except ipaddress.AddressValueError:
|
||||
return False
|
||||
except ipaddress.NetmaskValueError:
|
||||
return False
|
||||
|
||||
def should_bypass_proxies(url, no_proxy=None):
|
||||
""" Yet another requests.should_bypass_proxies
|
||||
Test if proxies should not be used for a particular url.
|
||||
"""
|
||||
|
||||
parsed = urlparse(url)
|
||||
|
||||
# special cases
|
||||
if parsed.hostname in [None, '']:
|
||||
return True
|
||||
|
||||
# special cases
|
||||
if no_proxy in [None , '']:
|
||||
return False
|
||||
if no_proxy == '*':
|
||||
return True
|
||||
|
||||
no_proxy = no_proxy.lower().replace(' ','');
|
||||
entries = (
|
||||
host for host in no_proxy.split(',') if host
|
||||
)
|
||||
|
||||
if is_ipv4(parsed.hostname):
|
||||
for item in entries:
|
||||
if in_ipv4net(parsed.hostname, item):
|
||||
return True
|
||||
return proxy_bypass_environment(parsed.hostname, {'no': no_proxy} )
|
||||
|
||||
@@ -262,6 +262,9 @@ conf = petstore_api.Configuration(
|
||||
self.proxy = None
|
||||
"""Proxy URL
|
||||
"""
|
||||
self.no_proxy = None
|
||||
"""bypass proxy for host in the no_proxy list.
|
||||
"""
|
||||
self.proxy_headers = None
|
||||
"""Proxy headers
|
||||
"""
|
||||
|
||||
@@ -14,8 +14,10 @@ import logging
|
||||
import re
|
||||
import ssl
|
||||
from urllib.parse import urlencode
|
||||
|
||||
from urllib.parse import urlparse
|
||||
from urllib.request import proxy_bypass_environment
|
||||
import urllib3
|
||||
import ipaddress
|
||||
|
||||
from petstore_api.exceptions import ApiException, UnauthorizedException, ForbiddenException, NotFoundException, ServiceException, ApiValueError
|
||||
|
||||
@@ -72,7 +74,7 @@ class RESTClientObject(object):
|
||||
maxsize = 4
|
||||
|
||||
# https pool manager
|
||||
if configuration.proxy:
|
||||
if configuration.proxy and not should_bypass_proxies(configuration.host, no_proxy=configuration.no_proxy or ''):
|
||||
self.pool_manager = urllib3.ProxyManager(
|
||||
num_pools=pools_size,
|
||||
maxsize=maxsize,
|
||||
@@ -290,3 +292,55 @@ class RESTClientObject(object):
|
||||
_preload_content=_preload_content,
|
||||
_request_timeout=_request_timeout,
|
||||
body=body)
|
||||
|
||||
# end of class RESTClientObject
|
||||
def is_ipv4(target):
|
||||
""" Test if IPv4 address or not
|
||||
"""
|
||||
try:
|
||||
chk = ipaddress.IPv4Address(target)
|
||||
return True
|
||||
except ipaddress.AddressValueError:
|
||||
return False
|
||||
|
||||
def in_ipv4net(target, net):
|
||||
""" Test if target belongs to given IPv4 network
|
||||
"""
|
||||
try:
|
||||
nw = ipaddress.IPv4Network(net)
|
||||
ip = ipaddress.IPv4Address(target)
|
||||
if ip in nw:
|
||||
return True
|
||||
return False
|
||||
except ipaddress.AddressValueError:
|
||||
return False
|
||||
except ipaddress.NetmaskValueError:
|
||||
return False
|
||||
|
||||
def should_bypass_proxies(url, no_proxy=None):
|
||||
""" Yet another requests.should_bypass_proxies
|
||||
Test if proxies should not be used for a particular url.
|
||||
"""
|
||||
|
||||
parsed = urlparse(url)
|
||||
|
||||
# special cases
|
||||
if parsed.hostname in [None, '']:
|
||||
return True
|
||||
|
||||
# special cases
|
||||
if no_proxy in [None , '']:
|
||||
return False
|
||||
if no_proxy == '*':
|
||||
return True
|
||||
|
||||
no_proxy = no_proxy.lower().replace(' ','');
|
||||
entries = (
|
||||
host for host in no_proxy.split(',') if host
|
||||
)
|
||||
|
||||
if is_ipv4(parsed.hostname):
|
||||
for item in entries:
|
||||
if in_ipv4net(parsed.hostname, item):
|
||||
return True
|
||||
return proxy_bypass_environment(parsed.hostname, {'no': no_proxy} )
|
||||
|
||||
@@ -48,9 +48,9 @@ class TestExtraOptionsForPools(unittest.TestCase):
|
||||
|
||||
socket_options = ["extra", "socket", "options"]
|
||||
|
||||
config = petstore_api.Configuration(host="HOST")
|
||||
config = petstore_api.Configuration(host="http://somehost.local:8080")
|
||||
config.socket_options = socket_options
|
||||
config.proxy = True
|
||||
config.proxy = "http://proxy.local:8080/"
|
||||
|
||||
with patch("petstore_api.rest.urllib3.ProxyManager", StubProxyManager):
|
||||
api_client = petstore_api.ApiClient(config)
|
||||
|
||||
Reference in New Issue
Block a user