Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
118 commits
Select commit Hold shift + click to select a range
8710565
Use 1 instead of 22 Erlang processes per MQTT connection
ansd Aug 5, 2022
24b0a6b
Publish with QoS0 via queue_type interface
ansd Aug 12, 2022
eac0622
Consume with QoS0 via queue_type interface
ansd Aug 13, 2022
f4d1f68
Move authn / authz into rabbitmq_mqtt
ansd Sep 1, 2022
73ad3ba
Revert maybe expression
ansd Sep 2, 2022
77da78f
Get most auth_SUITE tests green
ansd Sep 6, 2022
218ee19
Make proxy_protocol tests green
ansd Sep 18, 2022
99337b8
Emit stats
ansd Sep 19, 2022
cdd253e
Receive many messages from classic queue
ansd Sep 20, 2022
23dac49
Support QoS 1 for sending and receiving
ansd Sep 22, 2022
a02cbb7
Get all existing rabbitmq_mqtt tests green
ansd Sep 26, 2022
6f00ccb
Get all existing rabbitmq_web_mqtt tests green
ansd Sep 28, 2022
fc33719
Use 1 instead of 4 processes per WebMQTT connection
ansd Sep 30, 2022
5710a94
Support MQTT Keepalive in WebMQTT
ansd Oct 3, 2022
7a325a3
Remove deprecated cowboy return types from web mqtt
ChunyiLyu Oct 4, 2022
43bd548
Handle deprecated classic queue delivery
ansd Oct 10, 2022
ab8957b
Use best-effort client ID tracking
ansd Oct 10, 2022
199238d
Use pg to track MQTT client IDs
ansd Oct 12, 2022
3e28a52
Convert rabbit_mqtt_reader from gen_server2 to gen_server
ansd Oct 12, 2022
4b1c2c8
Emit cluster-wide MQTT connection infos
ansd Oct 14, 2022
af68fb4
Decrease memory usage of queue_type state
ansd Oct 18, 2022
07ad410
Skip queue when MQTT QoS 0
ansd Oct 23, 2022
b74dea4
Send rabbit event declaring mqtt_qos0 queue
ChunyiLyu Oct 27, 2022
627ea85
Add rabbit_event tests for MQTT
ansd Oct 27, 2022
8126925
Implement format_status for mqtt reader
ChunyiLyu Oct 28, 2022
33bf215
Add test for publishing via MQTT to different queue types
ansd Oct 28, 2022
816fedf
Enable flow control to target classic queue
ansd Nov 1, 2022
14f59f1
Handle soft limit exceeded as queue action
ansd Nov 1, 2022
645531b
Register mqtt connections in case event refresh
ChunyiLyu Nov 2, 2022
1925862
Test drain closing connections in web mqtt
ChunyiLyu Nov 3, 2022
df491d3
Use eqmtt client in web mqtt tests
ChunyiLyu Nov 4, 2022
4c15299
Delete old emqttc client
ansd Nov 7, 2022
319af38
Handle duplicate packet IDs
ansd Nov 8, 2022
96854a8
Use emqtt:publish in mqtt tests
ChunyiLyu Nov 8, 2022
9fd5704
Fix mixed version Web MQTT system tests
ansd Nov 8, 2022
17c5dff
Set common global counters for mqtt
ChunyiLyu Nov 10, 2022
38e5e20
Add tests
ansd Nov 10, 2022
de984d0
Subs from 1 connection counts as 1 consumer in global counter
ChunyiLyu Nov 11, 2022
ab5007a
Handle queue deletion
ansd Nov 11, 2022
6533532
Simplify counters
ansd Nov 13, 2022
7fc2234
Test ETS and NOOP retained message stores
ansd Nov 14, 2022
16b5ec5
Add missing unblock stream queue action
ansd Nov 14, 2022
b97006c
Output username in connection closed event
ansd Nov 15, 2022
bda52db
Support consuming classic mirrored queue failover
ansd Nov 16, 2022
aad7e1c
Add test for consuming MQTT classic queue going down
ansd Nov 17, 2022
0b43f00
Remove subscriptions map from proc state in mqtt
ChunyiLyu Nov 17, 2022
016451e
Reset application env in MQTT flow tests
ansd Nov 17, 2022
80f8e07
Implement consumer global counter for clean sess false
ChunyiLyu Nov 17, 2022
fb81a92
Add ws_connect helper to web mqtt tests
ChunyiLyu Nov 17, 2022
e06d3e7
Unblock queue when it gets deleted
ansd Nov 17, 2022
61a33da
Make rabbit_fifo_dlx_worker tests less flaky
ansd Nov 18, 2022
b2c87c5
Minor reformatting and renaming
ansd Nov 18, 2022
0ba0a6e
Several small improvements
ansd Nov 18, 2022
6e527fb
Replace existing subscription
ansd Nov 21, 2022
c3779d9
Implement message consuming counters in mqtt
ChunyiLyu Nov 21, 2022
5739342
Consume from queue once
ansd Nov 21, 2022
16fa122
Avoid exceptions in mixed version cluster
ansd Nov 23, 2022
075bc06
Handle messages_delivered_consume_*_ack counters at delivery
ChunyiLyu Nov 24, 2022
65bc0c3
Fix global counters
ansd Nov 24, 2022
14b3b93
Make ff_SUITE less flaky
ansd Nov 24, 2022
76f4598
Send last will if client did not DISCONNECT
ansd Nov 25, 2022
6815ceb
Fix mixed version reader_SUITE will test
ansd Nov 25, 2022
7bc8208
Remove local record definitions from header files
ansd Nov 28, 2022
1493cbe
Rename message_id to packet_id
ansd Nov 28, 2022
6605a16
Fix rabbitmq_web_mqtt:system_SUITE-mixed
ansd Nov 28, 2022
30a9ea5
Use connect helper func in more mqtt tests
ChunyiLyu Nov 29, 2022
f842ffd
Add feature flag rabbit_mqtt_qos0_queue
ansd Nov 29, 2022
46e8a65
Check if state.stats_timer is undefined to avoid crashing
ChunyiLyu Nov 30, 2022
7782142
Reduce memory usage of reader processes
ansd Dec 1, 2022
1180429
Terminate connection if mqtt not found in web sup protocol
ChunyiLyu Dec 1, 2022
86de0a1
Reduce memory usage of MQTT connection process
ansd Dec 2, 2022
3980c28
Allow higher load on Mnesia by default
ansd Dec 3, 2022
15636fd
Rename frame to packet
ansd Dec 4, 2022
61f6ca7
Support iodata() when sending message to MQTT client
ansd Dec 4, 2022
b3215eb
Set close header when close connection in web_mqtt init/2
ChunyiLyu Dec 6, 2022
2a7f22f
Web mqtt: close connection when receive invalid data
ChunyiLyu Dec 6, 2022
aa8d16f
Web mqtt: return hibernate in handle_credits
ChunyiLyu Dec 6, 2022
d5e497f
Web mqtt: backfill test for connecting with duplicated id
ChunyiLyu Dec 6, 2022
aea7ff8
Use helper to connect to node in mqtt cluster suite
ChunyiLyu Dec 8, 2022
de28560
Extract connect to node helper in rmq mqtt tests
ChunyiLyu Dec 8, 2022
97fefff
Add overflow drop-head to rabbit_mqtt_qos_queue type
ansd Dec 12, 2022
bd0acb3
Remove test helper util:connect_to_node/3
ansd Dec 12, 2022
fb91300
Add func specs for mqtt process_packet and process_request
ChunyiLyu Dec 13, 2022
482bef5
Stop rabbit_mqtt_packet:parse from crashing in web mqtt
ChunyiLyu Dec 14, 2022
4ca12b7
Fix func spec for mqtt process_request
ChunyiLyu Dec 15, 2022
340e930
Web mqtt returns 1002 with mqtt parsing error
ChunyiLyu Dec 15, 2022
4fa8e83
Allow undefined in some mqtt record type fields
ChunyiLyu Dec 16, 2022
cb68e48
Resolve some dialyzer issues
ChunyiLyu Dec 16, 2022
56e97a9
Fix MQTT in management plugin
ansd Dec 19, 2022
1720aa0
Allow CLI listing rabbit_mqtt_qos0_queue queues
ansd Dec 22, 2022
a8b69b4
Fix dialyzer issues and add function specs
ansd Dec 27, 2022
fb6c8da
Block Web MQTT connection if memory or disk alarm
ansd Dec 28, 2022
c9df098
Handle topic, username, password as binaries
ansd Dec 28, 2022
7c1aa49
Increase MQTT test coverage and fix edge cases
ansd Dec 29, 2022
d651f87
Share tests between MQTT and Web MQTT
ansd Jan 2, 2023
6ba2dc4
Switch to Logger macros
ansd Jan 4, 2023
fb93a3c
Block only publishing (Web) MQTT connections
ansd Jan 4, 2023
9283b4f
Add test AMQP 0.9.1 to MQTT with QoS 0
ansd Jan 5, 2023
a341912
Expand clean_session=false test
ansd Jan 10, 2023
35afeff
Eliminate bindings query CPU bottleneck
ansd Jan 10, 2023
863b7ea
Include non-AMQP connections in connection count
ansd Jan 12, 2023
f46f054
Fix "Clean Session" state for QoS 0 subscriptions
ansd Jan 13, 2023
b5febb8
Hold subsriptions in process state
ansd Jan 13, 2023
dcb00f1
Reduce test load
ansd Jan 14, 2023
a4db85d
Make pipeline fail when there are dialyzer warnings
ansd Jan 14, 2023
63ccf3e
Reduce inter-node traffic for MQTT QoS 0 queue type
ansd Jan 16, 2023
437cbb7
Use delegate for stateless deliveries
ansd Jan 16, 2023
3f85f8d
Do not log message payload
ansd Jan 17, 2023
d4cfbdd
Parse at most maximum packet length of 256MB
ansd Jan 18, 2023
d86ce70
Add missing type definitions in mqtt records
ChunyiLyu Jan 19, 2023
cd8962b
Remove optional rabbit_queue_type callbacks
ansd Jan 19, 2023
8a2a82e
Remove feature flag no_queue_name_in_classic_queue_client
ansd Jan 19, 2023
9db8626
Re-enable dialyzer option Wunmatched_returns
ansd Jan 23, 2023
6cc65ec
Export opaque types for event and mqtt_packet state
ChunyiLyu Jan 24, 2023
9c2f597
Support tracing in Native MQTT
ansd Jan 24, 2023
ec137bc
Add nested config record for rarely changing fields
ansd Jan 25, 2023
1f106fc
Fix wrong and add missing type specs
ansd Jan 25, 2023
50e2577
Adding missing function specs
ChunyiLyu Jan 25, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,13 @@ set_default_config() ->
{schedule_ms_limit, 0},
{heap_word_limit, 0},
{busy_port, false},
{busy_dist_port, true}]}
| OsirisConfig
{busy_dist_port, true}]},
{mnesia,
[
{dump_log_write_threshold, 5000},
{dump_log_time_threshold, 90000}
]}
| OsirisConfig
],
apply_erlang_term_based_config(Config).

