Skip to content
This repository was archived by the owner on Mar 18, 2019. It is now read-only.

fixing #37 issue support of definitions for nested objects #38

Open
wants to merge 20 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 148 additions & 13 deletions openapi_codec/encode.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
import random
import string
import coreschema

from collections import OrderedDict
from coreapi.document import Field
from coreapi.compat import urlparse
from openapi_codec.utils import get_method, get_encoding, get_location, get_links_from_document

Expand All @@ -22,21 +26,116 @@ def generate_swagger_object(document):
if parsed_url.scheme:
swagger['schemes'] = [parsed_url.scheme]

swagger['paths'] = _get_paths_object(document)
if not parsed_url.netloc and not parsed_url.scheme:
swagger['host'] = document.url

swagger['definitions'] = _get_definitions(document)
swagger['paths'] = _get_paths_object(document, swagger['definitions'])

return swagger


def _get_or_update_definitions(update_def_data, update_def_name, definitions):
"""
Updates definitions with provided data If definition is not present in map, returns found definition
data in case definition overlaps with existing one.
"""

# Check if there's existing definition with same name or props
clashing_def_names = \
[d for d in definitions.keys() if d.startswith(update_def_name) or definitions.get(d) == update_def_data]

for clashing_def_name in clashing_def_names:
clash_def_data = definitions.get(clashing_def_name)
if clash_def_data == update_def_data:
return clash_def_data
else:
if list(clashing_def_names):
rand_part = ''.join([random.choice(string.ascii_letters + string.digits) for _ in range(5)])
update_def_name = '{}_{}'.format(update_def_name, rand_part)
definitions[update_def_name] = update_def_data
return update_def_data


def _get_field_definition_data(field_item, defs):
"""
Returns dictionary with field definition data.
"""
definition_data = {
'type': 'object',
'properties': {}
}

if isinstance(field_item, coreschema.Object):
props = field_item.properties
elif isinstance(field_item.schema, coreschema.schemas.Array):
props = field_item.schema.items.properties
else:
try:
props = field_item.schema.properties
except AttributeError:
props = OrderedDict()

for f_name, f_schema in iter(props.items()):

if _get_field_type(f_schema) is 'object':
def_data = _get_or_update_definitions(
_get_field_definition_data(f_schema, defs),
'{}_def_item'.format(f_schema.name),

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm starting to play with this.

It works for a super simple API using nested definitions, but when I tried to apply it to our whole site I'm getting a 500 and the traceback is giving an AttributeError here saying the name property doesn't exist. I'll try to look into it later, but just a heads up. (I cloned your branch to nexleaf/python-openapi-codec and may contribute that way)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I'm going to be switching to drf-yasg as it already supports nested definitions.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for heads up. I'd rather close this one and switch to it either.

defs
)
if def_data:
return def_data
else:
definition_data['properties'][f_name] = {
'type': _get_field_type(f_schema),
'description': ''
}

return definition_data


def _get_definitions(document):
"""
Returns dictionary with schema definitions.
"""

definitions = OrderedDict()
links = _get_links(document)

for _, link, _ in links:
for field in link.fields:
field_type = _get_field_type(field)

# Get field definition data
if field_type == 'array':
def_data = _get_field_definition_data(field.schema.items, definitions)
else:
def_data = _get_field_definition_data(field, definitions)

_get_or_update_definitions(
def_data,
'{}_def_item'.format(field.name),
definitions
)

return definitions


def _add_tag_prefix(item):
"""
Returns tuple (operation_id, link, tags) with modified operation_id in case of tags.
"""

operation_id, link, tags = item
if tags:
operation_id = tags[0] + '_' + operation_id
return (operation_id, link, tags)
return operation_id, link, tags


def _get_links(document):
"""
Return a list of (operation_id, link, [tags])
Return a list of (operation_id, link, [tags]).
"""
# Extract all the links from the first or second level of the document.
links = []
Expand All @@ -60,7 +159,10 @@ def _get_links(document):
return links


def _get_paths_object(document):
def _get_paths_object(document, definitions):
"""
Returns dictionary with document paths.
"""
paths = OrderedDict()

links = _get_links(document)
Expand All @@ -70,21 +172,25 @@ def _get_paths_object(document):
paths[link.url] = OrderedDict()

method = get_method(link)
operation = _get_operation(operation_id, link, tags)
operation = _get_operation(operation_id, link, tags, definitions)
paths[link.url].update({method: operation})

return paths


def _get_operation(operation_id, link, tags):
def _get_operation(operation_id, link, tags, definitions):
"""
Returns dictionary with operation parameters.
"""

encoding = get_encoding(link)
description = link.description.strip()
summary = description.splitlines()[0] if description else None

operation = {
'operationId': operation_id,
'responses': _get_responses(link),
'parameters': _get_parameters(link, encoding)
'parameters': _get_parameters(link, encoding, definitions)
}

if description:
Expand All @@ -99,6 +205,10 @@ def _get_operation(operation_id, link, tags):


def _get_field_description(field):
"""
Returns field description.
"""

if getattr(field, 'description', None) is not None:
# Deprecated
return field.description
Expand All @@ -110,12 +220,17 @@ def _get_field_description(field):


def _get_field_type(field):
"""
Returns field string type by the given field schema.
"""
if getattr(field, 'type', None) is not None:
# Deprecated
return field.type

if field.schema is None:
return 'string'
if isinstance(field, Field):
cls = field.schema.__class__
else:
cls = field.__class__

return {
coreschema.String: 'string',
Expand All @@ -124,10 +239,10 @@ def _get_field_type(field):
coreschema.Boolean: 'boolean',
coreschema.Array: 'array',
coreschema.Object: 'object',
}.get(field.schema.__class__, 'string')
}.get(cls, 'string')


def _get_parameters(link, encoding):
def _get_parameters(link, encoding, definitions):
"""
Generates Swagger Parameter Item object.
"""
Expand Down Expand Up @@ -160,8 +275,28 @@ def _get_parameters(link, encoding):
'description': field_description,
'type': field_type,
}
if field_type == 'array':
schema_property['items'] = {'type': 'string'}

if field_type in ('object', 'array'):
definition_data = _get_field_definition_data(field, definitions)

definition_data = definition_data.get('properties')
defs = filter(lambda d: definitions.get(d).get('properties') == definition_data, definitions)

if defs:
# Note: Python2.X <-> Python3.X
try:
def_name = defs[0]
except TypeError:
def_name = next(defs)

schema_property = {'$ref': '#/definitions/{}'.format(def_name)}
if field_type == 'array':
schema_property.pop('$ref')
schema_property['type'] = 'array'
schema_property['items'] = {
'$ref': '#/definitions/{}'.format(def_name)
}

properties[field.name] = schema_property
if field.required:
required.append(field.name)
Expand Down
Loading