Sha256: 073558c6f070d52210fec052466a6ac2f469b9bb6df4d2ab7233730c9bf13543
Contents?: true
Size: 1.28 KB
Versions: 1
Compression:
Stored size: 1.28 KB
Contents
class Fluent::GeoBlipperOutput < Fluent::BufferedOutput Fluent::Plugin.register_output('geoblipper', self) def initialize super require 'geoip' require 'pubnub' end config_param :pubnub_channel, :string config_param :pubnub_publish_key, :string config_param :pubnub_subscribe_key, :string config_param :geodata_location, :string config_param :max_entries, :integer, :default => -1 config_param :ip_key, :string, :default => 'ip' def start super @geodata = GeoIP.new(@geodata_location) @pubnub = Pubnub.new( publish_key: @pubnub_publish_key, subscribe_key: @pubnub_subscribe_key, logger: Logger.new(STDOUT) ) end def format(tag, time, record) address = record.delete(@ip_key) loc = @geodata.city(address) extra = {} if loc record.merge({latitude: loc.latitude, longitude: loc.longitude}).to_json + "\n" else # Fluent::Engine.emit('debug.livemap', Time.now, {message: "geodata not found for #{address}"}) '' end end def write(chunk) chunk.open do |io| items = io.read.split("\n") entries = items.slice(0..@max_entries).map {|item| JSON.parse(item) } unless entries.empty? @pubnub.publish(http_sync: true, message: entries, channel: @pubnub_channel) end end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
fluent-plugin-geoblipper-0.0.4.4 | lib/fluent/plugin/out_geoblipper.rb |