diff --git a/docs/usage/file_upload.rst b/docs/usage/file_upload.rst index d900df95..d5f07c50 100644 --- a/docs/usage/file_upload.rst +++ b/docs/usage/file_upload.rst @@ -19,7 +19,7 @@ In order to upload a single file, you need to: transport = AIOHTTPTransport(url='YOUR_URL') - client = Client(transport=sample_transport) + client = Client(transport=transport) query = gql(''' mutation($file: Upload!) { @@ -46,7 +46,7 @@ It is also possible to upload multiple files using a list. transport = AIOHTTPTransport(url='YOUR_URL') - client = Client(transport=sample_transport) + client = Client(transport=transport) query = gql(''' mutation($files: [Upload!]!) { @@ -67,3 +67,111 @@ It is also possible to upload multiple files using a list. f1.close() f2.close() + + +Streaming +--------- + +If you use the above methods to send files, then the entire contents of the files +must be loaded in memory before the files are sent. +If the files are not too big and you have enough RAM, it is not a problem. +On another hand if you want to avoid using too much memory, then it is better +to read the files and send them in small chunks so that the entire file contents +don't have to be in memory at once. + +We provide methods to do that for two different uses cases: + +* Sending local files +* Streaming downloaded files from an external URL to the GraphQL API + +Streaming local files +^^^^^^^^^^^^^^^^^^^^^ + +aiohttp allows to upload files using an asynchronous generator. +See `Streaming uploads on aiohttp docs`_. + + +In order to stream local files, instead of providing opened files to the +`variables_values` argument of `execute`, you need to provide an async generator +which will provide parts of the files. + +You can use `aiofiles`_ +to read the files in chunks and create this asynchronous generator. + +.. _Streaming uploads on aiohttp docs: https://docs.aiohttp.org/en/stable/client_quickstart.html#streaming-uploads +.. _aiofiles: https://github.com/Tinche/aiofiles + +Example: + +.. code-block:: python + + transport = AIOHTTPTransport(url='YOUR_URL') + + client = Client(transport=transport) + + query = gql(''' + mutation($file: Upload!) { + singleUpload(file: $file) { + id + } + } + ''') + + async def file_sender(file_name): + async with aiofiles.open(file_name, 'rb') as f: + chunk = await f.read(64*1024) + while chunk: + yield chunk + chunk = await f.read(64*1024) + + params = {"file": file_sender(file_name='YOUR_FILE_PATH')} + + result = client.execute( + query, variable_values=params, upload_files=True + ) + +Streaming downloaded files +^^^^^^^^^^^^^^^^^^^^^^^^^^ + +If the file you want to upload to the GraphQL API is not present locally +and needs to be downloaded from elsewhere, then it is possible to chain the download +and the upload in order to limit the amout of memory used. + +Because the `content` attribute of an aiohttp response is a `StreamReader` +(it provides an async iterator protocol), you can chain the download and the upload +together. + +In order to do that, you need to: + +* get the response from an aiohttp request and then get the StreamReader instance + from `resp.content` +* provide the StreamReader instance to the `variable_values` argument of `execute` + +Example: + +.. code-block:: python + + # First request to download your file with aiohttp + async with aiohttp.ClientSession() as http_client: + async with http_client.get('YOUR_DOWNLOAD_URL') as resp: + + # We now have a StreamReader instance in resp.content + # and we provide it to the variable_values argument of execute + + transport = AIOHTTPTransport(url='YOUR_GRAPHQL_URL') + + client = Client(transport=transport) + + query = gql(''' + mutation($file: Upload!) { + singleUpload(file: $file) { + id + } + } + ''') + + params = {"file": resp.content} + + result = client.execute( + query, variable_values=params, upload_files=True + ) diff --git a/gql/transport/aiohttp.py b/gql/transport/aiohttp.py index 857edfab..b1a33ad2 100644 --- a/gql/transport/aiohttp.py +++ b/gql/transport/aiohttp.py @@ -1,7 +1,8 @@ +import io import json import logging from ssl import SSLContext -from typing import Any, AsyncGenerator, Dict, Optional, Union +from typing import Any, AsyncGenerator, Dict, Optional, Tuple, Type, Union import aiohttp from aiohttp.client_exceptions import ClientResponseError @@ -29,6 +30,12 @@ class AIOHTTPTransport(AsyncTransport): This transport use the aiohttp library with asyncio. """ + file_classes: Tuple[Type[Any], ...] = ( + io.IOBase, + aiohttp.StreamReader, + AsyncGenerator, + ) + def __init__( self, url: str, @@ -144,7 +151,9 @@ async def execute( # If we upload files, we will extract the files present in the # variable_values dict and replace them by null values - nulled_variable_values, files = extract_files(variable_values) + nulled_variable_values, files = extract_files( + variables=variable_values, file_classes=self.file_classes, + ) # Save the nulled variable values in the payload payload["variables"] = nulled_variable_values @@ -175,7 +184,8 @@ async def execute( data.add_field("map", file_map_str, content_type="application/json") # Add the extracted files as remaining fields - data.add_fields(*file_streams.items()) + for k, v in file_streams.items(): + data.add_field(k, v, filename=k) post_args: Dict[str, Any] = {"data": data} diff --git a/gql/utils.py b/gql/utils.py index ce0318b0..3edb086c 100644 --- a/gql/utils.py +++ b/gql/utils.py @@ -1,7 +1,6 @@ """Utilities to manipulate several python objects.""" -import io -from typing import Any, Dict, Tuple +from typing import Any, Dict, Tuple, Type # From this response in Stackoverflow @@ -13,12 +12,9 @@ def to_camel_case(snake_str): return components[0] + "".join(x.title() if x else "_" for x in components[1:]) -def is_file_like(value: Any) -> bool: - """Check if a value represents a file like object""" - return isinstance(value, io.IOBase) - - -def extract_files(variables: Dict) -> Tuple[Dict, Dict]: +def extract_files( + variables: Dict, file_classes: Tuple[Type[Any], ...] +) -> Tuple[Dict, Dict]: files = {} def recurse_extract(path, obj): @@ -40,7 +36,7 @@ def recurse_extract(path, obj): value = recurse_extract(f"{path}.{key}", value) nulled_obj[key] = value return nulled_obj - elif is_file_like(obj): + elif isinstance(obj, file_classes): # extract obj from its parent and put it into files instead. files[path] = obj return None diff --git a/setup.py b/setup.py index 2014dcf3..fdcaccf0 100644 --- a/setup.py +++ b/setup.py @@ -18,6 +18,7 @@ "pytest-cov==2.8.1", "mock==4.0.2", "vcrpy==4.0.2", + "aiofiles", ] dev_requires = [ diff --git a/tests/test_aiohttp.py b/tests/test_aiohttp.py index 66a29dbc..0bf8c1ba 100644 --- a/tests/test_aiohttp.py +++ b/tests/test_aiohttp.py @@ -582,6 +582,84 @@ async def test_aiohttp_binary_file_upload(event_loop, aiohttp_server): assert success +@pytest.mark.asyncio +async def test_aiohttp_stream_reader_upload(event_loop, aiohttp_server): + from aiohttp import web, ClientSession + from gql.transport.aiohttp import AIOHTTPTransport + + async def binary_data_handler(request): + return web.Response( + body=binary_file_content, content_type="binary/octet-stream" + ) + + app = web.Application() + app.router.add_route("POST", "/", binary_upload_handler) + app.router.add_route("GET", "/binary_data", binary_data_handler) + + server = await aiohttp_server(app) + + url = server.make_url("/") + binary_data_url = server.make_url("/binary_data") + + sample_transport = AIOHTTPTransport(url=url, timeout=10) + + async with Client(transport=sample_transport) as session: + query = gql(file_upload_mutation_1) + async with ClientSession() as client: + async with client.get(binary_data_url) as resp: + params = {"file": resp.content, "other_var": 42} + + # Execute query asynchronously + result = await session.execute( + query, variable_values=params, upload_files=True + ) + + success = result["success"] + + assert success + + +@pytest.mark.asyncio +async def test_aiohttp_async_generator_upload(event_loop, aiohttp_server): + import aiofiles + from aiohttp import web + from gql.transport.aiohttp import AIOHTTPTransport + + app = web.Application() + app.router.add_route("POST", "/", binary_upload_handler) + server = await aiohttp_server(app) + + url = server.make_url("/") + + sample_transport = AIOHTTPTransport(url=url, timeout=10) + + with TemporaryFile(binary_file_content) as test_file: + + async with Client(transport=sample_transport,) as session: + + query = gql(file_upload_mutation_1) + + file_path = test_file.filename + + async def file_sender(file_name): + async with aiofiles.open(file_name, "rb") as f: + chunk = await f.read(64 * 1024) + while chunk: + yield chunk + chunk = await f.read(64 * 1024) + + params = {"file": file_sender(file_path), "other_var": 42} + + # Execute query asynchronously + result = await session.execute( + query, variable_values=params, upload_files=True + ) + + success = result["success"] + + assert success + + file_upload_mutation_2 = """ mutation($file1: Upload!, $file2: Upload!) { uploadFile(input:{file1:$file, file2:$file}) {