Sha256: 77cd46e6c2a592def611e49128c7db8fcdb53d6f349c11976ee79df7c586b5a2

Contents?: true

Size: 1.58 KB

Versions: 3

Compression:

Stored size: 1.58 KB

Contents

require_relative './tradesman'
module WorkerRoulette
  class ATradesman < Tradesman
    def wait_for_work_orders(on_subscribe_callback = nil, &on_message_callback)
      @redis_pubsub ||= WorkerRoulette.new_redis_pubsub #cannot use connection pool bc redis expects each obj to own its own pubsub connection for the life of the subscription
      @redis_pubsub.on(:subscribe) {|channel, subscription_count| on_subscribe_callback.call(channel, subscription_count) if on_subscribe_callback}
      @redis_pubsub.on(:message)   {|channel, message| set_timer(on_message_callback); get_messages(message, channel, on_message_callback)}
      @redis_pubsub.subscribe(@channel)
    end

    def unsubscribe(&callback)
      deferable = @redis_pubsub.unsubscribe(@channel)
      deferable.callback do
        @redis_pubsub.close_connection
        @redis_pubsub = nil
        callback.call
      end
      deferable.errback do
        @redis_pubsub.close_connection
        @redis_pubsub = nil
        callback.call
      end
    end

  private
    attr_reader :timer
    def get_messages(message, channel, on_message_callback)
      return unless on_message_callback
      work_orders! do |work_orders_1|
        work_orders! do |work_orders|
          on_message_callback.call(work_orders_1 + work_orders, message, channel)
        end
      end
    end

    def set_timer(on_message_callback)
      return unless on_message_callback
      @timer && @timer.cancel
      @timer = EM::PeriodicTimer.new(rand(20..25)) do
        work_orders! {|work_orders| on_message_callback.call(work_orders, nil, nil)}
      end
    end
  end
end

Version data entries

3 entries across 3 versions & 1 rubygems

Version Path
worker_roulette-0.1.5 lib/worker_roulette/a_tradesman.rb
worker_roulette-0.1.4 lib/worker_roulette/a_tradesman.rb
worker_roulette-0.1.3 lib/worker_roulette/a_tradesman.rb