lib/fluent/plugin/out_gcloud_pubsub.rb in fluent-plugin-gcloud-pubsub-custom-1.0.3 vs lib/fluent/plugin/out_gcloud_pubsub.rb in fluent-plugin-gcloud-pubsub-custom-1.1.0
- old
+ new
@@ -35,17 +35,17 @@
end
def configure(conf)
compat_parameters_convert(conf, :buffer, :formatter)
super
+ placeholder_validate!(:topic, @topic)
@formatter = formatter_create
end
def start
super
- @publisher = Fluent::GcloudPubSub::Publisher.new @project, @key, @topic, @autocreate_topic
- log.debug "connected topic:#{@topic} in project #{@project}"
+ @publisher = Fluent::GcloudPubSub::Publisher.new @project, @key, @autocreate_topic
end
def format(tag, time, record)
@formatter.format(tag, time, record).to_msgpack
end
@@ -57,29 +57,31 @@
def multi_workers_ready?
true
end
def write(chunk)
+ topic = extract_placeholders(@topic, chunk.metadata)
+
messages = []
size = 0
chunk.msgpack_each do |msg|
if msg.bytesize > @max_message_size
log.warn 'Drop a message because its size exceeds `max_message_size`', size: msg.bytesize
next
end
if messages.length + 1 > @max_messages || size + msg.bytesize > @max_total_size
- publish messages
+ publish(topic, messages)
messages = []
size = 0
end
messages << msg
size += msg.bytesize
end
if messages.length > 0
- publish messages
+ publish(topic, messages)
end
rescue Fluent::GcloudPubSub::RetryableError => ex
log.warn "Retryable error occurs. Fluentd will retry.", error_message: ex.to_s, error_class: ex.class.to_s
raise ex
rescue => ex
@@ -88,11 +90,11 @@
raise ex
end
private
- def publish(messages)
- log.debug "send message topic:#{@topic} length:#{messages.length} size:#{messages.map(&:bytesize).inject(:+)}"
- @publisher.publish messages
+ def publish(topic, messages)
+ log.debug "send message topic:#{topic} length:#{messages.length} size:#{messages.map(&:bytesize).inject(:+)}"
+ @publisher.publish(topic, messages)
end
end
end