Sha256: 4bff438f8b36b2c7bd5ef523f3b00b450ba727949637310feffb25bc90c860c3
Contents?: true
Size: 1.44 KB
Versions: 1
Compression:
Stored size: 1.44 KB
Contents
# frozen_string_literal: true module Boatload # A worker that will run in the background, batching up and processing messages. class Worker def initialize(queue:, max_backlog_size: 0, logger:, context: nil, &block) @backlog = [] @context = context @incoming_queue = queue @logger = logger @max_backlog_size = max_backlog_size @process_proc = block end def run @logger.info 'Starting Worker in the background...' loop do operation, payload = @incoming_queue.pop case operation when :item @backlog.push payload process if threshold_reached? when :process process when :shutdown begin process rescue StandardError => e @logger.error "Failed to process backlog during shutdown: #{e.full_message}" end break else raise "Unknown operation: #{operation.inspect}" end end rescue StandardError => e @logger.error "Worker thread encountered an unexpected error:\n#{e.full_message}" end private def process @process_proc.call @backlog, @logger, @context @backlog.clear rescue StandardError => e @logger.error "Error encountered while processing backlog:\n#{e.full_message}" end def threshold_reached? @max_backlog_size.positive? && @backlog.length >= @max_backlog_size end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
boatload-0.1.0 | lib/boatload/worker.rb |