lib/legion/transport/queue.rb in legion-transport-0.1.0 vs lib/legion/transport/queue.rb in legion-transport-1.0.0
- old
+ new
@@ -1,13 +1,70 @@
module Legion
module Transport
- if RUBY_ENGINE == 'jruby'
- require 'legion/transport/queues/marchhare'
- class Queue < Legion::Transport::Queues::Marchhare
+ class Queue < Legion::Transport::CONNECTOR::Queue
+ include Legion::Transport::Common
+
+ def initialize(queue = queue_name, options = {})
+ retries ||= 0
+ @options = options
+ super(channel, queue, options_builder(default_options, queue_options, options))
+ rescue ::Bunny::PreconditionFailed
+ retries.zero? ? retries = 1 : raise
+ recreate_queue(channel, queue)
+ retry
end
- else
- require 'legion/transport/queues/bunny'
- class Queue < Legion::Transport::Queues::Bunny
+
+ def recreate_queue(channel, queue)
+ Legion::Logging.warn "Queue:#{queue} exists with wrong parameters, deleting and creating"
+ queue = ::Bunny::Queue.new(channel, queue, no_declare: true, passive: true)
+ queue.delete(if_empty: true)
end
+
+ def default_options
+ hash = {}
+ hash[:manual_ack] = true
+ hash[:durable] = true
+ hash[:exclusive] = false
+ hash[:block] = false
+ hash[:auto_delete] = false
+ hash[:arguments] = { 'x-max-priority': 255,
+ 'x-overflow': 'reject-publish',
+ 'x-dead-letter-exchange': "#{self.class.ancestors.first.to_s.split('::')[2].downcase}.dlx" }
+ hash
+ end
+
+ def queue_options
+ {}
+ end
+
+ def queue_name
+ ancestor = self.class.ancestors.first.to_s.split('::')
+ name = if ancestor[5].scan(/[A-Z]/).length > 1
+ ancestor[5].gsub!(/(.)([A-Z])/, '\1_\2').downcase!
+ else
+ ancestor[5].downcase!
+ end
+ "#{ancestor[2].downcase}.#{name}"
+ end
+
+ def delete(options = { if_unused: true, if_empty: true })
+ super(options)
+ true
+ rescue ::Bunny::PreconditionFailed
+ false
+ end
+
+ def acknowledge(delivery_tag)
+ channel.acknowledge(delivery_tag)
+ end
+
+ def reject(delivery_tag, requeue = false)
+ channel.reject(delivery_tag, requeue)
+ end
end
end
end
+
+require_relative 'queues/node'
+require_relative 'queues/node_status'
+require_relative 'queues/task_log'
+require_relative 'queues/task_update'