Sha256: 7957250ad890e8ab34a833e486b694ef8631d2b4c2e1a5facdd07a0cffdb1860
Contents?: true
Size: 1.22 KB
Versions: 8
Compression:
Stored size: 1.22 KB
Contents
#!/usr/bin/env ruby require "rubygems" require "bundler" Bundler.setup(:default) $:.unshift(File.dirname(__FILE__) + "/../lib") require "messaging" require_relative "consumer_processor" require_relative "producer_processor" require_relative "duplex_processor" # Setup configuration Messaging::Configuration.setup do |config| config.publish_to = "amqp://localhost" config.consume_from = [ "amqp://localhost", "amqp://localhost", "amqp://localhost" ] end EM.run do # Instantiate the processors consumer = ConsumerProcessor.new producer = ProducerProcessor.new duplex = DuplexProcessor.new # Start the consumers consumer.consume duplex.consume # Create a handle to the publish timer, to cancel later timer = EM.add_periodic_timer(0.5) do producer.publish("exchange", "direct", "key", "a_producer_payload") duplex.publish("exchange", "direct", "key", "a_duplex_payload") end # Handle Ctrl-C interrupt trap("INT") do puts "Stopping..." # Cancel the publisher timer EM.cancel_timer(timer) # Disconnect the producer/consumer connections consumer.disconnect producer.disconnect duplex.disconnect # Shutdown the EM loop EM.add_timer(1) { EM.stop } end end
Version data entries
8 entries across 8 versions & 1 rubygems