Sha256: 33866ef9506d629d529b8c99b64f57ffdff88049e32762e6ffee657bca072942

Contents?: true

Size: 1.91 KB

Versions: 28

Compression:

Stored size: 1.91 KB

Contents

# encoding: utf-8

$:.unshift('.')
$:.unshift(File.expand_path(File.dirname(__FILE__)) + '/../lib')
require 'zmq'
require 'pp'

# REQ / REP topology

Thread.abort_on_exception = true

class Server
  def initialize(ctx, endpoint)
    @thread = nil
    @connect = Proc.new do
      @socket = ctx.socket(:REP)
      # verbose output
      @socket.verbose = true
      @socket.bind(endpoint)
      @socket.linger = 1
    end
    @jobs, @working = 0, 0.0
  end

  def start
    @thread = Thread.new do
      @connect.call
      loop do
        perform(@socket.recv)
        break if Thread.current[:interrupted]
      end
    end
    self
  end

  def stop
    return unless @thread
    @thread[:interrupted] = true
    @thread.join(0.1) unless @thread.stop?
    stats
  end

  def perform(work)
    # Random hot loop to simulate CPU intensive work
    start = Time.now
    work.to_i.times{}
    @jobs += 1
    @working += (Time.now - start).to_f
    @socket.send "done"
  end

  private
  def stats
    puts "Processed #{@jobs} jobs in %.4f seconds" % @working
    $stdout.flush
  end
end

class Client
  def initialize(ctx, endpoint)
    @ctx, @endpoint, @server, @interrupted = ctx, endpoint, nil, false
    @socket = ctx.socket(:REQ)
    # verbose output
    @socket.verbose = true
  end

  def spawn_server
    @server = Server.new(@ctx, @endpoint).start
    sleep 0.01 # give each thread time to spin up
    connect
  end

  def start(messages = 100)
    messages.to_i.times do
      request = "#{@topic}#{rand(100_000).to_s}"
      @socket.send(request)
      response = @socket.recv
      break if @interrupted
    end
    @server.stop
    @ctx.destroy
  end

  def stop
    @interrupted = true
  end
  private
  def connect
    @socket.connect(@endpoint)
    @socket.linger = 1
  end
end

ctx = ZMQ::Context.new
client = Client.new(ctx, 'inproc://example.req_rep')
client.spawn_server
trap(:INT) do
  client.stop
end
client.start(ENV['MESSAGES'] || 1000)

Version data entries

28 entries across 28 versions & 1 rubygems

Version Path
rbczmq-1.7.9 examples/req_rep.rb
rbczmq-1.7.8 examples/req_rep.rb
rbczmq-1.7.7 examples/req_rep.rb
rbczmq-1.7.6 examples/req_rep.rb
rbczmq-1.7.5 examples/req_rep.rb
rbczmq-1.7.4 examples/req_rep.rb
rbczmq-1.7.3 examples/req_rep.rb
rbczmq-1.7.2 examples/req_rep.rb
rbczmq-1.7.1 examples/req_rep.rb
rbczmq-1.7.0 examples/req_rep.rb
rbczmq-1.6.4 examples/req_rep.rb
rbczmq-1.6.2 examples/req_rep.rb
rbczmq-1.6 examples/req_rep.rb
rbczmq-1.5 examples/req_rep.rb
rbczmq-1.4 examples/req_rep.rb
rbczmq-1.3 examples/req_rep.rb
rbczmq-1.2 examples/req_rep.rb
rbczmq-1.1 examples/req_rep.rb
rbczmq-1.0 examples/req_rep.rb
rbczmq-0.9 examples/req_rep.rb