|
| 1 | +import contextlib |
| 2 | +from typing import ContextManager |
| 3 | + |
| 4 | +from localstack.services.lambda_.invocation.assignment import AssignmentService |
| 5 | +from localstack.services.lambda_.invocation.docker_runtime_executor import ( |
| 6 | + DockerRuntimeExecutor, |
| 7 | +) |
| 8 | +from localstack.services.lambda_.invocation.execution_environment import ( |
| 9 | + ExecutionEnvironment, |
| 10 | +) |
| 11 | +from localstack.services.lambda_.invocation.lambda_models import ( |
| 12 | + FunctionVersion, |
| 13 | + InitializationType, |
| 14 | +) |
| 15 | + |
| 16 | +from localstack_prometheus.metrics.lambda_ import ( |
| 17 | + LOCALSTACK_LAMBDA_ENVIRONMENT_ACTIVE, |
| 18 | + LOCALSTACK_LAMBDA_ENVIRONMENT_CONTAINERS_RUNNING, |
| 19 | + LOCALSTACK_LAMBDA_ENVIRONMENT_START_TOTAL, |
| 20 | +) |
| 21 | + |
| 22 | + |
| 23 | +def count_version_environments( |
| 24 | + assignment_service: AssignmentService, version_manager_id: str, prov_type: InitializationType |
| 25 | +): |
| 26 | + """Count environments of a specific provisioning type for a specific version manager""" |
| 27 | + return sum( |
| 28 | + env.initialization_type == prov_type |
| 29 | + for env in assignment_service.environments.get(version_manager_id, {}).values() |
| 30 | + ) |
| 31 | + |
| 32 | + |
| 33 | +def count_service_environments( |
| 34 | + assignment_service: AssignmentService, prov_type: InitializationType |
| 35 | +): |
| 36 | + """Count environments of a specific provisioning type across all function versions""" |
| 37 | + return sum( |
| 38 | + count_version_environments(assignment_service, version_manager_id, prov_type) |
| 39 | + for version_manager_id in assignment_service.environments |
| 40 | + ) |
| 41 | + |
| 42 | + |
| 43 | +def init_assignment_service_with_metrics(fn, self: AssignmentService): |
| 44 | + fn(self) |
| 45 | + # Initialise these once, with all subsequent calls being evaluated at collection time. |
| 46 | + LOCALSTACK_LAMBDA_ENVIRONMENT_ACTIVE.labels( |
| 47 | + provisioning_type="provisioned-concurrency" |
| 48 | + ).set_function(lambda: count_service_environments(self, "provisioned-concurrency")) |
| 49 | + |
| 50 | + LOCALSTACK_LAMBDA_ENVIRONMENT_ACTIVE.labels(provisioning_type="on-demand").set_function( |
| 51 | + lambda: count_service_environments(self, "on-demand") |
| 52 | + ) |
| 53 | + |
| 54 | + |
| 55 | +def tracked_docker_start(fn, self: DockerRuntimeExecutor, env_vars: dict[str, str]): |
| 56 | + fn(self, env_vars) |
| 57 | + LOCALSTACK_LAMBDA_ENVIRONMENT_CONTAINERS_RUNNING.inc() |
| 58 | + |
| 59 | + |
| 60 | +def tracked_docker_stop(fn, self: DockerRuntimeExecutor): |
| 61 | + fn(self) |
| 62 | + LOCALSTACK_LAMBDA_ENVIRONMENT_CONTAINERS_RUNNING.dec() |
| 63 | + |
| 64 | + |
| 65 | +@contextlib.contextmanager |
| 66 | +def tracked_get_environment( |
| 67 | + fn, |
| 68 | + self: AssignmentService, |
| 69 | + version_manager_id: str, |
| 70 | + function_version: FunctionVersion, |
| 71 | + provisioning_type: InitializationType, |
| 72 | +) -> ContextManager[ExecutionEnvironment]: |
| 73 | + applicable_env_count = count_version_environments(self, version_manager_id, provisioning_type) |
| 74 | + # If there are no applicable environments, this will be a cold start. |
| 75 | + # Otherwise, it'll be warm. |
| 76 | + start_type = "warm" if applicable_env_count > 0 else "cold" |
| 77 | + LOCALSTACK_LAMBDA_ENVIRONMENT_START_TOTAL.labels( |
| 78 | + start_type=start_type, provisioning_type=provisioning_type |
| 79 | + ).inc() |
| 80 | + with fn(self, version_manager_id, function_version, provisioning_type) as execution_env: |
| 81 | + yield execution_env |
0 commit comments