Python AIOHTTP server generator (#1470)

* Astract factory for generators based on connexion

* Add aiohttp server generator

* Fix flask tests

* Normalize python-flask folder names
This commit is contained in:
Jyhess
2019-01-11 16:35:21 +01:00
committed by William Cheng
parent 4652023b7c
commit 80ca67cfda
160 changed files with 5404 additions and 815 deletions

View File

@@ -0,0 +1,16 @@
import logging
import connexion
from flask_testing import TestCase
from openapi_server.encoder import JSONEncoder
class BaseTestCase(TestCase):
def create_app(self):
logging.getLogger('connexion.operation').setLevel('ERROR')
app = connexion.App(__name__, specification_dir='../openapi/')
app.app.json_encoder = JSONEncoder
app.add_api('openapi.yaml')
return app.app

View File

@@ -0,0 +1,202 @@
# coding: utf-8
from __future__ import absolute_import
import unittest
from flask import json
from six import BytesIO
from openapi_server.models.api_response import ApiResponse # noqa: E501
from openapi_server.models.pet import Pet # noqa: E501
from openapi_server.test import BaseTestCase
class TestPetController(BaseTestCase):
"""PetController integration test stubs"""
@unittest.skip("Connexion does not support multiple consummes. See https://github.com/zalando/connexion/pull/760")
def test_add_pet(self):
"""Test case for add_pet
Add a new pet to the store
"""
body = {
"photoUrls" : [ "photoUrls", "photoUrls" ],
"name" : "doggie",
"id" : 0,
"category" : {
"name" : "name",
"id" : 6
},
"tags" : [ {
"name" : "name",
"id" : 1
}, {
"name" : "name",
"id" : 1
} ],
"status" : "available"
}
headers = {
'Content-Type': 'application/json',
'Authorization': 'Bearer special-key',
}
response = self.client.open(
'/v2/pet',
method='POST',
headers=headers,
data=json.dumps(body),
content_type='application/json')
self.assert200(response,
'Response body is : ' + response.data.decode('utf-8'))
def test_delete_pet(self):
"""Test case for delete_pet
Deletes a pet
"""
headers = {
'api_key': 'api_key_example',
'Authorization': 'Bearer special-key',
}
response = self.client.open(
'/v2/pet/{pet_id}'.format(pet_id=789),
method='DELETE',
headers=headers)
self.assert200(response,
'Response body is : ' + response.data.decode('utf-8'))
def test_find_pets_by_status(self):
"""Test case for find_pets_by_status
Finds Pets by status
"""
query_string = [('status', 'available')]
headers = {
'Accept': 'application/json',
'Authorization': 'Bearer special-key',
}
response = self.client.open(
'/v2/pet/findByStatus',
method='GET',
headers=headers,
query_string=query_string)
self.assert200(response,
'Response body is : ' + response.data.decode('utf-8'))
def test_find_pets_by_tags(self):
"""Test case for find_pets_by_tags
Finds Pets by tags
"""
query_string = [('tags', 'tags_example')]
headers = {
'Accept': 'application/json',
'Authorization': 'Bearer special-key',
}
response = self.client.open(
'/v2/pet/findByTags',
method='GET',
headers=headers,
query_string=query_string)
self.assert200(response,
'Response body is : ' + response.data.decode('utf-8'))
def test_get_pet_by_id(self):
"""Test case for get_pet_by_id
Find pet by ID
"""
headers = {
'Accept': 'application/json',
'api_key': 'special-key',
}
response = self.client.open(
'/v2/pet/{pet_id}'.format(pet_id=789),
method='GET',
headers=headers)
self.assert200(response,
'Response body is : ' + response.data.decode('utf-8'))
@unittest.skip("Connexion does not support multiple consummes. See https://github.com/zalando/connexion/pull/760")
def test_update_pet(self):
"""Test case for update_pet
Update an existing pet
"""
body = {
"photoUrls" : [ "photoUrls", "photoUrls" ],
"name" : "doggie",
"id" : 0,
"category" : {
"name" : "name",
"id" : 6
},
"tags" : [ {
"name" : "name",
"id" : 1
}, {
"name" : "name",
"id" : 1
} ],
"status" : "available"
}
headers = {
'Content-Type': 'application/json',
'Authorization': 'Bearer special-key',
}
response = self.client.open(
'/v2/pet',
method='PUT',
headers=headers,
data=json.dumps(body),
content_type='application/json')
self.assert200(response,
'Response body is : ' + response.data.decode('utf-8'))
@unittest.skip("application/x-www-form-urlencoded not supported by Connexion")
def test_update_pet_with_form(self):
"""Test case for update_pet_with_form
Updates a pet in the store with form data
"""
headers = {
'Content-Type': 'application/x-www-form-urlencoded',
'Authorization': 'Bearer special-key',
}
data = dict(name='name_example',
status='status_example')
response = self.client.open(
'/v2/pet/{pet_id}'.format(pet_id=789),
method='POST',
headers=headers,
data=data,
content_type='application/x-www-form-urlencoded')
self.assert200(response,
'Response body is : ' + response.data.decode('utf-8'))
@unittest.skip("multipart/form-data not supported by Connexion")
def test_upload_file(self):
"""Test case for upload_file
uploads an image
"""
headers = {
'Accept': 'application/json',
'Content-Type': 'multipart/form-data',
'Authorization': 'Bearer special-key',
}
data = dict(additional_metadata='additional_metadata_example',
file=(BytesIO(b'some file data'), 'file.txt'))
response = self.client.open(
'/v2/pet/{pet_id}/uploadImage'.format(pet_id=789),
method='POST',
headers=headers,
data=data,
content_type='multipart/form-data')
self.assert200(response,
'Response body is : ' + response.data.decode('utf-8'))
if __name__ == '__main__':
unittest.main()

View File

@@ -0,0 +1,83 @@
# coding: utf-8
from __future__ import absolute_import
import unittest
from flask import json
from six import BytesIO
from openapi_server.models.order import Order # noqa: E501
from openapi_server.test import BaseTestCase
class TestStoreController(BaseTestCase):
"""StoreController integration test stubs"""
def test_delete_order(self):
"""Test case for delete_order
Delete purchase order by ID
"""
headers = {
}
response = self.client.open(
'/v2/store/order/{order_id}'.format(order_id='order_id_example'),
method='DELETE',
headers=headers)
self.assert200(response,
'Response body is : ' + response.data.decode('utf-8'))
def test_get_inventory(self):
"""Test case for get_inventory
Returns pet inventories by status
"""
headers = {
'Accept': 'application/json',
'api_key': 'special-key',
}
response = self.client.open(
'/v2/store/inventory',
method='GET',
headers=headers)
self.assert200(response,
'Response body is : ' + response.data.decode('utf-8'))
def test_get_order_by_id(self):
"""Test case for get_order_by_id
Find purchase order by ID
"""
headers = {
'Accept': 'application/json',
}
response = self.client.open(
'/v2/store/order/{order_id}'.format(order_id=5),
method='GET',
headers=headers)
self.assert200(response,
'Response body is : ' + response.data.decode('utf-8'))
@unittest.skip("*/* not supported by Connexion. Use application/json instead. See https://github.com/zalando/connexion/pull/760")
def test_place_order(self):
"""Test case for place_order
Place an order for a pet
"""
body = {}
headers = {
'Accept': 'application/json',
'Content-Type': 'application/json',
}
response = self.client.open(
'/v2/store/order',
method='POST',
headers=headers,
data=json.dumps(body),
content_type='application/json')
self.assert200(response,
'Response body is : ' + response.data.decode('utf-8'))
if __name__ == '__main__':
unittest.main()

View File

@@ -0,0 +1,155 @@
# coding: utf-8
from __future__ import absolute_import
import unittest
from flask import json
from six import BytesIO
from openapi_server.models.user import User # noqa: E501
from openapi_server.test import BaseTestCase
class TestUserController(BaseTestCase):
"""UserController integration test stubs"""
@unittest.skip("*/* not supported by Connexion. Use application/json instead. See https://github.com/zalando/connexion/pull/760")
def test_create_user(self):
"""Test case for create_user
Create user
"""
body = {}
headers = {
'Content-Type': 'application/json',
}
response = self.client.open(
'/v2/user',
method='POST',
headers=headers,
data=json.dumps(body),
content_type='application/json')
self.assert200(response,
'Response body is : ' + response.data.decode('utf-8'))
@unittest.skip("*/* not supported by Connexion. Use application/json instead. See https://github.com/zalando/connexion/pull/760")
def test_create_users_with_array_input(self):
"""Test case for create_users_with_array_input
Creates list of users with given input array
"""
body = []
headers = {
'Content-Type': 'application/json',
}
response = self.client.open(
'/v2/user/createWithArray',
method='POST',
headers=headers,
data=json.dumps(body),
content_type='application/json')
self.assert200(response,
'Response body is : ' + response.data.decode('utf-8'))
@unittest.skip("*/* not supported by Connexion. Use application/json instead. See https://github.com/zalando/connexion/pull/760")
def test_create_users_with_list_input(self):
"""Test case for create_users_with_list_input
Creates list of users with given input array
"""
body = []
headers = {
'Content-Type': 'application/json',
}
response = self.client.open(
'/v2/user/createWithList',
method='POST',
headers=headers,
data=json.dumps(body),
content_type='application/json')
self.assert200(response,
'Response body is : ' + response.data.decode('utf-8'))
def test_delete_user(self):
"""Test case for delete_user
Delete user
"""
headers = {
}
response = self.client.open(
'/v2/user/{username}'.format(username='username_example'),
method='DELETE',
headers=headers)
self.assert200(response,
'Response body is : ' + response.data.decode('utf-8'))
def test_get_user_by_name(self):
"""Test case for get_user_by_name
Get user by user name
"""
headers = {
'Accept': 'application/json',
}
response = self.client.open(
'/v2/user/{username}'.format(username='username_example'),
method='GET',
headers=headers)
self.assert200(response,
'Response body is : ' + response.data.decode('utf-8'))
def test_login_user(self):
"""Test case for login_user
Logs user into the system
"""
query_string = [('username', 'username_example'),
('password', 'password_example')]
headers = {
'Accept': 'application/json',
}
response = self.client.open(
'/v2/user/login',
method='GET',
headers=headers,
query_string=query_string)
self.assert200(response,
'Response body is : ' + response.data.decode('utf-8'))
def test_logout_user(self):
"""Test case for logout_user
Logs out current logged in user session
"""
headers = {
}
response = self.client.open(
'/v2/user/logout',
method='GET',
headers=headers)
self.assert200(response,
'Response body is : ' + response.data.decode('utf-8'))
@unittest.skip("*/* not supported by Connexion. Use application/json instead. See https://github.com/zalando/connexion/pull/760")
def test_update_user(self):
"""Test case for update_user
Updated user
"""
body = {}
headers = {
'Content-Type': 'application/json',
}
response = self.client.open(
'/v2/user/{username}'.format(username='username_example'),
method='PUT',
headers=headers,
data=json.dumps(body),
content_type='application/json')
self.assert200(response,
'Response body is : ' + response.data.decode('utf-8'))
if __name__ == '__main__':
unittest.main()