lib/upperkut/processor.rb in upperkut-0.8.1 vs lib/upperkut/processor.rb in upperkut-1.0.0.rc
- old
+ new
@@ -1,58 +1,63 @@
-require_relative 'batch_execution'
+require_relative 'logging'
module Upperkut
class Processor
- def initialize(manager)
- @manager = manager
- @worker = @manager.worker
- @logger = @manager.logger
- @strategy = @worker.strategy
-
- @sleeping_time = 0
+ def initialize(worker, logger = Logging.logger)
+ @worker = worker
+ @strategy = worker.strategy
+ @worker_instance = worker.new
+ @logger = logger
end
- def run
- @thread ||= Thread.new do
- begin
- process
- rescue Exception => e
- @logger.debug(
- action: :processor_killed,
- reason: e,
- stacktrace: e.backtrace
- )
+ def process
+ items = @worker.fetch_items.freeze
- @manager.notify_killed_processor(self)
- end
+ @worker.server_middlewares.invoke(@worker, items) do
+ @worker_instance.perform(items)
end
- end
- def kill
- return unless @thread
+ nacked_items, pending_ack_items = items.partition(&:nacked?)
+ @strategy.nack(nacked_items) if nacked_items.any?
+ @strategy.ack(pending_ack_items) if pending_ack_items.any?
+ rescue StandardError => error
+ @logger.error(
+ action: :handle_execution_error,
+ ex: error.to_s,
+ backtrace: error.backtrace.join("\n"),
+ item_size: Array(items).size
+ )
- @thread.raise Upperkut::Shutdown
- @thread.value # wait
+ if items
+ if @worker_instance.respond_to?(:handle_error)
+ @worker_instance.handle_error(error, items)
+ return
+ end
+
+ @strategy.nack(items)
+ end
+
+ raise error
end
- private
+ def blocking_process
+ sleeping_time = 0
- def process
loop do
- next if @manager.stopped
+ break if @stopped
if @strategy.process?
- @sleeping_time = 0
- process_batch
+ sleeping_time = 0
+ process
next
end
- @sleeping_time += sleep(@worker.setup.polling_interval)
- @logger.debug(sleeping_time: @sleeping_time)
+ sleeping_time += sleep(@worker.setup.polling_interval)
+ @logger.debug(sleeping_time: sleeping_time)
end
end
- def process_batch
- BatchExecution.new(@worker, @logger).execute
+ def stop
+ @stopped = true
end
end
end