lib/fluent/plugin/in_gcloud_pubsub.rb in fluent-plugin-gcloud-pubsub-custom-0.3.4 vs lib/fluent/plugin/in_gcloud_pubsub.rb in fluent-plugin-gcloud-pubsub-custom-0.4.0
- old
+ new
@@ -20,10 +20,12 @@
class FailedParseError < StandardError
end
desc 'Set tag of messages.'
config_param :tag, :string
+ desc 'Set key to be used as tag.'
+ config_param :tag_key, :string, default: nil
desc 'Set your GCP project.'
config_param :project, :string, default: nil
desc 'Set your credential file path.'
config_param :key, :string, default: nil
desc 'Set topic name to pull.'
@@ -106,10 +108,16 @@
super
@rpc_srv = nil
@rpc_thread = nil
@stop_pull = false
+ @extract_tag = if @tag_key.nil?
+ method(:static_tag)
+ else
+ method(:dynamic_tag)
+ end
+
@parser = Plugin.new_parser(@format)
@parser.configure(conf)
end
def start
@@ -151,10 +159,18 @@
log.info "start pull from subscription:#{@subscription}"
end
private
+ def static_tag(record)
+ @tag
+ end
+
+ def dynamic_tag(record)
+ record.delete(@tag_key) || @tag
+ end
+
def start_rpc
log.info "listening http rpc server on http://#{@rpc_bind}:#{@rpc_port}/"
@rpc_srv = WEBrick::HTTPServer.new(
{
BindAddress: @rpc_bind,
@@ -199,16 +215,19 @@
log.error "unexpected error", error_message: ex.to_s, error_class: ex.class.to_s
log.error_backtrace ex.backtrace
end
def process(messages)
- es = MultiEventStream.new
+ event_streams = Hash.new do |hsh, key|
+ hsh[key] = MultiEventStream.new
+ end
+
messages.each do |m|
line = m.message.data.chomp
@parser.parse(line) do |time, record|
if time && record
- es.add(time, record)
+ event_streams[@extract_tag.call(record)].add(time, record)
else
case @parse_error_action
when :exception
raise FailedParseError.new "pattern not match: #{line.inspect}"
else
@@ -216,13 +235,15 @@
end
end
end
end
- # There are some output plugins not to supposed to be called with multi-threading.
- # Maybe remove in the future.
- @emit_guard.synchronize do
- router.emit_stream(@tag, es)
+ event_streams.each do |tag, es|
+ # There are some output plugins not to supposed to be called with multi-threading.
+ # Maybe remove in the future.
+ @emit_guard.synchronize do
+ router.emit_stream(tag, es)
+ end
end
end
end
end