Sha256: deae393ce4c978066c8c169498625880f272e3e43e3791308e5533b314575e40

Contents?: true

Size: 1.3 KB

Versions: 2

Compression:

Stored size: 1.3 KB

Contents

module WorkerRoulette
  class Foreman
    attr_reader :sender
    COUNTER_KEY = 'counter_key'

    def initialize(sender, redis_pool)
      @sender = sender
      @redis_pool = redis_pool
    end

    def job_board_key
      WorkerRoulette::JOB_BOARD
    end

    def counter_key
      COUNTER_KEY
    end

    def enqueue_work_order_without_headers(work_order)
      #Caveat Emptor: There is a race condition here, but it not serious;
      #the count may be incremented again by another process before the sender
      #is added to the job_queue. This is not a big deal bc it just means that
      #the sender's queue will be processed one slot behind it's rightful place.
      #This does not effect work_order ordering.
      @redis_pool.with do |redis|
        @count = redis.incr(COUNTER_KEY)
        redis.multi do
          redis.zadd(WorkerRoulette::JOB_BOARD, @count, sender)
          redis.rpush(sender, Oj.dump(work_order))
          redis.publish(WorkerRoulette::JOB_NOTIFICATIONS, WorkerRoulette::JOB_NOTIFICATIONS)
        end
      end
    end

    def enqueue_work_order(work_order, headers = {})
      work_order = {headers: default_headers.merge(headers), payload: work_order}
      enqueue_work_order_without_headers(work_order)
    end

    def default_headers
      Hash[sender: sender]
    end
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
worker_roulette-0.0.3 lib/worker_roulette/foreman.rb
worker_roulette-0.0.2 lib/worker_roulette/foreman.rb