Add option verify_ssl to switch SSL/TLS verification.

This commit is contained in:
geekerzp
2015-08-25 17:58:14 +08:00
parent 392e5172b5
commit b3d28f07f1
10 changed files with 180 additions and 240 deletions

View File

@@ -18,6 +18,7 @@ Copyright 2015 SmartBear Software
Credit: this file (rest.py) is modified based on rest.py in Dropbox Python SDK:
https://www.dropbox.com/developers/core/sdks/python
"""
from __future__ import absolute_import
import sys
import io
@@ -29,6 +30,8 @@ import logging
# python 2 and python 3 compatibility library
from six import iteritems
from .configuration import Configuration
try:
import urllib3
except ImportError:
@@ -69,31 +72,24 @@ class RESTResponse(io.IOBase):
class RESTClientObject(object):
def __init__(self, pools_size=4):
# http pool manager
self.pool_manager = urllib3.PoolManager(
num_pools=pools_size
)
if Configuration().verify_ssl:
cert_reqs = ssl.CERT_REQUIRED
else:
cert_reqs = ssl.CERT_NONE
if Configuration().ssl_ca_cert:
ca_certs = Configuration().ssl_ca_cert
else:
# if not set certificate file, use Mozilla's root certificates.
ca_certs = certifi.where()
# https pool manager
# certificates validated using Mozillas root certificates
self.ssl_pool_manager = urllib3.PoolManager(
self.pool_manager = urllib3.PoolManager(
num_pools=pools_size,
cert_reqs=ssl.CERT_REQUIRED,
ca_certs=certifi.where()
cert_reqs=cert_reqs,
ca_certs=ca_certs
)
def agent(self, url):
"""
Use `urllib3.util.parse_url` for backward compatibility.
Return proper pool manager for the http/https schemes.
"""
url = urllib3.util.parse_url(url)
scheme = url.scheme
if scheme == 'https':
return self.ssl_pool_manager
else:
return self.pool_manager
def request(self, method, url, query_params=None, headers=None,
body=None, post_params=None):
"""
@@ -120,32 +116,37 @@ class RESTClientObject(object):
if 'Content-Type' not in headers:
headers['Content-Type'] = 'application/json'
# For `POST`, `PUT`, `PATCH`
if method in ['POST', 'PUT', 'PATCH']:
if query_params:
url += '?' + urlencode(query_params)
if headers['Content-Type'] == 'application/json':
r = self.agent(url).request(method, url,
body=json.dumps(body),
headers=headers)
if headers['Content-Type'] == 'application/x-www-form-urlencoded':
r = self.agent(url).request(method, url,
fields=post_params,
encode_multipart=False,
headers=headers)
if headers['Content-Type'] == 'multipart/form-data':
# must del headers['Content-Type'], or the correct Content-Type
# which generated by urllib3 will be overwritten.
del headers['Content-Type']
r = self.agent(url).request(method, url,
fields=post_params,
encode_multipart=True,
headers=headers)
# For `GET`, `HEAD`, `DELETE`
else:
r = self.agent(url).request(method, url,
fields=query_params,
headers=headers)
try:
# For `POST`, `PUT`, `PATCH`
if method in ['POST', 'PUT', 'PATCH']:
if query_params:
url += '?' + urlencode(query_params)
if headers['Content-Type'] == 'application/json':
r = self.pool_manager.request(method, url,
body=json.dumps(body),
headers=headers)
if headers['Content-Type'] == 'application/x-www-form-urlencoded':
r = self.pool_manager.request(method, url,
fields=post_params,
encode_multipart=False,
headers=headers)
if headers['Content-Type'] == 'multipart/form-data':
# must del headers['Content-Type'], or the correct Content-Type
# which generated by urllib3 will be overwritten.
del headers['Content-Type']
r = self.pool_manager.request(method, url,
fields=post_params,
encode_multipart=True,
headers=headers)
# For `GET`, `HEAD`, `DELETE`
else:
r = self.pool_manager.request(method, url,
fields=query_params,
headers=headers)
except urllib3.exceptions.SSLError as e:
msg = "{0}\n{1}".format(type(e).__name__, str(e))
raise ApiException(status=0, reason=msg)
r = RESTResponse(r)
# In the python 3, the response.data is bytes.
@@ -227,58 +228,20 @@ class ApiException(Exception):
return error_message
class RESTClient(object):
"""
A class with all class methods to perform JSON requests.
"""
IMPL = RESTClientObject()
@classmethod
def request(cls, *n, **kw):
"""
Perform a REST request and parse the response.
"""
return cls.IMPL.request(*n, **kw)
@classmethod
def GET(cls, *n, **kw):
"""
Perform a GET request using `RESTClient.request()`.
"""
return cls.IMPL.GET(*n, **kw)
@classmethod
def HEAD(cls, *n, **kw):
"""
Perform a HEAD request using `RESTClient.request()`.
"""
return cls.IMPL.GET(*n, **kw)
@classmethod
def POST(cls, *n, **kw):
"""
Perform a POST request using `RESTClient.request()`
"""
return cls.IMPL.POST(*n, **kw)
@classmethod
def PUT(cls, *n, **kw):
"""
Perform a PUT request using `RESTClient.request()`
"""
return cls.IMPL.PUT(*n, **kw)
@classmethod
def PATCH(cls, *n, **kw):
"""
Perform a PATCH request using `RESTClient.request()`
"""
return cls.IMPL.PATCH(*n, **kw)
@classmethod
def DELETE(cls, *n, **kw):
"""
Perform a DELETE request using `RESTClient.request()`
"""
return cls.IMPL.DELETE(*n, **kw)