examples/advanced/batch_client.rb in jruby-hornetq-0.3.1 vs examples/advanced/batch_client.rb in jruby-hornetq-0.3.2
- old
+ new
@@ -8,108 +8,65 @@
# if say 80% of the first say 100 requests fail.
#
# This sample sends a total of 100 requests in batches of 10.
# One thread sends requests and the other processes replies.
# Once 80% of the replies are back, it will send the next batch
+#
+# Before running this sample, start server.rb first
+#
# Allow examples to be run in-place without requiring a gem install
$LOAD_PATH.unshift File.dirname(__FILE__) + '/../../lib'
require 'rubygems'
require 'yaml'
require 'hornetq'
-require 'sync'
+require 'batch_requestor_pattern'
-total_count = (ARGV[0] || 100).to_i
-batching_size = (ARGV[1] || 10).to_i
+batch_size = (ARGV[0] || 100).to_i
+window_size = (ARGV[1] || 10).to_i
-request_address = 'ServerAddress'
+server_address = 'ServerAddress'
+
config = YAML.load_file(File.dirname(__FILE__) + '/hornetq.yml')['development']
-class BatchClientPattern
- def initialize(connection, request_address)
- @session = connection.create_session
- @producer = @session.create_producer(request_address)
- @reply_queue = "#{request_address}.#{Java::java.util::UUID.randomUUID.toString}"
- @session.create_temporary_queue(@reply_queue, @reply_queue)
- @counter_sync = Sync.new
- @counter = 0
-
- # Start consuming replies
- connection.on_message(:queue_name => @reply_queue) {|message| process_reply(message) }
- end
+# Create a HornetQ session
+HornetQ::Client::Connection.connection(config[:connection]) do |connection|
+ window_size = batch_size if window_size > batch_size
+ # * :batch_size Total size of batch
+ # * :window_size Size of the sliding window. This also indicates the maximimum
+ # number of outstanding requests to have open at any time
+ # Default: 0.8
+ # (:max_outstanding_responses)
- # Increment Message Counter
- def inc_counter(total_count=1)
- @counter_sync.synchronize(:EX) { @counter += total_count }
- end
-
- # Decrement Message counter
- def dec_counter(total_count=1)
- @counter_sync.synchronize(:EX) { @counter -= total_count }
- end
+ pattern_config = {
+ :connection => connection,
+ :server_address => server_address,
+ :completion_ratio => 0.8
+ }
- # Return the current message count
- def counter
- @counter_sync.synchronize(:SH) { @counter }
- end
-
- # Send x messages
- def send(total_count)
- #print "Sending #{total_count} messages"
- start_time = Time.now
- total_count.times do |i|
- message = @session.create_message(HornetQ::Client::Message::TEXT_TYPE,true)
- message.reply_to_queue_name = @reply_queue
- message.body = "Request Current Time. #{i}"
- @producer.send(message)
- print "."
- #puts "Sent:#{message}"
- end
- duration = Time.now - start_time
- #printf "\nSend %5d msg, %5.2f s, %10.2f msg/s\n", total_count, duration, total_count/duration
- end
-
- # Receive Reply messages
- def process_reply(message)
+ requestor = BatchRequestorPattern.new(connection, server_address) do |message|
+ # Display an @ symbol for every reply received
print '@'
- inc_counter(1)
- message.acknowledge
end
- def close
- @producer.close
- @session.close
- end
-end
-
-# Create a HornetQ session
-HornetQ::Client::Connection.connection(config[:connection]) do |connection|
- batching_size = total_count if batching_size > total_count
-
- client = BatchClientPattern.new(connection, request_address)
-
- times = (total_count/batching_size).to_i
+ times = (batch_size/window_size).to_i
puts "Performing #{times} loops"
- count = 0
+
times.times do |i|
- client.send(batching_size)
- count += batching_size
- # Wait for at least 80% of responses
- loop do
- #puts "Waiting for receive"
- sleep 0.1
- received_count = client.counter
- #puts "\nReceived #{received_count} messages"
- if received_count >= 0.8 * count
- puts ""
- break
- end
+
+ window_size.times do |i|
+ message = @session.create_message(true)
+ message.type = :text
+ message.body = "Request Current Time. #{i}"
+ requestor.send(message)
+ print "."
end
+
+ # Wait for at least 80% of responses before sending more requests
+ requestor.wait_for_outstanding_replies
+
end
-
- while client.counter < total_count
- sleep 0.1
- print "*"
- end
- client.close
+ puts "Done sending requests, waiting for remaining replies"
+ requestor.wait_for_all_outstanding_replies
+ requestor.close
end