Sha256: 758e4a70f69b2f619c198e2804368f93dd0e1527e242c8400ffb597f1e414019

Contents?: true

Size: 1.39 KB

Versions: 4

Compression:

Stored size: 1.39 KB

Contents

module WorkerRoulette
  class Tradesman
    attr_reader :sender
    def initialize(client_pool, pubsub_pool, namespace = nil)
      @client_pool = client_pool
      @pubsub_pool = pubsub_pool
      @namespace   = namespace
      @channel     = namespace || WorkerRoulette::JOB_NOTIFICATIONS
    end

    def job_board_key
      @job_board_key ||= "#{@namespace + ':' if @namespace}#{WorkerRoulette::JOB_BOARD}"
    end

    def sender_key
      @sender_key = "#{@namespace + ':' if @namespace}#{@sender}"
    end

    def wait_for_work_orders(on_subscribe_callback = nil, &block)
      @pubsub_pool.with do |redis|
        redis.subscribe(@channel) do |on|
          on.subscribe {on_subscribe_callback.call if on_subscribe_callback}
          on.message   {self.unsubscribe; block.call(work_orders!) if block}
        end
      end
    end

    def work_orders!
      @client_pool.with do |redis|
        get_sender_for_next_job(redis)
        results = redis.multi do
          redis.lrange(sender_key, 0, -1)
          redis.del(sender_key)
          redis.zrem(job_board_key, sender_key)
        end
        ((results || []).first || []).map {|work_order| Oj.load(work_order)}
      end
    end

    def unsubscribe
      @pubsub_pool.with {|redis| redis.unsubscribe(@channel)}
    end

  private
    def get_sender_for_next_job(redis)
      @sender = (redis.zrange(job_board_key, 0, 0) || []).first.to_s
    end
  end
end

Version data entries

4 entries across 4 versions & 1 rubygems

Version Path
worker_roulette-0.1.0 lib/worker_roulette/tradesman.rb
worker_roulette-0.0.12 lib/worker_roulette/tradesman.rb
worker_roulette-0.0.11 lib/worker_roulette/tradesman.rb
worker_roulette-0.0.10 lib/worker_roulette/tradesman.rb