Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion azure_functions_worker/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
SHARED_MEMORY_DATA_TRANSFER = "SharedMemoryDataTransfer"
FUNCTION_DATA_CACHE = "FunctionDataCache"
HTTP_URI = "HttpUri"

REQUIRES_ROUTE_PARAMETERS = "RequiresRouteParameters"
# When this capability is enabled, logs are not piped back to the
# host from the worker. Logs will directly go to where the user has
# configured them to go. This is to ensure that the logs are not
Expand Down
9 changes: 7 additions & 2 deletions azure_functions_worker/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
PYTHON_THREADPOOL_THREAD_COUNT_DEFAULT,
PYTHON_THREADPOOL_THREAD_COUNT_MAX_37,
PYTHON_THREADPOOL_THREAD_COUNT_MIN,
REQUIRES_ROUTE_PARAMETERS
)
from .extension import ExtensionManager
from .http_v2 import (
Expand Down Expand Up @@ -405,6 +406,7 @@ async def _handle__worker_init_request(self, request):
if HttpV2Registry.http_v2_enabled():
capabilities[constants.HTTP_URI] = \
initialize_http_server(self._host)
capabilities[REQUIRES_ROUTE_PARAMETERS] = _TRUE

except HttpServerInitError:
raise
Expand Down Expand Up @@ -640,8 +642,10 @@ async def _handle__invocation_request(self, request):
http_request = await http_coordinator.get_http_request_async(
invocation_id)

await sync_http_request(http_request, invoc_request)
args[fi.trigger_metadata.get('param_name')] = http_request
trigger_arg_name = fi.trigger_metadata.get('param_name')
func_http_request = args[trigger_arg_name]
await sync_http_request(http_request, func_http_request)
args[trigger_arg_name] = http_request

fi_context = self._get_context(invoc_request, fi.name,
fi.directory)
Expand Down Expand Up @@ -792,6 +796,7 @@ async def _handle__function_environment_reload_request(self, request):
if HttpV2Registry.http_v2_enabled():
capabilities[constants.HTTP_URI] = \
initialize_http_server(self._host)
capabilities[REQUIRES_ROUTE_PARAMETERS] = _TRUE
except HttpServerInitError:
raise
except Exception as ex:
Expand Down
7 changes: 2 additions & 5 deletions azure_functions_worker/http_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,14 +246,11 @@ async def catch_all(request: request_type): # type: ignore
from e


async def sync_http_request(http_request, invoc_request):
async def sync_http_request(http_request, func_http_request):
# Sync http request route params from invoc_request to http_request
route_params = {key: item.string for key, item
in invoc_request.trigger_metadata.items()
if key not in ['Headers', 'Query']}
(HttpV2Registry.ext_base().RequestTrackerMeta
.get_synchronizer()
.sync_route_params(http_request, route_params))
.sync_route_params(http_request, func_http_request.route_params))


class HttpV2Registry:
Expand Down
62 changes: 61 additions & 1 deletion tests/unittests/test_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
PYTHON_THREADPOOL_THREAD_COUNT,
PYTHON_THREADPOOL_THREAD_COUNT_DEFAULT,
PYTHON_THREADPOOL_THREAD_COUNT_MAX_37,
PYTHON_THREADPOOL_THREAD_COUNT_MIN,
PYTHON_THREADPOOL_THREAD_COUNT_MIN, HTTP_URI, REQUIRES_ROUTE_PARAMETERS,
)
from azure_functions_worker.dispatcher import Dispatcher, ContextEnabledTask
from azure_functions_worker.version import VERSION
Expand All @@ -33,6 +33,8 @@
'dispatcher_functions_stein'
FUNCTION_APP_DIRECTORY = UNIT_TESTS_ROOT / 'dispatcher_functions' / \
'dispatcher_functions_stein'
HTTPV2_FUNCTION_APP_DIRECTORY = UNIT_TESTS_ROOT / 'dispatcher_functions' / \
'http_v2' / 'fastapi'


class TestThreadPoolSettingsPython37(testutils.AsyncTestCase):
Expand Down Expand Up @@ -767,6 +769,7 @@ def setUp(self):
asyncio.set_event_loop(self.loop)
self.dispatcher = testutils.create_dummy_dispatcher()
sys.path.append(str(FUNCTION_APP_DIRECTORY))
sys.path.append(str(HTTPV2_FUNCTION_APP_DIRECTORY))

def tearDown(self):
self.loop.close()
Expand Down Expand Up @@ -991,6 +994,63 @@ def test_dispatcher_indexing_in_load_request_with_exception(
response.function_load_response.result.exception.message,
"Exception: Mocked Exception")

@patch.dict(os.environ, {PYTHON_ENABLE_INIT_INDEXING: 'true'})
@patch("azure_functions_worker.http_v2.HttpV2Registry.http_v2_enabled",
return_value=True)
def test_dispatcher_http_v2_init_request_fail(self, mock_http_v2_enabled):
request = protos.StreamingMessage(
worker_init_request=protos.WorkerInitRequest(
host_version="2.3.4",
function_app_directory=str(HTTPV2_FUNCTION_APP_DIRECTORY)
)
)

resp = self.loop.run_until_complete(
self.dispatcher._handle__worker_init_request(request)
)

mock_http_v2_enabled.assert_called_once()
self.assertIsNotNone(self.dispatcher._function_metadata_exception)

capabilities = resp.worker_init_response.capabilities
self.assertNotIn(HTTP_URI, capabilities)
self.assertNotIn(REQUIRES_ROUTE_PARAMETERS, capabilities)

# Cleanup
del sys.modules['function_app']

@patch.dict(os.environ, {PYTHON_ENABLE_INIT_INDEXING: 'true'})
@patch("azure_functions_worker.http_v2.HttpV2Registry.http_v2_enabled",
return_value=True)
@patch("azure_functions_worker.dispatcher.initialize_http_server",
return_value="http://localhost:8080")
@patch("azure_functions_worker.dispatcher.Dispatcher"
".load_function_metadata")
def test_dispatcher_http_v2_init_request_pass(self, mock_http_v2_enabled,
mock_init_http_server,
mock_load_func_metadata):
request = protos.StreamingMessage(
worker_init_request=protos.WorkerInitRequest(
host_version="2.3.4",
function_app_directory=str(HTTPV2_FUNCTION_APP_DIRECTORY)
)
)

resp = self.loop.run_until_complete(
self.dispatcher._handle__worker_init_request(request)
)

mock_http_v2_enabled.assert_called_once()
mock_init_http_server.assert_called_once()
mock_load_func_metadata.assert_called_once()
self.assertIsNone(self.dispatcher._function_metadata_exception)

capabilities = resp.worker_init_response.capabilities
self.assertIn(HTTP_URI, capabilities)
self.assertEqual(capabilities[HTTP_URI], "http://localhost:8080")
self.assertIn(REQUIRES_ROUTE_PARAMETERS, capabilities)
self.assertEqual(capabilities[REQUIRES_ROUTE_PARAMETERS], "true")


class TestContextEnabledTask(unittest.TestCase):
def setUp(self):
Expand Down