lib/adaptation/druby_subscriber.rb in adaptation-0.1.3 vs lib/adaptation/druby_subscriber.rb in adaptation-0.1.4
- old
+ new
@@ -5,77 +5,71 @@
module Mom
class DrubySubscriber
+ # constructor, called from the subscribe command
def initialize subscriber_uri, mom_uri, topics
@subscriber_uri = subscriber_uri
@mom_uri = mom_uri
@topics = topics
- @adaptors_list = Array.new
+ @messages = []
end
- def process message, topic
- if ( (@topics.include?(topic)) or (@topics.include?("all")) )
- system("ruby public/dispatch.rb '#{message}'")
- end
- puts "#{topic} => #{message}"
+ # method to receive messages, called from the mom
+ def send_message message, topic
+ # Insert message into messages buffer, and awake
+ # message processor (@sleeper) if paused
+ puts "-----------------------------------"
+ puts "Received message in topic: #{topic}"
+ puts "#{message}"
+ puts "-----------------------------------"
+ @messages << {:message => message, :topic => topic}
+ @sleeper.run if @sleeper.stop?
end
-
+
def subscription_result subscribed
if subscribed
puts "Subscribed to mom (#{@mom_uri}). Listening at #{@subscriber_uri}"
end
end
- # TODO: Think what exactly is this subscriber:
- # - path to file
- # - combination of a path and a set of conditions <- looks better...
- # - ...
- def note_me_down adaptor
- unless @adaptors_list.include? adaptor
- @adaptors_list << adaptor
- puts "Added adaptor: #{adaptor}"
- end
- end
-
def start
- uri_owner = false
begin
# try to start the subscriber service,
# using the uri specified in config/mom.yml
DRb.start_service(@subscriber_uri, self)
# subscribe that uri to the mom
mom = DRbObject.new(nil, @mom_uri)
mom.subscribe @subscriber_uri
- uri_owner = true
rescue Exception => e
- # desired uri already in use...
- # if the process using it is a subscriber, this
- # shouldn't be a problem
- uri_owner = false
+ # desired uri already in use
+ puts "Couldn't start subscriber at #{@subscriber_uri}. Address already in use?"
+ return
end
+
+ @sleeper = Thread.new{
+ loop do
- # try to tell to the subscriber using the uri
- # (may be this self instance of DrubySubscriber),
- # that we want to be executed when the MOM
- # calls its process method
- begin
- subscriber = DRbObject.new(nil, @subscriber_uri)
- routes_file = File.expand_path(File.dirname(__FILE__) + '/../config/routes.rb')
- subscriber.note_me_down routes_file
- rescue Exception => e
- # the process using @subscriber_uri is not
- # an instance of DrubySubsciber...
- puts "Couldn't start or find subscriber at #{@subscriber_uri}:"
- puts "#{e}"
- return
- end
+ # process all messages
+ while !@messages.empty?
+ @messages.each do |message|
+ if ( (@topics.include?(message[:topic])) or (@topics.include?("all")) )
+ system("ruby public/dispatch.rb '#{message[:message]}'")
+ end
+ @messages.delete message
+ end
+ end
- if uri_owner
- DRb.thread.join # Don't exit just yet!
- end
+ # go to sleep
+ Thread.stop
+
+ end
+ }
+
+ @sleeper.join
+ Drb.thread.join
end
end