Sha256: 861acbe36f4955227779edde59c4592a8879852d0c708166eda4817ec03e4207

Contents?: true

Size: 1.1 KB

Versions: 1

Compression:

Stored size: 1.1 KB

Contents

require 'helper'

class Fluent::MqttInput 
  def emit topic, message, time = Fluent::Engine.now
    Fluent::Engine.emit(topic, message["t"], message)
  end
end

class MqttInputTest < Test::Unit::TestCase
  def setup
    Fluent::Test.setup
  end


 CONFIG = %[
  ]

  def create_driver(conf = CONFIG) 
    Fluent::Test::InputTestDriver.new(Fluent::MqttInput).configure(conf)
  end
  
  def test_configure
    d = create_driver(
      %[ bind 127.0.0.1
         port 1300 ] 
    )
    assert_equal '127.0.0.1', d.instance.bind
    assert_equal 1300, d.instance.port
  end


  def sub_client
    connect = MQTT::Client.connect
    connect.subscribe('#')
    return connect
  end


  def test_client
    d = create_driver
    time = Time.parse("2011-01-02 13:14:15 UTC").to_i    
    d.expect_emit "tag1", time, {"t" => time, "v" => {"a"=>1}}
    d.expect_emit "tag2", time, {"t" => time, "v" => {"a"=>2}}
    d.run do
      d.expected_emits.each {|tag,time,record|
        send_data tag, time, record
      }
      sleep 0.5
    end
  end

  def send_data tag, time, record
    sub_client.publish(tag, record.to_json)
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
fluent-plugin-mqtt-0.0.2 test/plugin/test_in_mqtt.rb