lib/fluent/plugin/in_gcloud_pubsub.rb in fluent-plugin-gcloud-pubsub-custom-0.1.2 vs lib/fluent/plugin/in_gcloud_pubsub.rb in fluent-plugin-gcloud-pubsub-custom-0.1.3

- old
+ new

@@ -47,34 +47,46 @@ private def subscribe until @stop_subscribing - messages = @subscriber.pull @return_immediately, @max_messages + _subscribe - if messages.length > 0 - es = parse_messages(messages) - unless es.empty? - begin - router.emit_stream(@tag, es) - rescue - # ignore errors. Engine shows logs and backtraces. - end - @subscriber.acknowledge messages - log.debug "#{messages.length} message(s) processed" - end - end - if @return_immediately sleep @pull_interval end end - rescue - log.error "unexpected error", :error=>$!.to_s - log.error_backtrace + rescue => ex + log.error "unexpected error", error_message: ex.to_s, error_class: ex.class.to_s + log.error_backtrace ex.backtrace end + def _subscribe + messages = @subscriber.pull @return_immediately, @max_messages + if messages.length == 0 + log.debug "no messages are pulled" + return + end + + es = parse_messages(messages) + if es.empty? + log.warn "#{messages.length} message(s) are pulled, but no messages are parsed" + return + end + + begin + router.emit_stream(@tag, es) + rescue + # ignore errors. Engine shows logs and backtraces. + end + @subscriber.acknowledge messages + log.debug "#{messages.length} message(s) processed" + rescue => ex + log.error "unexpected error", error_message: ex.to_s, error_class: ex.class.to_s + log.error_backtrace ex.backtrace + end + def parse_messages(messages) es = MultiEventStream.new messages.each do |m| convert_line_to_event(m.message.data, es) end @@ -88,11 +100,11 @@ es.add(time, record) else log.warn "pattern not match: #{line.inspect}" end } - rescue => e - log.warn line.dump, :error => e.to_s - log.debug_backtrace(e.backtrace) + rescue => ex + log.warn line.dump, error_message: ex.to_s, error_class: ex.class.to_s + log.warn_backtrace ex.backtrace end end end