lib/fluent/plugin/out_mixpanel.rb in fluent-plugin-mixpanel-0.0.8 vs lib/fluent/plugin/out_mixpanel.rb in fluent-plugin-mixpanel-0.0.9
- old
+ new
@@ -1,20 +1,22 @@
+require_relative "mixpanel_ruby_error_handler.rb"
class Fluent::MixpanelOutput < Fluent::BufferedOutput
Fluent::Plugin.register_output('mixpanel', self)
include Fluent::HandleTagNameMixin
- config_param :project_token, :string
- config_param :api_key, :string, :default => ''
+ config_param :project_token, :string, :secret => true
+ config_param :api_key, :string, :default => '', :secret => true
config_param :use_import, :bool, :default => nil
config_param :distinct_id_key, :string
config_param :event_key, :string, :default => nil
config_param :ip_key, :string, :default => nil
config_param :event_map_tag, :bool, :default => false
#NOTE: This will be removed in a future release. Please specify the '.' on any prefix
config_param :use_legacy_prefix_behavior, :default => true
+ config_param :discard_event_on_send_error, :default => false
class MixpanelError < StandardError
end
def initialize
@@ -30,19 +32,21 @@
@ip_key = conf['ip_key']
@event_map_tag = conf['event_map_tag']
@api_key = conf['api_key']
@use_import = conf['use_import']
@use_legacy_prefix_behavior = conf['use_legacy_prefix_behavior']
+ @discard_event_on_send_error = conf['discard_event_on_send_error']
if @event_key.nil? and !@event_map_tag
raise Fluent::ConfigError, "'event_key' must be specifed when event_map_tag == false."
end
end
def start
super
- @tracker = Mixpanel::Tracker.new(@project_token)
+ error_handler = Fluent::MixpanelOutputErrorHandler.new(log)
+ @tracker = Mixpanel::Tracker.new(@project_token, error_handler)
end
def shutdown
super
end
@@ -66,23 +70,23 @@
data['event'] = tag
elsif record[@event_key]
data['event'] = record[@event_key]
prop.delete(@event_key)
else
- log.warn('no event')
- return
+ log.warn("no event, tag: #{tag}, time: #{time.to_s}, record: #{record.to_json}")
+ next
end
# Ignore browswer only special event
- return if data['event'].start_with?('mp_')
+ next if data['event'].start_with?('mp_')
if record[@distinct_id_key]
data['distinct_id'] = record[@distinct_id_key]
prop.delete(@distinct_id_key)
else
- log.warn('no distinct_id')
- return
+ log.warn("no distinct_id, tag: #{tag}, time: #{time.to_s}, record: #{record.to_json}")
+ next
end
if !@ip_key.nil? and record[@ip_key]
prop['ip'] = record[@ip_key]
prop.delete(@ip_key)
@@ -96,15 +100,28 @@
send_to_mixpanel(records)
end
def send_to_mixpanel(records)
+ log.debug("sending #{records.length} to mixpanel")
+
records.each do |record|
- success = if @use_import
- @tracker.import(@api_key, record['distinct_id'], record['event'], record['properties'])
- else
- @tracker.track(record['distinct_id'], record['event'], record['properties'])
- end
- raise MixpanelError.new("Failed to track event to mixpanel") unless success
+ success = true
+
+ if @use_import
+ success = @tracker.import(@api_key, record['distinct_id'], record['event'], record['properties'])
+ else
+ success = @tracker.track(record['distinct_id'], record['event'], record['properties'])
+ end
+
+ unless success
+ if @discard_event_on_send_error
+ msg = "Failed to track event to mixpanel:\n"
+ msg += "\tRecord: #{record.to_json}"
+ log.info(msg)
+ else
+ raise MixpanelError.new("Failed to track event to mixpanel")
+ end
+ end
end
end
end