Sha256: 16cdabe2eb596291ebdad8ce0c8e361589ae80466b435d1ae60fef5abfbfb55a
Contents?: true
Size: 1.99 KB
Versions: 28
Compression:
Stored size: 1.99 KB
Contents
# encoding: utf-8 $:.unshift('.') $:.unshift(File.expand_path(File.dirname(__FILE__)) + '/../lib') require 'zmq' require 'pp' class ConsumerHandler < ZMQ::Handler def initialize(pollable, consumer) super @consumer = consumer end def on_readable @consumer.perform recv end end class ProducerHandler < ZMQ::Handler def initialize(pollable, producer) super @producer = producer end def on_writable @producer.work end end class Consumer attr_reader :thread def initialize(ctx, endpoint, topic = "") @socket = ctx.socket(:SUB) @socket.subscribe("") @socket.connect(endpoint) # verbose output @socket.verbose = true @socket.subscribe(topic) @jobs, @working = 0, 0.0 end def start ZL.register_readable(@socket, ConsumerHandler, self) self end def stop ZL.remove(@socket) stats end def perform(work) # Random hot loop to simulate CPU intensive work start = Time.now work.to_i.times{} @jobs += 1 @working += (Time.now - start).to_f end private def stats puts "Processed #{@jobs} jobs in %.4f seconds" % @working $stdout.flush end end class Producer def initialize(ctx, endpoint, topic = "") @ctx, @endpoint, @topic, @consumers = ctx, endpoint, topic, [] @socket = ctx.socket(:PUB) # verbose output @socket.verbose = true @socket.bind(endpoint) @socket.linger = 1 @interrupted = false end def spawn_consumers(count = 10) count.times do @consumers << Consumer.new(@ctx, @endpoint, @topic).start end end def start ZL.register_writable(@socket, ProducerHandler, self) end def stop @consumers.each{|c| c.stop } ZL.remove(@socket) ZL.stop @ctx.destroy end def work work = "#{@topic}#{rand(100_000).to_s}" @socket.send(work) end end ZL.run do ctx = ZMQ::Context.new producer = Producer.new(ctx, 'inproc://example.loop') producer.spawn_consumers trap(:INT) do producer.stop end producer.start end
Version data entries
28 entries across 28 versions & 1 rubygems