lib/fluent/plugin/out_indicative.rb in fluent-plugin-indicative-0.1.6 vs lib/fluent/plugin/out_indicative.rb in fluent-plugin-indicative-0.1.7
- old
+ new
@@ -3,20 +3,27 @@
require 'net/https'
require 'uri'
require 'fluent/plugin/output'
-BATCH_SIZE = 15
-
def flatten_hash(hash)
hash.each_with_object({}) do |(k, v), h|
if v.is_a? Hash
flatten_hash(v).map do |h_k, h_v|
h["#{k}.#{h_k}"] = h_v
end
- elsif !v.is_a? Array
+ elsif v.is_a? Array
+ # Indicative doesn't support arrays so we use the value of the array as a key and set it to true
+ v.each do |item|
+ if item.is_a?(Hash) && item.has_key?("key") && item.has_key?("value")
+ h["#{k}.#{item["key"]}"] = item["value"]
+ else
+ h["#{k}.#{item}"] = true
+ end
+ end
+ else
h[k] = v
end
end
end
@@ -24,25 +31,26 @@
class Fluent::Plugin::IndicativeOutput < Fluent::Plugin::Output
Fluent::Plugin.register_output('indicative', self)
config_param :api_key, :string, secret: true
config_param :api_url, :string, default: 'https://api.indicative.com/service/event/batch'
+ config_param :batch_size, :integer, default: 15
config_param :event_name_key, :string
config_param :event_time_key, :string
config_param :event_unique_id_keys, :array, value_type: :string
def process(tag, es)
- es.each_slice(BATCH_SIZE) do |events|
+ es.each_slice(@batch_size) do |events|
send_events(events.map {|time, record| record})
end
end
def write(chunk)
records = []
chunk.each do |time, record|
records << record
end
- records.each_slice(BATCH_SIZE) do |events|
+ records.each_slice(@batch_size) do |events|
send_events(events)
end
end
def send_events(events)