Sha256: 987844f30e48ef3972f60d17ccc064c3af4dba2379aae3c447a1ccf38a7b7d6d
Contents?: true
Size: 1.71 KB
Versions: 1
Compression:
Stored size: 1.71 KB
Contents
require 'snowplow-tracker' class Fluent::SomeOutput < Fluent::TimeSlicedOutput Fluent::Plugin.register_output('snowplow', self) config_param :host, :string config_param :buffer_size, :integer config_param :protocol, :string config_param :method, :string def configure(conf) super end def start super @emitter = SnowplowTracker::Emitter.new(@host, { buffer_size: @buffer_size, protocol: @protocol, method: @method, on_success: ->(_) { log.debug("Flush with success on snowplow") }, on_failure: ->(_, _) { raise "Error when flushing to snowplow" } }) @trackers = {} end def stop @tracker.flush end def format(tag, time, record) [tag, time, record].to_msgpack end def tracker_for(application) @trackers[application] ||= SnowplowTracker::Tracker.new(@emitter, nil, nil, application) @trackers[application] end def write(chunk) application, tracker = nil, nil chunk.msgpack_each do |_, _, record| schema = record['schema'] message = JSON.parse record['message'] true_timestamp = record['true_timestamp'] application = record['application'] contexts = JSON.parse record.fetch('contexts', "[]") tracker = tracker_for(application) contexts = contexts.map do |context| context_schema = context['schema'] context_message = context['message'] SnowplowTracker::SelfDescribingJson.new(context_schema, context_message) end self_describing_json = SnowplowTracker::SelfDescribingJson.new(schema, message) tracker.track_self_describing_event(self_describing_json, contexts, SnowplowTracker::TrueTimestamp.new(true_timestamp.to_i)) end tracker.flush end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
fluent-plugin-snowplow-0.2.3 | lib/fluent/plugin/out_snowplow.rb |