lib/fluent/plugin/out_indicative.rb in fluent-plugin-indicative-0.1.3 vs lib/fluent/plugin/out_indicative.rb in fluent-plugin-indicative-0.1.4
- old
+ new
@@ -3,10 +3,13 @@
require 'net/https'
require 'uri'
require 'fluent/plugin/output'
+BATCH_SIZE = 100
+
+
def flatten_hash(hash)
hash.each_with_object({}) do |(k, v), h|
if v.is_a? Hash
flatten_hash(v).map do |h_k, h_v|
h["#{k}.#{h_k}"] = h_v
@@ -26,27 +29,40 @@
config_param :event_name_key, :string
config_param :event_time_key, :string
config_param :event_unique_id_keys, :array, value_type: :string
def process(tag, es)
- es.each do |time, record|
- send_event(record)
+ es.each_slice(BATCH_SIZE) do |events|
+ send_events(events.map {|time, record| record})
end
end
- def send_event(data)
+ def write(chunk)
+ records = []
+ chunk.each do |time, record|
+ records << record
+ end
+ records.each_slice(BATCH_SIZE) do |events|
+ send_events(events)
+ end
+ end
+
+ def send_events(events)
uri = URI.parse(@api_url)
headers = {'Content-Type' => 'application/json'}
- unique_id_key = @event_unique_id_keys.find {|k| data[k]}
-
payload = {
apiKey: @api_key,
- eventName: data[@event_name_key],
- eventUniqueId: unique_id_key && data[unique_id_key],
- properties: flatten_hash(data),
- eventTime: DateTime.parse(data[@event_time_key]).rfc3339
+ events: events.map do |data|
+ unique_id_key = @event_unique_id_keys.find {|k| data[k]}
+ {
+ eventName: data[@event_name_key],
+ eventUniqueId: unique_id_key && data[unique_id_key],
+ properties: flatten_hash(data),
+ eventTime: DateTime.parse(data[@event_time_key]).rfc3339
+ }
+ end
}
http = Net::HTTP.new(uri.host, uri.port)
http.use_ssl = true
request = Net::HTTP::Post.new(uri.request_uri, headers)