benchmark/socket.rb in fluq-0.7.5 vs benchmark/socket.rb in fluq-0.8.0
- old
+ new
@@ -6,47 +6,35 @@
require 'fluq'
BATCH_SIZE = 100_000
BATCHES = 50
ROOT = FluQ.root.join("log/benchmark")
-EVENT = FluQ::Event.new("a.b.c.d", Time.now.to_i, "k1" => "value", "k2" => "value", "k3" => "value").to_msgpack
+EVENT = MessagePack.pack("k1" => "value", "k2" => "value", "k3" => "value")
FileUtils.rm_rf ROOT.to_s
FileUtils.mkdir_p ROOT.to_s
+FluQ.logger.level = Logger::ERROR
-class FluQ::Handler::Counter < FluQ::Handler::Base
- attr_reader :count
- def initialize(*)
- super
- @count = 0
+puts "--> Preparing"
+Thread.new do
+ FluQ::Runner.run do |run|
+ run.feed :test do |feed|
+ feed.register FluQ::Handler::Noop
+ feed.listen FluQ::Input::Socket, bind: "tcp://127.0.0.1:8765", format: :msgpack
+ end
end
- def on_events(events)
- @count += events.size
- EM.stop if @count >= BATCHES * BATCH_SIZE
- end
end
-
-puts "--> Preparing"
BATCHES.times do |i|
ROOT.join("batch.#{i}").open("wb:ASCII-8BIT") do |file|
BATCH_SIZE.times { file.write(EVENT) }
end
end
-processed = 0
-handler = nil
-start = Time.now
-FluQ::Reactor.run do |reactor|
- reactor.listen FluQ::Input::Socket, bind: "tcp://127.0.0.1:8765"
- handler = reactor.register FluQ::Handler::Counter
-
- sleep(0.1)
+puts "--> Started benchmark"
+2.times do
start = Time.now
- puts "--> Started benchmark"
BATCHES.times do |i|
file = ROOT.join("batch.#{i}")
- spawn("nc 127.0.0.1 8765 < #{file}")
+ system "nc 127.0.0.1 8765 < #{file}"
end
+ puts "--> Processed : #{BATCHES * BATCH_SIZE} in #{(Time.now - start).round(1)}s"
end
-
-puts "--> Accepted : #{BATCHES * BATCH_SIZE} in #{(Time.now - start).round(1)}s"
-puts "--> Processed : #{handler.count} events"