Sha256: 33a0a662815e119d195944f7c9cd3c01a68e6a2a86fd84920a5b623fd5872d62

Contents?: true

Size: 881 Bytes

Versions: 1

Compression:

Stored size: 881 Bytes

Contents

module Afterparty
  class Worker
    include QueueHelpers

    def initialize options = {}
      @options = options
      @options[:adapter] ||= :redis
      @options[:namespace] ||= :default
      @options[:sleep] ||= 10
      @options[:logger] ||= Logger.new($stderr)
      self
    end

    def consume
      @stopped = false
      # puts "starting worker with namespace [#{@options[:namespace]}]."
      @thread = Thread.new {
        consume_sync
      }
      @thread
    end

    def consume_next
      if (job = next_valid_job)
        run job
      end
    end

    def consume_sync
      while !@stopped
        job = next_valid_job
        if job
          puts "Executing job: #{job.id}"
          run job
        else
          sleep(@options[:sleep])
        end
      end
    end

    def stop
      @stopped = true
      @thread.join(0) if @thread
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
afterparty-0.1.0 lib/afterparty/worker.rb