Sha256: fba09b47878001c69319dbb492811907c6c3487cd02b38843f0411fb130568de

Contents?: true

Size: 1.5 KB

Versions: 1

Compression:

Stored size: 1.5 KB

Contents

require 'helper'

class Fluent::MqttInput 
  def emit topic, message , time = Fluent::Engine.now
    if message.class == Array
      message.each do |data|
        $log.debug "#{topic}: #{data}"
        Fluent::Engine.emit(topic, message["t"], data)
      end
    else
      Fluent::Engine.emit(topic, message["t"], message)
    end
  end

end

class MqttInputTest < Test::Unit::TestCase
  def setup
    Fluent::Test.setup
  end


 CONFIG = %[
  ]

  def create_driver(conf = CONFIG) 
    Fluent::Test::InputTestDriver.new(Fluent::MqttInput).configure(conf)
  end
  
  def test_configure
    d = create_driver(
      %[ bind 127.0.0.1
         port 1300 ] 
    )
    assert_equal '127.0.0.1', d.instance.bind
    assert_equal 1300, d.instance.port
  end


  def sub_client
    connect = MQTT::Client.connect
    connect.subscribe('#')
    return connect
  end


  def test_client
    d = create_driver
    time = Time.parse("2011-01-02 13:14:15 UTC").to_i    
    d.expect_emit "tag1", time, {"t" => time, "v" => {"a"=>1}}
    d.expect_emit "tag2", time, {"t" => time, "v" => {"a"=>2}}
    d.expect_emit "tag3", time, {"t" => time, "v" => {"a"=>31}}
    d.expect_emit "tag3", time, {"t" => time, "v" => {"a"=>32}}

    d.run do
      d.expected_emits.each {|tag,time,record|
        send_data tag, time, record
      }
      send_data "tag3", time , [{"t" => time, "v" => {"a"=>31}} , {"t" => time, "v" => {"a"=>32}}] 
      sleep 0.5
    end

  end

  def send_data tag, time, record
    sub_client.publish(tag, record.to_json)
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
fluent-plugin-mqtt-0.0.3 test/plugin/test_in_mqtt.rb