Sha256: 861acbe36f4955227779edde59c4592a8879852d0c708166eda4817ec03e4207
Contents?: true
Size: 1.1 KB
Versions: 1
Compression:
Stored size: 1.1 KB
Contents
require 'helper' class Fluent::MqttInput def emit topic, message, time = Fluent::Engine.now Fluent::Engine.emit(topic, message["t"], message) end end class MqttInputTest < Test::Unit::TestCase def setup Fluent::Test.setup end CONFIG = %[ ] def create_driver(conf = CONFIG) Fluent::Test::InputTestDriver.new(Fluent::MqttInput).configure(conf) end def test_configure d = create_driver( %[ bind 127.0.0.1 port 1300 ] ) assert_equal '127.0.0.1', d.instance.bind assert_equal 1300, d.instance.port end def sub_client connect = MQTT::Client.connect connect.subscribe('#') return connect end def test_client d = create_driver time = Time.parse("2011-01-02 13:14:15 UTC").to_i d.expect_emit "tag1", time, {"t" => time, "v" => {"a"=>1}} d.expect_emit "tag2", time, {"t" => time, "v" => {"a"=>2}} d.run do d.expected_emits.each {|tag,time,record| send_data tag, time, record } sleep 0.5 end end def send_data tag, time, record sub_client.publish(tag, record.to_json) end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
fluent-plugin-mqtt-0.0.2 | test/plugin/test_in_mqtt.rb |