Sha256: f0917c6d02f30c41d2bdca57d2c17d0fda6947194e5df3657595f0bcf14315c2
Contents?: true
Size: 1.69 KB
Versions: 1
Compression:
Stored size: 1.69 KB
Contents
require 'gcloud' module Fluent class GcloudPubSubOutput < Fluent::BufferedOutput Fluent::Plugin.register_output('gcloud_pubsub', self) config_set_default :buffer_type, 'lightening' config_set_default :flush_interval, 1 config_set_default :try_flush_interval, 0.05 config_set_default :buffer_chunk_records_limit, 900 config_set_default :buffer_chunk_limit, 9437184 config_set_default :buffer_queue_limit, 64 config_param :project, :string, :default => nil config_param :topic, :string, :default => nil config_param :key, :string, :default => nil unless method_defined?(:log) define_method("log") { $log } end unless method_defined?(:router) define_method("router") { Fluent::Engine } end def configure(conf) super raise Fluent::ConfigError, "'project' must be specified." unless @project raise Fluent::ConfigError, "'topic' must be specified." unless @topic raise Fluent::ConfigError, "'key' must be specified." unless @key end def start super pubsub = (Gcloud.new @project, @key).pubsub @client = pubsub.topic @topic end def format(tag, time, record) [tag, time, record].to_msgpack end def write(chunk) messages = [] chunk.msgpack_each do |tag, time, record| messages << record.to_json end if messages.length > 0 @client.publish do |batch| messages.each do |m| batch.publish m end end end rescue log.error "unexpected error", :error=>$!.to_s log.error_backtrace end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
fluent-plugin-gcloud-pubsub-0.0.1 | lib/fluent/plugin/out_gcloud_pubsub.rb |