#!/usr/bin/env ruby # == Imports ================================================================ require 'optparse' require 'thread' require_relative '../lib/skein' # == Support Classes ======================================================== class EchoWorker < Skein::Client::Worker def initialize(queue_name, options = nil) super(queue_name, options || { }) @debug = options && options[:debug] end def echo(text) if (@debug) puts text end text end end # == Support Methods ======================================================== def rescue_safely(options) yield rescue Object => e $stderr.puts('[%s] %s' % [ e.class, e ]) if (options[:trace]) $stderr.puts(e.backtrace) end exit(-1) end def in_thread(options) Thread.new do begin Thread.abort_on_exception = true rescue_safely(options) do yield end end end end # == Main =================================================================== options = { count: 1, threads: 1 } parser = OptionParser.new do |parser| parser.on('-v', '--verbose') do options[:verbose] = true end parser.on('-n', '--count=n') do |n| options[:count] = n.to_i end parser.on('-c', '--threads=n') do |n| options[:threads] = n.to_i end parser.on('-t', '--trace') do options[:trace] = true end parser.on('-d', '--debug') do options[:debug] = true end parser.on('-h', '--help') do puts parser exit(0) end end Skein::RabbitMQ.force_require! args = parser.parse(*ARGV) case (command = args.shift) when 'config' Skein::Support.hash_format(Skein.config).each do |line| puts line end when 'test' rescue_safely(options) do Skein::RabbitMQ.connect puts '[OK] Connection succeeded.' end when 'publish' rescue_safely(options) do publisher = Skein::Client.publisher('test_pubsub') loop do publisher << { test: Time.now.to_f } sleep(1) end end when 'subscribe' rescue_safely(options) do subscriber = Skein::Client.subscriber('test_pubsub') subscriber.listen do |message, metadata| puts metadata.inspect puts message.inspect end end when 'echo' rescue_safely(options) do results = Queue.new start = Time.now count = Hash.new(0) tabulator = Thread.new do loop do v = results.pop break if (v.nil?) count[v] += 1 end end options[:threads].times.map do in_thread(options) do client = Skein::Client.new rpc = client.rpc('test_echo') options[:count].times do |i| test_data = SecureRandom.uuid response = rpc.echo(test_data) results << (response == test_data) if (options[:verbose]) puts '[%s] %s (%d/%d)' % [ (response == test_data ? 'OK' : 'ERR'), response.inspect, i + 1, options[:count] ] end end rpc.close client.close end end.each(&:join) results << nil tabulator.join elapsed = Time.now - start puts 'Success: %d Failed: %d in %.1fms [%d mps]' % [ count[true], count[false], elapsed.to_f * 1000, count[true] > 0 ? (count[true].to_f / elapsed.to_f) : 0 ] end when 'echo_server' rescue_safely(options) do options[:threads].times.map do EchoWorker.new('test_echo') end.each(&:join) end else $stderr.puts('Unknown command: %s' % command) exit(-1) end