Sha256: 58c9ccea150a9a01f478961e033fcc669b7c18c0abc7094176ef9f63f829ac6c

Contents?: true

Size: 1.14 KB

Versions: 1

Compression:

Stored size: 1.14 KB

Contents

module WorkerRoulette
  class Tradesman
    attr_reader :sender
    def initialize(client_pool, pubsub_pool)
      @client_pool = client_pool
      @pubsub_pool = pubsub_pool
    end

    def job_board_key
      WorkerRoulette::JOB_BOARD
    end

    def wait_for_work_orders(on_subscribe_callback = nil, &block)
      @pubsub_pool.with do |redis|
        redis.subscribe(WorkerRoulette::JOB_NOTIFICATIONS) do |on|
          on.subscribe {on_subscribe_callback.call if on_subscribe_callback}
          on.message   {block.call(work_orders!) if block}
        end
      end
    end

    def work_orders!
      @client_pool.with do |redis|
        get_sender_for_next_job(redis)
        results = redis.multi do
          redis.lrange(sender, 0, -1)
          redis.del(sender)
          redis.zrem(WorkerRoulette::JOB_BOARD, sender)
        end
        ((results || []).first || []).map {|work_order| Oj.load(work_order)}
      end
    end

    def unsubscribe
      @pubsub_pool.with {|redis| redis.unsubscribe}
    end

  private
    def get_sender_for_next_job(redis)
      @sender = (redis.zrange(WorkerRoulette::JOB_BOARD, 0, 0) || []).first.to_s
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
worker_roulette-0.0.3 lib/worker_roulette/tradesman.rb