lib/fluent/plugin/out_gcloud_pubsub.rb in fluent-plugin-gcloud-pubsub-custom-0.3.0 vs lib/fluent/plugin/out_gcloud_pubsub.rb in fluent-plugin-gcloud-pubsub-custom-0.3.1
- old
+ new
@@ -33,19 +33,18 @@
@publisher = Fluent::GcloudPubSub::Publisher.new @project, @key, @topic, @autocreate_topic
log.debug "connected topic:#{@topic} in project #{@project}"
end
def format(tag, time, record)
- [tag, time, record].to_msgpack
+ @formatter.format(tag, time, record).to_msgpack
end
def write(chunk)
messages = []
size = 0
- chunk.msgpack_each do |tag, time, record|
- msg = @formatter.format(tag, time, record)
+ chunk.msgpack_each do |msg|
if messages.length + 1 > @max_messages || size + msg.bytesize > @max_total_size
publish messages
messages = []
size = 0
end
@@ -54,13 +53,16 @@
end
if messages.length > 0
publish messages
end
- rescue => e
- log.error "unexpected error", :error=>$!.to_s
+ rescue Fluent::GcloudPubSub::RetryableError => ex
+ log.warn "Retryable error occurs. Fluentd will retry.", error_message: ex.to_s, error_class: ex.class.to_s
+ raise ex
+ rescue => ex
+ log.error "unexpected error", error_message: ex.to_s, error_class: ex.class.to_s
log.error_backtrace
- raise e
+ raise ex
end
private
def publish(messages)