lib/fluent/plugin/out_gcloud_pubsub.rb in fluent-plugin-gcloud-pubsub-custom-1.0.3 vs lib/fluent/plugin/out_gcloud_pubsub.rb in fluent-plugin-gcloud-pubsub-custom-1.1.0

- old
+ new

@@ -35,17 +35,17 @@ end def configure(conf) compat_parameters_convert(conf, :buffer, :formatter) super + placeholder_validate!(:topic, @topic) @formatter = formatter_create end def start super - @publisher = Fluent::GcloudPubSub::Publisher.new @project, @key, @topic, @autocreate_topic - log.debug "connected topic:#{@topic} in project #{@project}" + @publisher = Fluent::GcloudPubSub::Publisher.new @project, @key, @autocreate_topic end def format(tag, time, record) @formatter.format(tag, time, record).to_msgpack end @@ -57,29 +57,31 @@ def multi_workers_ready? true end def write(chunk) + topic = extract_placeholders(@topic, chunk.metadata) + messages = [] size = 0 chunk.msgpack_each do |msg| if msg.bytesize > @max_message_size log.warn 'Drop a message because its size exceeds `max_message_size`', size: msg.bytesize next end if messages.length + 1 > @max_messages || size + msg.bytesize > @max_total_size - publish messages + publish(topic, messages) messages = [] size = 0 end messages << msg size += msg.bytesize end if messages.length > 0 - publish messages + publish(topic, messages) end rescue Fluent::GcloudPubSub::RetryableError => ex log.warn "Retryable error occurs. Fluentd will retry.", error_message: ex.to_s, error_class: ex.class.to_s raise ex rescue => ex @@ -88,11 +90,11 @@ raise ex end private - def publish(messages) - log.debug "send message topic:#{@topic} length:#{messages.length} size:#{messages.map(&:bytesize).inject(:+)}" - @publisher.publish messages + def publish(topic, messages) + log.debug "send message topic:#{topic} length:#{messages.length} size:#{messages.map(&:bytesize).inject(:+)}" + @publisher.publish(topic, messages) end end end