lib/fluent/plugin/in_gcloud_pubsub.rb in fluent-plugin-gcloud-pubsub-custom-0.3.1 vs lib/fluent/plugin/in_gcloud_pubsub.rb in fluent-plugin-gcloud-pubsub-custom-0.3.2

- old
+ new

@@ -8,24 +8,47 @@ module Fluent class GcloudPubSubInput < Input Fluent::Plugin.register_input('gcloud_pubsub', self) + class << self + unless method_defined?(:desc) + def desc(description) + end + end + end + + class FailedParseError < StandardError + end + + desc 'Set tag of messages.' config_param :tag, :string + desc 'Set your GCP project.' config_param :project, :string, default: nil + desc 'Set your credential file path.' config_param :key, :string, default: nil + desc 'Set topic name to pull.' config_param :topic, :string + desc 'Set subscription name to pull.' config_param :subscription, :string + desc 'Pulling messages by intervals of specified seconds.' config_param :pull_interval, :float, default: 5.0 + desc 'Max messages pulling at once.' config_param :max_messages, :integer, default: 100 + desc 'Setting `true`, keepalive connection to wait for new messages.' config_param :return_immediately, :bool, default: true + desc 'Set number of threads to pull messages.' config_param :pull_threads, :integer, default: 1 + desc 'Set input format.' config_param :format, :string, default: 'json' # for HTTP RPC + desc 'If `true` is specified, HTTP RPC to stop or start pulling message is enabled.' config_param :enable_rpc, :bool, default: false - config_param :rpc_bind, :string, default: '0.0.0.0' - config_param :rpc_port, :integer, default: 24680 + desc 'Bind IP address for HTTP RPC.' + config_param :rpc_bind, :string, default: '0.0.0.0' + desc 'Port for HTTP RPC.' + config_param :rpc_port, :integer, default: 24680 unless method_defined?(:log) define_method("log") { $log } end @@ -161,46 +184,29 @@ if messages.length == 0 log.debug "no messages are pulled" return end - es = parse_messages(messages) - if es.empty? - log.warn "#{messages.length} message(s) are pulled, but no messages are parsed" - return - end - - begin - router.emit_stream(@tag, es) - rescue - # ignore errors. Engine shows logs and backtraces. - end + process messages @subscriber.acknowledge messages + log.debug "#{messages.length} message(s) processed" rescue => ex log.error "unexpected error", error_message: ex.to_s, error_class: ex.class.to_s log.error_backtrace ex.backtrace end - def parse_messages(messages) + def process(messages) es = MultiEventStream.new messages.each do |m| - convert_line_to_event(m.message.data, es) - end - es - end - - def convert_line_to_event(line, es) - line = line.chomp # remove \n - @parser.parse(line) { |time, record| - if time && record - es.add(time, record) - else - log.warn "pattern not match: #{line.inspect}" + @parser.parse(m.message.data.chomp) do |time, record| + if time && record + es.add(time, record) + else + raise FailedParseError.new "pattern not match: #{line.inspect}" + end end - } - rescue => ex - log.warn line.dump, error_message: ex.to_s, error_class: ex.class.to_s - log.warn_backtrace ex.backtrace + end + router.emit_stream(@tag, es) end end end