diff --git a/.github/workflows/_containerTemplate.yml b/.github/workflows/_containerTemplate.yml index b9f4bda..566ef78 100644 --- a/.github/workflows/_containerTemplate.yml +++ b/.github/workflows/_containerTemplate.yml @@ -56,7 +56,7 @@ jobs: - name: Install cosign uses: sigstore/cosign-installer@v3.3.0 id: install_cosign - if: github.event_name != 'pull_request' + if: github.event_name == 'release' with: cosign-release: 'v2.2.0' diff --git a/code/function/fastapp/api/v1/endpoints/sample.py b/code/function/fastapp/api/v1/endpoints/sample.py index 47439a8..4d2d06b 100644 --- a/code/function/fastapp/api/v1/endpoints/sample.py +++ b/code/function/fastapp/api/v1/endpoints/sample.py @@ -1,6 +1,7 @@ -from typing import Any +from typing import Annotated -from fastapi import APIRouter +import httpx +from fastapi import APIRouter, Header from fastapp.models.sample import SampleRequest, SampleResponse from fastapp.utils import setup_logging @@ -11,7 +12,14 @@ @router.post("/sample", response_model=SampleResponse, name="sample") async def post_predict( - data: SampleRequest, + data: SampleRequest, x_forwarded_for: Annotated[str, Header()] = "" ) -> SampleResponse: logger.info(f"Received request: {data}") + logger.info(f"IP of sender: {x_forwarded_for}") + + # Sample request + async with httpx.AsyncClient() as client: + response = await client.get("https://www.bing.com") + logger.info(f"Received response status code: {response.status_code}") + return SampleResponse(output=f"Hello {data.input}") diff --git a/code/function/fastapp/core/config.py b/code/function/fastapp/core/config.py index dc87cf7..62846bd 100644 --- a/code/function/fastapp/core/config.py +++ b/code/function/fastapp/core/config.py @@ -10,11 +10,15 @@ class Settings(BaseSettings): APP_VERSION: str = "v0.0.1" API_V1_STR: str = "/v1" LOGGING_LEVEL: int = logging.INFO + LOGGING_SAMPLING_RATIO: float = 1.0 + LOGGING_SCHEDULE_DELAY: int = 5000 DEBUG: bool = False APPLICATIONINSIGHTS_CONNECTION_STRING: str = Field( default="", env="APPLICATIONINSIGHTS_CONNECTION_STRING" ) - MY_SECRET_CONFIG: str = Field(default="", env="MY_SECRET_CONFIG") + WEBSITE_NAME: str = Field(default="test", alias="WEBSITE_SITE_NAME") + WEBSITE_INSTANCE_ID: str = Field(default="0", alias="WEBSITE_INSTANCE_ID") + MY_SECRET_CONFIG: str = Field(default="", alias="MY_SECRET_CONFIG") settings = Settings() diff --git a/code/function/fastapp/main.py b/code/function/fastapp/main.py index 8834cfa..7ee394f 100644 --- a/code/function/fastapp/main.py +++ b/code/function/fastapp/main.py @@ -1,7 +1,22 @@ +from contextlib import asynccontextmanager + from fastapi import FastAPI from fastapp.api.v1.api_v1 import api_v1_router from fastapp.core.config import settings -from fastapp.utils import setup_tracer +from fastapp.utils import setup_opentelemetry + + +@asynccontextmanager +async def lifespan(app: FastAPI) -> None: + """Gracefully start the application before the server reports readiness.""" + setup_opentelemetry(app=app) + yield + pass + + +def lifespan_sync(app: FastAPI) -> None: + """Gracefully start the application before the server reports readiness.""" + setup_opentelemetry(app=app) def get_app() -> FastAPI: @@ -11,24 +26,14 @@ def get_app() -> FastAPI: """ app = FastAPI( title=settings.PROJECT_NAME, + description="", version=settings.APP_VERSION, openapi_url="/openapi.json", debug=settings.DEBUG, + lifespan=lifespan, ) app.include_router(api_v1_router, prefix=settings.API_V1_STR) return app app = get_app() - - -@app.on_event("startup") -async def startup_event(): - """Gracefully start the application before the server reports readiness.""" - setup_tracer(app=app) - - -@app.on_event("shutdown") -async def shutdown_event(): - """Gracefully close connections before shutdown of the server.""" - pass diff --git a/code/function/fastapp/utils.py b/code/function/fastapp/utils.py index dba9aa4..9a3d10c 100644 --- a/code/function/fastapp/utils.py +++ b/code/function/fastapp/utils.py @@ -1,14 +1,31 @@ import logging from logging import Logger +from azure.monitor.opentelemetry import configure_azure_monitor + # from azure.identity import ManagedIdentityCredential -from azure.monitor.opentelemetry.exporter import AzureMonitorTraceExporter +from azure.monitor.opentelemetry.exporter import ( + ApplicationInsightsSampler, + AzureMonitorLogExporter, + AzureMonitorMetricExporter, + AzureMonitorTraceExporter, +) from fastapi import FastAPI from fastapp.core.config import settings +from opentelemetry import trace +from opentelemetry._logs import set_logger_provider from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor +from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor +from opentelemetry.instrumentation.system_metrics import SystemMetricsInstrumentor +from opentelemetry.metrics import set_meter_provider +from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler +from opentelemetry.sdk._logs.export import BatchLogRecordProcessor +from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader from opentelemetry.sdk.resources import SERVICE_NAME, Resource from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor +from opentelemetry.trace import Tracer, get_tracer_provider, set_tracer_provider def setup_logging(module) -> Logger: @@ -21,15 +38,24 @@ def setup_logging(module) -> Logger: logger.propagate = False # Create stream handler - logger_stream_handler = logging.StreamHandler() - logger_stream_handler.setFormatter( + stream_handler = logging.StreamHandler() + stream_handler.setFormatter( logging.Formatter("[%(asctime)s] [%(levelname)s] [%(module)-8.8s] %(message)s") ) - logger.addHandler(logger_stream_handler) + logger.addHandler(stream_handler) return logger -def setup_tracer(app: FastAPI): +def setup_tracer(module) -> Tracer: + """Setup tracer and event handler. + + RETURNS (Tracer): The tracer object to create spans. + """ + tracer = trace.get_tracer(module) + return tracer + + +def setup_opentelemetry(app: FastAPI): """Setup tracer for Open Telemetry. app (FastAPI): The app to be instrumented by Open Telemetry. @@ -37,10 +63,76 @@ def setup_tracer(app: FastAPI): """ if settings.APPLICATIONINSIGHTS_CONNECTION_STRING: # credential = ManagedIdentityCredential() - exporter = AzureMonitorTraceExporter.from_connection_string( + resource = Resource.create( + { + "service.name": settings.WEBSITE_NAME, + "service.namespace": settings.WEBSITE_NAME, + "service.instance.id": settings.WEBSITE_INSTANCE_ID, + } + ) + + # Create logger provider + logger_exporter = AzureMonitorLogExporter.from_connection_string( settings.APPLICATIONINSIGHTS_CONNECTION_STRING, # credential=credential ) - tracer = TracerProvider(resource=Resource({SERVICE_NAME: "api"})) - tracer.add_span_processor(BatchSpanProcessor(exporter)) - FastAPIInstrumentor.instrument_app(app, tracer_provider=tracer) + logger_provider = LoggerProvider(resource=resource) + logger_provider.add_log_record_processor( + BatchLogRecordProcessor( + exporter=logger_exporter, + schedule_delay_millis=settings.LOGGING_SCHEDULE_DELAY, + ) + ) + set_logger_provider(logger_provider) + handler = LoggingHandler( + level=settings.LOGGING_LEVEL, logger_provider=logger_provider + ) + logging.getLogger().addHandler(handler) + + # Create tracer provider + tracer_exporter = AzureMonitorTraceExporter.from_connection_string( + settings.APPLICATIONINSIGHTS_CONNECTION_STRING, + # credential=credential + ) + sampler = ApplicationInsightsSampler( + sampling_ratio=settings.LOGGING_SAMPLING_RATIO + ) + tracer_provider = TracerProvider(resource=resource, sampler=sampler) + tracer_provider.add_span_processor( + BatchSpanProcessor( + span_exporter=tracer_exporter, + schedule_delay_millis=settings.LOGGING_SCHEDULE_DELAY, + ) + ) + set_tracer_provider(tracer_provider) + + # Create meter provider + metrics_exporter = AzureMonitorMetricExporter.from_connection_string( + settings.APPLICATIONINSIGHTS_CONNECTION_STRING, + # credential=credential + ) + reader = PeriodicExportingMetricReader( + exporter=metrics_exporter, + export_interval_millis=settings.LOGGING_SCHEDULE_DELAY, + ) + meter_provider = MeterProvider(metric_readers=[reader], resource=resource) + set_meter_provider(meter_provider) + + # Configure custom metrics + system_metrics_config = { + "system.memory.usage": ["used", "free", "cached"], + "system.cpu.time": ["idle", "user", "system", "irq"], + "system.network.io": ["transmit", "receive"], + "process.runtime.memory": ["rss", "vms"], + "process.runtime.cpu.time": ["user", "system"], + } + + # Create instrumenter + FastAPIInstrumentor.instrument_app( + app, + excluded_urls=f"{settings.API_V1_STR}/health/heartbeat", + tracer_provider=tracer_provider, + meter_provider=meter_provider, + ) + HTTPXClientInstrumentor().instrument() + SystemMetricsInstrumentor(config=system_metrics_config).instrument() diff --git a/code/function/requirements.txt b/code/function/requirements.txt index f4a40cd..5d01136 100644 --- a/code/function/requirements.txt +++ b/code/function/requirements.txt @@ -2,10 +2,11 @@ # The Python Worker is managed by Azure Functions platform # Manually managing azure-functions-worker may cause unexpected issues -# azure-identity~=1.13.0 +# azure-identity~=1.15.0 azure-functions~=1.17.0 fastapi~=0.106.0 pydantic-settings~=2.1.0 -aiohttp~=3.9.1 -opentelemetry-instrumentation-fastapi==0.43b0 -azure-monitor-opentelemetry-exporter==1.0.0b19 +httpx~=0.26.0 +azure-monitor-opentelemetry~=1.1.1 +opentelemetry-instrumentation-httpx~=0.43b0 +opentelemetry-instrumentation-system-metrics~=0.43b0 diff --git a/code/function/wrapper/__init__.py b/code/function/wrapper/__init__.py index b30f269..4748e4c 100644 --- a/code/function/wrapper/__init__.py +++ b/code/function/wrapper/__init__.py @@ -1,6 +1,26 @@ import azure.functions as func -from fastapp.main import app +from fastapp.main import app, lifespan_sync +from fastapp.utils import setup_tracer +from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator + +lifespan_sync(app=app) +tracer = setup_tracer(__name__) async def main(req: func.HttpRequest, context: func.Context) -> func.HttpResponse: - return await func.AsgiMiddleware(app).handle_async(req, context) + # Start distributed tracing + functions_current_context = { + "traceparent": context.trace_context.Traceparent, + "tracestate": context.trace_context.Tracestate, + } + parent_context = TraceContextTextMapPropagator().extract( + carrier=functions_current_context + ) + + # Function logic + with tracer.start_as_current_span("wrapper", context=parent_context) as span: + response = await func.AsgiMiddleware(app).handle_async( + req=req, context=parent_context + ) + + return response diff --git a/code/infra/function.tf b/code/infra/function.tf index 30216b9..a3d6473 100644 --- a/code/infra/function.tf +++ b/code/infra/function.tf @@ -93,6 +93,14 @@ resource "azapi_resource" "function" { name = "APPLICATIONINSIGHTS_CONNECTION_STRING" value = azurerm_application_insights.application_insights.connection_string }, + { + name = "AZURE_SDK_TRACING_IMPLEMENTATION" + value = "opentelemetry" + }, + { + name = "AZURE_TRACING_ENABLED" + value = "true" + }, { name = "AZURE_FUNCTIONS_ENVIRONMENT" value = "Production"