Expand Down
4 changes: 2 additions & 2 deletions deps/rabbit/docs/rabbitmq.conf.example
Original file line number Diff line number Diff line change
Expand Up @@ -755,7 +755,7 @@
# stomp.tcp_listen_options.nodelay = true
#
# stomp.tcp_listen_options.exit_on_close = true
# stomp.tcp_listen_options.send_timeout = 120
# stomp.tcp_listen_options.send_timeout = 120000

## Proxy protocol support
##
Expand Down Expand Up @@ -838,7 +838,7 @@
# mqtt.tcp_listen_options.nodelay = true
#
# mqtt.tcp_listen_options.exit_on_close = true
# mqtt.tcp_listen_options.send_timeout = 120
# mqtt.tcp_listen_options.send_timeout = 120000

## TLS listener settings
## ## See https://rabbitmq.com/mqtt.html and https://rabbitmq.com/ssl.html for details.
Expand Down
9 changes: 5 additions & 4 deletions deps/rabbit/src/rabbit.erl
Original file line number Diff line number Diff line change
Expand Up @@ -727,7 +727,8 @@ status() ->
true ->
[{virtual_host_count, rabbit_vhost:count()},
{connection_count,
length(rabbit_networking:connections_local())},
length(rabbit_networking:connections_local()) +
length(rabbit_networking:local_non_amqp_connections())},
{queue_count, total_queue_count()}];
false ->
[]
Expand Down Expand Up @@ -1163,12 +1164,12 @@ config_locations() ->
% This event is necessary for the stats timer to be initialized with
% the correct values once the management agent has started
force_event_refresh(Ref) ->
% direct connections, e.g. MQTT, STOMP
% direct connections, e.g. STOMP
ok = rabbit_direct:force_event_refresh(Ref),
% AMQP connections
ok = rabbit_networking:force_connection_event_refresh(Ref),
% "external" connections, which are not handled by the "AMQP core",
% e.g. connections to the stream plugin
% non-AMQP connections, which are not handled by the "AMQP core",
% e.g. connections to the stream and MQTT plugins
ok = rabbit_networking:force_non_amqp_connection_event_refresh(Ref),
ok = rabbit_channel:force_event_refresh(Ref),
ok = rabbit_amqqueue:force_event_refresh(Ref).
Expand Down
10 changes: 5 additions & 5 deletions deps/rabbit/src/rabbit_access_control.erl
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

