lib/upperkut/processor.rb in upperkut-0.1.4 vs lib/upperkut/processor.rb in upperkut-0.3.0
- old
+ new
@@ -1,22 +1,36 @@
+require_relative 'batch_execution'
+
module Upperkut
class Processor
def initialize(manager)
@manager = manager
@worker = @manager.worker
+ @logger = @manager.logger
+
@sleeping_time = 0
end
def run
@thread ||= Thread.new do
- process
+ begin
+ process
+ rescue Exception => e
+ @logger.debug(
+ action: :processor_killed,
+ reason: e
+ )
+
+ @manager.notify_killed_processor(self)
+ end
end
end
def kill
return unless @thread
@thread.raise Upperkut::Shutdown
+ @thread.value # wait
end
private
def process
@@ -26,10 +40,11 @@
process_batch
next
end
@sleeping_time += sleep(@worker.setup.polling_interval)
+ @logger.debug(sleeping_time: @sleeping_time)
end
end
def should_process?
buffer_size = @worker.size
@@ -41,15 +56,9 @@
buffer_size >= @worker.setup.batch_size ||
@sleeping_time >= @worker.setup.max_wait
end
def process_batch
- @sleeping_time = 0
- @worker.new.process
- rescue Exception => ex
- # Add to retry_queue
- # if retry_limit is reached
- # send to dead
- raise ex
+ BatchExecution.new(@worker, @logger).execute
end
end
end