Sha256: 2231d705247467807e7a162b4d7d0925f931cd7b2cbb8a4df14c50e7cbd85c4c

Contents?: true

Size: 1.9 KB

Versions: 1

Compression:

Stored size: 1.9 KB

Contents

require 'date'
require 'net/http'
require 'net/https'
require 'uri'

require 'fluent/plugin/output'

BATCH_SIZE = 100


def flatten_hash(hash)
  hash.each_with_object({}) do |(k, v), h|
    if v.is_a? Hash
      flatten_hash(v).map do |h_k, h_v|
        h["#{k}.#{h_k}"] = h_v
      end
    elsif !v.is_a? Array
      h[k] = v
    end
   end
end


class Fluent::Plugin::IndicativeOutput < Fluent::Plugin::Output
  Fluent::Plugin.register_output('indicative', self)

  config_param :api_key, :string, secret: true
  config_param :api_url, :string, default: 'https://api.indicative.com/service/event/batch'
  config_param :event_name_key, :string
  config_param :event_time_key, :string
  config_param :event_unique_id_keys, :array, value_type: :string

  def process(tag, es)
    es.each_slice(BATCH_SIZE) do |events|
      send_events(events.map {|time, record| record})
    end
  end

  def write(chunk)
    records = []
    chunk.each do |time, record|
      records << record
    end
    records.each_slice(BATCH_SIZE) do |events|
      send_events(events)
    end
  end

  def send_events(events)
    uri = URI.parse(@api_url)

    headers = {'Content-Type' => 'application/json'}

    payload = {
      apiKey: @api_key,
      events: events.map do |data|
        unique_id_key = @event_unique_id_keys.find {|k| data[k]}
        {
          eventName: data[@event_name_key],
          eventUniqueId: unique_id_key && data[unique_id_key],
          properties: flatten_hash(data),
          eventTime: DateTime.parse(data[@event_time_key]).rfc3339
        }
      end
    }

    http = Net::HTTP.new(uri.host, uri.port)
    http.use_ssl = true
    request = Net::HTTP::Post.new(uri.request_uri, headers)
    request.body = payload.to_json
    response = http.request(request)

    if response.code != "200"
        log.warn("Indicative responded with error (code: #{response.code}): #{payload.to_json} -> #{response.body}")
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
fluent-plugin-indicative-0.1.5 lib/fluent/plugin/out_indicative.rb