Sha256: 5121e0cb6ab734ddf7bc27f7e68daa3dbfc8d9a3222ffb03ea8a4cb1e041d482
Contents?: true
Size: 1.39 KB
Versions: 2
Compression:
Stored size: 1.39 KB
Contents
module WorkerRoulette class Tradesman attr_reader :sender def initialize(client_pool, pubsub_pool, namespace = nil) @client_pool = client_pool @pubsub_pool = pubsub_pool @namespace = namespace @channel = namespace || WorkerRoulette::JOB_NOTIFICATIONS end def job_board_key @job_board_key ||= "#{@namespace + ':' if @namespace}#{WorkerRoulette::JOB_BOARD}" end def sender_key @sender_key ||= "#{@namespace + ':' if @namespace}#{@sender}" end def wait_for_work_orders(on_subscribe_callback = nil, &block) @pubsub_pool.with do |redis| redis.subscribe(@channel) do |on| on.subscribe {on_subscribe_callback.call if on_subscribe_callback} on.message {self.unsubscribe; block.call(work_orders!) if block} end end end def work_orders! @client_pool.with do |redis| get_sender_for_next_job(redis) results = redis.multi do redis.lrange(sender_key, 0, -1) redis.del(sender_key) redis.zrem(job_board_key, sender_key) end ((results || []).first || []).map {|work_order| Oj.load(work_order)} end end def unsubscribe @pubsub_pool.with {|redis| redis.unsubscribe(@channel)} end private def get_sender_for_next_job(redis) @sender = (redis.zrange(job_board_key, 0, 0) || []).first.to_s end end end
Version data entries
2 entries across 2 versions & 1 rubygems
Version | Path |
---|---|
worker_roulette-0.0.9 | lib/worker_roulette/tradesman.rb |
worker_roulette-0.0.8 | lib/worker_roulette/tradesman.rb |