Sha256: 45bf4c11d9595d66686e10e3d11a5de7c34774cdb588eadc2bbfd913d26e48a5

Contents?: true

Size: 1.91 KB

Versions: 2

Compression:

Stored size: 1.91 KB

Contents

require 'fluent/output'

require 'fluent/plugin/gcloud_pubsub/client'

module Fluent
  class GcloudPubSubOutput < BufferedOutput
    Fluent::Plugin.register_output('gcloud_pubsub', self)

    config_param :project,            :string,  :default => nil
    config_param :key,                :string,  :default => nil
    config_param :topic,              :string
    config_param :autocreate_topic,   :bool,    :default => false
    config_param :max_messages,       :integer, :default => 1000
    config_param :max_total_size,     :integer, :default => 10000000  # 10MB
    config_param :format,             :string,  :default => 'json'

    unless method_defined?(:log)
      define_method("log") { $log }
    end

    unless method_defined?(:router)
      define_method("router") { Fluent::Engine }
    end

    def configure(conf)
      super
      @formatter = Plugin.new_formatter(@format)
      @formatter.configure(conf)
    end

    def start
      super
      @publisher = Fluent::GcloudPubSub::Publisher.new @project, @key, @topic, @autocreate_topic
      log.debug "connected topic:#{@topic} in project #{@project}"
    end

    def format(tag, time, record)
      [tag, time, record].to_msgpack
    end

    def write(chunk)
      messages = []
      size = 0

      chunk.msgpack_each do |tag, time, record|
        msg = @formatter.format(tag, time, record)
        if messages.length + 1 > @max_messages || size + msg.bytesize > @max_total_size
          publish messages
          messages = []
          size = 0
        end
        messages << msg
        size += msg.bytesize
      end

      if messages.length > 0
        publish messages
      end
    rescue => e
      log.error "unexpected error", :error=>$!.to_s
      log.error_backtrace
      raise e
    end

    private

    def publish(messages)
      log.debug "send message topic:#{@topic} length:#{messages.length.to_s}"
      @publisher.publish messages
    end
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
fluent-plugin-gcloud-pubsub-custom-0.1.1 lib/fluent/plugin/out_gcloud_pubsub.rb
fluent-plugin-gcloud-pubsub-custom-0.1.0 lib/fluent/plugin/out_gcloud_pubsub.rb