diff --git a/README.md b/README.md index a6ad186..05ca250 100644 --- a/README.md +++ b/README.md @@ -26,7 +26,7 @@ $ python -m pip install ecs-logging [`logging`](https://docs.python.org/3/library/logging.html) module and the [`structlog`](https://www.structlog.org/en/stable/) package. -### Logging Example +## Standard Library `logging` Module ```python import logging @@ -111,7 +111,7 @@ formatter = StdlibFormatter( ) ``` -### Structlog Example +## Structlog Example ```python import structlog @@ -180,6 +180,12 @@ logger.debug("Example message!") } ``` +## Elastic APM Log Correlation + +`ecs-logging-python` supports automatically collecting [ECS tracing fields](https://www.elastic.co/guide/en/ecs/master/ecs-tracing.html) +from the [Elastic APM Python agent](https://github.com/elastic/apm-agent-python) in order to +[correlate logs to spans, transactions and traces](https://www.elastic.co/guide/en/apm/agent/python/current/log-correlation.html) in Elastic APM. + ## License Apache-2.0 diff --git a/ecs_logging/_stdlib.py b/ecs_logging/_stdlib.py index df1b464..313b6b9 100644 --- a/ecs_logging/_stdlib.py +++ b/ecs_logging/_stdlib.py @@ -158,9 +158,17 @@ def format_to_ecs(self, record): extra_keys = set(available).difference(self._LOGRECORD_DICT) extras = flatten_dict({key: available[key] for key in extra_keys}) + # Pop all Elastic APM extras and add them + # to standard tracing ECS fields. + extras["span.id"] = extras.pop("elasticapm_span_id", None) + extras["transaction.id"] = extras.pop("elasticapm_transaction_id", None) + extras["trace.id"] = extras.pop("elasticapm_trace_id", None) + # Merge in any keys that were set within 'extra={...}' for field, value in extras.items(): - if self._is_field_excluded(field): + if field.startswith("elasticapm_labels."): + continue # Unconditionally remove, we don't need this info. + if value is None or self._is_field_excluded(field): continue merge_dicts(de_dot(field, value), result) diff --git a/pyproject.toml b/pyproject.toml index 621ae58..11da5f1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -34,6 +34,7 @@ develop = [ "pytest-cov", "mock", "structlog", + "elastic-apm", ] [tool.flit.metadata.urls] diff --git a/tests/test_apm.py b/tests/test_apm.py new file mode 100644 index 0000000..dd0a282 --- /dev/null +++ b/tests/test_apm.py @@ -0,0 +1,177 @@ +import json +import sys +import elasticapm +from elasticapm.handlers.logging import LoggingFilter +from elasticapm.handlers.structlog import structlog_processor +import ecs_logging +import logging +import structlog +import pytest +from .compat import StringIO + + +def test_elasticapm_structlog_log_correlation_ecs_fields(): + apm = elasticapm.Client({"SERVICE_NAME": "apm-service", "DISABLE_SEND": True}) + stream = StringIO() + logger = structlog.PrintLogger(stream) + logger = structlog.wrap_logger( + logger, processors=[structlog_processor, ecs_logging.StructlogFormatter()] + ) + log = logger.new() + + apm.begin_transaction("test-transaction") + try: + with elasticapm.capture_span("test-span"): + span_id = elasticapm.get_span_id() + trace_id = elasticapm.get_trace_id() + transaction_id = elasticapm.get_transaction_id() + + log.info("test message") + finally: + apm.end_transaction("test-transaction") + + ecs = json.loads(stream.getvalue().rstrip()) + ecs.pop("@timestamp") + assert ecs == { + "ecs": {"version": "1.5.0"}, + "log": {"level": "info"}, + "message": "test message", + "span": {"id": span_id}, + "trace": {"id": trace_id}, + "transaction": {"id": transaction_id}, + } + + +@pytest.mark.skipif( + sys.version_info < (3, 2), reason="elastic-apm uses logger factory in Python 3.2+" +) +def test_elastic_apm_stdlib_no_filter_log_correlation_ecs_fields(): + apm = elasticapm.Client({"SERVICE_NAME": "apm-service", "DISABLE_SEND": True}) + stream = StringIO() + logger = logging.getLogger("apm-logger") + handler = logging.StreamHandler(stream) + handler.setFormatter( + ecs_logging.StdlibFormatter( + exclude_fields=["@timestamp", "process", "log.origin.file.line"] + ) + ) + logger.addHandler(handler) + logger.setLevel(logging.DEBUG) + + apm.begin_transaction("test-transaction") + try: + with elasticapm.capture_span("test-span"): + span_id = elasticapm.get_span_id() + trace_id = elasticapm.get_trace_id() + transaction_id = elasticapm.get_transaction_id() + + logger.info("test message") + finally: + apm.end_transaction("test-transaction") + + ecs = json.loads(stream.getvalue().rstrip()) + assert ecs == { + "ecs": {"version": "1.5.0"}, + "log": { + "level": "info", + "logger": "apm-logger", + "origin": { + "file": {"name": "test_apm.py"}, + "function": "test_elastic_apm_stdlib_no_filter_log_correlation_ecs_fields", + }, + "original": "test message", + }, + "message": "test message", + "span": {"id": span_id}, + "trace": {"id": trace_id}, + "transaction": {"id": transaction_id}, + } + + +def test_elastic_apm_stdlib_with_filter_log_correlation_ecs_fields(): + apm = elasticapm.Client({"SERVICE_NAME": "apm-service", "DISABLE_SEND": True}) + stream = StringIO() + logger = logging.getLogger("apm-logger") + handler = logging.StreamHandler(stream) + handler.setFormatter( + ecs_logging.StdlibFormatter( + exclude_fields=["@timestamp", "process", "log.origin.file.line"] + ) + ) + handler.addFilter(LoggingFilter()) + logger.addHandler(handler) + logger.setLevel(logging.DEBUG) + + apm.begin_transaction("test-transaction") + try: + with elasticapm.capture_span("test-span"): + span_id = elasticapm.get_span_id() + trace_id = elasticapm.get_trace_id() + transaction_id = elasticapm.get_transaction_id() + + logger.info("test message") + finally: + apm.end_transaction("test-transaction") + + ecs = json.loads(stream.getvalue().rstrip()) + assert ecs == { + "ecs": {"version": "1.5.0"}, + "log": { + "level": "info", + "logger": "apm-logger", + "origin": { + "file": {"name": "test_apm.py"}, + "function": "test_elastic_apm_stdlib_with_filter_log_correlation_ecs_fields", + }, + "original": "test message", + }, + "message": "test message", + "span": {"id": span_id}, + "trace": {"id": trace_id}, + "transaction": {"id": transaction_id}, + } + + +def test_elastic_apm_stdlib_exclude_fields(): + apm = elasticapm.Client({"SERVICE_NAME": "apm-service", "DISABLE_SEND": True}) + stream = StringIO() + logger = logging.getLogger("apm-logger") + handler = logging.StreamHandler(stream) + handler.setFormatter( + ecs_logging.StdlibFormatter( + exclude_fields=[ + "@timestamp", + "process", + "log.origin.file.line", + "span", + "transaction.id", + ] + ) + ) + logger.addHandler(handler) + logger.setLevel(logging.DEBUG) + + apm.begin_transaction("test-transaction") + try: + with elasticapm.capture_span("test-span"): + trace_id = elasticapm.get_trace_id() + + logger.info("test message") + finally: + apm.end_transaction("test-transaction") + + ecs = json.loads(stream.getvalue().rstrip()) + assert ecs == { + "ecs": {"version": "1.5.0"}, + "log": { + "level": "info", + "logger": "apm-logger", + "origin": { + "file": {"name": "test_apm.py"}, + "function": "test_elastic_apm_stdlib_exclude_fields", + }, + "original": "test message", + }, + "message": "test message", + "trace": {"id": trace_id}, + }