lib/fluent/plugin/in_gcloud_pubsub.rb in fluent-plugin-gcloud-pubsub-custom-0.3.4 vs lib/fluent/plugin/in_gcloud_pubsub.rb in fluent-plugin-gcloud-pubsub-custom-0.4.0

- old
+ new

@@ -20,10 +20,12 @@ class FailedParseError < StandardError end desc 'Set tag of messages.' config_param :tag, :string + desc 'Set key to be used as tag.' + config_param :tag_key, :string, default: nil desc 'Set your GCP project.' config_param :project, :string, default: nil desc 'Set your credential file path.' config_param :key, :string, default: nil desc 'Set topic name to pull.' @@ -106,10 +108,16 @@ super @rpc_srv = nil @rpc_thread = nil @stop_pull = false + @extract_tag = if @tag_key.nil? + method(:static_tag) + else + method(:dynamic_tag) + end + @parser = Plugin.new_parser(@format) @parser.configure(conf) end def start @@ -151,10 +159,18 @@ log.info "start pull from subscription:#{@subscription}" end private + def static_tag(record) + @tag + end + + def dynamic_tag(record) + record.delete(@tag_key) || @tag + end + def start_rpc log.info "listening http rpc server on http://#{@rpc_bind}:#{@rpc_port}/" @rpc_srv = WEBrick::HTTPServer.new( { BindAddress: @rpc_bind, @@ -199,16 +215,19 @@ log.error "unexpected error", error_message: ex.to_s, error_class: ex.class.to_s log.error_backtrace ex.backtrace end def process(messages) - es = MultiEventStream.new + event_streams = Hash.new do |hsh, key| + hsh[key] = MultiEventStream.new + end + messages.each do |m| line = m.message.data.chomp @parser.parse(line) do |time, record| if time && record - es.add(time, record) + event_streams[@extract_tag.call(record)].add(time, record) else case @parse_error_action when :exception raise FailedParseError.new "pattern not match: #{line.inspect}" else @@ -216,13 +235,15 @@ end end end end - # There are some output plugins not to supposed to be called with multi-threading. - # Maybe remove in the future. - @emit_guard.synchronize do - router.emit_stream(@tag, es) + event_streams.each do |tag, es| + # There are some output plugins not to supposed to be called with multi-threading. + # Maybe remove in the future. + @emit_guard.synchronize do + router.emit_stream(tag, es) + end end end end end