Skip to content

Commit 065b921

Browse files
committed
Support streams in STOMP plugin
This commit introduces the support of an x-stream-offset header in the SUBSCRIBE frame to start consuming from a specific place in a stream. The possible values are first, last, next, offset:<offset-value> (e.g. offset:40000), timestamp:<timestamp-in-seconds> (e.g. timestamp:1619428685). This commit also propagates the x-stream-offset header in the MESSAGE frame to know the offset of a the delivered message in the stream.
1 parent bedc46a commit 065b921

File tree

5 files changed

+52
-2
lines changed

5 files changed

+52
-2
lines changed

deps/rabbitmq_stomp/include/rabbit_stomp_headers.hrl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
-define(HEADER_PASSCODE, "passcode").
2727
-define(HEADER_PERSISTENT, "persistent").
2828
-define(HEADER_PREFETCH_COUNT, "prefetch-count").
29+
-define(HEADER_X_STREAM_OFFSET, "x-stream-offset").
2930
-define(HEADER_PRIORITY, "priority").
3031
-define(HEADER_RECEIPT, "receipt").
3132
-define(HEADER_REDELIVERED, "redelivered").

deps/rabbitmq_stomp/src/rabbit_stomp_frame.erl

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
boolean_header/2, boolean_header/3,
2828
integer_header/2, integer_header/3,
2929
binary_header/2, binary_header/3]).
30+
-export([stream_offset_header/2]).
3031
-export([serialize/1, serialize/2]).
3132

3233
initial_state() -> none.
@@ -222,6 +223,26 @@ binary_header(F, K) ->
222223

223224
binary_header(F, K, D) -> default_value(binary_header(F, K), D).
224225

226+
stream_offset_header(F, D) ->
227+
OffsetPrefix = <<"offset:">>,
228+
OffsetPrefixLength = byte_size(OffsetPrefix),
229+
TimestampPrefix = <<"timestamp:">>,
230+
TimestampPrefixLength = byte_size(TimestampPrefix),
231+
case binary_header(F, ?HEADER_X_STREAM_OFFSET, D) of
232+
<<"first">> ->
233+
{longstr, <<"first">>};
234+
<<"last">> ->
235+
{longstr, <<"last">>};
236+
<<"next">> ->
237+
{longstr, <<"next">>};
238+
<<OffsetPrefix:OffsetPrefixLength/binary, OffsetValue/binary>> ->
239+
{long, binary_to_integer(OffsetValue)};
240+
<<TimestampPrefix:TimestampPrefixLength/binary, TimestampValue/binary>> ->
241+
{timestamp, binary_to_integer(TimestampValue)};
242+
_ ->
243+
D
244+
end.
245+
225246
serialize(Frame) ->
226247
serialize(Frame, true).
227248

deps/rabbitmq_stomp/src/rabbit_stomp_processor.erl

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -685,6 +685,13 @@ do_subscribe(Destination, DestHdr, Frame,
685685
{stop, normal, close_connection(State)};
686686
error ->
687687
ExchangeAndKey = parse_routing(Destination, DfltTopicEx),
688+
StreamOffset = rabbit_stomp_frame:stream_offset_header(Frame, undefined),
689+
Arguments = case StreamOffset of
690+
undefined ->
691+
[];
692+
{Type, Value} ->
693+
[{<<"x-stream-offset">>, Type, Value}]
694+
end,
688695
try
689696
amqp_channel:subscribe(Channel,
690697
#'basic.consume'{
@@ -693,7 +700,7 @@ do_subscribe(Destination, DestHdr, Frame,
693700
no_local = false,
694701
no_ack = (AckMode == auto),
695702
exclusive = false,
696-
arguments = []},
703+
arguments = Arguments},
697704
self()),
698705
ok = rabbit_routing_util:ensure_binding(
699706
Queue, ExchangeAndKey, Channel)

deps/rabbitmq_stomp/src/rabbit_stomp_util.erl

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,8 @@ adhoc_convert_headers(Headers, Existing) ->
115115
[{binary_to_list(K), binary_to_list(V)} | Acc];
116116
({K, signedint, V}, Acc) ->
117117
[{binary_to_list(K), integer_to_list(V)} | Acc];
118+
({K, long, V}, Acc) ->
119+
[{binary_to_list(K), integer_to_list(V)} | Acc];
118120
(_, Acc) ->
119121
Acc
120122
end, Existing, Headers).

deps/rabbitmq_stomp/test/frame_SUITE.erl

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@ all() ->
3838
header_value_with_cr,
3939
header_value_with_colon,
4040
headers_escaping_roundtrip,
41-
headers_escaping_roundtrip_without_trailing_lf
41+
headers_escaping_roundtrip_without_trailing_lf,
42+
stream_offset_header
4243
].
4344

4445
parse_simple_frame(_) ->
@@ -162,6 +163,24 @@ header_value_with_colon(_) ->
162163
headers = [{"header", "val:ue"}],
163164
body_iolist = []}).
164165

166+
stream_offset_header(_) ->
167+
TestCases = [
168+
{{"x-stream-offset", "first"}, {longstr, <<"first">>}},
169+
{{"x-stream-offset", "last"}, {longstr, <<"last">>}},
170+
{{"x-stream-offset", "next"}, {longstr, <<"next">>}},
171+
{{"x-stream-offset", "offset:5000"}, {long, 5000}},
172+
{{"x-stream-offset", "timestamp:1000"}, {timestamp, 1000}},
173+
{{"x-stream-offset", "foo"}, undefined},
174+
{{"some-header", "some value"}, undefined}
175+
],
176+
177+
lists:foreach(fun({Header, Expected}) ->
178+
?assertEqual(
179+
Expected,
180+
rabbit_stomp_frame:stream_offset_header(#stomp_frame{headers = [Header]}, undefined)
181+
)
182+
end, TestCases).
183+
165184
test_frame_serialization(Expected, TrailingLF) ->
166185
{ok, Frame, _} = parse(Expected),
167186
{ok, Val} = rabbit_stomp_frame:header(Frame, "head\r:\ner"),

0 commit comments

Comments
 (0)