Sha256: 47ecb8c99a817b4a62cf54b5c8d2498810ad2f9eff1a040d4c4b64fa1f2a9afe

Contents?: true

Size: 1.79 KB

Versions: 1

Compression:

Stored size: 1.79 KB

Contents

require 'fluent/plugin/output'
require 'yajl'
require 'faye/websocket'

module Fluent::Plugin
  class SakuraIOOutput < Output
    Fluent::Plugin.register_output('sakuraio', self)

    config_param :url, :string, secret: true
    config_param :modules, :array, value_type: :string, secret: true
    # channels {"channel_number": ["key", "type"]}
    config_param :channels, :hash

    def configure(conf)
      super

      ensure_reactor_running
      thread_create(:out_sakuraio, &method(:run))
    end

    def ensure_reactor_running
      return if EM.reactor_running?
      thread_create(:out_sakuraio_reactor) do
        EM.run
      end
    end

    def run()
      @client = Faye::WebSocket::Client.new(@url)
      EM.next_tick do
        @client.on :open do
          log.info "sakuraio: starting websocket connection for #{@url}."
        end

        @client.on :message do |event|
          log.debug "sakuraio: received message #{event.data}"
        end

        @client.on :error do |event|
          log.warn "sakuraio: #{event.message}"
        end

        @client.on :close do
          @client = nil
        end
      end
    end

    def process(_tag, es)
      es.each do |_time, record|
        log.debug "sakuraio: process record #{record}"
        modules.each do |m|
          s = encode_record(m, record)
          log.debug "sakuraio: encoded json #{s}"
          @client.send(s)
        end
      end
    end

    def encode_record(m, record)
      data = []
      @channels.each do |ch, v|
        key, type = v
        data.push('channel' => ch.to_i,
                  'type' => type,
                  'value' => record[key])
      end
      hash = { 'type' => 'channels',
               'module' => m,
               'payload' => { 'channels' => data } }
      Yajl::Encoder.encode(hash)
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
fluent-plugin-sakuraio-0.1.2 lib/fluent/plugin/out_sakuraio.rb