examples/mq/clock.rb in amqp-0.7.0.pre vs examples/mq/clock.rb in amqp-0.7.0
- old
+ new
@@ -1,42 +1,42 @@
+# encoding: utf-8
+
$:.unshift File.dirname(__FILE__) + '/../../lib'
require 'mq'
-AMQP.start(:host => 'localhost', port: 5673) do
+AMQP.start(:host => 'localhost') do |connection|
- def log *args
+ # Send Connection.Close on Ctrl+C
+ trap(:INT) do
+ unless connection.closing?
+ connection.close { exit! }
+ end
+ end
+
+ def log(*args)
p args
end
# AMQP.logging = true
clock = MQ.new.fanout('clock')
- clock2 = MQ.new.fanout('clock2')
-
- EM.add_periodic_timer(1){
+ EM.add_periodic_timer(1) {
puts
log :publishing, time = Time.now
clock.publish(Marshal.dump(time))
}
- EM.add_periodic_timer(1){
- puts
-
- log 2, :publishing, time = Time.now
- clock.publish(Marshal.dump(time))
- }
-
amq = MQ.new
- q = amq.queue('every second')
- q.bind(amq.fanout('clock')).subscribe{ |time|
+ amq.queue('every second').bind(amq.fanout('clock')).subscribe { |time|
log 'every second', :received, Marshal.load(time)
}
- amq.queue!('every second').bind(amq.fanout('clock2')).subscribe{ |time|
- log 2, 'every second', :received, Marshal.load(time)
+ amq = MQ.new
+ amq.queue('every 5 seconds').bind(amq.fanout('clock')).subscribe { |time|
+ time = Marshal.load(time)
+ log 'every 5 seconds', :received, time if time.strftime('%S').to_i % 5 == 0
}
-
end
__END__