Sha256: 3c2344fc95e3c28d60cf0cce4d6ce5db629d73c23a1a99297c79122b8232538d

Contents?: true

Size: 1.38 KB

Versions: 6

Compression:

Stored size: 1.38 KB

Contents

module Eventboss
  # Worker is part of a pool of workers, handles UnitOfWork lifecycle
  class Worker
    include Logging
    include SafeThread

    attr_reader :id

    def initialize(launcher, id, bus, restart_on: [Exception])
      @id = id
      @launcher = launcher
      @bus = bus
      @thread = nil
      @restart_on = restart_on
    end

    def start
      @thread = safe_thread(id, &method(:run))
    end

    def run
      while (work = @bus.pop)
        run_work(work)
      end
      @launcher.worker_stopped(self)
    rescue Eventboss::Shutdown
      @launcher.worker_stopped(self)
    rescue *@restart_on => exception
      handle_exception(exception, worker_id: id)
      # Restart the worker in case of hard exception
      # Message won't be delete from SQS and will be visible later
      @launcher.worker_stopped(self, restart: true)
    end

    def run_work(work)
      server_middleware.invoke(work) do
        work.run
      end
    end

    def terminate(wait = false)
      stop_token
      return unless @thread
      @thread.value if wait
    end

    def kill(wait = false)
      return unless @thread
      @thread.raise Eventboss::Shutdown
      @thread.value if wait
    end

    private

    # stops the loop, by enqueuing falsey value
    def stop_token
      @bus << nil
    end

    def server_middleware
      Eventboss.configuration.server_middleware
    end
  end
end

Version data entries

6 entries across 6 versions & 1 rubygems

Version Path
eventboss-1.9.2 lib/eventboss/worker.rb
eventboss-1.9.1 lib/eventboss/worker.rb
eventboss-1.9.0 lib/eventboss/worker.rb
eventboss-1.8.1 lib/eventboss/worker.rb
eventboss-1.8.0 lib/eventboss/worker.rb
eventboss-1.7.0 lib/eventboss/worker.rb