lib/fluent/plugin/out_indicative.rb in fluent-plugin-indicative-0.1.3 vs lib/fluent/plugin/out_indicative.rb in fluent-plugin-indicative-0.1.4

- old
+ new

@@ -3,10 +3,13 @@ require 'net/https' require 'uri' require 'fluent/plugin/output' +BATCH_SIZE = 100 + + def flatten_hash(hash) hash.each_with_object({}) do |(k, v), h| if v.is_a? Hash flatten_hash(v).map do |h_k, h_v| h["#{k}.#{h_k}"] = h_v @@ -26,27 +29,40 @@ config_param :event_name_key, :string config_param :event_time_key, :string config_param :event_unique_id_keys, :array, value_type: :string def process(tag, es) - es.each do |time, record| - send_event(record) + es.each_slice(BATCH_SIZE) do |events| + send_events(events.map {|time, record| record}) end end - def send_event(data) + def write(chunk) + records = [] + chunk.each do |time, record| + records << record + end + records.each_slice(BATCH_SIZE) do |events| + send_events(events) + end + end + + def send_events(events) uri = URI.parse(@api_url) headers = {'Content-Type' => 'application/json'} - unique_id_key = @event_unique_id_keys.find {|k| data[k]} - payload = { apiKey: @api_key, - eventName: data[@event_name_key], - eventUniqueId: unique_id_key && data[unique_id_key], - properties: flatten_hash(data), - eventTime: DateTime.parse(data[@event_time_key]).rfc3339 + events: events.map do |data| + unique_id_key = @event_unique_id_keys.find {|k| data[k]} + { + eventName: data[@event_name_key], + eventUniqueId: unique_id_key && data[unique_id_key], + properties: flatten_hash(data), + eventTime: DateTime.parse(data[@event_time_key]).rfc3339 + } + end } http = Net::HTTP.new(uri.host, uri.port) http.use_ssl = true request = Net::HTTP::Post.new(uri.request_uri, headers)