-export_type([permission_atom/0]).

-type permission_atom() :: 'configure' | 'read' | 'write'.
-type permission_atom() :: 'configure' | 'write' | 'read'.

%%----------------------------------------------------------------------------

Expand Down Expand Up @@ -194,8 +194,8 @@ check_resource_access(User = #user{username = Username,
check_access(
fun() -> Module:check_resource_access(
auth_user(User, Impl), Resource, Permission, Context) end,
Module, "access to ~ts refused for user '~ts'",
[rabbit_misc:rs(Resource), Username]);
Module, "~s access to ~s refused for user '~s'",
[Permission, rabbit_misc:rs(Resource), Username]);
(_, Else) -> Else
end, ok, Modules).

Expand All @@ -207,8 +207,8 @@ check_topic_access(User = #user{username = Username,
check_access(
fun() -> Module:check_topic_access(
auth_user(User, Impl), Resource, Permission, Context) end,
Module, "access to topic '~ts' in exchange ~ts refused for user '~ts'",
[maps:get(routing_key, Context), rabbit_misc:rs(Resource), Username]);
Module, "~s access to topic '~s' in exchange ~s refused for user '~s'",
[Permission, maps:get(routing_key, Context), rabbit_misc:rs(Resource), Username]);
(_, Else) -> Else
end, ok, Modules).

