lib/toiler/actor/fetcher.rb in toiler-0.3.1.beta1 vs lib/toiler/actor/fetcher.rb in toiler-0.3.1.beta2

- old
+ new

@@ -1,104 +1,120 @@ -require 'toiler/actor/utils/actor_logging' -require 'toiler/aws/queue' - -module Toiler - module Actor - # Actor polling for messages only when processors are ready, otherwise idle - class Fetcher < Concurrent::Actor::RestartingContext - include Utils::ActorLogging - - FETCH_LIMIT = 10 - - attr_accessor :queue, :wait, :visibility_timeout, :free_processors, - :scheduled, :executing - - def initialize(queue, client) - debug "Initializing Fetcher for queue #{queue}..." - @queue = Toiler::Aws::Queue.new queue, client - @wait = Toiler.options[:wait] || 20 - @free_processors = Concurrent::AtomicFixnum.new(0) - @batch = Toiler.worker_class_registry[queue].batch? - @visibility_timeout = @queue.visibility_timeout - @scheduled = Concurrent::AtomicBoolean.new - @executing = Concurrent::AtomicBoolean.new - debug "Finished initializing Fetcher for queue #{queue}" - end - - def default_executor - Concurrent.global_io_executor - end - - def on_message(msg) - method, *args = msg - send(method, *args) - rescue StandardError => e - error "Fetcher #{queue.name} raised exception #{e.class}" - end - - def executing? - executing.value - end - - def scheduled? - scheduled.value - end - - def get_free_processors - free_processors.value - end - - private - - def batch? - @batch - end - - def processor_finished - debug "Fetcher #{queue.name} received processor finished signal..." - free_processors.increment - schedule_poll - end - - def max_messages - batch? ? FETCH_LIMIT : [FETCH_LIMIT, free_processors.value].min - end - - def poll_future - Concurrent.future do - @executing.make_true - queue.receive_messages message_attribute_names: %w(All), - wait_time_seconds: wait, - max_number_of_messages: max_messages - @executing.make_false - end - end - - def poll_messages - poll_future.on_completion! do |_success, msgs| - tell [:assign_messages, msgs] unless msgs.nil? || msgs.empty? - scheduled.make_false - tell :schedule_poll - end - end - - def schedule_poll - return unless free_processors.value > 0 && scheduled.make_true - debug "Fetcher #{queue.name} scheduling polling..." - tell :poll_messages - end - - def processor_pool - @processor_pool ||= Toiler.processor_pool queue.name - end - - def assign_messages(messages) - messages = [messages] if batch? - messages.each do |m| - processor_pool.tell [:process, visibility_timeout, m] - free_processors.decrement - end - debug "Fetcher #{queue.name} assigned #{messages.count} messages" - end - end - end -end +require 'toiler/actor/utils/actor_logging' +require 'toiler/aws/queue' + +module Toiler + module Actor + # Actor polling for messages only when processors are ready, otherwise idle + class Fetcher < Concurrent::Actor::RestartingContext + include Utils::ActorLogging + + FETCH_LIMIT = 10 + + attr_accessor :queue, :wait, :visibility_timeout, :free_processors, + :scheduled, :executing + + def initialize(queue, client) + debug "Initializing Fetcher for queue #{queue}..." + @queue = Toiler::Aws::Queue.new queue, client + @wait = Toiler.options[:wait] || 20 + @free_processors = Concurrent::AtomicFixnum.new(0) + @batch = Toiler.worker_class_registry[queue].batch? + @visibility_timeout = @queue.visibility_timeout + @scheduled = Concurrent::AtomicBoolean.new + @executing = Concurrent::AtomicBoolean.new + @polling = Concurrent::AtomicBoolean.new + debug "Finished initializing Fetcher for queue #{queue}" + end + + def default_executor + Concurrent.global_io_executor + end + + def on_message(msg) + executing.make_true + method, *args = msg + send(method, *args) + rescue StandardError => e + error "Fetcher #{queue.name} raised exception #{e.class}" + ensure + executing.make_false + end + + def executing? + executing.value + end + + def polling? + polling.value + end + + def scheduled? + scheduled.value + end + + def get_free_processors + free_processors.value + end + + private + + def batch? + @batch + end + + def processor_started + debug "Fetcher #{queue.name} received processor started signal..." + free_processors.decrement + end + + def processor_finished + debug "Fetcher #{queue.name} received processor finished signal..." + free_processors.increment + schedule_poll + end + + def max_messages + batch? ? FETCH_LIMIT : [FETCH_LIMIT, free_processors.value].min + end + + def poll_future + Concurrent.future do + queue.receive_messages message_attribute_names: %w(All), + wait_time_seconds: wait, + max_number_of_messages: max_messages + end + end + + def poll_messages + polling.make_true + poll_future.on_completion! do |success, msgs, error| + polling.make_false + scheduled.make_false + if success && !msgs.nil? && !msgs.empty? + tell [:assign_messages, msgs] + else + tell :schedule_poll + end + end + end + + def schedule_poll + return unless free_processors.value > 0 && scheduled.make_true + debug "Fetcher #{queue.name} scheduling polling..." + tell :poll_messages + end + + def processor_pool + @processor_pool ||= Toiler.processor_pool queue.name + end + + def assign_messages(messages) + messages = [messages] if batch? + messages.each do |m| + processor_pool.tell [:process, visibility_timeout, m] + end + debug "Fetcher #{queue.name} assigned #{messages.count} messages" + tell :schedule_poll + end + end + end +end