Sha256: 246bb8b4565f6d4e5e86f51c4ef02b247f4573c6c28c9c2f5f4ea169fe7b06ac

Contents?: true

Size: 1.88 KB

Versions: 1

Compression:

Stored size: 1.88 KB

Contents

require 'fluent/input'
require 'yajl'
require 'faye/websocket'
require 'eventmachine'
require 'time'

module Fluent::Plugin
  class SakuraIOInput < Input
    Fluent::Plugin.register_input('sakuraio', self)

    helpers :thread

    config_param :url, :string, default: nil

    def configure(conf)
      super

    end

    def start
      super

      thread_create(:in_sakuraio) do
        run
      end
    end

    def shutdown
      super
    end

    def run
      EM.run do
        client = Faye::WebSocket::Client.new(@url)
        client.on :open do
          log.info "sakuraio: starting websocket connection for #{@url}."
        end

        client.on :message do |event|
          log.debug "sakuraio: received message #{event.data}"
          records = parse(event.data)
          unless records.empty?
            records.each do |r|
              router.emit(r['tag'], r['time'], r['record'])
            end
          end
        end

        client.on :error do |event|
          log.warn "sakuraio: #{event.message}"
        end

        client.on :close do
          client = nil
        end
      end
    end

    def parse(text)
      parser = Yajl::Parser.new
      j = parser.parse(text)
      records = []
      case j['type']
      when 'channels' then
        parse_channels(records, j)
      else
        log.debug "unknown type: #{j['type']}: #{text}"
      end
      records
    end

    def parse_channels(records, j)
      message_time = Time.parse(j['datetime']).to_i
      tag = j['module']
      j['payload']['channels'].each do |c|
        record = {
          'tag' => tag + '.' + c['channel'].to_s,
          'record' => {
            'channel' => c['channel'],
            'type' => c['type'],
            'value' => c['value']
          },
          'time' => Time.parse(c['datetime']).to_i || message_time
        }
        records.push(record)
      end
      records
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
fluent-plugin-sakuraio-0.0.2 lib/fluent/plugin/in_sakuraio.rb