diff --git a/pyproject.toml b/pyproject.toml index 49db4d5..582de27 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -14,7 +14,7 @@ readme = "README.md" requires-python = ">=3.7" dependencies = [ "backports.zoneinfo >= 0.2.1; python_version < '3.9'", - "clp-ffi-py >= 0.0.11", + "clp-ffi-py >= 0.0.14", "typing-extensions >= 3.7.4", "tzlocal == 5.1; python_version < '3.8'", "tzlocal >= 5.2; python_version >= '3.8'", diff --git a/src/clp_logging/auto_generated_kv_pairs_utils.py b/src/clp_logging/auto_generated_kv_pairs_utils.py new file mode 100644 index 0000000..5519394 --- /dev/null +++ b/src/clp_logging/auto_generated_kv_pairs_utils.py @@ -0,0 +1,68 @@ +import logging +from typing import Any, Dict + +from clp_logging.utils import Timestamp + +TIMESTAMP_KEY: str = "timestamp" +TIMESTAMP_UNIX_MILLISECS_KEY: str = "unix_millisecs" +TIMESTAMP_UTC_OFFSET_SECS_KEY: str = "utc_offset_secs" + +LEVEL_KEY: str = "level" +LEVEL_NUM_KEY: str = "num" +LEVEL_NAME_KEY: str = "name" + +SOURCE_LOCATION_KEY: str = "source_location" +SOURCE_LOCATION_PATH_KEY: str = "path" +SOURCE_LOCATION_LINE_KEY: str = "line" + + +class AutoGeneratedKeyValuePairsBuffer: + """ + A reusable buffer for auto-generated key-value pairs. + + This buffer maintains a predefined dictionary for common metadata fields, to + enable efficient reuse without creating new dictionaries for each log event. + """ + + def __init__(self) -> None: + self._buf: Dict[str, Any] = { + TIMESTAMP_KEY: { + TIMESTAMP_UNIX_MILLISECS_KEY: None, + TIMESTAMP_UTC_OFFSET_SECS_KEY: None, + }, + LEVEL_KEY: { + LEVEL_NUM_KEY: None, + LEVEL_NAME_KEY: None, + }, + SOURCE_LOCATION_KEY: { + SOURCE_LOCATION_PATH_KEY: None, + SOURCE_LOCATION_LINE_KEY: None, + }, + } + + def generate(self, ts: Timestamp, record: logging.LogRecord) -> Dict[str, Any]: + """ + Generates the auto-generated key-value pairs by populating the + underlying buffer with the given log event metadata. + + :param ts: The timestamp assigned to the log event. + :param record: The LogRecord containing metadata for the log event. + :return: The populated underlying buffer as the auto-generated key-value + pairs. + """ + + self._buf[TIMESTAMP_KEY][TIMESTAMP_UNIX_MILLISECS_KEY] = ts.get_unix_ts() + self._buf[TIMESTAMP_KEY][TIMESTAMP_UTC_OFFSET_SECS_KEY] = ts.get_utc_offset() + + # NOTE: We don't add all the metadata contained in `record`. Instead, we only add the + # following fields: + # - Log level + # - Source location + + self._buf[LEVEL_KEY][LEVEL_NUM_KEY] = record.levelno + self._buf[LEVEL_KEY][LEVEL_NAME_KEY] = record.levelname + + self._buf[SOURCE_LOCATION_KEY][SOURCE_LOCATION_PATH_KEY] = record.pathname + self._buf[SOURCE_LOCATION_KEY][SOURCE_LOCATION_LINE_KEY] = record.lineno + + return self._buf diff --git a/src/clp_logging/handlers.py b/src/clp_logging/handlers.py index ede0003..488882b 100644 --- a/src/clp_logging/handlers.py +++ b/src/clp_logging/handlers.py @@ -3,19 +3,23 @@ import socket import sys import time +import warnings from abc import ABCMeta, abstractmethod +from contextlib import nullcontext from math import floor from pathlib import Path from queue import Empty, Queue from signal import SIGINT, signal, SIGTERM from threading import Thread, Timer from types import FrameType -from typing import Callable, ClassVar, Dict, IO, Optional, Tuple, Union +from typing import Any, Callable, ClassVar, Dict, IO, Optional, Tuple, Union import tzlocal -from clp_ffi_py.ir import FourByteEncoder +from clp_ffi_py.ir import FourByteEncoder, Serializer +from clp_ffi_py.utils import serialize_dict_to_msgpack from zstandard import FLUSH_FRAME, ZstdCompressionWriter, ZstdCompressor +from clp_logging.auto_generated_kv_pairs_utils import AutoGeneratedKeyValuePairsBuffer from clp_logging.protocol import ( BYTE_ORDER, EOF_CHAR, @@ -25,12 +29,15 @@ UINT_MAX, ULONG_MAX, ) +from clp_logging.utils import Timestamp # TODO: lock writes to zstream if GIL ever goes away # Note: no need to quote "Queue[Tuple[int, bytes]]" in python 3.9 DEFAULT_LOG_FORMAT: str = " %(levelname)s %(name)s %(message)s" WARN_PREFIX: str = " [WARN][clp_logging]" +AUTO_GENERATED_KV_PAIRS_KEY: str = "auto_generated_kv_pairs" +USER_GENERATED_KV_PAIRS_KEY: str = "user_generated_kv_pairs" def _init_timeinfo(fmt: Optional[str], tz: Optional[str]) -> Tuple[str, str]: @@ -129,9 +136,9 @@ def _write(self, loglevel: int, msg: str) -> None: # override def emit(self, record: logging.LogRecord) -> None: """ - Override `logging.Handler.emit` in base class to ensure - `logging.Handler.handleError` is always called and avoid requiring a - `logging.LogRecord` to call internal writing functions. + Implements `logging.Handler.emit` to ensure + `logging.Handler.handleError` is always called and so derived classes + only need to implement `_write` instead of implementing this method. """ msg: str = self.format(record) + "\n" try: @@ -792,3 +799,169 @@ def __init__( super().__init__( open(fpath, mode), enable_compression, timestamp_format, timezone, loglevel_timeout ) + + +class ClpKeyValuePairStreamHandler(logging.Handler): + """ + A custom logging handler that serializes key-value pair log events into the + CLP key-value pair IR format. + + Differences from `logging.StreamHandler`: + + - Log events (`logging.LogRecord`) should contain the key-value pairs that a user wants to log + as a Python dictionary. + - As a result, the key-value pairs will not be formatted into a string before being written. + - The key-value pairs will be serialized into the CLP key-value pair IR format before writing to + the stream. + + Key-value pairs in the log event must abide by the following rules: + - Keys must be of type `str`. + - Values must be one of the following types: + - Primitives: `int`, `float`, `str`, `bool`, or `None`. + - Arrays, where each array: + - may contain primitive values, dictionaries, or nested arrays. + - can be empty. + - Dictionaries, where each dictionary: + - must adhere to the aforementioned rules for keys and values. + - can be empty. + + :param stream: A writable byte output stream to which the handler will write the serialized IR + byte sequences. + :param enable_compression: Whether to compress the serialized IR byte sequences using Zstandard. + """ + + def __init__( + self, + stream: IO[bytes], + enable_compression: bool = True, + ) -> None: + super().__init__() + + self._enable_compression: bool = enable_compression + self._serializer: Optional[Serializer] = None + self._formatter: Optional[logging.Formatter] = None + self._ostream: IO[bytes] = stream + + self._auto_gen_kv_pairs_buf: AutoGeneratedKeyValuePairsBuffer = ( + AutoGeneratedKeyValuePairsBuffer() + ) + + self._init_new_serializer(stream) + + # override + def setFormatter(self, fmt: Optional[logging.Formatter]) -> None: + if fmt is None: + return + warnings.warn( + f"{self.__class__.__name__} doesn't currently support Formatters", + category=RuntimeWarning, + ) + self._formatter = fmt + + # override + def emit(self, record: logging.LogRecord) -> None: + """ + Implements `logging.Handler.emit` to encode the given record into CLP's + IR format before it's written to the underlying stream. + + :param record: The log event to serialize. + """ + try: + self._write(record) + except Exception: + self.handleError(record) + + # override + def setStream(self, stream: IO[bytes]) -> Optional[IO[bytes]]: + """ + Sets the instance's stream to the given value, if it's different from + the current value. The old stream is flushed before the new stream is + set. + + NOTE: The old stream will also be closed by this method. + + :param stream: A writable byte output stream to which the handler will write the serialized + IR byte sequences. + :return: The old stream if the stream was changed, or `None` if it wasn't. + """ + + # NOTE: This function is implemented by mirroring CPython's implementation. + + if stream is self._ostream: + return None + + old_stream: IO[bytes] = self._ostream + with self.lock if self.lock else nullcontext(): + # TODO: The following call will close the old stream whereas `logging.StreamHandler`'s + # implementation will only flush the stream without closing it. To support + # `logging.StreamHandler`'s behaviour, we need `clp_ffi_py.ir.Serializer` to allow + # closing the serializer without closing the underlying output stream. + self._init_new_serializer(stream) + self._ostream = stream + return old_stream + + # override + def close(self) -> None: + if self._is_closed(): + return + self._close_serializer() + super().close() + + def _is_closed(self) -> bool: + return self._serializer is None + + def _close_serializer(self) -> None: + """ + Closes the current serializer if it's open. + + NOTE: The underlying output stream will also be closed. + """ + if self._is_closed(): + return + assert self._serializer is not None + self._serializer.close() + self._serializer = None + + def _init_new_serializer(self, stream: IO[bytes]) -> None: + """ + Initializes a new serializer that will write to the given stream. + + :param stream: The stream that the underlying serializer will write to. + """ + self._close_serializer() + self._serializer = Serializer( + ZstdCompressor().stream_writer(stream) if self._enable_compression else stream + ) + + def _write(self, record: logging.LogRecord) -> None: + """ + Writes the log event into the underlying serializer. + + :param record: The log event to serialize. + :raise RuntimeError: If the handler has been already closed. + :raise TypeError: If `record.msg` is not a Python dictionary. + """ + if self._is_closed(): + raise RuntimeError("Stream already closed.") + + if not isinstance(record.msg, dict): + raise TypeError("`record.msg` must be a Python dictionary.") + + self._serialize_kv_pair_log_event( + self._auto_gen_kv_pairs_buf.generate(Timestamp.now(), record), record.msg + ) + + def _serialize_kv_pair_log_event( + self, auto_gen_kv_pairs: Dict[str, Any], user_gen_kv_pairs: Dict[str, Any] + ) -> None: + """ + :param auto_gen_kv_pairs: A dict of auto-generated kv-pairs. + :param user_gen_kv_pairs: A dict of user-generated kv-pairs. + """ + if self._is_closed(): + raise RuntimeError("Stream already closed.") + assert self._serializer is not None + self._serializer.serialize_log_event_from_msgpack_map( + serialize_dict_to_msgpack(auto_gen_kv_pairs), + serialize_dict_to_msgpack(user_gen_kv_pairs), + ) diff --git a/src/clp_logging/utils.py b/src/clp_logging/utils.py new file mode 100644 index 0000000..f0f05b9 --- /dev/null +++ b/src/clp_logging/utils.py @@ -0,0 +1,44 @@ +from __future__ import annotations + +import time +from math import floor + + +class Timestamp: + """ + A timestamp represented as a Unix timestamp and a timezone offset from UTC. + """ + + @staticmethod + def now() -> Timestamp: + """ + :return: A `Timestamp` instance representing the current time. + """ + ts: float = time.time() + return Timestamp( + unix_ts=floor(ts * 1000), + utc_offset=time.localtime(ts).tm_gmtoff, + ) + + def __init__(self, unix_ts: int, utc_offset: int): + """ + Initializes a `Timestamp` instance with the given time. + + :param unix_ts: Unix timestamp in milliseconds. + :param utc_offset: The number of seconds the timezone is ahead of + (positive) or behind (negative) UTC. + """ + self._utc_offset: int = utc_offset + self._unix_ts: int = unix_ts + + def get_unix_ts(self) -> int: + """ + :return: The Unix timestamp in milliseconds. + """ + return self._unix_ts + + def get_utc_offset(self) -> int: + """ + :return: The number of seconds the timezone is ahead of (positive) or behind (negative) UTC. + """ + return self._utc_offset diff --git a/tests/__init__.py b/tests/__init__.py index 1e32374..92bea66 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -1,7 +1,11 @@ import unittest from typing import Iterable, Optional, Union -from tests.test_handlers import TestCLPBase, TestCLPSegmentStreamingBase +from tests.test_handlers import ( + TestCLPBase, + TestClpKeyValuePairLoggingBase, + TestCLPSegmentStreamingBase, +) def add_tests(suite: unittest.TestSuite, loader: unittest.TestLoader, test_class: type) -> None: @@ -35,4 +39,7 @@ def load_tests( for seg_test_class in TestCLPSegmentStreamingBase.__subclasses__(): add_tests(suite, loader, seg_test_class) + for kv_pair_handler_test_class in TestClpKeyValuePairLoggingBase.__subclasses__(): + add_tests(suite, loader, kv_pair_handler_test_class) + return suite diff --git a/tests/test_handlers.py b/tests/test_handlers.py index 09c79d8..1fd1a7a 100644 --- a/tests/test_handlers.py +++ b/tests/test_handlers.py @@ -1,3 +1,5 @@ +import copy +import inspect import logging import os import signal @@ -5,12 +7,15 @@ import unittest from ctypes import c_double, c_int from datetime import datetime, tzinfo +from io import open as io_open from math import floor from multiprocessing.sharedctypes import Array, Synchronized, SynchronizedArray, Value from pathlib import Path -from typing import cast, Dict, IO, List, Optional, Union +from types import FrameType +from typing import Any, cast, Dict, IO, List, Optional, Union import dateutil.parser +from clp_ffi_py.ir import Deserializer, KeyValuePairLogEvent from smart_open import open, register_compressor # type: ignore from zstandard import ( ZstdCompressionWriter, @@ -19,9 +24,21 @@ ZstdDecompressor, ) +from clp_logging.auto_generated_kv_pairs_utils import ( + LEVEL_KEY, + LEVEL_NAME_KEY, + LEVEL_NUM_KEY, + SOURCE_LOCATION_KEY, + SOURCE_LOCATION_LINE_KEY, + SOURCE_LOCATION_PATH_KEY, + TIMESTAMP_KEY, + TIMESTAMP_UNIX_MILLISECS_KEY, + TIMESTAMP_UTC_OFFSET_SECS_KEY, +) from clp_logging.handlers import ( CLPBaseHandler, CLPFileHandler, + ClpKeyValuePairStreamHandler, CLPLogLevelTimeout, CLPSockHandler, CLPStreamHandler, @@ -30,6 +47,7 @@ ) from clp_logging.protocol import Metadata from clp_logging.readers import CLPFileReader, CLPSegmentStreaming +from clp_logging.utils import Timestamp def _zstd_comppressions_handler( @@ -52,6 +70,7 @@ def _zstd_comppressions_handler( FATAL_EXIT_CODE_BASE: int = 128 ASSERT_TIMESTAMP_DELTA_S: float = 0.256 +ASSERT_TIMESTAMP_DELTA_MS: int = floor(ASSERT_TIMESTAMP_DELTA_S * 1000) LOG_DELAY_S: float = 0.064 TIMEOUT_PADDING_S: float = 0.512 @@ -750,5 +769,204 @@ def setUp(self) -> None: self.setup_logging() +class ExpectedLogEvent: + """ + An expected kv-pair log event, which contains all relevant log event + metadata and user-generated kv-pairs. + """ + + def __init__( + self, + ts: Timestamp, + level_num: int, + level_name: str, + path: Path, + line: Optional[int], + user_generated_kv_pairs: Dict[str, Any], + ) -> None: + self.ts: Timestamp = ts + self.level_num: int = level_num + self.level_name: str = level_name + self.path: Path = path + self.line: Optional[int] = line + self.user_generated_kv_pairs: Dict[str, Any] = user_generated_kv_pairs + + +class TestClpKeyValuePairLoggingBase(unittest.TestCase): + """ + A base class for testing CLP key-value pair logging handlers. + + TODO: Functionality-wise, this class mirrors `TestCLPBase`. We should refactor `TestCLPBase` + to support both raw-text logging (unstructured logging) and key-value pair logging (structured + logging). + """ + + # Set by the derived test cases + _enable_compression: bool + _clp_kv_pair_handler: ClpKeyValuePairStreamHandler + + # Set by `setUp` + _clp_log_path: Path + _logger: logging.Logger + _expected_log_events: List[ExpectedLogEvent] + + # override + @classmethod + def setUpClass(cls) -> None: + if not LOG_DIR.exists(): + LOG_DIR.mkdir(parents=True, exist_ok=True) + assert LOG_DIR.is_dir() + + # override + def setUp(self) -> None: + file_extension: str = ".clp.zst" if self._enable_compression else ".clp" + self._clp_log_path: Path = LOG_DIR / f"{self.id()}{file_extension}" + if self._clp_log_path.exists(): + self._clp_log_path.unlink() + self._logger: logging.Logger = logging.getLogger(self.id()) + self._expected_log_events: List[ExpectedLogEvent] = [] + + def _setup_logging(self) -> None: + self._logger.setLevel(logging.DEBUG) + self._logger.addHandler(self._clp_kv_pair_handler) + + def _log(self, level: int, kv_pairs: Dict[str, Any]) -> None: + level_name: str = logging.getLevelName(level) + path: Path = Path(__file__).resolve() + curr_frame: Optional[FrameType] = inspect.currentframe() + + # NOTE: This line must be right before the actual logging statement + line: Optional[int] = curr_frame.f_lineno + 1 if curr_frame is not None else None + self._logger.log(level, kv_pairs) + expected: ExpectedLogEvent = ExpectedLogEvent( + Timestamp.now(), level, level_name, path, line, kv_pairs + ) + self._expected_log_events.append(expected) + + def _close_and_compare(self) -> None: + self._close() + self._compare() + + def _close(self) -> None: + logging.shutdown() + self._logger.removeHandler(self._clp_kv_pair_handler) + + def _compare(self) -> None: + actual_log_events: List[KeyValuePairLogEvent] = self._read_clp() + self.assertEqual(len(self._expected_log_events), len(actual_log_events)) + for actual, expected in zip(actual_log_events, self._expected_log_events): + auto_generated_kv_pairs: Dict[str, Any] + user_generated_kv_pairs: Dict[str, Any] + auto_generated_kv_pairs, user_generated_kv_pairs = actual.to_dict() + + # Check user generated kv pairs + self.assertEqual(user_generated_kv_pairs, expected.user_generated_kv_pairs) + + # Check auto generated kv pairs + self.assertAlmostEqual( + auto_generated_kv_pairs[TIMESTAMP_KEY][TIMESTAMP_UNIX_MILLISECS_KEY], + expected.ts.get_unix_ts(), + delta=ASSERT_TIMESTAMP_DELTA_MS, + ) + self.assertEqual( + auto_generated_kv_pairs[TIMESTAMP_KEY][TIMESTAMP_UTC_OFFSET_SECS_KEY], + expected.ts.get_utc_offset(), + ) + + self.assertEqual(auto_generated_kv_pairs[LEVEL_KEY][LEVEL_NUM_KEY], expected.level_num) + self.assertEqual( + auto_generated_kv_pairs[LEVEL_KEY][LEVEL_NAME_KEY], expected.level_name + ) + + self.assertEqual( + Path(auto_generated_kv_pairs[SOURCE_LOCATION_KEY][SOURCE_LOCATION_PATH_KEY]), + expected.path, + ) + if expected.line is not None: + self.assertEqual( + auto_generated_kv_pairs[SOURCE_LOCATION_KEY][SOURCE_LOCATION_LINE_KEY], + expected.line, + ) + + def _read_clp(self) -> List[KeyValuePairLogEvent]: + result: List[KeyValuePairLogEvent] = [] + with open(str(self._clp_log_path), "rb") as ir_stream: + deserializer: Deserializer = Deserializer(ir_stream) + while True: + deserialized_log_event: Optional[KeyValuePairLogEvent] = ( + deserializer.deserialize_log_event() + ) + if deserialized_log_event is None: + break + result.append(deserialized_log_event) + return result + + +class TestClpKeyValuePairHandlerBase(TestClpKeyValuePairLoggingBase): + def test_basic(self) -> None: + static_message: Dict[str, str] = {"static_message": "This is a static message"} + primitive_dict: Dict[str, Any] = { + "str": "str", + "int": 1, + "float": 1.0, + "bool": True, + "null": None, + } + primitive_array: List[Any] = ["str", 1, 1.0, True, None] + + self._log(logging.DEBUG, {"message": f"user_id={self.id()}", "static": static_message}) + self._log(logging.INFO, {"dict": primitive_dict}) + self._log(logging.WARNING, {"array": primitive_array}) + self._log(logging.ERROR, {"array": primitive_array}) + + dict_with_array: Dict[str, Any] = copy.deepcopy(primitive_dict) + dict_with_array["array"] = primitive_array + array_with_dict: List[Any] = copy.deepcopy(primitive_array) + array_with_dict.append(primitive_dict) + self._log( + logging.CRITICAL, + {"dict_with_array": dict_with_array, "array_with_dict": array_with_dict}, + ) + + self._log(logging.DEBUG, {}) + + self._close_and_compare() + + def test_empty(self) -> None: + self._close_and_compare() + + +class TestClpKeyValuePairStreamingHandlerRaw(TestClpKeyValuePairHandlerBase): + """ + Test `ClpKeyValuePairStreamingHandler` without compression. + """ + + # override + def setUp(self) -> None: + self._enable_compression = False + super().setUp() + self._clp_kv_pair_handler = ClpKeyValuePairStreamHandler( + io_open(self._clp_log_path, "wb"), + self._enable_compression, + ) + self._setup_logging() + + +class TestClpKeyValuePairStreamingHandlerZstd(TestClpKeyValuePairHandlerBase): + """ + Test `ClpKeyValuePairStreamingHandler` with zstd compression. + """ + + # override + def setUp(self) -> None: + self._enable_compression = True + super().setUp() + self._clp_kv_pair_handler = ClpKeyValuePairStreamHandler( + io_open(self._clp_log_path, "wb"), + self._enable_compression, + ) + self._setup_logging() + + if __name__ == "__main__": unittest.main()