#!/usr/bin/env ruby # == Imports ================================================================ require 'optparse' require 'thread' require_relative '../lib/skein' # == Support Classes ======================================================== class EchoWorker < Skein::Client::Worker def initialize(queue_name, debug: false, **options) super(queue_name, options) @debug = debug end def echo(text) if (@debug) puts text end text end end # == Support Methods ======================================================== def rescue_safely(options) yield rescue Object => e $stderr.puts('[%s] %s' % [ e.class, e ]) if (options[:trace]) $stderr.puts(e.backtrace) end exit(-1) end def in_thread(options) Thread.new do begin Thread.abort_on_exception = true rescue_safely(options) do yield end end end end # == Main =================================================================== options = { count: 1, threads: 1, queue_name: 'test_echo', exchange_name: nil } parser = OptionParser.new do |parser| parser.on('-v', '--verbose') do options[:verbose] = true end parser.on('-n', '--count=n') do |n| options[:count] = n.to_i end parser.on('-c', '--threads=n') do |n| options[:threads] = n.to_i end parser.on('-t', '--trace') do options[:trace] = true end parser.on('-d', '--debug') do options[:debug] = true end parser.on('-e', '--exchange=s') do |s| options[:exchange_name] = s end parser.on('-q', '--queue=s') do |s| options[:queue_name] = s end parser.on('-h', '--help') do puts parser exit(0) end end Skein::RabbitMQ.force_require! args = parser.parse(*ARGV) case (command = args.shift) when 'config' Skein::Support.hash_format(Skein.config).each do |line| puts line end when 'test' rescue_safely(options) do Skein::RabbitMQ.connect puts '[OK] Connection succeeded.' end when 'publish' rescue_safely(options) do publisher = Skein::Client.publisher('test_pubsub') loop do publisher << { test: Time.now.to_f } sleep(1) end end when 'subscribe' rescue_safely(options) do subscriber = Skein::Client.subscriber('test_pubsub') subscriber.listen do |message, metadata| puts metadata.inspect puts message.inspect end end when 'echo' rescue_safely(options) do results = Queue.new start = Time.now count = Hash.new(0) tabulator = Thread.new do loop do v = results.pop break if (v.nil?) count[v] += 1 end end if (options[:threads] > 1) # Warm up RabbitMQ driver, force all autoloads Skein::Client.new.close end options[:threads].times.map do in_thread(options) do client = Skein::Client.new rpc = client.rpc(options[:exchange_name], routing_key: options[:queue_name]) options[:count].times do |i| test_data = SecureRandom.uuid if (options[:verbose]) print '[%s] ??? (%d/%d)' % [ '---', i + 1, options[:count] ] end response = rpc.echo(test_data) results << (response == test_data) if (options[:verbose]) puts "\r[%s] %s (%d/%d)" % [ (response == test_data ? 'OK' : 'ERR'), response.inspect, i + 1, options[:count] ] end end rpc.close client.close end end.each(&:join) results << nil tabulator.join elapsed = Time.now - start puts 'Success: %d Failed: %d in %.1fms [%d mps]' % [ count[true], count[false], elapsed.to_f * 1000, count[true] > 0 ? (count[true].to_f / elapsed.to_f) : 0 ] end when 'echo_server' rescue_safely(options) do options[:threads].times.map do EchoWorker.new( options[:queue_name], exchange_name: options[:exchange_name], debug: options[:debug] ) end.each(&:join) end else $stderr.puts('Unknown command: %s' % command) exit(-1) end