diff --git a/README.md b/README.md index 9898b02..d995924 100644 --- a/README.md +++ b/README.md @@ -79,8 +79,7 @@ AMQP::Client.start("amqp://guest:guest@localhost") do |c| name, message_count, consumer_count = ch.queue_declare(name: "myqueue", passive: false, durable: true, - exclusive: false, auto_delete: false, - arguments: AMQP::Client::Arguments.new) + exclusive: false, auto_delete: false) q = ch.queue # temporary queue that is deleted when the channel is closed ch.queue_purge("myqueue") ch.queue_bind("myqueue", "amq.topic", "routing-key") diff --git a/examples/streams.cr b/examples/streams.cr index ced7a83..8c36433 100644 --- a/examples/streams.cr +++ b/examples/streams.cr @@ -5,10 +5,10 @@ AMQP::Client.start do |c| # prefetch required when consuming from stream queues ch.prefetch(10) # declare a stream queue using the x-queue-type argument - q = ch.queue("stream1", args: AMQP::Client::Arguments.new({"x-queue-type": "stream"})) + q = ch.queue("stream1", arguments: {"x-queue-type": "stream"}) puts "Waiting for messages. To exit press CTRL+C" # Decide from where to subscribe using the x-stream-offset argument - q.subscribe(block: true, no_ack: false, args: AMQP::Client::Arguments.new({"x-stream-offset": "first"})) do |msg| + q.subscribe(block: true, no_ack: false, arguments: {"x-stream-offset": "first"}) do |msg| puts "Received: #{msg.body_io}" msg.ack end diff --git a/shard.yml b/shard.yml index 04b32c4..616fafe 100644 --- a/shard.yml +++ b/shard.yml @@ -7,8 +7,9 @@ authors: dependencies: amq-protocol: - github: cloudamqp/amq-protocol.cr - version: ~>1 + #github: cloudamqp/amq-protocol.cr + #version: ~>1 + path: ../amq-protocol.cr development_dependencies: ameba: github: crystal-ameba/ameba diff --git a/src/amqp-client/channel.cr b/src/amqp-client/channel.cr index e3f3ab9..2c00651 100644 --- a/src/amqp-client/channel.cr +++ b/src/amqp-client/channel.cr @@ -429,6 +429,12 @@ class AMQP::Client ok.consumer_tag end + def basic_consume(queue, tag = "", no_ack = true, exclusive = false, + block = false, arguments : NamedTuple = NamedTuple.new, work_pool = 1, + &blk : DeliverMessage -> Nil) + basic_consume(queue, tag, no_ack, exclusive, block, Arguments.new(arguments), work_pool, blk) + end + private def consume(consumer_tag, deliveries, done, i, log_errors, blk) Log.context.set channel_id: @id.to_i, consumer: consumer_tag, worker: i while msg = deliveries.receive? @@ -498,11 +504,15 @@ class AMQP::Client end # Declares a queue with a name, by default durable and not auto-deleted - def queue(name : String, passive = false, durable = true, exclusive = false, auto_delete = false, args arguments = Arguments.new) + def queue(name : String, passive = false, durable = true, exclusive = false, auto_delete = false, arguments = Arguments.new) q = queue_declare(name, passive, durable, exclusive, auto_delete, false, arguments) Queue.new(self, q[:queue_name]) end + def queue(name : String, passive = false, durable = true, exclusive = false, auto_delete = false, arguments : NamedTuple = NamedTuple.new) + queue(name, passive, durable, exclusive, auto_delete, Arguments.new(arguments)) + end + # Declare a queue with *name* # *passive* will raise if the queue doesn't already exists, other arguments are ignored # *durable* will make the queue durable on the server (note that messages have have the persistent flag set to make the messages persistent) @@ -532,6 +542,15 @@ class AMQP::Client end end + def queue_declare(name : String, passive = false, + durable = name.empty? ? false : true, + exclusive = name.empty? ? true : false, + auto_delete = name.empty? ? true : false, + no_wait = false, + arguments : NamedTuple = NamedTuple.new) + queue_declare(name, passive, durable, exclusive, auto_delete, no_wait, Arguments.new(arguments)) + end + # Delete a queue def queue_delete(name : String, if_unused = false, if_empty = false, no_wait = false) write Frame::Queue::Delete.new(@id, 0_u16, name, if_unused, if_empty, no_wait) @@ -560,12 +579,20 @@ class AMQP::Client expect Frame::Queue::BindOk unless no_wait end + def queue_bind(queue : String, exchange : String, routing_key : String, no_wait = false, arguments : NamedTuple = NamedTuple.new) : Nil + queue_bind(queue, exchange, routing_key, no_wait, Arguments.new(arguments)) + end + # Unbind a *queue* from an *exchange*, with a *routing_key* and optionally some *arguments* def queue_unbind(queue : String, exchange : String, routing_key : String, args arguments = Arguments.new) : Nil write Frame::Queue::Unbind.new(@id, 0_u16, queue, exchange, routing_key, arguments) expect Frame::Queue::UnbindOk end + def queue_unbind(queue : String, exchange : String, routing_key : String, arguments : NamedTuple = NamedTuple.new) : Nil + queue_unbind(queue, exchange, routing_key, Arguments.new(arguments)) + end + def topic_exchange(name = "amq.topic", passive = true) exchange(name, "topic", passive) end @@ -593,6 +620,11 @@ class AMQP::Client Exchange.new(self, name) end + def exchange(name, type, passive = false, durable = true, + internal = false, auto_delete = false, arguments : NamedTuple = NamedTuple.new) + exchange(name, type, passive, durable, internal, auto_delete, Arguments.new(arguments)) + end + # Declares an exchange def exchange_declare(name : String, type : String, passive = false, durable = true, internal = false, auto_delete = false, @@ -604,6 +636,12 @@ class AMQP::Client expect Frame::Exchange::DeclareOk unless no_wait end + def exchange_declare(name : String, type : String, passive = false, + durable = true, internal = false, auto_delete = false, + no_wait = false, arguments : NamedTuple = NamedTuple.new) : Nil + exchange_declare(name, type, passive, durable, internal, auto_delete, no_wait, Arguments.new(arguments)) + end + # Deletes an exchange def exchange_delete(name, if_unused = false, no_wait = false) : Nil write Frame::Exchange::Delete.new(@id, 0_u16, name, if_unused, no_wait) @@ -616,12 +654,20 @@ class AMQP::Client expect Frame::Exchange::BindOk unless no_wait end + def exchange_bind(source : String, destination : String, routing_key : String, no_wait = false, arguments : NamedTuple = NamedTuple.new) : Nil + exchange_bind(source, destination, routing_key, no_wait, Arguments.new(arguments)) + end + # Unbind an exchange from another exchange def exchange_unbind(source : String, destination : String, routing_key : String, no_wait = false, args arguments = Arguments.new) : Nil write Frame::Exchange::Unbind.new(@id, 0_u16, source, destination, routing_key, no_wait, arguments) expect Frame::Exchange::UnbindOk unless no_wait end + def exchange_unbind(source : String, destination : String, routing_key : String, no_wait = false, arguments : NamedTuple = NamedTuple.new) : Nil + exchange_unbind(source, destination, routing_key, no_wait, Arguments.new(arguments)) + end + # Sets the channel in publish confirm mode, each published message will be acked or nacked def confirm_select(no_wait = false) : Nil return if @confirm_mode diff --git a/src/amqp-client/exchange.cr b/src/amqp-client/exchange.cr index d596500..ebe844e 100644 --- a/src/amqp-client/exchange.cr +++ b/src/amqp-client/exchange.cr @@ -16,12 +16,20 @@ class AMQP::Client self end + def bind(exchange : String, routing_key : String, no_wait = false, arguments = NamedTuple.new) + bind(exchange, routing_key, no_wait, Arguments.new(arguments)) + end + # Unbind the exchange from another exchange def unbind(exchange : String, routing_key : String, no_wait = false, args arguments = Arguments.new) @channel.exchange_unbind(@name, exchange, routing_key, no_wait, arguments) self end + def unbind(exchange : String, routing_key : String, no_wait = false, arguments = NamedTuple.new) + unbind(exchange, routing_key, no_wait, Arguments.new(arguments)) + end + # Publish a message to the exchange def publish(message, routing_key : String, mandatory = false, immediate = false, props properties = Properties.new) @channel.basic_publish(message, @name, routing_key, mandatory, immediate, properties) diff --git a/src/amqp-client/queue.cr b/src/amqp-client/queue.cr index 2e1775f..7c6026a 100644 --- a/src/amqp-client/queue.cr +++ b/src/amqp-client/queue.cr @@ -27,12 +27,20 @@ class AMQP::Client self end + def bind(exchange : String, routing_key : String, no_wait = false, arguments = NamedTuple.new) + bind(exchange, routing_key, no_wait, Arguments.new(arguments)) + end + # Unbind the queue from an exchange def unbind(exchange : String, routing_key : String, args arguments = Arguments.new) @channel.queue_unbind(@name, exchange, routing_key, arguments) self end + def unbind(exchange : String, routing_key : String, arguments = NamedTuple.new) + unbind(exchange, routing_key, Arguments.new(arguments)) + end + # Publish a message directly to the queue def publish(message, mandatory = false, immediate = false, props properties = Properties.new) @channel.basic_publish(message, "", @name, mandatory, immediate, properties) @@ -74,10 +82,15 @@ class AMQP::Client # # See `Channel#basic_consume` def subscribe(tag = "", no_ack = true, exclusive = false, block = false, - args arguments = Arguments.new, work_pool = 1, &blk : DeliverMessage -> Nil) + arguments = Arguments.new, work_pool = 1, &blk : DeliverMessage -> Nil) @channel.basic_consume(@name, tag, no_ack, exclusive, block, arguments, work_pool, &blk) end + def subscribe(tag = "", no_ack = true, exclusive = false, block = false, + arguments : NamedTuple = NamedTuple.new, work_pool = 1, &blk : DeliverMessage -> Nil) + subscribe(tag, no_ack, exclusive, block, Arguments.new(arguments), work_pool, &blk) + end + # Unsubscribe from the queue # # See `Channel#basic_cancel` for more details