Sha256: 5cf691f7b5f2cf494236d3a8579bc9191f428d0b330b442f6331ea2b257c6b87

Contents?: true

Size: 1.75 KB

Versions: 4

Compression:

Stored size: 1.75 KB

Contents

module Sequel
  # A Worker is a thread that accepts jobs off a work queue and
  # processes them in the background.  It accepts an optional
  # database where it wruns all of its work inside a transaction.
  class Worker < Thread
    attr_reader :queue
    attr_reader :errors
  
    # Setup the interal variables.  If a database is given,
    # run the thread inside a database transaction. Continue
    # to work until #join is called.
    def initialize(db = nil)
      @queue = Queue.new
      @errors = []
      t = self
      t.abort_on_exception = true
      @transaction = !db.nil?
      db ? super {db.transaction {t.work}} : super {t.work}
    end
    
    # Add a job to the queue, specified either as a proc argument
    # or as a block.
    def async(proc = nil, &block)
      @queue << (proc || block)
      self
    end
    alias_method :add, :async
    alias_method :<<, :async
  
    # Whether the worker is actively working and/or has work scheduled
    def busy?
      @cur || !@queue.empty?
    end
  
    # Wait until the worker is no longer busy and then stop working.
    def join
      sleep(0.1) while busy?
      self.raise Error::WorkerStop
      super
    end

    # Continually get jobs from the work queue and process them.
    def work
      begin
        loop {next_job}
      rescue Sequel::Error::WorkerStop # signals the worker thread to stop
      ensure
        raise Sequel::Error::Rollback if @transaction && !@errors.empty?
      end
    end
    
    private

    # Get the next job from the work queue and process it.
    def next_job
      begin
        @cur = @queue.pop
        @cur.call
      rescue Error::WorkerStop => e
        raise e
      rescue Exception => e
        @errors << e
      ensure
        @cur = nil
      end
    end
  end
end

Version data entries

4 entries across 4 versions & 1 rubygems

Version Path
sequel_core-2.1.0 lib/sequel_core/worker.rb
sequel_core-2.0.0 lib/sequel_core/worker.rb
sequel_core-2.0.1 lib/sequel_core/worker.rb
sequel_core-2.2.0 lib/sequel_core/worker.rb