diff --git a/aws_lambda_powertools/logging/buffer/__init__.py b/aws_lambda_powertools/logging/buffer/__init__.py
new file mode 100644
index 00000000000..0e7a8cce6bd
--- /dev/null
+++ b/aws_lambda_powertools/logging/buffer/__init__.py
@@ -0,0 +1,3 @@
+from aws_lambda_powertools.logging.buffer.config import LoggerBufferConfig
+
+__all__ = ["LoggerBufferConfig"]
diff --git a/aws_lambda_powertools/logging/buffer/cache.py b/aws_lambda_powertools/logging/buffer/cache.py
new file mode 100644
index 00000000000..728147b852e
--- /dev/null
+++ b/aws_lambda_powertools/logging/buffer/cache.py
@@ -0,0 +1,215 @@
+from __future__ import annotations
+
+from collections import deque
+from typing import Any
+
+
+class KeyBufferCache:
+ """
+ A cache implementation for a single key with size tracking and eviction support.
+
+ This class manages a buffer for a specific key, keeping track of the current size
+ and providing methods to add, remove, and manage cached items. It supports automatic
+ eviction tracking and size management.
+
+ Attributes
+ ----------
+ cache : deque
+ A double-ended queue storing the cached items.
+ current_size : int
+ The total size of all items currently in the cache.
+ has_evicted : bool
+ A flag indicating whether any items have been evicted from the cache.
+ """
+
+ def __init__(self):
+ """
+ Initialize a buffer cache for a specific key.
+ """
+ self.cache: deque = deque()
+ self.current_size: int = 0
+ self.has_evicted: bool = False
+
+ def add(self, item: Any) -> None:
+ """
+ Add an item to the cache.
+
+ Parameters
+ ----------
+ item : Any
+ The item to be stored in the cache.
+ """
+ item_size = len(str(item))
+ self.cache.append(item)
+ self.current_size += item_size
+
+ def remove_oldest(self) -> Any:
+ """
+ Remove and return the oldest item from the cache.
+
+ Returns
+ -------
+ Any
+ The removed item.
+ """
+ removed_item = self.cache.popleft()
+ self.current_size -= len(str(removed_item))
+ self.has_evicted = True
+ return removed_item
+
+ def get(self) -> list:
+ """
+ Retrieve items for this key.
+
+ Returns
+ -------
+ list
+ List of items in the cache.
+ """
+ return list(self.cache)
+
+ def clear(self) -> None:
+ """
+ Clear the cache for this key.
+ """
+ self.cache.clear()
+ self.current_size = 0
+ self.has_evicted = False
+
+
+class LoggerBufferCache:
+ """
+ A multi-key buffer cache with size-based eviction and management.
+
+ This class provides a flexible caching mechanism that manages multiple keys,
+ with each key having its own buffer cache. The total size of each key's cache
+ is limited, and older items are automatically evicted when the size limit is reached.
+
+ Key Features:
+ - Multiple key support
+ - Size-based eviction
+ - Tracking of evicted items
+ - Configurable maximum buffer size
+
+ Example
+ --------
+ >>> buffer_cache = LoggerBufferCache(max_size_bytes=1000)
+ >>> buffer_cache.add("logs", "First log message")
+ >>> buffer_cache.add("debug", "Debug information")
+ >>> buffer_cache.get("logs")
+ ['First log message']
+ >>> buffer_cache.get_current_size("logs")
+ 16
+ """
+
+ def __init__(self, max_size_bytes: int):
+ """
+ Initialize the LoggerBufferCache.
+
+ Parameters
+ ----------
+ max_size_bytes : int
+ Maximum size of the cache in bytes for each key.
+ """
+ self.max_size_bytes: int = max_size_bytes
+ self.cache: dict[str, KeyBufferCache] = {}
+
+ def add(self, key: str, item: Any) -> None:
+ """
+ Add an item to the cache for a specific key.
+
+ Parameters
+ ----------
+ key : str
+ The key to store the item under.
+ item : Any
+ The item to be stored in the cache.
+
+ Returns
+ -------
+ bool
+ True if item was added, False otherwise.
+ """
+ # Check if item is larger than entire buffer
+ item_size = len(str(item))
+ if item_size > self.max_size_bytes:
+ raise BufferError("Cannot add item to the buffer")
+
+ # Create the key's cache if it doesn't exist
+ if key not in self.cache:
+ self.cache[key] = KeyBufferCache()
+
+ # Calculate the size after adding the new item
+ new_total_size = self.cache[key].current_size + item_size
+
+ # If adding the item would exceed max size, remove oldest items
+ while new_total_size > self.max_size_bytes and self.cache[key].cache:
+ self.cache[key].remove_oldest()
+ new_total_size = self.cache[key].current_size + item_size
+
+ self.cache[key].add(item)
+
+ def get(self, key: str) -> list:
+ """
+ Retrieve items for a specific key.
+
+ Parameters
+ ----------
+ key : str
+ The key to retrieve items for.
+
+ Returns
+ -------
+ list
+ List of items for the given key, or an empty list if the key doesn't exist.
+ """
+ return [] if key not in self.cache else self.cache[key].get()
+
+ def clear(self, key: str | None = None) -> None:
+ """
+ Clear the cache, either for a specific key or entirely.
+
+ Parameters
+ ----------
+ key : Optional[str], optional
+ The key to clear. If None, clears the entire cache.
+ """
+ if key:
+ if key in self.cache:
+ self.cache[key].clear()
+ del self.cache[key]
+ else:
+ self.cache.clear()
+
+ def has_items_evicted(self, key: str) -> bool:
+ """
+ Check if a specific key's cache has evicted items.
+
+ Parameters
+ ----------
+ key : str
+ The key to check for evicted items.
+
+ Returns
+ -------
+ bool
+ True if items have been evicted, False otherwise.
+ """
+ return False if key not in self.cache else self.cache[key].has_evicted
+
+ def get_current_size(self, key: str) -> int | None:
+ """
+ Get the current size of the buffer for a specific key.
+
+ Parameters
+ ----------
+ key : str
+ The key to get the current size for.
+
+ Returns
+ -------
+ int
+ The current size of the buffer for the key.
+ Returns 0 if the key does not exist.
+ """
+ return None if key not in self.cache else self.cache[key].current_size
diff --git a/aws_lambda_powertools/logging/buffer/config.py b/aws_lambda_powertools/logging/buffer/config.py
new file mode 100644
index 00000000000..cd8a7935fdf
--- /dev/null
+++ b/aws_lambda_powertools/logging/buffer/config.py
@@ -0,0 +1,78 @@
+from __future__ import annotations
+
+from typing import Literal
+
+
+class LoggerBufferConfig:
+ """
+ Configuration for log buffering behavior.
+ """
+
+ # Define class-level constant for valid log levels
+ VALID_LOG_LEVELS: list[str] = ["DEBUG", "INFO", "WARNING"]
+ LOG_LEVEL_BUFFER_VALUES = Literal["DEBUG", "INFO", "WARNING"]
+
+ def __init__(
+ self,
+ max_bytes: int = 20480,
+ buffer_at_verbosity: LOG_LEVEL_BUFFER_VALUES = "DEBUG",
+ flush_on_error_log: bool = True,
+ ):
+ """
+ Initialize logger buffer configuration.
+
+ Parameters
+ ----------
+ max_bytes : int, optional
+ Maximum size of the buffer in bytes
+ buffer_at_verbosity : str, optional
+ Minimum log level to buffer
+ flush_on_error_log : bool, optional
+ Whether to flush the buffer when an error occurs
+ """
+ self._validate_inputs(max_bytes, buffer_at_verbosity, flush_on_error_log)
+
+ self._max_bytes = max_bytes
+ self._buffer_at_verbosity = buffer_at_verbosity.upper()
+ self._flush_on_error_log = flush_on_error_log
+
+ def _validate_inputs(
+ self,
+ max_bytes: int,
+ buffer_at_verbosity: str,
+ flush_on_error_log: bool,
+ ) -> None:
+ """
+ Validate configuration inputs.
+
+ Parameters
+ ----------
+ Same as __init__ method parameters
+ """
+ if not isinstance(max_bytes, int) or max_bytes <= 0:
+ raise ValueError("Max size must be a positive integer")
+
+ if not isinstance(buffer_at_verbosity, str):
+ raise ValueError("Log level must be a string")
+
+ # Validate log level
+ if buffer_at_verbosity.upper() not in self.VALID_LOG_LEVELS:
+ raise ValueError(f"Invalid log level. Must be one of {self.VALID_LOG_LEVELS}")
+
+ if not isinstance(flush_on_error_log, bool):
+ raise ValueError("flush_on_error must be a boolean")
+
+ @property
+ def max_bytes(self) -> int:
+ """Maximum buffer size in bytes."""
+ return self._max_bytes
+
+ @property
+ def buffer_at_verbosity(self) -> str:
+ """Minimum log level to buffer."""
+ return self._buffer_at_verbosity
+
+ @property
+ def flush_on_error_log(self) -> bool:
+ """Flag to flush buffer on error."""
+ return self._flush_on_error_log
diff --git a/aws_lambda_powertools/logging/buffer/functions.py b/aws_lambda_powertools/logging/buffer/functions.py
new file mode 100644
index 00000000000..cc266354e91
--- /dev/null
+++ b/aws_lambda_powertools/logging/buffer/functions.py
@@ -0,0 +1,127 @@
+from __future__ import annotations
+
+import sys
+import time
+from typing import TYPE_CHECKING, Any, Mapping
+
+if TYPE_CHECKING:
+ import logging
+
+
+def _create_buffer_record(
+ level: int,
+ msg: object,
+ args: object,
+ exc_info: logging._ExcInfoType = None,
+ stack_info: bool = False,
+ extra: Mapping[str, object] | None = None,
+) -> dict[str, Any]:
+ """
+ Create a structured log record for buffering to save in buffer.
+
+ Parameters
+ ----------
+ level : int
+ Logging level (e.g., logging.DEBUG, logging.INFO) indicating log severity.
+ msg : object
+ The log message to be recorded.
+ args : object
+ Additional arguments associated with the log message.
+ exc_info : logging._ExcInfoType, optional
+ Exception information to be included in the log record.
+ If None, no exception details will be captured.
+ stack_info : bool, default False
+ Flag to include stack trace information in the log record.
+ extra : Mapping[str, object], optional
+ Additional context or metadata to be attached to the log record.
+
+ Returns
+ -------
+ dict[str, Any]
+
+ Notes
+ -----
+ - Captures caller frame information for precise log source tracking
+ - Automatically handles exception context
+ """
+ # Retrieve the caller's frame information to capture precise log context
+ # Uses inspect.stack() with index 3 to get the original caller's details
+ caller_frame = sys._getframe(3)
+
+ # Get the current timestamp
+ timestamp = time.time()
+
+ # Dynamically replace exc_info with current system exception information
+ # This ensures the most recent exception is captured if available
+ if exc_info:
+ exc_info = sys.exc_info()
+
+ # Construct and return the og record dictionary
+ return {
+ "level": level,
+ "msg": msg,
+ "args": args,
+ "filename": caller_frame.f_code.co_filename,
+ "line": caller_frame.f_lineno,
+ "function": caller_frame.f_code.co_name,
+ "extra": extra,
+ "timestamp": timestamp,
+ "exc_info": exc_info,
+ "stack_info": stack_info,
+ }
+
+
+def _check_minimum_buffer_log_level(buffer_log_level, current_log_level):
+ """
+ Determine if the current log level meets or exceeds the buffer's minimum log level.
+
+ Compares log levels to decide whether a log message should be included in the buffer.
+
+ Parameters
+ ----------
+ buffer_log_level : str
+ Minimum log level configured for the buffer.
+ Must be one of: 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'.
+ current_log_level : str
+ Log level of the current log message.
+ Must be one of: 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'.
+
+ Returns
+ -------
+ bool
+ True if the current log level is lower (more verbose) than the buffer's
+ minimum log level, indicating the message should be buffered.
+ False if the current log level is higher (less verbose) and should not be buffered.
+
+ Notes
+ -----
+ - Log levels are compared based on their numeric severity
+ - Conversion to uppercase ensures case-insensitive comparisons
+
+ Examples
+ --------
+ >>> _check_minimum_buffer_log_level('INFO', 'DEBUG')
+ True
+ >>> _check_minimum_buffer_log_level('ERROR', 'WARNING')
+ False
+ """
+ # Predefined log level mapping with numeric severity values
+ # Lower values indicate more verbose logging levels
+ log_levels = {
+ "DEBUG": 10,
+ "INFO": 20,
+ "WARNING": 30,
+ "ERROR": 40,
+ "CRITICAL": 50,
+ }
+
+ # Normalize input log levels to uppercase for consistent comparison
+ # Retrieve corresponding numeric log level values
+ buffer_level_num = log_levels.get(buffer_log_level.upper())
+ current_level_num = log_levels.get(current_log_level.upper())
+
+ # Compare numeric levels
+ if buffer_level_num < current_level_num:
+ return True
+
+ return False
diff --git a/aws_lambda_powertools/logging/logger.py b/aws_lambda_powertools/logging/logger.py
index 36f4ed1f194..58b3c50ca3e 100644
--- a/aws_lambda_powertools/logging/logger.py
+++ b/aws_lambda_powertools/logging/logger.py
@@ -16,12 +16,17 @@
from contextlib import contextmanager
from typing import IO, TYPE_CHECKING, Any, Callable, Generator, Iterable, Mapping, TypeVar, cast, overload
+from aws_lambda_powertools.logging.buffer.cache import LoggerBufferCache
+from aws_lambda_powertools.logging.buffer.functions import _check_minimum_buffer_log_level, _create_buffer_record
from aws_lambda_powertools.logging.constants import (
LOGGER_ATTRIBUTE_HANDLER,
LOGGER_ATTRIBUTE_POWERTOOLS_HANDLER,
LOGGER_ATTRIBUTE_PRECONFIGURED,
)
-from aws_lambda_powertools.logging.exceptions import InvalidLoggerSamplingRateError, OrphanedChildLoggerError
+from aws_lambda_powertools.logging.exceptions import (
+ InvalidLoggerSamplingRateError,
+ OrphanedChildLoggerError,
+)
from aws_lambda_powertools.logging.filters import SuppressFilter
from aws_lambda_powertools.logging.formatter import (
RESERVED_FORMATTER_CUSTOM_KEYS,
@@ -32,14 +37,18 @@
from aws_lambda_powertools.shared import constants
from aws_lambda_powertools.shared.functions import (
extract_event_from_common_models,
+ get_tracer_id,
resolve_env_var_choice,
resolve_truthy_env_var_choice,
)
from aws_lambda_powertools.utilities import jmespath_utils
+from aws_lambda_powertools.warnings import PowertoolsUserWarning
if TYPE_CHECKING:
+ from aws_lambda_powertools.logging.buffer.config import LoggerBufferConfig
from aws_lambda_powertools.shared.types import AnyCallableT
+
logger = logging.getLogger(__name__)
is_cold_start = True
@@ -100,6 +109,8 @@ class Logger:
custom logging handler e.g. logging.FileHandler("file.log")
log_uncaught_exceptions: bool, by default False
logs uncaught exception using sys.excepthook
+ buffer_config: LoggerBufferConfig, optional
+ logger buffer configuration
See: https://docs.python.org/3/library/sys.html#sys.excepthook
@@ -218,6 +229,7 @@ def __init__(
utc: bool = False,
use_rfc3339: bool = False,
serialize_stacktrace: bool = True,
+ buffer_config: LoggerBufferConfig | None = None,
**kwargs,
) -> None:
@@ -259,7 +271,17 @@ def __init__(
"serialize_stacktrace": serialize_stacktrace,
}
- self._init_logger(formatter_options=formatter_options, log_level=level, **kwargs)
+ self._buffer_config = buffer_config
+ if self._buffer_config:
+ self._buffer_cache = LoggerBufferCache(max_size_bytes=self._buffer_config.max_bytes)
+
+ self._init_logger(
+ formatter_options=formatter_options,
+ log_level=level,
+ buffer_config=self._buffer_config,
+ buffer_cache=getattr(self, "_buffer_cache", None),
+ **kwargs,
+ )
if self.log_uncaught_exceptions:
logger.debug("Replacing exception hook")
@@ -303,6 +325,8 @@ def _init_logger(
self,
formatter_options: dict | None = None,
log_level: str | int | None = None,
+ buffer_config: LoggerBufferConfig | None = None,
+ buffer_cache: LoggerBufferCache | None = None,
**kwargs,
) -> None:
"""Configures new logger"""
@@ -315,9 +339,19 @@ def _init_logger(
is_logger_preconfigured = getattr(self._logger, LOGGER_ATTRIBUTE_PRECONFIGURED, False)
if self.child:
self.setLevel(log_level)
+ if getattr(self._logger.parent, "powertools_buffer_config", None):
+ # Initializes a new, empty LoggerBufferCache for child logger
+ # Preserves parent's buffer configuration while resetting cache contents
+ self._buffer_config = self._logger.parent.powertools_buffer_config # type: ignore[union-attr]
+ self._buffer_cache = LoggerBufferCache(self._logger.parent.powertools_buffer_config.max_bytes) # type: ignore[union-attr]
return
if is_logger_preconfigured:
+ # Reuse existing buffer configuration from a previously configured logger
+ # Ensures consistent buffer settings across logger instances within the same service
+ # Enables buffer propagation and maintains a unified logging configuration
+ self._buffer_config = self._logger.powertools_buffer_config # type: ignore[attr-defined]
+ self._buffer_cache = self._logger.powertools_buffer_cache # type: ignore[attr-defined]
return
self.setLevel(log_level)
@@ -342,6 +376,8 @@ def _init_logger(
logger.debug(f"Marking logger {self.service} as preconfigured")
self._logger.init = True # type: ignore[attr-defined]
self._logger.powertools_handler = self.logger_handler # type: ignore[attr-defined]
+ self._logger.powertools_buffer_config = buffer_config # type: ignore[attr-defined]
+ self._logger.powertools_buffer_cache = buffer_cache # type: ignore[attr-defined]
def refresh_sample_rate_calculation(self) -> None:
"""
@@ -386,6 +422,7 @@ def inject_lambda_context(
log_event: bool | None = None,
correlation_id_path: str | None = None,
clear_state: bool | None = False,
+ flush_buffer_on_uncaught_error: bool = False,
) -> AnyCallableT: ...
@overload
@@ -395,6 +432,7 @@ def inject_lambda_context(
log_event: bool | None = None,
correlation_id_path: str | None = None,
clear_state: bool | None = False,
+ flush_buffer_on_uncaught_error: bool = False,
) -> Callable[[AnyCallableT], AnyCallableT]: ...
def inject_lambda_context(
@@ -403,6 +441,7 @@ def inject_lambda_context(
log_event: bool | None = None,
correlation_id_path: str | None = None,
clear_state: bool | None = False,
+ flush_buffer_on_uncaught_error: bool = False,
) -> Any:
"""Decorator to capture Lambda contextual info and inject into logger
@@ -459,6 +498,7 @@ def handler(event, context):
log_event=log_event,
correlation_id_path=correlation_id_path,
clear_state=clear_state,
+ flush_buffer_on_uncaught_error=flush_buffer_on_uncaught_error,
)
log_event = resolve_truthy_env_var_choice(
@@ -491,11 +531,25 @@ def decorate(event, context, *args, **kwargs):
if self.sampling_rate and not cold_start:
self.refresh_sample_rate_calculation()
- return lambda_handler(event, context, *args, **kwargs)
+ try:
+ # Execute the Lambda handler with provided event and context
+ return lambda_handler(event, context, *args, **kwargs)
+ except:
+ # Flush the log buffer if configured to do so on uncaught errors
+ # Ensures logging state is cleaned up even if an exception is raised
+ if flush_buffer_on_uncaught_error:
+ logger.debug("Uncaught error detected, flushing log buffer before exit")
+ self.flush_buffer()
+ # Re-raise any exceptions that occur during handler execution
+ raise
+ finally:
+ # Clear the cache after invocation is complete
+ if self._buffer_config:
+ self._buffer_cache.clear()
return decorate
- def info(
+ def debug(
self,
msg: object,
*args: object,
@@ -508,16 +562,37 @@ def info(
extra = extra or {}
extra = {**extra, **kwargs}
- return self._logger.info(
- msg,
- *args,
+ # Logging workflow for logging.debug:
+ # 1. Buffer is completely disabled - log right away
+ # 2. DEBUG is the maximum level of buffer, so, can't bypass if enabled
+ # 3. Store in buffer for potential later processing
+
+ # MAINTAINABILITY_DECISION:
+ # Keeping this implementation to avoid complex code handling.
+ # Also for clarity over complexity
+
+ # Buffer is not active and we need to log immediately
+ if not self._buffer_config:
+ return self._logger.debug(
+ msg,
+ *args,
+ exc_info=exc_info,
+ stack_info=stack_info,
+ stacklevel=stacklevel,
+ extra=extra,
+ )
+
+ # Store record in the buffer
+ self._add_log_record_to_buffer(
+ level=logging.DEBUG,
+ msg=msg,
+ args=args,
exc_info=exc_info,
stack_info=stack_info,
- stacklevel=stacklevel,
extra=extra,
)
- def error(
+ def info(
self,
msg: object,
*args: object,
@@ -530,20 +605,52 @@ def error(
extra = extra or {}
extra = {**extra, **kwargs}
- return self._logger.error(
- msg,
- *args,
+ # Logging workflow for logging.info:
+ # 1. Buffer is completely disabled - log right away
+ # 2. Log severity exceeds buffer's minimum threshold - bypass buffering
+ # 3. If neither condition met, store in buffer for potential later processing
+
+ # MAINTAINABILITY_DECISION:
+ # Keeping this implementation to avoid complex code handling.
+ # Also for clarity over complexity
+
+ # Buffer is not active and we need to log immediately
+ if not self._buffer_config:
+ return self._logger.info(
+ msg,
+ *args,
+ exc_info=exc_info,
+ stack_info=stack_info,
+ stacklevel=stacklevel,
+ extra=extra,
+ )
+
+ # Bypass buffer when log severity meets or exceeds configured minimum
+ if _check_minimum_buffer_log_level(self._buffer_config.buffer_at_verbosity, "INFO"):
+ return self._logger.info(
+ msg,
+ *args,
+ exc_info=exc_info,
+ stack_info=stack_info,
+ stacklevel=stacklevel,
+ extra=extra,
+ )
+
+ # Store record in the buffer
+ self._add_log_record_to_buffer(
+ level=logging.INFO,
+ msg=msg,
+ args=args,
exc_info=exc_info,
stack_info=stack_info,
- stacklevel=stacklevel,
extra=extra,
)
- def exception(
+ def warning(
self,
msg: object,
*args: object,
- exc_info: logging._ExcInfoType = True,
+ exc_info: logging._ExcInfoType = None,
stack_info: bool = False,
stacklevel: int = 2,
extra: Mapping[str, object] | None = None,
@@ -552,16 +659,48 @@ def exception(
extra = extra or {}
extra = {**extra, **kwargs}
- return self._logger.exception(
- msg,
- *args,
+ # Logging workflow for logging.warning:
+ # 1. Buffer is completely disabled - log right away
+ # 2. Log severity exceeds buffer's minimum threshold - bypass buffering
+ # 3. If neither condition met, store in buffer for potential later processing
+
+ # MAINTAINABILITY_DECISION:
+ # Keeping this implementation to avoid complex code handling.
+ # Also for clarity over complexity
+
+ # Buffer is not active and we need to log immediately
+ if not self._buffer_config:
+ return self._logger.warning(
+ msg,
+ *args,
+ exc_info=exc_info,
+ stack_info=stack_info,
+ stacklevel=stacklevel,
+ extra=extra,
+ )
+
+ # Bypass buffer when log severity meets or exceeds configured minimum
+ if _check_minimum_buffer_log_level(self._buffer_config.buffer_at_verbosity, "WARNING"):
+ return self._logger.warning(
+ msg,
+ *args,
+ exc_info=exc_info,
+ stack_info=stack_info,
+ stacklevel=stacklevel,
+ extra=extra,
+ )
+
+ # Store record in the buffer
+ self._add_log_record_to_buffer(
+ level=logging.WARNING,
+ msg=msg,
+ args=args,
exc_info=exc_info,
stack_info=stack_info,
- stacklevel=stacklevel,
extra=extra,
)
- def critical(
+ def error(
self,
msg: object,
*args: object,
@@ -574,7 +713,15 @@ def critical(
extra = extra or {}
extra = {**extra, **kwargs}
- return self._logger.critical(
+ # Workflow: Error Logging with automatic buffer flushing
+ # 1. Buffer configuration checked for immediate flush
+ # 2. If auto-flush enabled, trigger complete buffer processing
+ # 3. Error log is not "bufferable", so ensure error log is immediately available
+
+ if self._buffer_config and self._buffer_config.flush_on_error_log:
+ self.flush_buffer()
+
+ return self._logger.error(
msg,
*args,
exc_info=exc_info,
@@ -583,7 +730,7 @@ def critical(
extra=extra,
)
- def warning(
+ def critical(
self,
msg: object,
*args: object,
@@ -596,7 +743,15 @@ def warning(
extra = extra or {}
extra = {**extra, **kwargs}
- return self._logger.warning(
+ # Workflow: Error Logging with automatic buffer flushing
+ # 1. Buffer configuration checked for immediate flush
+ # 2. If auto-flush enabled, trigger complete buffer processing
+ # 3. Critical log is not "bufferable", so ensure error log is immediately available
+
+ if self._buffer_config and self._buffer_config.flush_on_error_log:
+ self.flush_buffer()
+
+ return self._logger.critical(
msg,
*args,
exc_info=exc_info,
@@ -605,11 +760,11 @@ def warning(
extra=extra,
)
- def debug(
+ def exception(
self,
msg: object,
*args: object,
- exc_info: logging._ExcInfoType = None,
+ exc_info: logging._ExcInfoType = True,
stack_info: bool = False,
stacklevel: int = 2,
extra: Mapping[str, object] | None = None,
@@ -618,7 +773,14 @@ def debug(
extra = extra or {}
extra = {**extra, **kwargs}
- return self._logger.debug(
+ # Workflow: Error Logging with automatic buffer flushing
+ # 1. Buffer configuration checked for immediate flush
+ # 2. If auto-flush enabled, trigger complete buffer processing
+ # 3. Exception log is not "bufferable", so ensure error log is immediately available
+ if self._buffer_config and self._buffer_config.flush_on_error_log:
+ self.flush_buffer()
+
+ return self._logger.exception(
msg,
*args,
exc_info=exc_info,
@@ -887,6 +1049,161 @@ def _determine_log_level(self, level: str | int | None) -> str | int:
# Powertools log level is set, we use this
return powertools_log_level.upper()
+ # FUNCTIONS for Buffering log
+
+ def _create_and_flush_log_record(self, log_line: dict) -> None:
+ """
+ Create and immediately flush a log record to the configured logger.
+
+ Parameters
+ ----------
+ log_line : dict[str, Any]
+ Dictionary containing log record details with keys:
+ - 'level': Logging level
+ - 'filename': Source filename
+ - 'line': Line number
+ - 'msg': Log message
+ - 'function': Source function name
+ - 'extra': Additional context
+ - 'timestamp': Original log creation time
+
+ Notes
+ -----
+ Bypasses standard logging flow by directly creating and handling a log record.
+ Preserves original timestamp and source information.
+ """
+ record = self._logger.makeRecord(
+ name=self.name,
+ level=log_line["level"],
+ fn=log_line["filename"],
+ lno=log_line["line"],
+ msg=log_line["msg"],
+ args=(),
+ exc_info=log_line["exc_info"],
+ func=log_line["function"],
+ extra=log_line["extra"],
+ )
+ record.created = log_line["timestamp"]
+ self._logger.handle(record)
+
+ def _add_log_record_to_buffer(
+ self,
+ level: int,
+ msg: object,
+ args: object,
+ exc_info: logging._ExcInfoType = None,
+ stack_info: bool = False,
+ extra: Mapping[str, object] | None = None,
+ ) -> None:
+ """
+ Add log record to buffer with intelligent tracer ID handling.
+
+ Parameters
+ ----------
+ level : int
+ Logging level of the record.
+ msg : object
+ Log message to be recorded.
+ args : object
+ Additional arguments for the log message.
+ exc_info : logging._ExcInfoType, optional
+ Exception information for the log record.
+ stack_info : bool, optional
+ Whether to include stack information.
+ extra : Mapping[str, object], optional
+ Additional contextual information for the log record.
+
+ Raises
+ ------
+ InvalidBufferItem
+ If the log record cannot be added to the buffer.
+
+ Notes
+ -----
+ Handles special first invocation buffering and migration of log records
+ between different tracer contexts.
+ """
+ # Determine tracer ID, defaulting to first invoke marker
+ tracer_id = get_tracer_id()
+
+ if tracer_id and self._buffer_config:
+ log_record: dict[str, Any] = _create_buffer_record(
+ level=level,
+ msg=msg,
+ args=args,
+ exc_info=exc_info,
+ stack_info=stack_info,
+ extra=extra,
+ )
+ try:
+ self._buffer_cache.add(tracer_id, log_record)
+ except BufferError:
+ warnings.warn(
+ message="Cannot add item to the buffer. "
+ f"Item size exceeds total cache size {self._buffer_config.max_bytes} bytes",
+ category=PowertoolsUserWarning,
+ stacklevel=2,
+ )
+
+ # flush this log to avoid data loss
+ self._create_and_flush_log_record(log_record)
+
+ def flush_buffer(self) -> None:
+ """
+ Flush all buffered log records associated with current execution.
+
+ Notes
+ -----
+ Retrieves log records for current trace from buffer
+ Immediately processes and logs each record
+ Warning if some cache was evicted in that execution
+ Clears buffer after complete processing
+
+ Raises
+ ------
+ Any exceptions from underlying logging or buffer mechanisms
+ will be propagated to caller
+ """
+ tracer_id = get_tracer_id()
+
+ # Flushing log without a tracer id? Return
+ if not tracer_id:
+ return
+
+ # is buffer empty? return
+ buffer = self._buffer_cache.get(tracer_id)
+ if not buffer:
+ return
+
+ # Process log records
+ for log_line in buffer:
+ self._create_and_flush_log_record(log_line)
+
+ # Has items evicted?
+ if self._buffer_cache.has_items_evicted(tracer_id):
+ warnings.warn(
+ message="Some logs are not displayed because they were evicted from the buffer. "
+ "Increase buffer size to store more logs in the buffer",
+ category=PowertoolsUserWarning,
+ stacklevel=2,
+ )
+
+ # Clear the entire cache
+ self._buffer_cache.clear()
+
+ def clear_buffer(self) -> None:
+ """
+ Clear the internal buffer cache.
+
+ This method removes all items from the buffer cache, effectively resetting it to an empty state.
+
+ Returns
+ -------
+ None
+ """
+ if self._buffer_config:
+ self._buffer_cache.clear()
+
def set_package_logger(
level: str | int = logging.DEBUG,
diff --git a/aws_lambda_powertools/shared/functions.py b/aws_lambda_powertools/shared/functions.py
index 18f3ec49351..4e961d4aee0 100644
--- a/aws_lambda_powertools/shared/functions.py
+++ b/aws_lambda_powertools/shared/functions.py
@@ -283,3 +283,8 @@ def abs_lambda_path(relative_path: str = "") -> str:
def sanitize_xray_segment_name(name: str) -> str:
return re.sub(constants.INVALID_XRAY_NAME_CHARACTERS, "", name)
+
+
+def get_tracer_id() -> str | None:
+ xray_trace_id = os.getenv(constants.XRAY_TRACE_ID_ENV)
+ return xray_trace_id.split(";")[0].replace("Root=", "") if xray_trace_id else None
diff --git a/docs/core/logger.md b/docs/core/logger.md
index 3e19d8ebcd1..5d7c7941b72 100644
--- a/docs/core/logger.md
+++ b/docs/core/logger.md
@@ -11,6 +11,7 @@ Logger provides an opinionated logger with output structured as JSON.
* Log Lambda event when instructed (disabled by default)
* Log sampling enables DEBUG log level for a percentage of requests (disabled by default)
* Append additional keys to structured log at any point in time
+* Buffering logs for a specific request or invocation, and flushing them automatically on error or manually as needed.
## Getting started
@@ -514,6 +515,180 @@ The following environment variables are available to configure Logger at a globa
## Advanced
+### Buffering logs
+
+Log buffering enables you to buffer logs for a specific request or invocation. Enable log buffering by passing `logger_buffer` when initializing a Logger instance. You can buffer logs at the `WARNING`, `INFO` or `DEBUG` level, and flush them automatically on error or manually as needed.
+
+!!! tip "This is useful when you want to reduce the number of log messages emitted while still having detailed logs when needed, such as when troubleshooting issues."
+
+=== "getting_started_with_buffering_logs.py"
+
+ ```python hl_lines="5 6 15"
+ --8<-- "examples/logger/src/getting_started_with_buffering_logs.py"
+ ```
+
+#### Configuring the buffer
+
+When configuring log buffering, you have options to fine-tune how logs are captured, stored, and emitted. You can configure the following parameters in the `LoggerBufferConfig` constructor:
+
+| Parameter | Description | Configuration |
+|---------------------- |------------------------------------------------ |----------------------------- |
+| `max_bytes` | Maximum size of the log buffer in bytes | `int` (default: 20480 bytes) |
+| `buffer_at_verbosity` | Minimum log level to buffer | `DEBUG`, `INFO`, `WARNING` |
+| `flush_on_error_log` | Automatically flush buffer when an error occurs | `True` (default), `False` |
+
+!!! note "When `flush_on_error_log` is enabled, it automatically flushes for `logger.exception()`, `logger.error()`, and `logger.critical()` statements."
+
+=== "working_with_buffering_logs_different_levels.py"
+
+ ```python hl_lines="5 6 10-12"
+ --8<-- "examples/logger/src/working_with_buffering_logs_different_levels.py"
+ ```
+
+ 1. Setting `minimum_log_level="WARNING"` configures log buffering for `WARNING` and lower severity levels (`INFO`, `DEBUG`).
+
+=== "working_with_buffering_logs_disable_on_error.py"
+
+ ```python hl_lines="5 6 14 21 24"
+ --8<-- "examples/logger/src/working_with_buffering_logs_disable_on_error.py"
+ ```
+
+ 1. Disabling `flush_on_error_log` will not flush the buffer when logging an error. This is useful when you want to control when the buffer is flushed by calling the `logger.flush_buffer()` method.
+
+#### Flushing on exceptions
+
+Use the `@logger.inject_lambda_context` decorator to automatically flush buffered logs when an exception is raised in your Lambda function. This is done by setting the `flush_buffer_on_uncaught_error` option to `True` in the decorator.
+
+=== "working_with_buffering_logs_when_raise_exception.py"
+
+ ```python hl_lines="5 6 13 19"
+ --8<-- "examples/logger/src/working_with_buffering_logs_when_raise_exception.py"
+ ```
+
+#### Reutilizing same logger instance
+
+If you are using log buffering, we recommend sharing the same log instance across your code/modules, so that the same buffer is also shared. Doing this you can centralize logger instance creation and prevent buffer configuration drift.
+
+!!! note "Buffer Inheritance"
+ Loggers created with the same `service_name` automatically inherit the buffer configuration from the first initialized logger with a buffer configuration.
+
+ Child loggers instances inherit their parent's buffer configuration but maintain a separate buffer.
+
+=== "working_with_buffering_logs_creating_instance.py"
+
+ ```python hl_lines="2 5"
+ --8<-- "examples/logger/src/working_with_buffering_logs_creating_instance.py"
+ ```
+
+=== "working_with_buffering_logs_reusing_handler.py"
+
+ ```python hl_lines="1 8 12"
+ --8<-- "examples/logger/src/working_with_buffering_logs_reusing_handler.py"
+ ```
+
+=== "working_with_buffering_logs_reusing_function.py"
+
+ ```python hl_lines="1"
+ --8<-- "examples/logger/src/working_with_buffering_logs_reusing_function.py"
+ ```
+
+#### Buffering workflows
+
+##### Manual flush
+
+
+```mermaid
+sequenceDiagram
+ participant Client
+ participant Lambda
+ participant Logger
+ participant CloudWatch
+ Client->>Lambda: Invoke Lambda
+ Lambda->>Logger: Initialize with DEBUG level buffering
+ Logger-->>Lambda: Logger buffer ready
+ Lambda->>Logger: logger.debug("First debug log")
+ Logger-->>Logger: Buffer first debug log
+ Lambda->>Logger: logger.info("Info log")
+ Logger->>CloudWatch: Directly log info message
+ Lambda->>Logger: logger.debug("Second debug log")
+ Logger-->>Logger: Buffer second debug log
+ Lambda->>Logger: logger.flush_buffer()
+ Logger->>CloudWatch: Emit buffered logs to stdout
+ Lambda->>Client: Return execution result
+```
+Flushing buffer manually
+
+
+##### Flushing when logging an error
+
+
+```mermaid
+sequenceDiagram
+ participant Client
+ participant Lambda
+ participant Logger
+ participant CloudWatch
+ Client->>Lambda: Invoke Lambda
+ Lambda->>Logger: Initialize with DEBUG level buffering
+ Logger-->>Lambda: Logger buffer ready
+ Lambda->>Logger: logger.debug("First log")
+ Logger-->>Logger: Buffer first debug log
+ Lambda->>Logger: logger.debug("Second log")
+ Logger-->>Logger: Buffer second debug log
+ Lambda->>Logger: logger.debug("Third log")
+ Logger-->>Logger: Buffer third debug log
+ Lambda->>Lambda: Exception occurs
+ Lambda->>Logger: logger.error("Error details")
+ Logger->>CloudWatch: Emit buffered debug logs
+ Logger->>CloudWatch: Emit error log
+ Lambda->>Client: Raise exception
+```
+Flushing buffer when an error happens
+
+
+##### Flushing on exception
+
+This works only when decorating your Lambda handler with the decorator `@logger.inject_lambda_context(flush_buffer_on_uncaught_error=True)`
+
+
+```mermaid
+sequenceDiagram
+ participant Client
+ participant Lambda
+ participant Logger
+ participant CloudWatch
+ Client->>Lambda: Invoke Lambda
+ Lambda->>Logger: Using decorator
+ Logger-->>Lambda: Logger context injected
+ Lambda->>Logger: logger.debug("First log")
+ Logger-->>Logger: Buffer first debug log
+ Lambda->>Logger: logger.debug("Second log")
+ Logger-->>Logger: Buffer second debug log
+ Lambda->>Lambda: Uncaught Exception
+ Lambda->>CloudWatch: Automatically emit buffered debug logs
+ Lambda->>Client: Raise uncaught exception
+```
+Flushing buffer when an uncaught exception happens
+
+
+#### Buffering FAQs
+
+1. **Does the buffer persist across Lambda invocations?** No, each Lambda invocation has its own buffer. The buffer is initialized when the Lambda function is invoked and is cleared after the function execution completes or when flushed manually.
+
+2. **Are my logs buffered during cold starts?** No, we never buffer logs during cold starts. This is because we want to ensure that logs emitted during this phase are always available for debugging and monitoring purposes. The buffer is only used during the execution of the Lambda function.
+
+3. **How can I prevent log buffering from consuming excessive memory?** You can limit the size of the buffer by setting the `max_bytes` option in the `LoggerBufferConfig` constructor parameter. This will ensure that the buffer does not grow indefinitely and consume excessive memory.
+
+4. **What happens if the log buffer reaches its maximum size?** Older logs are removed from the buffer to make room for new logs. This means that if the buffer is full, you may lose some logs if they are not flushed before the buffer reaches its maximum size. When this happens, we emit a warning when flushing the buffer to indicate that some logs have been dropped.
+
+5. **What timestamp is used when I flush the logs?** The timestamp preserves the original time when the log record was created. If you create a log record at 11:00:10 and flush it at 11:00:25, the log line will retain its original timestamp of 11:00:10.
+
+6. **What happens if I try to add a log line that is bigger than max buffer size?** The log will be emitted directly to standard output and not buffered. When this happens, we emit a warning to indicate that the log line was too big to be buffered.
+
+7. **What happens if Lambda times out without flushing the buffer?** Logs that are still in the buffer will be lost.
+
+8. **Do child loggers inherit the buffer?** No, child loggers do not inherit the buffer from their parent logger but only the buffer configuration. This means that if you create a child logger, it will have its own buffer and will not share the buffer with the parent logger.
+
### Built-in Correlation ID expressions
You can use any of the following built-in JMESPath expressions as part of [inject_lambda_context decorator](#setting-a-correlation-id).
diff --git a/examples/logger/src/getting_started_with_buffering_logs.py b/examples/logger/src/getting_started_with_buffering_logs.py
new file mode 100644
index 00000000000..8e210662aa0
--- /dev/null
+++ b/examples/logger/src/getting_started_with_buffering_logs.py
@@ -0,0 +1,15 @@
+from aws_lambda_powertools import Logger
+from aws_lambda_powertools.logging.buffer import LoggerBufferConfig
+from aws_lambda_powertools.utilities.typing import LambdaContext
+
+logger_buffer_config = LoggerBufferConfig(max_bytes=20480, flush_on_error_log=True)
+logger = Logger(level="INFO", buffer_config=logger_buffer_config)
+
+
+def lambda_handler(event: dict, context: LambdaContext):
+ logger.debug("a debug log") # this is buffered
+ logger.info("an info log") # this is not buffered
+
+ # do stuff
+
+ logger.flush_buffer()
diff --git a/examples/logger/src/working_with_buffering_logs_creating_instance.py b/examples/logger/src/working_with_buffering_logs_creating_instance.py
new file mode 100644
index 00000000000..32acc20b5ce
--- /dev/null
+++ b/examples/logger/src/working_with_buffering_logs_creating_instance.py
@@ -0,0 +1,5 @@
+from aws_lambda_powertools import Logger
+from aws_lambda_powertools.logging.buffer import LoggerBufferConfig
+
+logger_buffer_config = LoggerBufferConfig(max_bytes=20480, buffer_at_verbosity="WARNING")
+logger = Logger(level="INFO", buffer_config=logger_buffer_config)
diff --git a/examples/logger/src/working_with_buffering_logs_different_levels.py b/examples/logger/src/working_with_buffering_logs_different_levels.py
new file mode 100644
index 00000000000..20a735c7501
--- /dev/null
+++ b/examples/logger/src/working_with_buffering_logs_different_levels.py
@@ -0,0 +1,16 @@
+from aws_lambda_powertools import Logger
+from aws_lambda_powertools.logging.buffer import LoggerBufferConfig
+from aws_lambda_powertools.utilities.typing import LambdaContext
+
+logger_buffer_config = LoggerBufferConfig(buffer_at_verbosity="WARNING") # (1)!
+logger = Logger(level="INFO", buffer_config=logger_buffer_config)
+
+
+def lambda_handler(event: dict, context: LambdaContext):
+ logger.warning("a warning log") # this is buffered
+ logger.info("an info log") # this is buffered
+ logger.debug("a debug log") # this is buffered
+
+ # do stuff
+
+ logger.flush_buffer()
diff --git a/examples/logger/src/working_with_buffering_logs_disable_on_error.py b/examples/logger/src/working_with_buffering_logs_disable_on_error.py
new file mode 100644
index 00000000000..5e5f7555e7d
--- /dev/null
+++ b/examples/logger/src/working_with_buffering_logs_disable_on_error.py
@@ -0,0 +1,24 @@
+from aws_lambda_powertools import Logger
+from aws_lambda_powertools.logging.buffer import LoggerBufferConfig
+from aws_lambda_powertools.utilities.typing import LambdaContext
+
+logger_buffer_config = LoggerBufferConfig(flush_on_error_log=False) # (1)!
+logger = Logger(level="INFO", buffer_config=logger_buffer_config)
+
+
+class MyException(Exception):
+ pass
+
+
+def lambda_handler(event: dict, context: LambdaContext):
+ logger.debug("a debug log") # this is buffered
+
+ # do stuff
+
+ try:
+ raise MyException
+ except MyException as error:
+ logger.error("An error ocurrend", exc_info=error) # Logs won't be flushed here
+
+ # Need to flush logs manually
+ logger.flush_buffer()
diff --git a/examples/logger/src/working_with_buffering_logs_reusing_function.py b/examples/logger/src/working_with_buffering_logs_reusing_function.py
new file mode 100644
index 00000000000..3de22289bbe
--- /dev/null
+++ b/examples/logger/src/working_with_buffering_logs_reusing_function.py
@@ -0,0 +1,6 @@
+from working_with_buffering_logs_creating_instance import logger # reusing same instance
+
+
+def my_function():
+ logger.debug("This will be buffered")
+ # do stuff
diff --git a/examples/logger/src/working_with_buffering_logs_reusing_handler.py b/examples/logger/src/working_with_buffering_logs_reusing_handler.py
new file mode 100644
index 00000000000..96f28c47916
--- /dev/null
+++ b/examples/logger/src/working_with_buffering_logs_reusing_handler.py
@@ -0,0 +1,12 @@
+from working_with_buffering_logs_creating_instance import logger # reusing same instance
+from working_with_buffering_logs_reusing_function import my_function
+
+from aws_lambda_powertools.utilities.typing import LambdaContext
+
+
+def lambda_handler(event: dict, context: LambdaContext):
+ logger.debug("a debug log") # this is buffered
+
+ my_function()
+
+ logger.flush_buffer()
diff --git a/examples/logger/src/working_with_buffering_logs_when_raise_exception.py b/examples/logger/src/working_with_buffering_logs_when_raise_exception.py
new file mode 100644
index 00000000000..20f39efcdb1
--- /dev/null
+++ b/examples/logger/src/working_with_buffering_logs_when_raise_exception.py
@@ -0,0 +1,19 @@
+from aws_lambda_powertools import Logger
+from aws_lambda_powertools.logging.buffer import LoggerBufferConfig
+from aws_lambda_powertools.utilities.typing import LambdaContext
+
+logger_buffer_config = LoggerBufferConfig(max_bytes=20480, flush_on_error_log=False)
+logger = Logger(level="INFO", buffer_config=logger_buffer_config)
+
+
+class MyException(Exception):
+ pass
+
+
+@logger.inject_lambda_context(flush_buffer_on_uncaught_error=True)
+def lambda_handler(event: dict, context: LambdaContext):
+ logger.debug("a debug log") # this is buffered
+
+ # do stuff
+
+ raise MyException # Logs will be flushed here
diff --git a/noxfile.py b/noxfile.py
index 4710bcbca2c..c1d06798b88 100644
--- a/noxfile.py
+++ b/noxfile.py
@@ -60,6 +60,7 @@ def test_with_only_required_packages(session: nox.Session):
session,
folders=[
f"{PREFIX_TESTS_FUNCTIONAL}/logger/required_dependencies/",
+ f"{PREFIX_TESTS_UNIT}/logger/required_dependencies/",
f"{PREFIX_TESTS_FUNCTIONAL}/metrics/required_dependencies/",
f"{PREFIX_TESTS_FUNCTIONAL}/middleware_factory/required_dependencies/",
f"{PREFIX_TESTS_FUNCTIONAL}/typing/required_dependencies/",
diff --git a/tests/e2e/logger/handlers/buffer_logs_with_flush.py b/tests/e2e/logger/handlers/buffer_logs_with_flush.py
new file mode 100644
index 00000000000..bcf70db3291
--- /dev/null
+++ b/tests/e2e/logger/handlers/buffer_logs_with_flush.py
@@ -0,0 +1,14 @@
+from aws_lambda_powertools import Logger
+from aws_lambda_powertools.logging.buffer import LoggerBufferConfig
+
+logger_buffer_config = LoggerBufferConfig(max_bytes=10240)
+
+logger = Logger(level="INFO", buffer_config=logger_buffer_config)
+
+
+def lambda_handler(event, context):
+ message_visible, message_buffered = event.get("message_visible", ""), event.get("message_buffered", {})
+ logger.info(message_visible)
+ logger.debug(message_buffered)
+ logger.flush_buffer()
+ return "success"
diff --git a/tests/e2e/logger/handlers/buffer_logs_without_flush.py b/tests/e2e/logger/handlers/buffer_logs_without_flush.py
new file mode 100644
index 00000000000..ef606c0c474
--- /dev/null
+++ b/tests/e2e/logger/handlers/buffer_logs_without_flush.py
@@ -0,0 +1,13 @@
+from aws_lambda_powertools import Logger
+from aws_lambda_powertools.logging.buffer import LoggerBufferConfig
+
+logger_buffer_config = LoggerBufferConfig(max_bytes=10240)
+
+logger = Logger(level="INFO", buffer_config=logger_buffer_config)
+
+
+def lambda_handler(event, context):
+ message_visible, message_buffered = event.get("message_visible", ""), event.get("message_buffered", {})
+ logger.info(message_visible)
+ logger.debug(message_buffered)
+ return "success"
diff --git a/tests/e2e/logger/test_logger.py b/tests/e2e/logger/test_logger.py
index 2a9ab47f559..94fa40026b5 100644
--- a/tests/e2e/logger/test_logger.py
+++ b/tests/e2e/logger/test_logger.py
@@ -39,6 +39,68 @@ def multiple_logger_instances_arn(infrastructure: dict) -> str:
return infrastructure.get("MultipleLoggerInstancesArn", "")
+@pytest.fixture
+def buffer_logs_without_flush_fn(infrastructure: dict) -> str:
+ return infrastructure.get("BufferLogsWithoutFlush", "")
+
+
+@pytest.fixture
+def buffer_logs_without_flush_arn(infrastructure: dict) -> str:
+ return infrastructure.get("BufferLogsWithoutFlushArn", "")
+
+
+@pytest.fixture
+def buffer_logs_with_flush_fn(infrastructure: dict) -> str:
+ return infrastructure.get("BufferLogsWithFlush", "")
+
+
+@pytest.fixture
+def buffer_logs_with_flush_arn(infrastructure: dict) -> str:
+ return infrastructure.get("BufferLogsWithFlushArn", "")
+
+
+@pytest.mark.xdist_group(name="logger")
+def test_buffer_logs_without_flush(buffer_logs_without_flush_fn, buffer_logs_without_flush_arn):
+ # GIVEN
+ message = "logs should be visible with default settings"
+ message_buffer = "not visible message"
+ payload = json.dumps({"message_visible": message, "message_buffered": message_buffer})
+
+ # WHEN
+ _, execution_time = data_fetcher.get_lambda_response(lambda_arn=buffer_logs_without_flush_arn, payload=payload)
+ data_fetcher.get_lambda_response(lambda_arn=buffer_logs_without_flush_arn, payload=payload)
+
+ # THEN
+ logs = data_fetcher.get_logs(
+ function_name=buffer_logs_without_flush_fn,
+ start_time=execution_time,
+ minimum_log_entries=2,
+ )
+
+ assert len(logs) == 2
+
+
+@pytest.mark.xdist_group(name="logger")
+def test_buffer_logs_with_flush(buffer_logs_with_flush_fn, buffer_logs_with_flush_arn):
+ # GIVEN
+ message = "logs should be visible with default settings"
+ message_buffer = "not visible message"
+ payload = json.dumps({"message_visible": message, "message_buffered": message_buffer})
+
+ # WHEN
+ _, execution_time = data_fetcher.get_lambda_response(lambda_arn=buffer_logs_with_flush_arn, payload=payload)
+ data_fetcher.get_lambda_response(lambda_arn=buffer_logs_with_flush_arn, payload=payload)
+
+ # THEN
+ logs = data_fetcher.get_logs(
+ function_name=buffer_logs_with_flush_fn,
+ start_time=execution_time,
+ minimum_log_entries=4,
+ )
+
+ assert len(logs) == 4
+
+
@pytest.mark.xdist_group(name="logger")
def test_basic_lambda_logs_visible(basic_handler_fn, basic_handler_fn_arn):
# GIVEN
diff --git a/tests/functional/logger/required_dependencies/test_powertools_logger_buffer.py b/tests/functional/logger/required_dependencies/test_powertools_logger_buffer.py
new file mode 100644
index 00000000000..1e5a104be83
--- /dev/null
+++ b/tests/functional/logger/required_dependencies/test_powertools_logger_buffer.py
@@ -0,0 +1,493 @@
+"""aws_lambda_logging tests."""
+
+import io
+import json
+import random
+import string
+import warnings
+from collections import namedtuple
+
+import pytest
+
+from aws_lambda_powertools import Logger
+from aws_lambda_powertools.logging.buffer import LoggerBufferConfig
+from aws_lambda_powertools.shared import constants
+from aws_lambda_powertools.warnings import PowertoolsUserWarning
+
+
+@pytest.fixture
+def lambda_context():
+ lambda_context = {
+ "function_name": "test",
+ "memory_limit_in_mb": 128,
+ "invoked_function_arn": "arn:aws:lambda:eu-west-1:809313241:function:test",
+ "aws_request_id": "52fdfc07-2182-154f-163f-5f0f9a621d72",
+ }
+
+ return namedtuple("LambdaContext", lambda_context.keys())(*lambda_context.values())
+
+
+@pytest.fixture
+def stdout():
+ return io.StringIO()
+
+
+@pytest.fixture
+def service_name():
+ chars = string.ascii_letters + string.digits
+ return "".join(random.SystemRandom().choice(chars) for _ in range(15))
+
+
+def capture_logging_output(stdout):
+ return json.loads(stdout.getvalue().strip())
+
+
+def capture_multiple_logging_statements_output(stdout):
+ return [json.loads(line.strip()) for line in stdout.getvalue().split("\n") if line]
+
+
+@pytest.mark.parametrize("log_level", ["DEBUG", "WARNING", "INFO"])
+def test_logger_buffer_with_minimum_level_warning(log_level, stdout, service_name, monkeypatch):
+
+ monkeypatch.setenv(constants.XRAY_TRACE_ID_ENV, "1-67c39786-5908a82a246fb67f3089263f")
+
+ # GIVEN A logger configured with a buffer and minimum log level set to WARNING
+ logger_buffer_config = LoggerBufferConfig(max_bytes=10240, buffer_at_verbosity="WARNING")
+ logger = Logger(level=log_level, service=service_name, stream=stdout, buffer_config=logger_buffer_config)
+
+ msg = "This is a test"
+ log_command = {
+ "INFO": logger.info,
+ "WARNING": logger.warning,
+ "DEBUG": logger.debug,
+ }
+
+ # WHEN Logging a message using the specified log level
+ log_message = log_command[log_level]
+ log_message(msg)
+ log_dict = stdout.getvalue()
+
+ # THEN verify that the message is buffered and not immediately output
+ assert log_dict == ""
+
+
+def test_logger_buffer_is_never_buffered_with_exception(stdout, service_name):
+ # GIVEN A logger configured with a buffer and default logging behavior
+ logger_buffer_config = LoggerBufferConfig(max_bytes=10240)
+ logger = Logger(service=service_name, stream=stdout, buffer_config=logger_buffer_config)
+
+ # WHEN An exception is raised and logged
+ try:
+ raise ValueError("something went wrong")
+ except Exception:
+ logger.exception("Received an exception")
+
+ # THEN We expect the log record is not buffered
+ log = capture_logging_output(stdout)
+ assert "Received an exception" == log["message"]
+
+
+def test_logger_buffer_is_never_buffered_with_error(stdout, service_name):
+ # GIVEN A logger configured with a buffer and default logging behavior
+ logger_buffer_config = LoggerBufferConfig(max_bytes=10240)
+ logger = Logger(service=service_name, stream=stdout, buffer_config=logger_buffer_config)
+
+ # WHEN Logging an error message
+ logger.error("Received an exception")
+
+ # THEN The error log should be immediately output without buffering
+ log = capture_logging_output(stdout)
+ assert "Received an exception" == log["message"]
+
+
+@pytest.mark.parametrize("log_level", ["CRITICAL", "ERROR"])
+def test_logger_buffer_is_flushed_when_an_error_happens(stdout, service_name, log_level, monkeypatch):
+ monkeypatch.setenv(constants.XRAY_TRACE_ID_ENV, "1-67c39786-5908a82a246fb67f3089263f")
+
+ # GIVEN A logger configured with buffer and automatic error-based flushing
+ logger_buffer_config = LoggerBufferConfig(max_bytes=10240, buffer_at_verbosity="DEBUG", flush_on_error_log=True)
+ logger = Logger(level="DEBUG", service=service_name, stream=stdout, buffer_config=logger_buffer_config)
+
+ # WHEN Adding debug log messages before triggering an error
+ logger.debug("this log line will be flushed")
+ logger.debug("this log line will be flushed too")
+
+ log_command = {
+ "CRITICAL": logger.critical,
+ "ERROR": logger.error,
+ "EXCEPTION": logger.exception,
+ }
+
+ # WHEN Logging an error message using the specified log level
+ log_message = log_command[log_level]
+ log_message("Received an exception")
+
+ # THEN: All buffered log messages should be flushed and output
+ log = capture_multiple_logging_statements_output(stdout)
+ assert isinstance(log, list)
+ assert "this log line will be flushed" == log[0]["message"]
+ assert "this log line will be flushed too" == log[1]["message"]
+
+
+@pytest.mark.parametrize("log_level", ["CRITICAL", "ERROR"])
+def test_logger_buffer_is_not_flushed_when_an_error_happens(stdout, service_name, log_level, monkeypatch):
+ monkeypatch.setenv(constants.XRAY_TRACE_ID_ENV, "1-67c39786-5908a82a246fb67f3089263f")
+
+ # GIVEN A logger configured with a buffer and error flushing disabled
+ logger_buffer_config = LoggerBufferConfig(max_bytes=10240, buffer_at_verbosity="DEBUG", flush_on_error_log=False)
+ logger = Logger(level="DEBUG", service=service_name, stream=stdout, buffer_config=logger_buffer_config)
+
+ # WHEN Adding debug log messages before an error
+ logger.debug("this log line will be flushed")
+ logger.debug("this log line will be flushed too")
+
+ log_command = {
+ "CRITICAL": logger.critical,
+ "ERROR": logger.error,
+ "EXCEPTION": logger.exception,
+ }
+
+ # WHEN Logging an error message using the specified log level
+ log_message = log_command[log_level]
+ log_message("Received an exception")
+
+ # THEN The error log message should be output, but previous debug logs should remain buffered
+ log = capture_logging_output(stdout)
+ assert not isinstance(log, list)
+ assert "Received an exception" == log["message"]
+ assert log_level == log["level"]
+
+
+def test_create_and_flush_logs(stdout, service_name, monkeypatch):
+ monkeypatch.setenv(constants.XRAY_TRACE_ID_ENV, "1-67c39786-5908a82a246fb67f3089263f")
+
+ # GIVEN A logger configured with a large buffer
+ logger_buffer_config = LoggerBufferConfig(max_bytes=10240)
+ logger = Logger(level="DEBUG", service=service_name, stream=stdout, buffer_config=logger_buffer_config)
+
+ # WHEN Logging a message and then flushing the buffer
+ logger.debug("this log line will be flushed")
+ logger.flush_buffer()
+
+ # THEN The log record should be immediately output and not remain buffered
+ log = capture_multiple_logging_statements_output(stdout)
+ assert "this log line will be flushed" == log[0]["message"]
+
+
+def test_ensure_log_location_after_flush_buffer(stdout, service_name, monkeypatch):
+ monkeypatch.setenv(constants.XRAY_TRACE_ID_ENV, "1-67c39786-5908a82a246fb67f3089263f")
+
+ # GIVEN A logger configured with a sufficiently large buffer
+ logger_buffer_config = LoggerBufferConfig(max_bytes=10240)
+ logger = Logger(level="DEBUG", service=service_name, stream=stdout, buffer_config=logger_buffer_config)
+
+ # WHEN Logging a debug message and immediately flushing the buffer
+ logger.debug("this log line will be flushed")
+ logger.flush_buffer()
+
+ # THEN Validate that the log location is precisely captured
+ log = capture_multiple_logging_statements_output(stdout)
+ assert "test_ensure_log_location_after_flush_buffer" in log[0]["location"]
+
+
+def test_clear_buffer_during_execution(stdout, service_name, monkeypatch):
+ monkeypatch.setenv(constants.XRAY_TRACE_ID_ENV, "1-67c39786-5908a82a246fb67f3089263f")
+
+ # GIVEN A logger configured with a sufficiently large buffer
+ logger_buffer_config = LoggerBufferConfig(max_bytes=10240)
+ logger = Logger(level="DEBUG", service=service_name, stream=stdout, buffer_config=logger_buffer_config)
+
+ # WHEN we clear the buffer during the execution
+ logger.debug("this log line will be flushed")
+ logger.clear_buffer()
+
+ # THEN not log is flushed
+ logger.flush_buffer()
+ log = capture_multiple_logging_statements_output(stdout)
+ assert not log
+
+
+def test_exception_logging_during_buffer_flush(stdout, service_name, monkeypatch):
+ monkeypatch.setenv(constants.XRAY_TRACE_ID_ENV, "1-67c39786-5908a82a246fb67f3089263f")
+
+ # GIVEN A logger configured with a sufficiently large buffer
+ logger_buffer_config = LoggerBufferConfig(max_bytes=10240)
+ logger = Logger(level="DEBUG", service=service_name, stream=stdout, buffer_config=logger_buffer_config)
+
+ # Custom exception class
+ class MyError(Exception):
+ pass
+
+ # WHEN Logging an exception and flushing the buffer
+ try:
+ raise MyError("Test exception message")
+ except MyError as error:
+ logger.debug("Logging a test exception to verify buffer and exception handling", exc_info=error)
+
+ logger.flush_buffer()
+
+ # THEN Validate that the log exception fields
+ log = capture_multiple_logging_statements_output(stdout)
+ assert log[0]["exception_name"] == "MyError"
+ assert "Test exception message" in log[0]["exception"]
+ assert "test_exception_logging_during_buffer_flush" in log[0]["exception"]
+
+
+def test_create_buffer_with_items_evicted(stdout, service_name, monkeypatch):
+ monkeypatch.setenv(constants.XRAY_TRACE_ID_ENV, "1-67c39786-5908a82a246fb67f3089263f")
+
+ # GIVEN A logger configured with a 1024-byte buffer
+ logger_buffer_config = LoggerBufferConfig(max_bytes=1024, buffer_at_verbosity="DEBUG")
+ logger = Logger(level="DEBUG", service=service_name, stream=stdout, buffer_config=logger_buffer_config)
+
+ # WHEN Adding multiple log entries that exceed buffer size
+ logger.debug("this log line will be flushed")
+ logger.debug("this log line will be flushed")
+ logger.debug("this log line will be flushed")
+ logger.debug("this log line will be flushed")
+ logger.debug("this log line will be flushed")
+
+ # THEN A warning should be raised when flushing logs that exceed buffer capacity
+ with pytest.warns(PowertoolsUserWarning, match="Some logs are not displayed because*"):
+ logger.flush_buffer()
+
+
+def test_create_buffer_with_items_evicted_with_next_invocation(stdout, service_name, monkeypatch):
+ monkeypatch.setenv(constants.XRAY_TRACE_ID_ENV, "1-67c39786-5908a82a246fb67f3089263f")
+
+ # GIVEN A logger configured with a 1024-byte buffer
+ logger_buffer_config = LoggerBufferConfig(max_bytes=1024, buffer_at_verbosity="DEBUG")
+ logger = Logger(level="DEBUG", service=service_name, stream=stdout, buffer_config=logger_buffer_config)
+
+ # WHEN Adding multiple log entries that exceed buffer size
+ message = "this log line will be flushed"
+ logger.debug(message)
+ logger.debug(message)
+ logger.debug(message)
+ logger.debug(message)
+ logger.debug(message)
+
+ # THEN First buffer flush triggers warning about log eviction
+ with pytest.warns(PowertoolsUserWarning, match="Some logs are not displayed because*"):
+ logger.flush_buffer()
+
+ monkeypatch.setenv(constants.XRAY_TRACE_ID_ENV, "12345")
+ # WHEN Adding another log entry after initial flush
+ logger.debug("new log entry after buffer flush")
+
+ # THEN Subsequent buffer flush should not trigger warning
+ with warnings.catch_warnings(record=True) as warning_list:
+ warnings.simplefilter("always")
+ logger.flush_buffer()
+ assert len(warning_list) == 0, "No warnings should be raised"
+
+
+def test_flush_buffer_when_empty(stdout, service_name, monkeypatch):
+ monkeypatch.setenv(constants.XRAY_TRACE_ID_ENV, "1-67c39786-5908a82a246fb67f3089263f")
+
+ # GIVEN: A logger configured with a 1024-byte buffer
+ logger_buffer_config = LoggerBufferConfig(max_bytes=1024, buffer_at_verbosity="DEBUG")
+
+ logger = Logger(level="DEBUG", service=service_name, stream=stdout, buffer_config=logger_buffer_config)
+
+ # WHEN: Flushing the buffer without adding any log entries
+ logger.flush_buffer()
+
+ # THEN: No output should be generated
+ log = capture_multiple_logging_statements_output(stdout)
+ assert not log
+
+
+def test_log_record_exceeding_buffer_size(stdout, service_name, monkeypatch):
+ monkeypatch.setenv(constants.XRAY_TRACE_ID_ENV, "1-67c39786-5908a82a246fb67f3089263f")
+
+ message = "this log is bigger than entire buffer size"
+
+ # GIVEN A logger configured with a small 10-byte buffer
+ logger_buffer_config = LoggerBufferConfig(max_bytes=10, buffer_at_verbosity="DEBUG")
+
+ logger = Logger(level="DEBUG", service=service_name, stream=stdout, buffer_config=logger_buffer_config)
+
+ # WHEN Attempting to log a message larger than the entire buffer
+ # THEN A warning should be raised indicating buffer size limitation
+ with pytest.warns(PowertoolsUserWarning, match="Cannot add item to the buffer*"):
+ logger.debug(message)
+
+ # THEN the log must be flushed to avoid data loss
+ log = capture_multiple_logging_statements_output(stdout)
+ assert log[0]["message"] == message
+
+
+@pytest.mark.parametrize("log_level", ["WARNING", "INFO"])
+def test_logger_buffer_log_output_for_levels_above_minimum(log_level, stdout, service_name, monkeypatch):
+ monkeypatch.setenv(constants.XRAY_TRACE_ID_ENV, "1-67c39786-5908a82a246fb67f3089263f")
+
+ # GIVEN A logger configured with a buffer and minimum log level set to DEBUG
+ logger_buffer_config = LoggerBufferConfig(max_bytes=10240, buffer_at_verbosity="DEBUG")
+ logger = Logger(level=log_level, service=service_name, stream=stdout, buffer_config=logger_buffer_config)
+
+ msg = f"This is a test with level {log_level}"
+ log_command = {
+ "INFO": logger.info,
+ "WARNING": logger.warning,
+ }
+
+ # WHEN Logging a message using the specified log level higher than debug
+ log_message = log_command[log_level]
+ log_message(msg)
+
+ # THEN: The logged message should be immediately output and not buffered
+ log = capture_multiple_logging_statements_output(stdout)
+ assert len(log) == 1
+ assert log[0]["message"] == msg
+
+
+def test_logger_buffer_flush_on_uncaught_exception(stdout, service_name, monkeypatch, lambda_context):
+ monkeypatch.setenv(constants.XRAY_TRACE_ID_ENV, "1-67c39786-5908a82a246fb67f3089263f")
+
+ # GIVEN: A logger configured with a large buffer and error-based flushing
+ logger_buffer_config = LoggerBufferConfig(max_bytes=10240, buffer_at_verbosity="DEBUG")
+ logger = Logger(level="DEBUG", service=service_name, stream=stdout, buffer_config=logger_buffer_config)
+
+ @logger.inject_lambda_context(flush_buffer_on_uncaught_error=True)
+ def handler(event, context):
+ # Log messages that should be flushed when an exception occurs
+ logger.debug("this log line will be flushed after error - 1")
+ logger.debug("this log line will be flushed after error - 2")
+ raise ValueError("Test error")
+
+ # WHEN Invoking the handler and expecting a ValueError
+ with pytest.raises(ValueError):
+ handler({}, lambda_context)
+
+ # THEN Verify that buffered log messages are flushed before the exception
+ log = capture_multiple_logging_statements_output(stdout)
+ assert len(log) == 2, "Expected two log messages to be flushed"
+ assert log[0]["message"] == "this log line will be flushed after error - 1"
+ assert log[1]["message"] == "this log line will be flushed after error - 2"
+
+
+def test_logger_buffer_not_flush_on_uncaught_exception(stdout, service_name, monkeypatch, lambda_context):
+ monkeypatch.setenv(constants.XRAY_TRACE_ID_ENV, "1-67c39786-5908a82a246fb67f3089263f")
+
+ # GIVEN: A logger configured with a large buffer and error-based flushing
+ logger_buffer_config = LoggerBufferConfig(max_bytes=10240, buffer_at_verbosity="DEBUG")
+ logger = Logger(level="DEBUG", service=service_name, stream=stdout, buffer_config=logger_buffer_config)
+
+ @logger.inject_lambda_context(flush_buffer_on_uncaught_error=False)
+ def handler(event, context):
+ # Log messages that should be flushed when an exception occurs
+ logger.debug("this log line will be flushed after error - 1")
+ logger.debug("this log line will be flushed after error - 2")
+ raise ValueError("Test error")
+
+ # WHEN Invoking the handler and expecting a ValueError
+ with pytest.raises(ValueError):
+ handler({}, lambda_context)
+
+ # THEN Verify that buffered log messages are flushed before the exception
+ log = capture_multiple_logging_statements_output(stdout)
+ assert len(log) == 0
+
+
+def test_buffer_configuration_and_buffer_propagation_across_logger_instances(stdout, service_name, monkeypatch):
+ monkeypatch.setenv(constants.XRAY_TRACE_ID_ENV, "1-67c39786-5908a82a246fb67f3089263f")
+
+ # GIVEN A logger configured with specific buffer settings
+ logger_buffer_config = LoggerBufferConfig(max_bytes=10240, buffer_at_verbosity="DEBUG")
+
+ # Create primary logger with explicit buffer configuration
+ primary_logger = Logger(level="DEBUG", service=service_name, stream=stdout, buffer_config=logger_buffer_config)
+
+ # Create secondary logger for the same service (should inherit buffer config)
+ secondary_logger = Logger(level="DEBUG", service=service_name)
+
+ # WHEN Logging messages and flushing the buffer
+ primary_logger.debug("Log message from primary logger")
+ secondary_logger.debug("Log message from secondary logger")
+ primary_logger.flush_buffer()
+
+ # THEN Verify log messages are correctly captured and output
+ log = capture_multiple_logging_statements_output(stdout)
+
+ assert "Log message from primary logger" == log[0]["message"]
+ assert "Log message from secondary logger" == log[1]["message"]
+ assert primary_logger._logger.powertools_buffer_config == secondary_logger._logger.powertools_buffer_config
+
+
+def test_buffer_config_isolation_between_loggers_with_different_services(stdout, service_name, monkeypatch):
+ monkeypatch.setenv(constants.XRAY_TRACE_ID_ENV, "1-67c39786-5908a82a246fb67f3089263f")
+
+ # GIVEN A logger configured with specific buffer settings
+ logger_buffer_config = LoggerBufferConfig(max_bytes=10240, buffer_at_verbosity="DEBUG")
+
+ # Create primary logger with explicit buffer configuration
+ buffered_logger = Logger(level="DEBUG", service=service_name, stream=stdout, buffer_config=logger_buffer_config)
+
+ # Configure another logger with a different service name
+ unbuffered_logger = Logger(level="DEBUG", service="powertoolsxyz")
+
+ # WHEN
+ # Log messages using both loggers and flush the buffer
+ buffered_logger.debug("Log message from buffered logger")
+ unbuffered_logger.debug("Log message from unbuffered logger")
+ buffered_logger.flush_buffer()
+
+ # THEN The buffered logger's message is present in the output
+ # THEN The loggers have different buffer configurations
+ log = capture_multiple_logging_statements_output(stdout)
+
+ assert "Log message from buffered logger" == log[0]["message"]
+ assert len(log) == 1
+ assert buffered_logger._logger.powertools_buffer_config != unbuffered_logger._logger.powertools_buffer_config
+
+
+def test_buffer_configuration_propagation_across_child_logger_instances(stdout, service_name, monkeypatch):
+ monkeypatch.setenv(constants.XRAY_TRACE_ID_ENV, "1-67c39786-5908a82a246fb67f3089263f")
+
+ # GIVEN A logger configured with specific buffer settings
+ logger_buffer_config = LoggerBufferConfig(max_bytes=10240, buffer_at_verbosity="DEBUG")
+
+ # Create primary logger with explicit buffer configuration
+ primary_logger = Logger(level="DEBUG", service=service_name, stream=stdout, buffer_config=logger_buffer_config)
+
+ # Create a child log
+ secondary_logger = Logger(level="DEBUG", service=service_name, child=True)
+
+ # WHEN Logging messages and flushing the buffer
+ primary_logger.debug("Log message from primary logger")
+ secondary_logger.debug("Log message from secondary logger")
+
+ primary_logger.flush_buffer()
+
+ # THEN Verify log messages are correctly captured and output only for primary logger
+ # 1. Only one log message is output (from parent logger)
+ # 2. Buffer configuration is shared between parent and child
+ # 3. Buffer caches remain separate between instances
+ log = capture_multiple_logging_statements_output(stdout)
+ assert len(log) == 1
+ assert primary_logger._buffer_config == secondary_logger._buffer_config
+ assert primary_logger._buffer_cache != secondary_logger._buffer_cache
+
+
+def test_logger_buffer_is_cleared_between_lambda_invocations(stdout, service_name, monkeypatch, lambda_context):
+ # Set initial trace ID for first Lambda invocation
+ monkeypatch.setenv(constants.XRAY_TRACE_ID_ENV, "1-67c39786-5908a82a246fb67f3089263f")
+
+ # GIVEN A logger configured with specific buffer parameters
+ logger_buffer_config = LoggerBufferConfig(max_bytes=10240)
+ logger = Logger(level="DEBUG", service=service_name, stream=stdout, buffer_config=logger_buffer_config)
+
+ @logger.inject_lambda_context
+ def handler(event, context):
+ logger.debug("debug line")
+
+ # WHEN First Lambda invocation with initial trace ID
+ handler({}, lambda_context)
+
+ # WHEN New Lambda invocation arrives with different trace ID
+ monkeypatch.setenv(constants.XRAY_TRACE_ID_ENV, "2-ABC39786-5908a82a246fb67f3089263f")
+ handler({}, lambda_context)
+
+ # THEN Verify buffer for the original trace ID is cleared
+ assert not logger._buffer_cache.get("1-67c39786-5908a82a246fb67f3089263f")
diff --git a/tests/unit/logger/__init__.py b/tests/unit/logger/__init__.py
new file mode 100644
index 00000000000..e69de29bb2d
diff --git a/tests/unit/logger/required_dependencies/__init__.py b/tests/unit/logger/required_dependencies/__init__.py
new file mode 100644
index 00000000000..e69de29bb2d
diff --git a/tests/unit/logger/required_dependencies/test_logger_buffer_cache.py b/tests/unit/logger/required_dependencies/test_logger_buffer_cache.py
new file mode 100644
index 00000000000..00ae7696281
--- /dev/null
+++ b/tests/unit/logger/required_dependencies/test_logger_buffer_cache.py
@@ -0,0 +1,165 @@
+import pytest
+
+from aws_lambda_powertools.logging.buffer.cache import LoggerBufferCache
+
+
+def test_initialization():
+
+ # GIVEN a new instance of LoggerBufferCache
+ logger_cache = LoggerBufferCache(1000)
+
+ # THEN cache should have correct initial state
+ assert logger_cache.max_size_bytes == 1000
+ assert logger_cache.cache == {}
+
+
+def test_add_single_item():
+ # GIVEN a new instance of LoggerBufferCache with 1024 bytes
+ logger_cache = LoggerBufferCache(1024)
+
+ # WHEN a single item is added
+ logger_cache.add("key1", "test_item")
+
+ # THEN item is stored correctly with proper size tracking
+ assert len(logger_cache.get("key1")) == 1
+ assert logger_cache.get("key1")[0] == "test_item"
+ assert logger_cache.get_current_size("key1") == len("test_item")
+
+
+def test_add_multiple_items_same_key():
+ # GIVEN a new instance of LoggerBufferCache with 1024 bytes
+ logger_cache = LoggerBufferCache(1024)
+
+ # WHEN multiple items are added to the same key
+ logger_cache.add("key1", "item1")
+ logger_cache.add("key1", "item2")
+
+ # THEN items are stored sequentially
+ assert len(logger_cache.get("key1")) == 2
+ assert logger_cache.get("key1") == ["item1", "item2"]
+ assert logger_cache.has_items_evicted("key1") is False
+
+
+def test_cache_size_limit_single_key():
+ # GIVEN a new instance of LoggerBufferCache with small cache size
+ logger_cache = LoggerBufferCache(10)
+
+ # WHEN multiple items are added
+ logger_cache.add("key1", "long_item1")
+ logger_cache.add("key1", "long_item2")
+ logger_cache.add("key1", "long_item3")
+
+ # THEN cache maintains size limit for a single key
+ assert len(logger_cache.get("key1")) > 0
+ assert logger_cache.get_current_size("key1") <= 10
+ assert logger_cache.has_items_evicted("key1") is True
+
+
+def test_item_larger_than_cache():
+ # GIVEN a new instance of LoggerBufferCache with small cache size
+ logger_cache = LoggerBufferCache(5)
+
+ # WHEN an item larger than cache is added
+ with pytest.raises(BufferError):
+ # THEN a warning is raised
+ logger_cache.add("key1", "very_long_item")
+
+ # THEN the key is not added
+ assert "key1" not in logger_cache.cache
+
+
+def test_get_existing_key():
+ # GIVEN a new instance of LoggerBufferCache with 1024 bytes
+ logger_cache = LoggerBufferCache(1024)
+
+ # WHEN we add keys
+ logger_cache.add("key1", "item1")
+ logger_cache.add("key1", "item2")
+
+ # THEN all items are retrieved
+ assert logger_cache.get("key1") == ["item1", "item2"]
+
+
+def test_get_non_existing_key():
+ # GIVEN a new instance of LoggerBufferCache with 1024 bytes
+ logger_cache = LoggerBufferCache(1000)
+
+ # WHEN getting items for a non-existing key
+ retrieved = logger_cache.get("non_existing")
+
+ # THEN an empty list is returned
+ assert retrieved == []
+
+
+def test_clear_all():
+ # GIVEN a new instance of LoggerBufferCache with 1024 bytes
+ logger_cache = LoggerBufferCache(1024)
+
+ # WHEN we add multiple keys
+ logger_cache.add("key1", "item1")
+ logger_cache.add("key2", "item2")
+
+ # WHEN clearing all keys
+ logger_cache.clear()
+
+ # THEN cache becomes empty
+ assert logger_cache.cache == {}
+
+
+def test_clear_specific_key():
+ # GIVEN a new instance of LoggerBufferCache with 1024 bytes
+ logger_cache = LoggerBufferCache(1024)
+
+ # WHEN we add multiple keys
+ logger_cache.add("key1", "item1")
+ logger_cache.add("key2", "item2")
+
+ # WHEN we remove a specific key
+ logger_cache.clear("key1")
+
+ # THEN only that key is removed
+ assert "key1" not in logger_cache.cache
+ assert "key2" in logger_cache.cache
+ assert logger_cache.get("key1") == []
+
+
+def test_multiple_keys_with_size_limits():
+ # GIVEN a new instance of LoggerBufferCache with 20 bytes
+ logger_cache = LoggerBufferCache(20)
+
+ # WHEN adding items to multiple keys
+ logger_cache.add("key1", "item1")
+ logger_cache.add("key1", "item2")
+ logger_cache.add("key2", "long_item")
+
+ # THEN total size remains within limit
+ assert len(logger_cache.get("key1")) > 0
+ assert len(logger_cache.get("key2")) > 0
+ assert logger_cache.get_current_size("key1") + logger_cache.get_current_size("key2") <= 20
+
+
+def test_add_different_types():
+ # GIVEN a new instance of LoggerBufferCache with 1024 bytes
+ logger_cache = LoggerBufferCache(1024)
+
+ # WHEN adding items of different types
+ logger_cache.add("key1", 123)
+ logger_cache.add("key1", [1, 2, 3])
+ logger_cache.add("key1", {"a": 1})
+
+ # THEN items are stored successfully
+ retrieved = logger_cache.get("key1")
+ assert len(retrieved) == 3
+
+
+def test_cache_size_tracking():
+ # GIVEN a new instance of LoggerBufferCache with 30 bytes
+ logger_cache = LoggerBufferCache(30)
+
+ # WHEN adding items
+ logger_cache.add("key1", "small")
+ logger_cache.add("key1", "another_item")
+
+ # THEN current size is tracked correctly
+ assert logger_cache.get_current_size("key1") == len("small") + len("another_item")
+ assert logger_cache.get_current_size("key1") <= 30
diff --git a/tests/unit/logger/required_dependencies/test_logger_buffer_config.py b/tests/unit/logger/required_dependencies/test_logger_buffer_config.py
new file mode 100644
index 00000000000..1cb0f1a5f0d
--- /dev/null
+++ b/tests/unit/logger/required_dependencies/test_logger_buffer_config.py
@@ -0,0 +1,78 @@
+import pytest
+
+from aws_lambda_powertools.logging.buffer import LoggerBufferConfig
+
+
+def test_default_configuration():
+ # GIVEN no specific configuration parameters
+ config_buffer = LoggerBufferConfig()
+
+ # THEN default values are default
+ assert config_buffer.max_bytes == 20480
+ assert config_buffer.buffer_at_verbosity == "DEBUG"
+ assert config_buffer.flush_on_error_log is True
+
+
+def test_custom_configuration():
+ # GIVEN a new LoggerBufferConfig with custom configuration parameters
+ config_buffer = LoggerBufferConfig(
+ max_bytes=51200,
+ buffer_at_verbosity="WARNING",
+ flush_on_error_log=False,
+ )
+
+ # THEN configuration is set with provided values
+ assert config_buffer.max_bytes == 51200
+ assert config_buffer.buffer_at_verbosity == "WARNING"
+ assert config_buffer.flush_on_error_log is False
+
+
+def test_invalid_max_size_negative():
+ # GIVEN an invalid negative max size
+ invalid_max_size = -100
+
+ # WHEN creating a LoggerBufferConfig
+ with pytest.raises(ValueError, match="Max size must be a positive integer"):
+ # THEN a ValueError is raised
+ LoggerBufferConfig(max_bytes=invalid_max_size)
+
+
+def test_invalid_max_size_type():
+ # GIVEN an invalid max size type
+ invalid_max_size = "10240"
+
+ # WHEN creating a LoggerBufferConfig
+ with pytest.raises(ValueError, match="Max size must be a positive integer"):
+ # THEN a ValueError is raised
+ LoggerBufferConfig(max_bytes=invalid_max_size)
+
+
+def test_invalid_log_level():
+ # GIVEN an invalid log level
+ invalid_log_levels = ["INVALID_LEVEL", 123, None]
+
+ # WHEN creating a LoggerBufferConfig
+ for invalid_log_level in invalid_log_levels:
+ # THEN a ValueError is raised
+ with pytest.raises(ValueError):
+ LoggerBufferConfig(buffer_at_verbosity=invalid_log_level)
+
+
+def test_case_insensitive_log_level():
+ # GIVEN
+ test_cases = ["debug", "Info", "WARNING"]
+
+ # WHEN / THEN
+ for log_level in test_cases:
+ config = LoggerBufferConfig(buffer_at_verbosity=log_level)
+ assert config.buffer_at_verbosity == log_level.upper()
+
+
+def test_invalid_flush_on_error():
+ # GIVEN an invalid flush_on_error type
+ invalid_flush_on_error = "True"
+
+ # WHEN creating a LoggerBufferConfig / THEN
+ with pytest.raises(ValueError, match="flush_on_error must be a boolean"):
+ # THEN a ValueError is raised
+ LoggerBufferConfig(flush_on_error_log=invalid_flush_on_error)
diff --git a/tests/unit/logger/required_dependencies/test_logger_buffer_functions.py b/tests/unit/logger/required_dependencies/test_logger_buffer_functions.py
new file mode 100644
index 00000000000..5a714b095d2
--- /dev/null
+++ b/tests/unit/logger/required_dependencies/test_logger_buffer_functions.py
@@ -0,0 +1,29 @@
+from aws_lambda_powertools.logging.buffer.functions import _check_minimum_buffer_log_level
+
+
+def test_resolve_buffer_log_level_comparison():
+ # Test cases where buffer level is lower than current level (should return True)
+ assert _check_minimum_buffer_log_level("DEBUG", "INFO") is True
+ assert _check_minimum_buffer_log_level("DEBUG", "WARNING") is True
+ assert _check_minimum_buffer_log_level("DEBUG", "ERROR") is True
+ assert _check_minimum_buffer_log_level("INFO", "WARNING") is True
+ assert _check_minimum_buffer_log_level("INFO", "ERROR") is True
+ assert _check_minimum_buffer_log_level("WARNING", "ERROR") is True
+
+ # Test cases where buffer level is higher than current level (should return False)
+ assert _check_minimum_buffer_log_level("ERROR", "DEBUG") is False
+ assert _check_minimum_buffer_log_level("CRITICAL", "INFO") is False
+ assert _check_minimum_buffer_log_level("ERROR", "WARNING") is False
+
+
+def test_resolve_buffer_log_level_case_insensitivity():
+ # Test case insensitivity
+ assert _check_minimum_buffer_log_level("debug", "INFO") is True
+ assert _check_minimum_buffer_log_level("DEBUG", "info") is True
+ assert _check_minimum_buffer_log_level("Debug", "Info") is True
+
+
+def test_resolve_buffer_log_level_edge_cases():
+ # Additional edge cases
+ assert _check_minimum_buffer_log_level("DEBUG", "CRITICAL") is True
+ assert _check_minimum_buffer_log_level("CRITICAL", "DEBUG") is False