Sha256: a2f6d95dbb742950056da11d4b14dd10260297aa8bd9b5917b42c4aa1b786008

Contents?: true

Size: 1.89 KB

Versions: 1

Compression:

Stored size: 1.89 KB

Contents

require 'gcloud'

module Fluent
  class CloudPubSubOutput < BufferedOutput
    MAX_REQ_SIZE = 10 * 1024 * 1024 # 10 MB
    MAX_MSGS_PER_REQ = 1000

    Plugin.register_output('cloud_pubsub', self)

    config_param :project,          :string,  :default => nil
    config_param :topic,            :string,  :default => nil
    config_param :key,              :string,  :default => nil
    config_param :max_req_size,     :integer, :default => MAX_REQ_SIZE
    config_param :max_msgs_per_req, :integer, :default => MAX_MSGS_PER_REQ

    unless method_defined?(:log)
      define_method("log") { $log }
    end

    unless method_defined?(:router)
      define_method("router") { Fluent::Engine }
    end

    def configure(conf)
      super

      raise Fluent::ConfigError, "'project' must be specified." unless @project
      raise Fluent::ConfigError, "'topic' must be specified." unless @topic
      raise Fluent::ConfigError, "'key' must be specified." unless @key
    end

    def start
      super

      pubsub = (Gcloud.new @project, @key).pubsub
      @client = pubsub.topic @topic
    end

    def format(tag, time, record)
      [tag, time, record].to_msgpack
    end

    def publish(msgs)
      log.debug "publish #{msgs.length} messages"

      @client.publish do |batch|
        msgs.each do |m|
          batch.publish m
        end
      end
    end

    def write(chunk)
      msgs = []
      msgs_size = 0

      chunk.msgpack_each do |tag, time, record|
        size = Yajl.dump(record).bytesize
        if msgs.length > 0 && (msgs_size + size > @max_req_size || msgs.length + 1 > @max_msgs_per_req)
          publish(msgs)
          msgs = []
          msgs_size = 0
        end
        msgs << record.to_json
        msgs_size += size
      end

      if msgs.length > 0
        publish(msgs)
      end
    rescue
      log.error "unexpected error", :error=>$!.to_s
      log.error_backtrace
    end
  end

end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
fluent-plugin-cloud-pubsub-0.0.2 lib/fluent/plugin/out_cloud_pubsub.rb