lib/fluent/plugin/out_gcloud_pubsub.rb in fluent-plugin-gcloud-pubsub-custom-0.4.6 vs lib/fluent/plugin/out_gcloud_pubsub.rb in fluent-plugin-gcloud-pubsub-custom-1.0.0
- old
+ new
@@ -1,20 +1,18 @@
-require 'fluent/output'
+require 'fluent/plugin/output'
require 'fluent/plugin/gcloud_pubsub/client'
-module Fluent
- class GcloudPubSubOutput < BufferedOutput
+module Fluent::Plugin
+ class GcloudPubSubOutput < Output
Fluent::Plugin.register_output('gcloud_pubsub', self)
- class << self
- unless method_defined?(:desc)
- def desc(description)
- end
- end
- end
+ helpers :compat_parameters, :formatter
+ DEFAULT_BUFFER_TYPE = "memory"
+ DEFAULT_FORMATTER_TYPE = "json"
+
desc 'Set your GCP project.'
config_param :project, :string, :default => nil
desc 'Set your credential file path.'
config_param :key, :string, :default => nil
desc 'Set topic name to publish.'
@@ -28,31 +26,39 @@
desc 'Limit bytesize per message.'
config_param :max_message_size, :integer, :default => 4000000 # 4MB
desc 'Set output format.'
config_param :format, :string, :default => 'json'
- unless method_defined?(:log)
- define_method("log") { $log }
+ config_section :buffer do
+ config_set_default :@type, DEFAULT_BUFFER_TYPE
end
- unless method_defined?(:router)
- define_method("router") { Fluent::Engine }
+ config_section :format do
+ config_set_default :@type, DEFAULT_FORMATTER_TYPE
end
def configure(conf)
+ compat_parameters_convert(conf, :buffer, :formatter)
super
- @formatter = Plugin.new_formatter(@format)
- @formatter.configure(conf)
+ @formatter = formatter_create
end
def start
super
@publisher = Fluent::GcloudPubSub::Publisher.new @project, @key, @topic, @autocreate_topic
log.debug "connected topic:#{@topic} in project #{@project}"
end
def format(tag, time, record)
@formatter.format(tag, time, record).to_msgpack
+ end
+
+ def formatted_to_msgpack_binary?
+ true
+ end
+
+ def multi_workers_ready?
+ true
end
def write(chunk)
messages = []
size = 0