lib/legion/transport/queue.rb in legion-transport-0.1.0 vs lib/legion/transport/queue.rb in legion-transport-1.0.0

- old
+ new

@@ -1,13 +1,70 @@ module Legion module Transport - if RUBY_ENGINE == 'jruby' - require 'legion/transport/queues/marchhare' - class Queue < Legion::Transport::Queues::Marchhare + class Queue < Legion::Transport::CONNECTOR::Queue + include Legion::Transport::Common + + def initialize(queue = queue_name, options = {}) + retries ||= 0 + @options = options + super(channel, queue, options_builder(default_options, queue_options, options)) + rescue ::Bunny::PreconditionFailed + retries.zero? ? retries = 1 : raise + recreate_queue(channel, queue) + retry end - else - require 'legion/transport/queues/bunny' - class Queue < Legion::Transport::Queues::Bunny + + def recreate_queue(channel, queue) + Legion::Logging.warn "Queue:#{queue} exists with wrong parameters, deleting and creating" + queue = ::Bunny::Queue.new(channel, queue, no_declare: true, passive: true) + queue.delete(if_empty: true) end + + def default_options + hash = {} + hash[:manual_ack] = true + hash[:durable] = true + hash[:exclusive] = false + hash[:block] = false + hash[:auto_delete] = false + hash[:arguments] = { 'x-max-priority': 255, + 'x-overflow': 'reject-publish', + 'x-dead-letter-exchange': "#{self.class.ancestors.first.to_s.split('::')[2].downcase}.dlx" } + hash + end + + def queue_options + {} + end + + def queue_name + ancestor = self.class.ancestors.first.to_s.split('::') + name = if ancestor[5].scan(/[A-Z]/).length > 1 + ancestor[5].gsub!(/(.)([A-Z])/, '\1_\2').downcase! + else + ancestor[5].downcase! + end + "#{ancestor[2].downcase}.#{name}" + end + + def delete(options = { if_unused: true, if_empty: true }) + super(options) + true + rescue ::Bunny::PreconditionFailed + false + end + + def acknowledge(delivery_tag) + channel.acknowledge(delivery_tag) + end + + def reject(delivery_tag, requeue = false) + channel.reject(delivery_tag, requeue) + end end end end + +require_relative 'queues/node' +require_relative 'queues/node_status' +require_relative 'queues/task_log' +require_relative 'queues/task_update'