lib/upperkut/processor.rb in upperkut-0.8.1 vs lib/upperkut/processor.rb in upperkut-1.0.0.rc

- old
+ new

@@ -1,58 +1,63 @@ -require_relative 'batch_execution' +require_relative 'logging' module Upperkut class Processor - def initialize(manager) - @manager = manager - @worker = @manager.worker - @logger = @manager.logger - @strategy = @worker.strategy - - @sleeping_time = 0 + def initialize(worker, logger = Logging.logger) + @worker = worker + @strategy = worker.strategy + @worker_instance = worker.new + @logger = logger end - def run - @thread ||= Thread.new do - begin - process - rescue Exception => e - @logger.debug( - action: :processor_killed, - reason: e, - stacktrace: e.backtrace - ) + def process + items = @worker.fetch_items.freeze - @manager.notify_killed_processor(self) - end + @worker.server_middlewares.invoke(@worker, items) do + @worker_instance.perform(items) end - end - def kill - return unless @thread + nacked_items, pending_ack_items = items.partition(&:nacked?) + @strategy.nack(nacked_items) if nacked_items.any? + @strategy.ack(pending_ack_items) if pending_ack_items.any? + rescue StandardError => error + @logger.error( + action: :handle_execution_error, + ex: error.to_s, + backtrace: error.backtrace.join("\n"), + item_size: Array(items).size + ) - @thread.raise Upperkut::Shutdown - @thread.value # wait + if items + if @worker_instance.respond_to?(:handle_error) + @worker_instance.handle_error(error, items) + return + end + + @strategy.nack(items) + end + + raise error end - private + def blocking_process + sleeping_time = 0 - def process loop do - next if @manager.stopped + break if @stopped if @strategy.process? - @sleeping_time = 0 - process_batch + sleeping_time = 0 + process next end - @sleeping_time += sleep(@worker.setup.polling_interval) - @logger.debug(sleeping_time: @sleeping_time) + sleeping_time += sleep(@worker.setup.polling_interval) + @logger.debug(sleeping_time: sleeping_time) end end - def process_batch - BatchExecution.new(@worker, @logger).execute + def stop + @stopped = true end end end