Sha256: 77cd46e6c2a592def611e49128c7db8fcdb53d6f349c11976ee79df7c586b5a2
Contents?: true
Size: 1.58 KB
Versions: 3
Compression:
Stored size: 1.58 KB
Contents
require_relative './tradesman' module WorkerRoulette class ATradesman < Tradesman def wait_for_work_orders(on_subscribe_callback = nil, &on_message_callback) @redis_pubsub ||= WorkerRoulette.new_redis_pubsub #cannot use connection pool bc redis expects each obj to own its own pubsub connection for the life of the subscription @redis_pubsub.on(:subscribe) {|channel, subscription_count| on_subscribe_callback.call(channel, subscription_count) if on_subscribe_callback} @redis_pubsub.on(:message) {|channel, message| set_timer(on_message_callback); get_messages(message, channel, on_message_callback)} @redis_pubsub.subscribe(@channel) end def unsubscribe(&callback) deferable = @redis_pubsub.unsubscribe(@channel) deferable.callback do @redis_pubsub.close_connection @redis_pubsub = nil callback.call end deferable.errback do @redis_pubsub.close_connection @redis_pubsub = nil callback.call end end private attr_reader :timer def get_messages(message, channel, on_message_callback) return unless on_message_callback work_orders! do |work_orders_1| work_orders! do |work_orders| on_message_callback.call(work_orders_1 + work_orders, message, channel) end end end def set_timer(on_message_callback) return unless on_message_callback @timer && @timer.cancel @timer = EM::PeriodicTimer.new(rand(20..25)) do work_orders! {|work_orders| on_message_callback.call(work_orders, nil, nil)} end end end end
Version data entries
3 entries across 3 versions & 1 rubygems
Version | Path |
---|---|
worker_roulette-0.1.5 | lib/worker_roulette/a_tradesman.rb |
worker_roulette-0.1.4 | lib/worker_roulette/a_tradesman.rb |
worker_roulette-0.1.3 | lib/worker_roulette/a_tradesman.rb |