Expand Down
6 changes: 6 additions & 0 deletions deps/rabbit/src/rabbit_alarm.erl
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@

-export([remote_conserve_resources/3]). %% Internal use only

-export_type([resource_alarm_source/0,
resource_alert/0]).

-define(SERVER, ?MODULE).

-define(FILE_DESCRIPTOR_RESOURCE, <<"file descriptors">>).
Expand All @@ -46,6 +49,9 @@
-type resource_alarm_source() :: 'disk' | 'memory'.
-type resource_alarm() :: {resource_limit, resource_alarm_source(), node()}.
-type alarm() :: local_alarm() | resource_alarm().
-type resource_alert() :: {WasAlarmSetForNode :: boolean(),
IsThereAnyAlarmsWithSameSourceInTheCluster :: boolean(),
NodeForWhichAlarmWasSetOrCleared :: node()}.

%%----------------------------------------------------------------------------

Expand Down
2 changes: 1 addition & 1 deletion deps/rabbit/src/rabbit_amqqueue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1763,7 +1763,7 @@ basic_get(Q, NoAck, LimiterPid, CTag, QStates) ->
non_neg_integer(), rabbit_types:ctag(), boolean(),
rabbit_framing:amqp_table(), any(), rabbit_types:username(),
rabbit_queue_type:state()) ->
{ok, rabbit_queue_type:state(), rabbit_queue_type:actions()} |
{ok, rabbit_queue_type:state()} |
{error, term()} |
{protocol_error, Type :: atom(), Reason :: string(), Args :: term()}.
basic_consume(Q, NoAck, ChPid, LimiterPid,
Expand Down
3 changes: 2 additions & 1 deletion deps/rabbit/src/rabbit_amqqueue_process.erl
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@

-define(CREATION_EVENT_KEYS,
[name,
type,
durable,
auto_delete,
arguments,
Expand All @@ -129,7 +130,7 @@
user_who_performed_action
]).

-define(INFO_KEYS, [pid | ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [name]]).
-define(INFO_KEYS, [pid | ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [name, type]]).

%%----------------------------------------------------------------------------

Expand Down
65 changes: 35 additions & 30 deletions deps/rabbit/src/rabbit_binding.erl
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@

-export([recover/0, recover/2, exists/1, add/2, add/3, remove/1, remove/2, remove/3, remove/4]).
-export([list/1, list_for_source/1, list_for_destination/1,
list_for_source_and_destination/2, list_explicit/0,
list_between/2, has_any_between/2]).
list_for_source_and_destination/2, list_for_source_and_destination/3,
list_explicit/0, list_between/2, has_any_between/2]).
-export([new_deletions/0, combine_deletions/2, add_deletion/3,
process_deletions/2, binding_action/3]).
-export([info_keys/0, info/1, info/2, info_all/1, info_all/2, info_all/4]).
Expand Down Expand Up @@ -253,26 +253,18 @@ list(VHostPath) ->
list_for_source(?DEFAULT_EXCHANGE(VHostPath)) ->
implicit_bindings(VHostPath);
list_for_source(SrcName) ->
mnesia:async_dirty(
fun() ->
Route = #route{binding = #binding{source = SrcName, _ = '_'}},
[B || #route{binding = B}
<- mnesia:match_object(rabbit_route, Route, read)]
end).
Route = #route{binding = #binding{source = SrcName, _ = '_'}},
Fun = list_for_route(Route, false),
mnesia:async_dirty(Fun).

