lib/toiler/actor/fetcher.rb in toiler-0.5.1.pre7 vs lib/toiler/actor/fetcher.rb in toiler-0.5.1.pre9

- old
+ new

@@ -8,21 +8,22 @@ include Utils::ActorLogging FETCH_LIMIT = 10 attr_accessor :queue, :wait, :visibility_timeout, :free_processors, - :executing, :waiting_messages + :executing, :waiting_messages, :concurrency def initialize(queue, client, count) debug "Initializing Fetcher for queue #{queue}..." @queue = Toiler::Aws::Queue.new queue, client @wait = Toiler.options[:wait] || 60 @free_processors = count @batch = Toiler.worker_class_registry[queue].batch? @visibility_timeout = @queue.visibility_timeout @executing = false @waiting_messages = 0 + @concurrency = count debug "Finished initializing Fetcher for queue #{queue}" tell :poll_messages end def default_executor @@ -73,10 +74,12 @@ def poll_messages return unless should_poll? max_number_of_messages = max_messages + return if waiting_messages > 0 && !full_batch?(max_number_of_messages) + @waiting_messages += max_number_of_messages debug "Fetcher #{queue.name} polling messages..." future = poll_future max_number_of_messages future.on_rejection! do @@ -87,14 +90,18 @@ tell [:assign_messages, msgs] if !msgs.nil? && !msgs.empty? tell [:release_messages, max_number_of_messages] tell :poll_messages end - tell :poll_messages if should_poll? + poll_messages if should_poll? end def should_poll? free_processors / 2 > waiting_messages + end + + def full_batch?(max_number_of_messages) + max_number_of_messages == FETCH_LIMIT || max_number_of_messages >= concurrency * 0.1 end def processor_pool @processor_pool ||= Toiler.processor_pool queue.name end