Sha256: 16cdabe2eb596291ebdad8ce0c8e361589ae80466b435d1ae60fef5abfbfb55a

Contents?: true

Size: 1.99 KB

Versions: 28

Compression:

Stored size: 1.99 KB

Contents

# encoding: utf-8

$:.unshift('.')
$:.unshift(File.expand_path(File.dirname(__FILE__)) + '/../lib')
require 'zmq'
require 'pp'

class ConsumerHandler < ZMQ::Handler
  def initialize(pollable, consumer)
    super
    @consumer = consumer
  end

  def on_readable
    @consumer.perform recv
  end
end

class ProducerHandler < ZMQ::Handler
  def initialize(pollable, producer)
    super
    @producer = producer
  end

  def on_writable
    @producer.work
  end
end

class Consumer
  attr_reader :thread
  def initialize(ctx, endpoint, topic = "")
    @socket = ctx.socket(:SUB)
    @socket.subscribe("")
    @socket.connect(endpoint)
    # verbose output
    @socket.verbose = true
    @socket.subscribe(topic)
    @jobs, @working = 0, 0.0
  end

  def start
    ZL.register_readable(@socket, ConsumerHandler, self)
    self
  end

  def stop
    ZL.remove(@socket)
    stats
  end

  def perform(work)
    # Random hot loop to simulate CPU intensive work
    start = Time.now
    work.to_i.times{}
    @jobs += 1
    @working += (Time.now - start).to_f
  end

  private
  def stats
    puts "Processed #{@jobs} jobs in %.4f seconds" % @working
    $stdout.flush
  end
end

class Producer
  def initialize(ctx, endpoint, topic = "")
    @ctx, @endpoint, @topic, @consumers = ctx, endpoint, topic, []
    @socket = ctx.socket(:PUB)
    # verbose output
    @socket.verbose = true
    @socket.bind(endpoint)
    @socket.linger = 1
    @interrupted = false
  end

  def spawn_consumers(count = 10)
    count.times do
      @consumers << Consumer.new(@ctx, @endpoint, @topic).start
    end
  end

  def start
    ZL.register_writable(@socket, ProducerHandler, self)
  end

  def stop
    @consumers.each{|c| c.stop }
    ZL.remove(@socket)
    ZL.stop
    @ctx.destroy
  end

  def work
    work = "#{@topic}#{rand(100_000).to_s}"
    @socket.send(work)
  end
end

ZL.run do
  ctx = ZMQ::Context.new
  producer = Producer.new(ctx, 'inproc://example.loop')
  producer.spawn_consumers
  trap(:INT) do
    producer.stop
  end
  producer.start
end

Version data entries

28 entries across 28 versions & 1 rubygems

Version Path
rbczmq-1.7.9 examples/loop.rb
rbczmq-1.7.8 examples/loop.rb
rbczmq-1.7.7 examples/loop.rb
rbczmq-1.7.6 examples/loop.rb
rbczmq-1.7.5 examples/loop.rb
rbczmq-1.7.4 examples/loop.rb
rbczmq-1.7.3 examples/loop.rb
rbczmq-1.7.2 examples/loop.rb
rbczmq-1.7.1 examples/loop.rb
rbczmq-1.7.0 examples/loop.rb
rbczmq-1.6.4 examples/loop.rb
rbczmq-1.6.2 examples/loop.rb
rbczmq-1.6 examples/loop.rb
rbczmq-1.5 examples/loop.rb
rbczmq-1.4 examples/loop.rb
rbczmq-1.3 examples/loop.rb
rbczmq-1.2 examples/loop.rb
rbczmq-1.1 examples/loop.rb
rbczmq-1.0 examples/loop.rb
rbczmq-0.9 examples/loop.rb