lib/queuel/iron_mq/poller.rb in queuel-0.0.1 vs lib/queuel/iron_mq/poller.rb in queuel-0.1.0
- old
+ new
@@ -1,123 +1,20 @@
-require 'timeout'
module Queuel
module IronMq
- class Poller
- def initialize(queue, options, block)
- self.queue = queue
- self.options = options || {}
- self.block = block
- self.tries = 0
- self.continue_looping = true
- end
-
- def poll
- choose_looper do |msg|
- if msg.nil?
- tried
- quit_looping! if break_if_nil? || maxed_tried?
- sleep(sleep_time)
- else
- reset_tries
- block.call msg
- msg.delete
- end
- !msg.nil?
- end
- end
-
- protected
- attr_accessor :tries
-
+ class Poller < Base::Poller
+ # Public: poll
private
- attr_accessor :queue
- attr_accessor :args
- attr_accessor :options
- attr_accessor :block
- attr_accessor :continue_looping
- def choose_looper(&loop_block)
- timeout? ? timeout_looper(loop_block) : looper(loop_block)
- end
-
- def timeout_looper(loop_block)
- Timeout.timeout(timeout) { looper(loop_block) }
- rescue Timeout::Error
- false
- end
-
- def looper(loop_block)
- while continue_looping? do
- loop_block.call(pop_new_message)
- end
- end
-
- def continue_looping?
- !!continue_looping
- end
-
- def quit_looping!
- self.continue_looping = false
- end
-
- def timeout
- options[:poll_timeout].to_i
- end
-
- def timeout?
- timeout > 0
- end
-
- def pop_new_message
- queue.pop built_options
- end
-
- def start_sleep_time
- 0.1
- end
-
- def sleep_time
- tries < 30 ? (start_sleep_time * tries) : 3
- end
-
- def reset_tries
- self.tries = 0
- end
-
- def maxed_tried?
- tries >= max_fails if max_fails_given?
- end
-
- def max_fails_given?
- max_fails > 0
- end
-
- def max_fails
- options[:max_consecutive_fails].to_i
- end
-
- def tried
- self.tries += 1
- end
-
- def break_if_nil?
- !!options.fetch(:break_if_nil, false)
- end
-
- def option_keys
- %w[break_if_nil poll_timeout max_consecutive_fails]
- end
-
- def my_options
- options.select { |key,_| option_keys.include? key.to_s }
- end
-
def built_options
- options.merge default_args # intentional direction, force defaults
+ options.merge default_options # intentional direction, force defaults
end
- def default_args
+ def default_options
{ n: 1 }
+ end
+
+ def peek_options
+ { n: self.workers }
end
end
end
end