From 9e568e4c2039d0bfc769d5b6f700967a3e337d7d Mon Sep 17 00:00:00 2001 From: Dan Schultzer <1254724+danschultzer@users.noreply.github.com> Date: Wed, 8 Jan 2025 11:40:28 -0800 Subject: [PATCH] Add telemetry events --- CHANGELOG.md | 7 ++ lib/idempotency_plug.ex | 65 ++++++++++++++----- lib/idempotency_plug/request_tracker.ex | 45 +++++++++++++ mix.exs | 1 + .../idempotency_plug/request_tracker_test.exs | 64 +++++++++++++++++- test/idempotency_plug_test.exs | 46 +++++++++++++ 6 files changed, 210 insertions(+), 18 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0ea8f6b..7d58879 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,10 @@ +## v0.2.2 (TBA) + +### Changes + +* Added `idempotency_plug.track` telemetry span +* Added `idempotency_plug.request_tracker.cache_hit`, `idempotency_plug.request_tracker.cache_miss`, and `idempotency_plug.request_tracker.prune` telemetry events + ## v0.2.1 (2023-04-28) Relaxed dependency requirements for `ecto` and `ecto_sql`. diff --git a/lib/idempotency_plug.ex b/lib/idempotency_plug.ex index 0b1853e..b3a2844 100644 --- a/lib/idempotency_plug.ex +++ b/lib/idempotency_plug.ex @@ -122,6 +122,24 @@ defmodule IdempotencyPlug do - `{mod, fun, args}` - calls the MFA to process the conn with error, the connection MUST be halted. + ## Telemetry events + + The following events are emitted by the Plug: + + * `[:idempotency_plug, :track, :start]` - dispatched before request tracking + * Measurement: `%{system_time: System.system_time}` + * Metadata: `%{telemetry_span_context: term(), conn: Plug.Conn.t, tracker: module, idempotency_key: binary}` + tracker: tracker, + idempotency_key: key + + * `[:idempotency_plug, :track, :exception]` - dispatched after exceptions on tracking a request + * Measurement: `%{duration: native_time}` + * Metadata: `%{telemetry_span_context: term(), conn: Plug.Conn.t, tracker: module, idempotency_key: binary, kind: :throw | :error | :exit, reason: term(), stacktrace: list()}` + + * `[:idempotency_plug, :track, :stop]` - dispatched after successfully tracking a request + * Measurement: `%{duration: native_time}` + * Metadata: `%{telemetry_span_context: term(), conn: Plug.Conn.t, tracker: module, idempotency_key: binary}` + ## Examples plug IdempotencyPlug, @@ -218,28 +236,41 @@ defmodule IdempotencyPlug do idempotency_key_hash = hash_idempotency_key(conn, key, opts) request_payload_hash = hash_request_payload(conn, opts) - case RequestTracker.track(tracker, idempotency_key_hash, request_payload_hash) do - {:processing, _node_caller, _expires} -> - raise ConcurrentRequestError + metadata = %{ + conn: conn, + tracker: tracker, + idempotency_key: key + } - {:mismatch, {:fingerprint, fingerprint}, _expires} -> - raise RequestPayloadFingerprintMismatchError, fingerprint: fingerprint + :telemetry.span([:idempotency_plug, :track], metadata, fn -> + case RequestTracker.track(tracker, idempotency_key_hash, request_payload_hash) do + {:processing, _node_caller, _expires} -> + raise ConcurrentRequestError - {:cache, {:halted, reason}, _expires} -> - raise HaltedResponseError, reason: reason + {:mismatch, {:fingerprint, fingerprint}, _expires} -> + raise RequestPayloadFingerprintMismatchError, fingerprint: fingerprint - {:cache, {:ok, response}, expires} -> - conn - |> put_expires_header(expires) - |> set_resp(response) - |> Conn.halt() + {:cache, {:halted, reason}, _expires} -> + raise HaltedResponseError, reason: reason - {:init, idempotency_key, _expires} -> - update_response_before_send(conn, idempotency_key, opts) + {:cache, {:ok, response}, expires} -> + conn = + conn + |> put_expires_header(expires) + |> set_resp(response) + |> Conn.halt() - {:error, error} -> - raise "failed to track request, got: #{error}" - end + {conn, %{metadata | conn: conn}} + + {:init, idempotency_key, _expires} -> + conn = update_response_before_send(conn, idempotency_key, opts) + + {conn, %{metadata | conn: conn}} + + {:error, error} -> + raise "failed to track request, got: #{error}" + end + end) end @doc """ diff --git a/lib/idempotency_plug/request_tracker.ex b/lib/idempotency_plug/request_tracker.ex index 594fbda..8c238b0 100644 --- a/lib/idempotency_plug/request_tracker.ex +++ b/lib/idempotency_plug/request_tracker.ex @@ -41,6 +41,22 @@ defmodule IdempotencyPlug.RequestTracker do * `:store` - the cache store module to use to store the cache objects. Defaults to `{IdempotencyPlug.ETSStore, [table: #{__MODULE__}]}`. + ## Telemetry events + + The following events are emitted by the tracker: + + * `[:idempotency_plug, :request_tracker, :cache_miss]` - dispatched after request has not found in the cache + * Measurement: `%{}` + * Metadata: `%{telemetry_span_context: term(), request_id: binary, fingerprint: binary, store: atom, expires_at: DateTime.t}` + + * `[:idempotency_plug, :request_tracker, :cache_hit]` - dispatched after request has been found in the cache + * Measurement: `%{}` + * Metadata: `%{telemetry_span_context: term(), request_id: binary, fingerprint: binary, store: atom, expires_at: DateTime.t}` + + * `[:idempotency_plug, :request_tracker, :prune]` - dispatched before the cache is pruned + * Measurement: `%{}` + * Metadata: `%{telemetry_span_context: term()}` + ## Examples children = [ @@ -133,11 +149,20 @@ defmodule IdempotencyPlug.RequestTracker do def handle_call({:track, request_id, fingerprint}, {caller, _}, state) do {store, store_opts} = fetch_store(state.options) + metadata = %{ + request_id: request_id, + fingerprint: fingerprint, + store: store, + expires_at: nil + } + case store.lookup(request_id, store_opts) do :not_found -> data = {:processing, {Node.self(), caller}} expires_at = expires_at(state.options) + execute_telemetry(:cache_miss, %{metadata | expires_at: expires_at}) + case store.insert(request_id, data, fingerprint, expires_at, store_opts) do :ok -> {:reply, {:init, request_id, expires_at}, put_monitored(state, request_id, caller)} @@ -147,15 +172,23 @@ defmodule IdempotencyPlug.RequestTracker do end {{:processing, node_caller}, ^fingerprint, expires} -> + execute_telemetry(:cache_hit, %{metadata | expires_at: expires}) + {:reply, {:processing, node_caller, expires}, state} {{:halted, reason}, ^fingerprint, expires} -> + execute_telemetry(:cache_hit, %{metadata | expires_at: expires}) + {:reply, {:cache, {:halted, reason}, expires}, state} {{:ok, response}, ^fingerprint, expires} -> + execute_telemetry(:cache_hit, %{metadata | expires_at: expires}) + {:reply, {:cache, {:ok, response}, expires}, state} {_res, other_fingerprint, expires} -> + execute_telemetry(:cache_hit, %{metadata | expires_at: expires}) + {:reply, {:mismatch, {:fingerprint, other_fingerprint}, expires}, state} end end @@ -172,6 +205,14 @@ defmodule IdempotencyPlug.RequestTracker do end end + defp execute_telemetry(event, metadata) do + :telemetry.execute( + [:idempotency_plug, :request_tracker, event], + _measurements = %{}, + metadata + ) + end + defp put_monitored(state, request_id, caller) do ref = Process.monitor(caller) @@ -201,6 +242,10 @@ defmodule IdempotencyPlug.RequestTracker do def handle_info(:prune, state) do {store, store_opts} = fetch_store(state.options) + metadata = %{store: store} + + execute_telemetry(:prune, metadata) + store.prune(store_opts) Process.send_after(self(), :prune, Keyword.fetch!(state.options, :prune)) diff --git a/mix.exs b/mix.exs index f3b06a9..606b21a 100644 --- a/mix.exs +++ b/mix.exs @@ -39,6 +39,7 @@ defmodule IdempotencyPlug.MixProject do [ {:plug, "~> 1.14"}, {:jason, "~> 1.2"}, + {:telemetry, "~> 1.0"}, {:ecto, "~> 3.9", optional: true}, {:ecto_sql, "~> 3.9", optional: true}, diff --git a/test/idempotency_plug/request_tracker_test.exs b/test/idempotency_plug/request_tracker_test.exs index 3945bd1..c16f71e 100644 --- a/test/idempotency_plug/request_tracker_test.exs +++ b/test/idempotency_plug/request_tracker_test.exs @@ -13,14 +13,33 @@ defmodule IdempotencyPlug.RequestTrackerTest do test "with no cached response", %{pid: pid} do expires_after = DateTime.add(DateTime.utc_now(), 24, :hour) + ref = + :telemetry_test.attach_event_handlers(self(), [ + [:idempotency_plug, :request_tracker, :cache_miss] + ]) + assert {:init, key, expires} = RequestTracker.track(pid, "no-cache", "fingerprint") assert DateTime.compare(expires, expires_after) != :lt + assert_receive {[:idempotency_plug, :request_tracker, :cache_miss], ^ref, measurements, + metadata} + + assert measurements == %{} + assert metadata.request_id == key + assert metadata.fingerprint == "fingerprint" + assert metadata.store == IdempotencyPlug.ETSStore + assert metadata.expires_at == expires + assert {:ok, expires} = RequestTracker.put_response(pid, key, "OK") assert DateTime.compare(expires, expires_after) != :lt end test "with concurrent requests", %{pid: pid} do + ref = + :telemetry_test.attach_event_handlers(self(), [ + [:idempotency_plug, :request_tracker, :cache_hit] + ]) + test_pid = self() task = @@ -42,6 +61,11 @@ defmodule IdempotencyPlug.RequestTrackerTest do {:expires, expires} -> assert {:processing, _node_caller, ^expires} = RequestTracker.track(pid, "concurrent-request", "fingerprint") + + assert_receive {[:idempotency_plug, :request_tracker, :cache_hit], ^ref, _measurements, + metadata} + + assert metadata.expires_at == expires end send(task.pid, :continue) @@ -57,16 +81,36 @@ defmodule IdempotencyPlug.RequestTrackerTest do {:init, key, _expires} = RequestTracker.track(pid, "cached-fingerprint", "fingerprint") {:ok, expires} = RequestTracker.put_response(pid, key, "OK") + ref = + :telemetry_test.attach_event_handlers(self(), [ + [:idempotency_plug, :request_tracker, :cache_hit] + ]) + assert {:mismatch, {:fingerprint, "fingerprint"}, ^expires} = RequestTracker.track(pid, "cached-fingerprint", "other-fingerprint") + + assert_receive {[:idempotency_plug, :request_tracker, :cache_hit], ^ref, _measurements, + metadata} + + assert metadata.expires_at == expires end test "with cached response", %{pid: pid} do {:init, key, _expires} = RequestTracker.track(pid, "cached-response", "fingerprint") {:ok, expires} = RequestTracker.put_response(pid, key, "OK") + ref = + :telemetry_test.attach_event_handlers(self(), [ + [:idempotency_plug, :request_tracker, :cache_hit] + ]) + assert {:cache, {:ok, "OK"}, ^expires} = RequestTracker.track(pid, "cached-response", "fingerprint") + + assert_receive {[:idempotency_plug, :request_tracker, :cache_hit], ^ref, _measurements, + metadata} + + assert metadata.expires_at == expires end @tag capture_log: true @@ -81,8 +125,18 @@ defmodule IdempotencyPlug.RequestTrackerTest do {{%RuntimeError{message: "oops"}, _}, _} = catch_exit(Task.await(task)) - assert {:cache, {:halted, {%RuntimeError{message: "oops"}, _}}, _expires} = + ref = + :telemetry_test.attach_event_handlers(self(), [ + [:idempotency_plug, :request_tracker, :cache_hit] + ]) + + assert {:cache, {:halted, {%RuntimeError{message: "oops"}, _}}, expires} = RequestTracker.track(pid, "halted-request", "fingerprint") + + assert_receive {[:idempotency_plug, :request_tracker, :cache_hit], ^ref, _measurements, + metadata} + + assert metadata.expires_at == expires end test "when no tracked request", %{pid: pid} do @@ -97,7 +151,15 @@ defmodule IdempotencyPlug.RequestTrackerTest do assert {:processing, _node_caller, _expires} = RequestTracker.track(pid, "prune", "fingerprint") + ref = + :telemetry_test.attach_event_handlers(self(), [ + [:idempotency_plug, :request_tracker, :prune] + ]) + :timer.sleep(20) assert {:init, _id, _expires} = RequestTracker.track(pid, "prune", "fingerprint") + + assert_receive {[:idempotency_plug, :request_tracker, :prune], ^ref, _measurements, metadata} + assert metadata.store == IdempotencyPlug.ETSStore end end diff --git a/test/idempotency_plug_test.exs b/test/idempotency_plug_test.exs index 5e0c230..06af85c 100644 --- a/test/idempotency_plug_test.exs +++ b/test/idempotency_plug_test.exs @@ -58,17 +58,34 @@ defmodule IdempotencyPlugTest do end test "with no cached response", %{conn: conn, tracker: tracker} do + ref = + :telemetry_test.attach_event_handlers(self(), [ + [:idempotency_plug, :track, :start], + [:idempotency_plug, :track, :stop] + ]) + conn = run_plug(conn, tracker) refute conn.halted assert conn.status == 200 assert conn.resp_body == "OK" assert expires(conn) + + assert_receive {[:idempotency_plug, :track, :start], ^ref, _measurements, metadata} + assert is_pid(metadata.tracker) + assert %Plug.Conn{resp_body: nil} = metadata.conn + assert [metadata.idempotency_key] == get_req_header(conn, "idempotency-key") + assert_receive {[:idempotency_plug, :track, :stop], ^ref, _measurements, _metadata} end test "with concurrent request", %{conn: conn, tracker: tracker} do pid = self() + ref = + :telemetry_test.attach_event_handlers(self(), [ + [:idempotency_plug, :track, :exception] + ]) + task = Task.async(fn -> run_plug(conn, tracker, @@ -100,10 +117,18 @@ defmodule IdempotencyPlugTest do send(task.pid, :continue) Task.await(task) + + assert_receive {[:idempotency_plug, :track, :exception], ^ref, _measurements, metadata} + assert %IdempotencyPlug.ConcurrentRequestError{} = metadata.reason end @tag capture_log: true test "with halted response", %{conn: conn, tracker: tracker} do + ref = + :telemetry_test.attach_event_handlers(self(), [ + [:idempotency_plug, :track, :exception] + ]) + Process.flag(:trap_exit, true) task = Task.async(fn -> run_plug(conn, tracker, callback: fn _conn -> raise "failed" end) end) {{%RuntimeError{}, _}, _} = catch_exit(Task.await(task)) @@ -117,9 +142,17 @@ defmodule IdempotencyPlugTest do assert error.message =~ "The original request was interrupted and can't be recovered as it's in an unknown state" + + assert_receive {[:idempotency_plug, :track, :exception], ^ref, _measurements, metadata} + assert %IdempotencyPlug.HaltedResponseError{} = metadata.reason end test "with cached response", %{conn: conn, tracker: tracker} do + ref = + :telemetry_test.attach_event_handlers(self(), [ + [:idempotency_plug, :track, :stop] + ]) + other_conn = run_plug(conn, tracker, callback: fn conn -> @@ -129,6 +162,8 @@ defmodule IdempotencyPlugTest do end ) + assert_receive {[:idempotency_plug, :track, :stop], _, _, _} + conn = run_plug(conn, tracker) assert conn.halted @@ -136,9 +171,17 @@ defmodule IdempotencyPlugTest do assert conn.resp_body == "OTHER" assert expires(conn) == expires(other_conn) assert get_resp_header(conn, "x-header-key") == ["header-value"] + + assert_receive {[:idempotency_plug, :track, :stop], ^ref, _measurements, metadata} + assert metadata.conn.resp_body == "OTHER" end test "with cached response with different request payload", %{conn: conn, tracker: tracker} do + ref = + :telemetry_test.attach_event_handlers(self(), [ + [:idempotency_plug, :track, :exception] + ]) + _other_conn = conn |> other_request_payload() @@ -153,6 +196,9 @@ defmodule IdempotencyPlugTest do assert error.message =~ "This `Idempotency-Key` can't be reused with a different payload or URI" + + assert_receive {[:idempotency_plug, :track, :exception], ^ref, _measurements, metadata} + assert %IdempotencyPlug.RequestPayloadFingerprintMismatchError{} = metadata.reason end test "with cached response with different request URI", %{conn: conn, tracker: tracker} do