Sha256: 4bff438f8b36b2c7bd5ef523f3b00b450ba727949637310feffb25bc90c860c3

Contents?: true

Size: 1.44 KB

Versions: 1

Compression:

Stored size: 1.44 KB

Contents

# frozen_string_literal: true

module Boatload
  # A worker that will run in the background, batching up and processing messages.
  class Worker
    def initialize(queue:, max_backlog_size: 0, logger:, context: nil, &block)
      @backlog = []
      @context = context
      @incoming_queue = queue
      @logger = logger
      @max_backlog_size = max_backlog_size
      @process_proc = block
    end

    def run
      @logger.info 'Starting Worker in the background...'

      loop do
        operation, payload = @incoming_queue.pop

        case operation
        when :item
          @backlog.push payload
          process if threshold_reached?
        when :process
          process
        when :shutdown
          begin
            process
          rescue StandardError => e
            @logger.error "Failed to process backlog during shutdown: #{e.full_message}"
          end

          break
        else
          raise "Unknown operation: #{operation.inspect}"
        end
      end
    rescue StandardError => e
      @logger.error "Worker thread encountered an unexpected error:\n#{e.full_message}"
    end

    private

    def process
      @process_proc.call @backlog, @logger, @context
      @backlog.clear
    rescue StandardError => e
      @logger.error "Error encountered while processing backlog:\n#{e.full_message}"
    end

    def threshold_reached?
      @max_backlog_size.positive? && @backlog.length >= @max_backlog_size
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
boatload-0.1.0 lib/boatload/worker.rb