lib/fluent/plugin/out_indicative.rb in fluent-plugin-indicative-0.1.6 vs lib/fluent/plugin/out_indicative.rb in fluent-plugin-indicative-0.1.7

- old
+ new

@@ -3,20 +3,27 @@ require 'net/https' require 'uri' require 'fluent/plugin/output' -BATCH_SIZE = 15 - def flatten_hash(hash) hash.each_with_object({}) do |(k, v), h| if v.is_a? Hash flatten_hash(v).map do |h_k, h_v| h["#{k}.#{h_k}"] = h_v end - elsif !v.is_a? Array + elsif v.is_a? Array + # Indicative doesn't support arrays so we use the value of the array as a key and set it to true + v.each do |item| + if item.is_a?(Hash) && item.has_key?("key") && item.has_key?("value") + h["#{k}.#{item["key"]}"] = item["value"] + else + h["#{k}.#{item}"] = true + end + end + else h[k] = v end end end @@ -24,25 +31,26 @@ class Fluent::Plugin::IndicativeOutput < Fluent::Plugin::Output Fluent::Plugin.register_output('indicative', self) config_param :api_key, :string, secret: true config_param :api_url, :string, default: 'https://api.indicative.com/service/event/batch' + config_param :batch_size, :integer, default: 15 config_param :event_name_key, :string config_param :event_time_key, :string config_param :event_unique_id_keys, :array, value_type: :string def process(tag, es) - es.each_slice(BATCH_SIZE) do |events| + es.each_slice(@batch_size) do |events| send_events(events.map {|time, record| record}) end end def write(chunk) records = [] chunk.each do |time, record| records << record end - records.each_slice(BATCH_SIZE) do |events| + records.each_slice(@batch_size) do |events| send_events(events) end end def send_events(events)