Sha256: feff86995ab7436edd260b01d89b0565c669fb1d04dc9efec6ca27c2cd9e2c34
Contents?: true
Size: 1.36 KB
Versions: 1
Compression:
Stored size: 1.36 KB
Contents
require 'net/http' require 'net/https' require 'uri' require 'fluent/plugin/output' def flatten_hash(hash) hash.each_with_object({}) do |(k, v), h| if v.is_a? Hash flatten_hash(v).map do |h_k, h_v| h["#{k}.#{h_k}"] = h_v end else h[k] = v end end end class Fluent::Plugin::IndicativeOutput < Fluent::Plugin::Output Fluent::Plugin.register_output('indicative', self) config_param :api_key, :string, secret: true config_param :api_url, :string, default: 'https://api.indicative.com/service/event' config_param :event_name_key, :string config_param :event_time_key, :string config_param :event_unique_id_keys, :array, value_type: :string def process(tag, es) es.each do |time, record| send_event(record) end end def send_event(data) uri = URI.parse(@api_url) headers = {'Content-Type' => 'application/json'} unique_id_key = @event_unique_id_keys.find {|k| data[k]} payload = { apiKey: @api_key, eventName: data[@event_name_key], eventUniqueId: unique_id_key && data[unique_id_key], properties: flatten_hash(data), eventTime: data[@event_time_key] } http = Net::HTTP.new(uri.host, uri.port) http.use_ssl = true request = Net::HTTP::Post.new(uri.request_uri, headers) request.body = payload.to_json response = http.request(request) end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
fluent-plugin-indicative-0.1.0 | lib/fluent/plugin/out_indicative.rb |