lib/fluent/plugin/in_gcloud_pubsub.rb in fluent-plugin-gcloud-pubsub-custom-0.1.2 vs lib/fluent/plugin/in_gcloud_pubsub.rb in fluent-plugin-gcloud-pubsub-custom-0.1.3
- old
+ new
@@ -47,34 +47,46 @@
private
def subscribe
until @stop_subscribing
- messages = @subscriber.pull @return_immediately, @max_messages
+ _subscribe
- if messages.length > 0
- es = parse_messages(messages)
- unless es.empty?
- begin
- router.emit_stream(@tag, es)
- rescue
- # ignore errors. Engine shows logs and backtraces.
- end
- @subscriber.acknowledge messages
- log.debug "#{messages.length} message(s) processed"
- end
- end
-
if @return_immediately
sleep @pull_interval
end
end
- rescue
- log.error "unexpected error", :error=>$!.to_s
- log.error_backtrace
+ rescue => ex
+ log.error "unexpected error", error_message: ex.to_s, error_class: ex.class.to_s
+ log.error_backtrace ex.backtrace
end
+ def _subscribe
+ messages = @subscriber.pull @return_immediately, @max_messages
+ if messages.length == 0
+ log.debug "no messages are pulled"
+ return
+ end
+
+ es = parse_messages(messages)
+ if es.empty?
+ log.warn "#{messages.length} message(s) are pulled, but no messages are parsed"
+ return
+ end
+
+ begin
+ router.emit_stream(@tag, es)
+ rescue
+ # ignore errors. Engine shows logs and backtraces.
+ end
+ @subscriber.acknowledge messages
+ log.debug "#{messages.length} message(s) processed"
+ rescue => ex
+ log.error "unexpected error", error_message: ex.to_s, error_class: ex.class.to_s
+ log.error_backtrace ex.backtrace
+ end
+
def parse_messages(messages)
es = MultiEventStream.new
messages.each do |m|
convert_line_to_event(m.message.data, es)
end
@@ -88,11 +100,11 @@
es.add(time, record)
else
log.warn "pattern not match: #{line.inspect}"
end
}
- rescue => e
- log.warn line.dump, :error => e.to_s
- log.debug_backtrace(e.backtrace)
+ rescue => ex
+ log.warn line.dump, error_message: ex.to_s, error_class: ex.class.to_s
+ log.warn_backtrace ex.backtrace
end
end
end