lib/fluent/plugin/out_gcloud_pubsub.rb in fluent-plugin-gcloud-pubsub-custom-0.4.6 vs lib/fluent/plugin/out_gcloud_pubsub.rb in fluent-plugin-gcloud-pubsub-custom-1.0.0

- old
+ new

@@ -1,20 +1,18 @@ -require 'fluent/output' +require 'fluent/plugin/output' require 'fluent/plugin/gcloud_pubsub/client' -module Fluent - class GcloudPubSubOutput < BufferedOutput +module Fluent::Plugin + class GcloudPubSubOutput < Output Fluent::Plugin.register_output('gcloud_pubsub', self) - class << self - unless method_defined?(:desc) - def desc(description) - end - end - end + helpers :compat_parameters, :formatter + DEFAULT_BUFFER_TYPE = "memory" + DEFAULT_FORMATTER_TYPE = "json" + desc 'Set your GCP project.' config_param :project, :string, :default => nil desc 'Set your credential file path.' config_param :key, :string, :default => nil desc 'Set topic name to publish.' @@ -28,31 +26,39 @@ desc 'Limit bytesize per message.' config_param :max_message_size, :integer, :default => 4000000 # 4MB desc 'Set output format.' config_param :format, :string, :default => 'json' - unless method_defined?(:log) - define_method("log") { $log } + config_section :buffer do + config_set_default :@type, DEFAULT_BUFFER_TYPE end - unless method_defined?(:router) - define_method("router") { Fluent::Engine } + config_section :format do + config_set_default :@type, DEFAULT_FORMATTER_TYPE end def configure(conf) + compat_parameters_convert(conf, :buffer, :formatter) super - @formatter = Plugin.new_formatter(@format) - @formatter.configure(conf) + @formatter = formatter_create end def start super @publisher = Fluent::GcloudPubSub::Publisher.new @project, @key, @topic, @autocreate_topic log.debug "connected topic:#{@topic} in project #{@project}" end def format(tag, time, record) @formatter.format(tag, time, record).to_msgpack + end + + def formatted_to_msgpack_binary? + true + end + + def multi_workers_ready? + true end def write(chunk) messages = [] size = 0