lib/fluent/plugin/in_gcloud_pubsub.rb in fluent-plugin-gcloud-pubsub-custom-0.3.1 vs lib/fluent/plugin/in_gcloud_pubsub.rb in fluent-plugin-gcloud-pubsub-custom-0.3.2
- old
+ new
@@ -8,24 +8,47 @@
module Fluent
class GcloudPubSubInput < Input
Fluent::Plugin.register_input('gcloud_pubsub', self)
+ class << self
+ unless method_defined?(:desc)
+ def desc(description)
+ end
+ end
+ end
+
+ class FailedParseError < StandardError
+ end
+
+ desc 'Set tag of messages.'
config_param :tag, :string
+ desc 'Set your GCP project.'
config_param :project, :string, default: nil
+ desc 'Set your credential file path.'
config_param :key, :string, default: nil
+ desc 'Set topic name to pull.'
config_param :topic, :string
+ desc 'Set subscription name to pull.'
config_param :subscription, :string
+ desc 'Pulling messages by intervals of specified seconds.'
config_param :pull_interval, :float, default: 5.0
+ desc 'Max messages pulling at once.'
config_param :max_messages, :integer, default: 100
+ desc 'Setting `true`, keepalive connection to wait for new messages.'
config_param :return_immediately, :bool, default: true
+ desc 'Set number of threads to pull messages.'
config_param :pull_threads, :integer, default: 1
+ desc 'Set input format.'
config_param :format, :string, default: 'json'
# for HTTP RPC
+ desc 'If `true` is specified, HTTP RPC to stop or start pulling message is enabled.'
config_param :enable_rpc, :bool, default: false
- config_param :rpc_bind, :string, default: '0.0.0.0'
- config_param :rpc_port, :integer, default: 24680
+ desc 'Bind IP address for HTTP RPC.'
+ config_param :rpc_bind, :string, default: '0.0.0.0'
+ desc 'Port for HTTP RPC.'
+ config_param :rpc_port, :integer, default: 24680
unless method_defined?(:log)
define_method("log") { $log }
end
@@ -161,46 +184,29 @@
if messages.length == 0
log.debug "no messages are pulled"
return
end
- es = parse_messages(messages)
- if es.empty?
- log.warn "#{messages.length} message(s) are pulled, but no messages are parsed"
- return
- end
-
- begin
- router.emit_stream(@tag, es)
- rescue
- # ignore errors. Engine shows logs and backtraces.
- end
+ process messages
@subscriber.acknowledge messages
+
log.debug "#{messages.length} message(s) processed"
rescue => ex
log.error "unexpected error", error_message: ex.to_s, error_class: ex.class.to_s
log.error_backtrace ex.backtrace
end
- def parse_messages(messages)
+ def process(messages)
es = MultiEventStream.new
messages.each do |m|
- convert_line_to_event(m.message.data, es)
- end
- es
- end
-
- def convert_line_to_event(line, es)
- line = line.chomp # remove \n
- @parser.parse(line) { |time, record|
- if time && record
- es.add(time, record)
- else
- log.warn "pattern not match: #{line.inspect}"
+ @parser.parse(m.message.data.chomp) do |time, record|
+ if time && record
+ es.add(time, record)
+ else
+ raise FailedParseError.new "pattern not match: #{line.inspect}"
+ end
end
- }
- rescue => ex
- log.warn line.dump, error_message: ex.to_s, error_class: ex.class.to_s
- log.warn_backtrace ex.backtrace
+ end
+ router.emit_stream(@tag, es)
end
end
end