# frozen_string_literal: true

module Karafka
  module Processing
    # Workers are used to run jobs in separate threads.
    # Workers are the main processing units of the Karafka framework.
    #
    # Each job runs in three stages:
    #   - prepare - here we can run any code that we would need to run blocking before we allow
    #               the job to run fully async (non blocking). This will always run in a blocking
    #               way and can be used to make sure all the resources and external dependencies
    #               are satisfied before going async.
    #
    #   - call - actual processing logic that can run sync or async
    #
    #   - teardown - it should include any code that we want to run after we executed the user
    #                code. This can be used to unlock certain resources or do other things that are
    #                not user code but need to run after user code base is executed.
    class Worker
      extend Forwardable

      def_delegators :@thread, :join, :terminate, :alive?

      # @param jobs_queue [JobsQueue]
      # @return [Worker]
      def initialize(jobs_queue)
        @jobs_queue = jobs_queue
        @thread = Thread.new do
          # If anything goes wrong in this worker thread, it means something went really wrong and
          # we should terminate.
          Thread.current.abort_on_exception = true
          loop { break unless process }
        end
      end

      private

      # Fetches a single job, processes it and marks as completed.
      #
      # @note We do not have error handling here, as no errors should propagate this far. If they
      #   do, it is a critical error and should bubble up.
      #
      # @note Upon closing the jobs queue, worker will close it's thread
      def process
        job = @jobs_queue.pop

        if job
          job.prepare

          # If a job is marked as non blocking, we can run a tick in the job queue and if there
          # are no other blocking factors, the job queue will be unlocked.
          # If this does not run, all the things will be blocking and job queue won't allow to
          # pass it until done.
          @jobs_queue.tick(job.group_id) if job.non_blocking?

          job.call

          job.teardown

          true
        else
          false
        end
      # We signal critical exceptions, notify and do not allow worker to fail
      # rubocop:disable Lint/RescueException
      rescue Exception => e
        # rubocop:enable Lint/RescueException
        Karafka.monitor.instrument(
          'error.occurred',
          caller: self,
          error: e,
          type: 'worker.process.error'
        )
      ensure
        # job can be nil when the queue is being closed
        @jobs_queue.complete(job) if job
      end
    end
  end
end