lib/fluent/plugin/out_gcloud_pubsub.rb in fluent-plugin-gcloud-pubsub-custom-subscriber-1.3.2 vs lib/fluent/plugin/out_gcloud_pubsub.rb in fluent-plugin-gcloud-pubsub-custom-subscriber-1.3.3Z

- old
+ new

@@ -1,13 +1,11 @@ require 'fluent/plugin/output' + require 'fluent/plugin/gcloud_pubsub/client' -require 'fluent/plugin_helper/inject' module Fluent::Plugin class GcloudPubSubOutput < Output - include Fluent::PluginHelper::Inject - Fluent::Plugin.register_output('gcloud_pubsub', self) helpers :compat_parameters, :formatter DEFAULT_BUFFER_TYPE = "memory" @@ -19,18 +17,20 @@ config_param :key, :string, :default => nil desc 'Set topic name to publish.' config_param :topic, :string desc "If set to `true`, specified topic will be created when it doesn't exist." config_param :autocreate_topic, :bool, :default => false + desc "Custom param - skips topic retrieval; service accounts can just solely have PubSub Publishing roles when set to true" + config_param :skip_lookup, :bool, :default => true desc 'Publishing messages count per request to Cloud Pub/Sub.' config_param :max_messages, :integer, :default => 1000 desc 'Publishing messages bytesize per request to Cloud Pub/Sub.' config_param :max_total_size, :integer, :default => 9800000 # 9.8MB desc 'Limit bytesize per message.' config_param :max_message_size, :integer, :default => 4000000 # 4MB - desc 'Publishing the set field as an attribute' - config_param :attribute_keys, :array, :default => [] + desc 'Set output format.' + config_param :format, :string, :default => 'json' config_section :buffer do config_set_default :@type, DEFAULT_BUFFER_TYPE end @@ -39,26 +39,21 @@ end def configure(conf) compat_parameters_convert(conf, :buffer, :formatter) super - placeholder_validate!(:topic, @topic) @formatter = formatter_create end def start super - @publisher = Fluent::GcloudPubSub::Publisher.new @project, @key, @autocreate_topic + @publisher = Fluent::GcloudPubSub::Publisher.new @project, @key, @topic, @skip_lookup + log.debug "connected topic:#{@topic} in project #{@project}" end def format(tag, time, record) - record = inject_values_to_record(tag, time, record) - attributes = {} - @attribute_keys.each do |key| - attributes[key] = record.delete(key) - end - [@formatter.format(tag, time, record), attributes].to_msgpack + @formatter.format(tag, time, record).to_msgpack end def formatted_to_msgpack_binary? true end @@ -66,45 +61,46 @@ def multi_workers_ready? true end def write(chunk) - topic = extract_placeholders(@topic, chunk.metadata) - messages = [] size = 0 - chunk.msgpack_each do |msg, attr| - msg = Fluent::GcloudPubSub::Message.new(msg, attr) + chunk.msgpack_each do |msg| if msg.bytesize > @max_message_size log.warn 'Drop a message because its size exceeds `max_message_size`', size: msg.bytesize next end if messages.length + 1 > @max_messages || size + msg.bytesize > @max_total_size - publish(topic, messages) + publish messages messages = [] size = 0 end messages << msg size += msg.bytesize end if messages.length > 0 - publish(topic, messages) + publish messages end rescue Fluent::GcloudPubSub::RetryableError => ex log.warn "Retryable error occurs. Fluentd will retry.", error_message: ex.to_s, error_class: ex.class.to_s raise ex + rescue Google::Cloud::UnauthenticatedError => ex + log.warn "Encountered UnauthenticatedError, renewing @publisher instance", error_message: ex.to_s, error_class: ex.class.to_s + @publisher = Fluent::GcloudPubSub::Publisher.new @project, @key, @topic, @skip_lookup + raise ex rescue => ex log.error "unexpected error", error_message: ex.to_s, error_class: ex.class.to_s log.error_backtrace raise ex end private - def publish(topic, messages) - log.debug "send message topic:#{topic} length:#{messages.length} size:#{messages.map(&:bytesize).inject(:+)}" - @publisher.publish(topic, messages) + def publish(messages) + log.debug "send message topic:#{@topic} length:#{messages.length} size:#{messages.map(&:bytesize).inject(:+)}" + @publisher.publish messages end end end