spec/inputs/zeromq_spec.rb in logstash-input-zeromq-2.0.4 vs spec/inputs/zeromq_spec.rb in logstash-input-zeromq-3.0.0
- old
+ new
@@ -1,10 +1,27 @@
# encoding: utf-8
require_relative "../spec_helper"
require "logstash/plugin"
require "logstash/event"
+require "securerandom"
+def send_mock_messages(messages, &block)
+ socket = double("socket")
+ expect(socket).to receive(:recv_strings) do |arr|
+ messages.each do |msg|
+ msg.each do |frame|
+ arr << frame
+ end
+ end
+ 0
+ end
+ plugin.instance_variable_set(:@zsocket, socket)
+ q = []
+ plugin.send(:handle_message, q)
+ q
+end
+
describe LogStash::Inputs::ZeroMQ, :zeromq => true do
context "when register and close" do
let(:plugin) { LogStash::Plugin.lookup("input", "zeromq").new({ "topology" => "pushpull" }) }
@@ -21,7 +38,42 @@
subject.close
end
end
end
+ end
+
+ context "pubsub" do
+ topic_field = SecureRandom.hex
+ let(:plugin) { LogStash::Plugin.lookup("input", "zeromq").new({"topology" => "pubsub", "topic_field" => topic_field}) }
+
+ before do
+ allow(plugin).to receive(:init_socket)
+ plugin.register
+ end
+
+ it "should set the topic field with multiple message frames" do
+ events = send_mock_messages([["topic", '{"message": "message"}', '{"message": "message2"}']])
+ expect(events.first.get(topic_field)).to eq("topic")
+ expect(events.first.get("message")).to eq("message")
+ expect(events[1].get("message")).to eq("message2")
+ expect(events[1].get(topic_field)).to eq("topic")
+ expect(events.length).to eq(2)
+ end
+ end
+
+ context "pushpull" do
+ let(:plugin) { LogStash::Plugin.lookup("input", "zeromq").new({ "topology" => "pushpull" }) }
+
+ before do
+ allow(plugin).to receive(:init_socket)
+ plugin.register
+ end
+
+ it "should receive multiple frames" do
+ events = send_mock_messages([['{"message": "message"}', '{"message": "message2"}']])
+ expect(events.first.get("message")).to eq("message")
+ expect(events[1].get("message")).to eq("message2")
+ expect(events.length).to eq(2)
+ end
end
end