Sha256: 7957250ad890e8ab34a833e486b694ef8631d2b4c2e1a5facdd07a0cffdb1860

Contents?: true

Size: 1.22 KB

Versions: 8

Compression:

Stored size: 1.22 KB

Contents

#!/usr/bin/env ruby

require "rubygems"
require "bundler"

Bundler.setup(:default)

$:.unshift(File.dirname(__FILE__) + "/../lib")

require "messaging"

require_relative "consumer_processor"
require_relative "producer_processor"
require_relative "duplex_processor"

# Setup configuration
Messaging::Configuration.setup do |config|
  config.publish_to = "amqp://localhost"
  config.consume_from = [
    "amqp://localhost",
    "amqp://localhost",
    "amqp://localhost"
  ]
end

EM.run do
  # Instantiate the processors
  consumer = ConsumerProcessor.new
  producer = ProducerProcessor.new
  duplex   = DuplexProcessor.new

  # Start the consumers
  consumer.consume
  duplex.consume

  # Create a handle to the publish timer, to cancel later
  timer = EM.add_periodic_timer(0.5) do
    producer.publish("exchange", "direct", "key", "a_producer_payload")
    duplex.publish("exchange", "direct", "key", "a_duplex_payload")
  end

  # Handle Ctrl-C interrupt
  trap("INT") do
    puts "Stopping..."

    # Cancel the publisher timer
    EM.cancel_timer(timer)

    # Disconnect the producer/consumer connections
    consumer.disconnect
    producer.disconnect
    duplex.disconnect

    # Shutdown the EM loop
    EM.add_timer(1) { EM.stop }
  end
end

Version data entries

8 entries across 8 versions & 1 rubygems

Version Path
amqp-subscribe-many-0.2.1 examples/run
amqp-subscribe-many-0.2.0 examples/run
amqp-subscribe-many-0.1.7 examples/run
amqp-subscribe-many-0.1.6 examples/run
amqp-subscribe-many-0.1.5 examples/run
amqp-subscribe-many-0.1.4 examples/run
amqp-subscribe-many-0.1.3 examples/run
amqp-subscribe-many-0.1.2 examples/run