examples/advanced/batch_client.rb in jruby-hornetq-0.3.1 vs examples/advanced/batch_client.rb in jruby-hornetq-0.3.2

- old
+ new

@@ -8,108 +8,65 @@ # if say 80% of the first say 100 requests fail. # # This sample sends a total of 100 requests in batches of 10. # One thread sends requests and the other processes replies. # Once 80% of the replies are back, it will send the next batch +# +# Before running this sample, start server.rb first +# # Allow examples to be run in-place without requiring a gem install $LOAD_PATH.unshift File.dirname(__FILE__) + '/../../lib' require 'rubygems' require 'yaml' require 'hornetq' -require 'sync' +require 'batch_requestor_pattern' -total_count = (ARGV[0] || 100).to_i -batching_size = (ARGV[1] || 10).to_i +batch_size = (ARGV[0] || 100).to_i +window_size = (ARGV[1] || 10).to_i -request_address = 'ServerAddress' +server_address = 'ServerAddress' + config = YAML.load_file(File.dirname(__FILE__) + '/hornetq.yml')['development'] -class BatchClientPattern - def initialize(connection, request_address) - @session = connection.create_session - @producer = @session.create_producer(request_address) - @reply_queue = "#{request_address}.#{Java::java.util::UUID.randomUUID.toString}" - @session.create_temporary_queue(@reply_queue, @reply_queue) - @counter_sync = Sync.new - @counter = 0 - - # Start consuming replies - connection.on_message(:queue_name => @reply_queue) {|message| process_reply(message) } - end +# Create a HornetQ session +HornetQ::Client::Connection.connection(config[:connection]) do |connection| + window_size = batch_size if window_size > batch_size + # * :batch_size Total size of batch + # * :window_size Size of the sliding window. This also indicates the maximimum + # number of outstanding requests to have open at any time + # Default: 0.8 + # (:max_outstanding_responses) - # Increment Message Counter - def inc_counter(total_count=1) - @counter_sync.synchronize(:EX) { @counter += total_count } - end - - # Decrement Message counter - def dec_counter(total_count=1) - @counter_sync.synchronize(:EX) { @counter -= total_count } - end + pattern_config = { + :connection => connection, + :server_address => server_address, + :completion_ratio => 0.8 + } - # Return the current message count - def counter - @counter_sync.synchronize(:SH) { @counter } - end - - # Send x messages - def send(total_count) - #print "Sending #{total_count} messages" - start_time = Time.now - total_count.times do |i| - message = @session.create_message(HornetQ::Client::Message::TEXT_TYPE,true) - message.reply_to_queue_name = @reply_queue - message.body = "Request Current Time. #{i}" - @producer.send(message) - print "." - #puts "Sent:#{message}" - end - duration = Time.now - start_time - #printf "\nSend %5d msg, %5.2f s, %10.2f msg/s\n", total_count, duration, total_count/duration - end - - # Receive Reply messages - def process_reply(message) + requestor = BatchRequestorPattern.new(connection, server_address) do |message| + # Display an @ symbol for every reply received print '@' - inc_counter(1) - message.acknowledge end - def close - @producer.close - @session.close - end -end - -# Create a HornetQ session -HornetQ::Client::Connection.connection(config[:connection]) do |connection| - batching_size = total_count if batching_size > total_count - - client = BatchClientPattern.new(connection, request_address) - - times = (total_count/batching_size).to_i + times = (batch_size/window_size).to_i puts "Performing #{times} loops" - count = 0 + times.times do |i| - client.send(batching_size) - count += batching_size - # Wait for at least 80% of responses - loop do - #puts "Waiting for receive" - sleep 0.1 - received_count = client.counter - #puts "\nReceived #{received_count} messages" - if received_count >= 0.8 * count - puts "" - break - end + + window_size.times do |i| + message = @session.create_message(true) + message.type = :text + message.body = "Request Current Time. #{i}" + requestor.send(message) + print "." end + + # Wait for at least 80% of responses before sending more requests + requestor.wait_for_outstanding_replies + end - - while client.counter < total_count - sleep 0.1 - print "*" - end - client.close + puts "Done sending requests, waiting for remaining replies" + requestor.wait_for_all_outstanding_replies + requestor.close end