lib/adaptation/druby_subscriber.rb in adaptation-0.1.3 vs lib/adaptation/druby_subscriber.rb in adaptation-0.1.4

- old
+ new

@@ -5,77 +5,71 @@ module Mom class DrubySubscriber + # constructor, called from the subscribe command def initialize subscriber_uri, mom_uri, topics @subscriber_uri = subscriber_uri @mom_uri = mom_uri @topics = topics - @adaptors_list = Array.new + @messages = [] end - def process message, topic - if ( (@topics.include?(topic)) or (@topics.include?("all")) ) - system("ruby public/dispatch.rb '#{message}'") - end - puts "#{topic} => #{message}" + # method to receive messages, called from the mom + def send_message message, topic + # Insert message into messages buffer, and awake + # message processor (@sleeper) if paused + puts "-----------------------------------" + puts "Received message in topic: #{topic}" + puts "#{message}" + puts "-----------------------------------" + @messages << {:message => message, :topic => topic} + @sleeper.run if @sleeper.stop? end - + def subscription_result subscribed if subscribed puts "Subscribed to mom (#{@mom_uri}). Listening at #{@subscriber_uri}" end end - # TODO: Think what exactly is this subscriber: - # - path to file - # - combination of a path and a set of conditions <- looks better... - # - ... - def note_me_down adaptor - unless @adaptors_list.include? adaptor - @adaptors_list << adaptor - puts "Added adaptor: #{adaptor}" - end - end - def start - uri_owner = false begin # try to start the subscriber service, # using the uri specified in config/mom.yml DRb.start_service(@subscriber_uri, self) # subscribe that uri to the mom mom = DRbObject.new(nil, @mom_uri) mom.subscribe @subscriber_uri - uri_owner = true rescue Exception => e - # desired uri already in use... - # if the process using it is a subscriber, this - # shouldn't be a problem - uri_owner = false + # desired uri already in use + puts "Couldn't start subscriber at #{@subscriber_uri}. Address already in use?" + return end + + @sleeper = Thread.new{ + loop do - # try to tell to the subscriber using the uri - # (may be this self instance of DrubySubscriber), - # that we want to be executed when the MOM - # calls its process method - begin - subscriber = DRbObject.new(nil, @subscriber_uri) - routes_file = File.expand_path(File.dirname(__FILE__) + '/../config/routes.rb') - subscriber.note_me_down routes_file - rescue Exception => e - # the process using @subscriber_uri is not - # an instance of DrubySubsciber... - puts "Couldn't start or find subscriber at #{@subscriber_uri}:" - puts "#{e}" - return - end + # process all messages + while !@messages.empty? + @messages.each do |message| + if ( (@topics.include?(message[:topic])) or (@topics.include?("all")) ) + system("ruby public/dispatch.rb '#{message[:message]}'") + end + @messages.delete message + end + end - if uri_owner - DRb.thread.join # Don't exit just yet! - end + # go to sleep + Thread.stop + + end + } + + @sleeper.join + Drb.thread.join end end