Sha256: 073558c6f070d52210fec052466a6ac2f469b9bb6df4d2ab7233730c9bf13543

Contents?: true

Size: 1.28 KB

Versions: 1

Compression:

Stored size: 1.28 KB

Contents

class Fluent::GeoBlipperOutput < Fluent::BufferedOutput
  Fluent::Plugin.register_output('geoblipper', self)

  def initialize
    super
    require 'geoip'
    require 'pubnub'
  end

  config_param :pubnub_channel, :string
  config_param :pubnub_publish_key, :string
  config_param :pubnub_subscribe_key, :string
  config_param :geodata_location, :string
  config_param :max_entries, :integer, :default => -1
  config_param :ip_key, :string, :default => 'ip'

  def start
    super
    @geodata = GeoIP.new(@geodata_location)
    @pubnub = Pubnub.new( publish_key: @pubnub_publish_key, subscribe_key: @pubnub_subscribe_key, logger: Logger.new(STDOUT) )
  end

  def format(tag, time, record)
    address = record.delete(@ip_key)
    loc = @geodata.city(address)
    extra = {}
    if loc
      record.merge({latitude: loc.latitude, longitude: loc.longitude}).to_json + "\n"
    else
       # Fluent::Engine.emit('debug.livemap', Time.now, {message: "geodata not found for #{address}"})
       ''
    end
  end

  def write(chunk)
    chunk.open do |io|
      items = io.read.split("\n")
      entries = items.slice(0..@max_entries).map {|item| JSON.parse(item) }
      unless entries.empty?
        @pubnub.publish(http_sync: true, message: entries, channel: @pubnub_channel)
      end
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
fluent-plugin-geoblipper-0.0.4.4 lib/fluent/plugin/out_geoblipper.rb