Sha256: f0917c6d02f30c41d2bdca57d2c17d0fda6947194e5df3657595f0bcf14315c2

Contents?: true

Size: 1.69 KB

Versions: 1

Compression:

Stored size: 1.69 KB

Contents

require 'gcloud'

module Fluent
  class GcloudPubSubOutput < Fluent::BufferedOutput
    Fluent::Plugin.register_output('gcloud_pubsub', self)

    config_set_default :buffer_type,                'lightening'
    config_set_default :flush_interval,             1
    config_set_default :try_flush_interval,         0.05
    config_set_default :buffer_chunk_records_limit, 900
    config_set_default :buffer_chunk_limit,         9437184
    config_set_default :buffer_queue_limit,         64

    config_param :project,            :string,  :default => nil
    config_param :topic,              :string,  :default => nil
    config_param :key,                :string,  :default => nil

    unless method_defined?(:log)
      define_method("log") { $log }
    end

    unless method_defined?(:router)
      define_method("router") { Fluent::Engine }
    end

    def configure(conf)
      super

      raise Fluent::ConfigError, "'project' must be specified." unless @project
      raise Fluent::ConfigError, "'topic' must be specified." unless @topic
      raise Fluent::ConfigError, "'key' must be specified." unless @key
    end

    def start
      super

      pubsub = (Gcloud.new @project, @key).pubsub
      @client = pubsub.topic @topic
    end

    def format(tag, time, record)
      [tag, time, record].to_msgpack
    end

    def write(chunk)
      messages = []

      chunk.msgpack_each do |tag, time, record|
        messages << record.to_json
      end

      if messages.length > 0
        @client.publish do |batch|
          messages.each do |m|
            batch.publish m
          end
        end
      end
    rescue
      log.error "unexpected error", :error=>$!.to_s
      log.error_backtrace
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
fluent-plugin-gcloud-pubsub-0.0.1 lib/fluent/plugin/out_gcloud_pubsub.rb