-spec list_for_destination
(rabbit_types:binding_destination()) -> bindings().

list_for_destination(DstName = #resource{}) ->
ExplicitBindings = mnesia:async_dirty(
fun() ->
Route = #route{binding = #binding{destination = DstName,
_ = '_'}},
[reverse_binding(B) ||
#reverse_route{reverse_binding = B} <-
mnesia:match_object(rabbit_reverse_route,
reverse_route(Route), read)]
end),
Route = #route{binding = #binding{destination = DstName,
_ = '_'}},
Fun = list_for_route(Route, true),
ExplicitBindings = mnesia:async_dirty(Fun),
implicit_for_destination(DstName) ++ ExplicitBindings.

-spec list_between(
Expand Down Expand Up @@ -316,27 +308,40 @@ implicit_for_destination(DstQueue = #resource{kind = queue,
implicit_for_destination(_) ->
[].

-spec list_for_source_and_destination
(rabbit_types:binding_source(), rabbit_types:binding_destination()) ->
bindings().
-spec list_for_source_and_destination(rabbit_types:binding_source(), rabbit_types:binding_destination()) ->
bindings().
list_for_source_and_destination(SrcName, DstName) ->
list_for_source_and_destination(SrcName, DstName, false).

-spec list_for_source_and_destination(rabbit_types:binding_source(), rabbit_types:binding_destination(), boolean()) ->
bindings().
list_for_source_and_destination(?DEFAULT_EXCHANGE(VHostPath),
#resource{kind = queue,
virtual_host = VHostPath,
name = QName} = DstQueue) ->
name = QName} = DstQueue,
_Reverse) ->
[#binding{source = ?DEFAULT_EXCHANGE(VHostPath),
destination = DstQueue,
key = QName,
args = []}];
list_for_source_and_destination(SrcName, DstName) ->
mnesia:async_dirty(
fun() ->
Route = #route{binding = #binding{source = SrcName,
destination = DstName,
_ = '_'}},
[B || #route{binding = B} <- mnesia:match_object(rabbit_route,
Route, read)]
end).
list_for_source_and_destination(SrcName, DstName, Reverse) ->
Route = #route{binding = #binding{source = SrcName,
destination = DstName,
_ = '_'}},
Fun = list_for_route(Route, Reverse),
mnesia:async_dirty(Fun).

list_for_route(Route, false) ->
fun() ->
[B || #route{binding = B} <- mnesia:match_object(rabbit_route, Route, read)]
end;
list_for_route(Route, true) ->
fun() ->
[reverse_binding(B) ||
#reverse_route{reverse_binding = B} <-
mnesia:match_object(rabbit_reverse_route,
reverse_route(Route), read)]
end.

-spec info_keys() -> rabbit_types:info_keys().

Expand Down
Loading