lib/toiler/actor/fetcher.rb in toiler-0.5.1.pre7 vs lib/toiler/actor/fetcher.rb in toiler-0.5.1.pre9
- old
+ new
@@ -8,21 +8,22 @@
include Utils::ActorLogging
FETCH_LIMIT = 10
attr_accessor :queue, :wait, :visibility_timeout, :free_processors,
- :executing, :waiting_messages
+ :executing, :waiting_messages, :concurrency
def initialize(queue, client, count)
debug "Initializing Fetcher for queue #{queue}..."
@queue = Toiler::Aws::Queue.new queue, client
@wait = Toiler.options[:wait] || 60
@free_processors = count
@batch = Toiler.worker_class_registry[queue].batch?
@visibility_timeout = @queue.visibility_timeout
@executing = false
@waiting_messages = 0
+ @concurrency = count
debug "Finished initializing Fetcher for queue #{queue}"
tell :poll_messages
end
def default_executor
@@ -73,10 +74,12 @@
def poll_messages
return unless should_poll?
max_number_of_messages = max_messages
+ return if waiting_messages > 0 && !full_batch?(max_number_of_messages)
+
@waiting_messages += max_number_of_messages
debug "Fetcher #{queue.name} polling messages..."
future = poll_future max_number_of_messages
future.on_rejection! do
@@ -87,14 +90,18 @@
tell [:assign_messages, msgs] if !msgs.nil? && !msgs.empty?
tell [:release_messages, max_number_of_messages]
tell :poll_messages
end
- tell :poll_messages if should_poll?
+ poll_messages if should_poll?
end
def should_poll?
free_processors / 2 > waiting_messages
+ end
+
+ def full_batch?(max_number_of_messages)
+ max_number_of_messages == FETCH_LIMIT || max_number_of_messages >= concurrency * 0.1
end
def processor_pool
@processor_pool ||= Toiler.processor_pool queue.name
end