lib/amqp/client/channel.rb in amqp-client-0.2.3 vs lib/amqp/client/channel.rb in amqp-client-0.3.0
- old
+ new
@@ -4,19 +4,20 @@
module AMQP
# AMQP Channel
class Channel
def initialize(connection, id)
- @replies = ::Queue.new
@connection = connection
@id = id
+ @replies = ::Queue.new
@consumers = {}
- @confirm = nil
- @last_confirmed = 0
@closed = nil
- @on_return = nil
@open = false
+ @on_return = nil
+ @confirm = nil
+ @unconfirmed = ::Queue.new
+ @unconfirmed_empty = ::Queue.new
end
attr_reader :id, :consumers
def open
@@ -122,34 +123,37 @@
end
def basic_publish(body, exchange, routing_key, **properties)
frame_max = @connection.frame_max - 8
id = @id
+ mandatory = properties.delete(:mandatory) || false
if 0 < body.bytesize && body.bytesize <= frame_max
- write_bytes FrameBytes.basic_publish(id, exchange, routing_key, properties.delete(:mandatory) || false),
+ write_bytes FrameBytes.basic_publish(id, exchange, routing_key, mandatory),
FrameBytes.header(id, body.bytesize, properties),
FrameBytes.body(id, body)
- return @confirm ? @confirm += 1 : nil
+ @unconfirmed.push @confirm += 1 if @confirm
+ return
end
- write_bytes FrameBytes.basic_publish(id, exchange, routing_key, properties.delete(:mandatory) || false),
+ write_bytes FrameBytes.basic_publish(id, exchange, routing_key, mandatory),
FrameBytes.header(id, body.bytesize, properties)
pos = 0
while pos < body.bytesize # split body into multiple frame_max frames
len = [frame_max, body.bytesize - pos].min
body_part = body.byteslice(pos, len)
write_bytes FrameBytes.body(id, body_part)
pos += len
end
- @confirm += 1 if @confirm
+ @unconfirmed.push @confirm += 1 if @confirm
+ nil
end
def basic_publish_confirm(body, exchange, routing_key, **properties)
confirm_select(no_wait: true)
- id = basic_publish(body, exchange, routing_key, **properties)
- wait_for_confirm(id)
+ basic_publish(body, exchange, routing_key, **properties)
+ wait_for_confirms
end
# Consume from a queue
# worker_threads: 0 => blocking, messages are executed in the thread calling this method
def basic_consume(queue, tag: "", no_ack: true, exclusive: false, arguments: {},
@@ -209,24 +213,36 @@
def confirm_select(no_wait: false)
return if @confirm
write_bytes FrameBytes.confirm_select(@id, no_wait)
expect :confirm_select_ok unless no_wait
- @confirms = ::Queue.new
@confirm = 0
end
- def wait_for_confirm(id)
- raise ArgumentError, "Confirm id has to a positive number" unless id&.positive?
- return true if @last_confirmed >= id
+ # Block until all publishes messages are confirmed
+ def wait_for_confirms
+ return true if @unconfirmed.empty?
+ @unconfirmed_empty.pop
+ end
+
+ def confirm(args)
+ ack_or_nack, delivery_tag, multiple = *args
loop do
- ack, delivery_tag, multiple = @confirms.shift || break
- @last_confirmed = delivery_tag
- return ack if delivery_tag == id || (delivery_tag > id && multiple)
+ tag = @unconfirmed.pop(true)
+ break if tag == delivery_tag
+ next if multiple && tag < delivery_tag
+
+ @unconfirmed << tag # requeue
+ rescue ThreadError
+ break
end
- false
+ return unless @unconfirmed.empty?
+
+ @unconfirmed_empty.num_waiting.times do
+ @unconfirmed_empty << ack_or_nack == :ack
+ end
end
def tx_select
write_bytes FrameBytes.tx_select(@id)
expect :tx_select_ok
@@ -242,13 +258,9 @@
expect :tx_rollback_ok
end
def reply(args)
@replies.push(args)
- end
-
- def confirm(args)
- @confirms.push(args)
end
def message_returned(reply_code, reply_text, exchange, routing_key)
Thread.new do
body_size, properties = expect(:header)