Sha256: 45bf4c11d9595d66686e10e3d11a5de7c34774cdb588eadc2bbfd913d26e48a5
Contents?: true
Size: 1.91 KB
Versions: 2
Compression:
Stored size: 1.91 KB
Contents
require 'fluent/output' require 'fluent/plugin/gcloud_pubsub/client' module Fluent class GcloudPubSubOutput < BufferedOutput Fluent::Plugin.register_output('gcloud_pubsub', self) config_param :project, :string, :default => nil config_param :key, :string, :default => nil config_param :topic, :string config_param :autocreate_topic, :bool, :default => false config_param :max_messages, :integer, :default => 1000 config_param :max_total_size, :integer, :default => 10000000 # 10MB config_param :format, :string, :default => 'json' unless method_defined?(:log) define_method("log") { $log } end unless method_defined?(:router) define_method("router") { Fluent::Engine } end def configure(conf) super @formatter = Plugin.new_formatter(@format) @formatter.configure(conf) end def start super @publisher = Fluent::GcloudPubSub::Publisher.new @project, @key, @topic, @autocreate_topic log.debug "connected topic:#{@topic} in project #{@project}" end def format(tag, time, record) [tag, time, record].to_msgpack end def write(chunk) messages = [] size = 0 chunk.msgpack_each do |tag, time, record| msg = @formatter.format(tag, time, record) if messages.length + 1 > @max_messages || size + msg.bytesize > @max_total_size publish messages messages = [] size = 0 end messages << msg size += msg.bytesize end if messages.length > 0 publish messages end rescue => e log.error "unexpected error", :error=>$!.to_s log.error_backtrace raise e end private def publish(messages) log.debug "send message topic:#{@topic} length:#{messages.length.to_s}" @publisher.publish messages end end end
Version data entries
2 entries across 2 versions & 1 rubygems
Version | Path |
---|---|
fluent-plugin-gcloud-pubsub-custom-0.1.1 | lib/fluent/plugin/out_gcloud_pubsub.rb |
fluent-plugin-gcloud-pubsub-custom-0.1.0 | lib/fluent/plugin/out_gcloud_pubsub.rb |