diff --git a/openapi_codec/encode.py b/openapi_codec/encode.py index 4a6fbec..8a0042c 100644 --- a/openapi_codec/encode.py +++ b/openapi_codec/encode.py @@ -1,5 +1,9 @@ +import random +import string import coreschema + from collections import OrderedDict +from coreapi.document import Field from coreapi.compat import urlparse from openapi_codec.utils import get_method, get_encoding, get_location, get_links_from_document @@ -22,21 +26,116 @@ def generate_swagger_object(document): if parsed_url.scheme: swagger['schemes'] = [parsed_url.scheme] - swagger['paths'] = _get_paths_object(document) + if not parsed_url.netloc and not parsed_url.scheme: + swagger['host'] = document.url + + swagger['definitions'] = _get_definitions(document) + swagger['paths'] = _get_paths_object(document, swagger['definitions']) return swagger +def _get_or_update_definitions(update_def_data, update_def_name, definitions): + """ + Updates definitions with provided data If definition is not present in map, returns found definition + data in case definition overlaps with existing one. + """ + + # Check if there's existing definition with same name or props + clashing_def_names = \ + [d for d in definitions.keys() if d.startswith(update_def_name) or definitions.get(d) == update_def_data] + + for clashing_def_name in clashing_def_names: + clash_def_data = definitions.get(clashing_def_name) + if clash_def_data == update_def_data: + return clash_def_data + else: + if list(clashing_def_names): + rand_part = ''.join([random.choice(string.ascii_letters + string.digits) for _ in range(5)]) + update_def_name = '{}_{}'.format(update_def_name, rand_part) + definitions[update_def_name] = update_def_data + return update_def_data + + +def _get_field_definition_data(field_item, defs): + """ + Returns dictionary with field definition data. + """ + definition_data = { + 'type': 'object', + 'properties': {} + } + + if isinstance(field_item, coreschema.Object): + props = field_item.properties + elif isinstance(field_item.schema, coreschema.schemas.Array): + props = field_item.schema.items.properties + else: + try: + props = field_item.schema.properties + except AttributeError: + props = OrderedDict() + + for f_name, f_schema in iter(props.items()): + + if _get_field_type(f_schema) is 'object': + def_data = _get_or_update_definitions( + _get_field_definition_data(f_schema, defs), + '{}_def_item'.format(f_schema.name), + defs + ) + if def_data: + return def_data + else: + definition_data['properties'][f_name] = { + 'type': _get_field_type(f_schema), + 'description': '' + } + + return definition_data + + +def _get_definitions(document): + """ + Returns dictionary with schema definitions. + """ + + definitions = OrderedDict() + links = _get_links(document) + + for _, link, _ in links: + for field in link.fields: + field_type = _get_field_type(field) + + # Get field definition data + if field_type == 'array': + def_data = _get_field_definition_data(field.schema.items, definitions) + else: + def_data = _get_field_definition_data(field, definitions) + + _get_or_update_definitions( + def_data, + '{}_def_item'.format(field.name), + definitions + ) + + return definitions + + def _add_tag_prefix(item): + """ + Returns tuple (operation_id, link, tags) with modified operation_id in case of tags. + """ + operation_id, link, tags = item if tags: operation_id = tags[0] + '_' + operation_id - return (operation_id, link, tags) + return operation_id, link, tags def _get_links(document): """ - Return a list of (operation_id, link, [tags]) + Return a list of (operation_id, link, [tags]). """ # Extract all the links from the first or second level of the document. links = [] @@ -60,7 +159,10 @@ def _get_links(document): return links -def _get_paths_object(document): +def _get_paths_object(document, definitions): + """ + Returns dictionary with document paths. + """ paths = OrderedDict() links = _get_links(document) @@ -70,13 +172,17 @@ def _get_paths_object(document): paths[link.url] = OrderedDict() method = get_method(link) - operation = _get_operation(operation_id, link, tags) + operation = _get_operation(operation_id, link, tags, definitions) paths[link.url].update({method: operation}) return paths -def _get_operation(operation_id, link, tags): +def _get_operation(operation_id, link, tags, definitions): + """ + Returns dictionary with operation parameters. + """ + encoding = get_encoding(link) description = link.description.strip() summary = description.splitlines()[0] if description else None @@ -84,7 +190,7 @@ def _get_operation(operation_id, link, tags): operation = { 'operationId': operation_id, 'responses': _get_responses(link), - 'parameters': _get_parameters(link, encoding) + 'parameters': _get_parameters(link, encoding, definitions) } if description: @@ -99,6 +205,10 @@ def _get_operation(operation_id, link, tags): def _get_field_description(field): + """ + Returns field description. + """ + if getattr(field, 'description', None) is not None: # Deprecated return field.description @@ -110,12 +220,17 @@ def _get_field_description(field): def _get_field_type(field): + """ + Returns field string type by the given field schema. + """ if getattr(field, 'type', None) is not None: # Deprecated return field.type - if field.schema is None: - return 'string' + if isinstance(field, Field): + cls = field.schema.__class__ + else: + cls = field.__class__ return { coreschema.String: 'string', @@ -124,10 +239,10 @@ def _get_field_type(field): coreschema.Boolean: 'boolean', coreschema.Array: 'array', coreschema.Object: 'object', - }.get(field.schema.__class__, 'string') + }.get(cls, 'string') -def _get_parameters(link, encoding): +def _get_parameters(link, encoding, definitions): """ Generates Swagger Parameter Item object. """ @@ -160,8 +275,28 @@ def _get_parameters(link, encoding): 'description': field_description, 'type': field_type, } - if field_type == 'array': - schema_property['items'] = {'type': 'string'} + + if field_type in ('object', 'array'): + definition_data = _get_field_definition_data(field, definitions) + + definition_data = definition_data.get('properties') + defs = filter(lambda d: definitions.get(d).get('properties') == definition_data, definitions) + + if defs: + # Note: Python2.X <-> Python3.X + try: + def_name = defs[0] + except TypeError: + def_name = next(defs) + + schema_property = {'$ref': '#/definitions/{}'.format(def_name)} + if field_type == 'array': + schema_property.pop('$ref') + schema_property['type'] = 'array' + schema_property['items'] = { + '$ref': '#/definitions/{}'.format(def_name) + } + properties[field.name] = schema_property if field.required: required.append(field.name) diff --git a/tests/test_encode.py b/tests/test_encode.py index 0ffd883..1d5c43f 100644 --- a/tests/test_encode.py +++ b/tests/test_encode.py @@ -1,6 +1,8 @@ import coreapi import coreschema -from openapi_codec.encode import generate_swagger_object, _get_parameters + +from collections import OrderedDict +from openapi_codec.encode import generate_swagger_object, _get_parameters, _get_definitions from unittest import TestCase @@ -32,6 +34,11 @@ def test_schemes(self): expected = ['https'] self.assertEquals(self.swagger['schemes'], expected) + def test_definitions(self): + self.assertIn('definitions', self.swagger) + expected = dict() + self.assertEquals(self.swagger['definitions'], expected) + class TestPaths(TestCase): def setUp(self): @@ -89,7 +96,7 @@ def setUp(self): location='query', schema=coreschema.String(description='A valid email address.') ) - self.swagger = _get_parameters(coreapi.Link(fields=[self.field]), encoding='') + self.swagger = _get_parameters(coreapi.Link(fields=[self.field]), encoding='', definitions=OrderedDict()) def test_expected_fields(self): self.assertEquals(len(self.swagger), 1) @@ -101,3 +108,125 @@ def test_expected_fields(self): 'type': 'string' # Everything is a string for now. } self.assertEquals(self.swagger[0], expected) + + +class TestDefinitions(TestCase): + + def setUp(self): + + # Clashing name + self.clashing_name = 'author' + + # Schema objects + name_schema_obj = coreschema.schemas.Object( + properties=OrderedDict({'name': coreschema.schemas.String(description='name')}) + ) + bday_schema_obj = coreschema.schemas.Object( + properties=OrderedDict({'birthday': coreschema.schemas.String(description='birthday')}) + ) + + # Fields + author_field = coreapi.Field( + name='author', + required=True, + location='form', + schema=name_schema_obj + ) + clashing_author_field = coreapi.Field( + name='author', + required=True, + location='form', + schema=bday_schema_obj + ) + co_authors_field = coreapi.Field( + name='co_authors', + required=True, + location='form', + schema=coreschema.schemas.Array( + items=bday_schema_obj + ) + ) + + # Link objects + v1_songs_link = coreapi.Link( + url='/api/v1/songs/', + action=u'post', + encoding=u'application/json', + fields=[author_field], + ) + v2_songs_link = coreapi.Link( + url='/api/v2/songs/', + action=u'post', + encoding=u'application/json', + fields=[clashing_author_field, co_authors_field], + ) + + self.links = OrderedDict({ + 'v1': OrderedDict({'list': v1_songs_link}), + 'v2': OrderedDict({'list': v2_songs_link}) + }) + + # Coreapi document object + self.document = coreapi.Document( + 'test api', + content=self.links + ) + + # Init definitions and swagger object + self.definitions = _get_definitions(self.document) + self.swagger = generate_swagger_object(self.document) + + def test_clashing_names(self): + + # Basic checks + self.assertIn('definitions', self.swagger) + self.assertEqual(len(self.swagger['definitions'].keys()), 2, 'Unexpected definitions count') + + # Check nothing unexpected is in definitions + defs = filter( + lambda d: d.startswith('{}_def_item'.format(self.clashing_name)), self.swagger['definitions'].keys() + ) + self.assertEqual(len(list(defs)), 2, 'Unexpected definitions count') + + v1_list_params = _get_parameters(self.links['v1']['list'], '', self.definitions) + v2_list_params = _get_parameters(self.links['v2']['list'], '', self.definitions) + + expected_def_name = \ + [d for d in self.definitions.keys() if d.startswith('{}_def_item_'.format(self.clashing_name))][0] + + expected_v1_list_params = [ + { + 'schema': { + 'required': ['author'], + 'type': 'object', + 'properties': { + 'author': { + '$ref': '#/definitions/author_def_item' + } + } + }, + 'name': 'data', + 'in': 'body' + } + ] + + expected_v2_list_params = [ + { + 'schema': { + 'required': ['author', 'co_authors'], + 'type': 'object', + 'properties': { + 'co_authors': { + 'items': {'$ref': '#/definitions/{}'.format(expected_def_name)}, + 'type': 'array' + }, + 'author': {'$ref': '#/definitions/{}'.format(expected_def_name)} + } + }, + 'name': 'data', + 'in': 'body' + } + ] + + self.assertEqual(v1_list_params, expected_v1_list_params, 'Unexpected definition params') + self.assertEqual(v2_list_params, expected_v2_list_params, 'Unexpected definition params') diff --git a/tests/test_mappings.py b/tests/test_mappings.py index 87d23c2..8904543 100644 --- a/tests/test_mappings.py +++ b/tests/test_mappings.py @@ -202,7 +202,6 @@ def test_mapping(): coreapi.Field( name='example', location='body', - schema=coreschema.String() ) ] diff --git a/tox.ini b/tox.ini index 3e9707d..9c2f2e1 100644 --- a/tox.ini +++ b/tox.ini @@ -1,5 +1,5 @@ [tox] -envlist = py35,py34,py27 +envlist = py35,py34,py36,py27 [testenv] deps = -rrequirements.txt commands = ./runtests