Sha256: 2f2f30669d5d4104ac36c1cadc84bbf09b5f0d091cd947cf222dd88801a5d178

Contents?: true

Size: 1.62 KB

Versions: 5

Compression:

Stored size: 1.62 KB

Contents

# frozen_string_literal: true

module Karafka
  module Processing
    # Workers are used to run jobs in separate threads.
    # Workers are the main processing units of the Karafka framework.
    class Worker
      extend Forwardable

      def_delegators :@thread, :join, :terminate, :alive?

      # @param jobs_queue [JobsQueue]
      # @return [Worker]
      def initialize(jobs_queue)
        @jobs_queue = jobs_queue
        @thread = Thread.new do
          # If anything goes wrong in this worker thread, it means something went really wrong and
          # we should terminate.
          Thread.current.abort_on_exception = true
          loop { break unless process }
        end
      end

      private

      # Fetches a single job, processes it and marks as completed.
      #
      # @note We do not have error handling here, as no errors should propagate this far. If they
      #   do, it is a critical error and should bubble up.
      #
      # @note Upon closing the jobs queue, worker will close it's thread
      def process
        job = @jobs_queue.pop

        if job
          job.call
          true
        else
          false
        end
      # We signal critical exceptions, notify and do not allow worker to fail
      # rubocop:disable Lint/RescueException
      rescue Exception => e
        # rubocop:enable Lint/RescueException
        Karafka.monitor.instrument(
          'error.occurred',
          caller: self,
          error: e,
          type: 'worker.process.error'
        )
      ensure
        # job can be nil when the queue is being closed
        @jobs_queue.complete(job) if job
      end
    end
  end
end

Version data entries

5 entries across 5 versions & 1 rubygems

Version Path
karafka-2.0.0.alpha6 lib/karafka/processing/worker.rb
karafka-2.0.0.alpha5 lib/karafka/processing/worker.rb
karafka-2.0.0.alpha4 lib/karafka/processing/worker.rb
karafka-2.0.0.alpha3 lib/karafka/processing/worker.rb
karafka-2.0.0.alpha2 lib/karafka/processing/worker.rb