lib/upperkut/processor.rb in upperkut-0.1.4 vs lib/upperkut/processor.rb in upperkut-0.3.0

- old
+ new

@@ -1,22 +1,36 @@ +require_relative 'batch_execution' + module Upperkut class Processor def initialize(manager) @manager = manager @worker = @manager.worker + @logger = @manager.logger + @sleeping_time = 0 end def run @thread ||= Thread.new do - process + begin + process + rescue Exception => e + @logger.debug( + action: :processor_killed, + reason: e + ) + + @manager.notify_killed_processor(self) + end end end def kill return unless @thread @thread.raise Upperkut::Shutdown + @thread.value # wait end private def process @@ -26,10 +40,11 @@ process_batch next end @sleeping_time += sleep(@worker.setup.polling_interval) + @logger.debug(sleeping_time: @sleeping_time) end end def should_process? buffer_size = @worker.size @@ -41,15 +56,9 @@ buffer_size >= @worker.setup.batch_size || @sleeping_time >= @worker.setup.max_wait end def process_batch - @sleeping_time = 0 - @worker.new.process - rescue Exception => ex - # Add to retry_queue - # if retry_limit is reached - # send to dead - raise ex + BatchExecution.new(@worker, @logger).execute end end end