# Fluentd output plugin to ingest data to Wendelin system # Copyright (C) 2015 Nexedi SA and Contributors. # Kirill Smelkov # # This program is free software: you can Use, Study, Modify and Redistribute # it under the terms of the GNU General Public License version 3, or (at your # option) any later version, as published by the Free Software Foundation. # # You can also Link and Combine this program with other software covered by # the terms of any of the Free Software licenses or any of the Open Source # Initiative approved licenses and Convey the resulting work. Corresponding # source of such a combination shall include the source code for all other # software used. # # This program is distributed WITHOUT ANY WARRANTY; without even the implied # warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. # # See COPYING file for full licensing terms. # See https://www.nexedi.com/licensing for rationale and options. # This plugin hooks into Fluentd in a way similiar to out_forward and # out_secure_forward and outputs event stream to a Wendelin HTTP endpoint. require 'fluent/plugin/output' require_relative 'wendelin_client' module Fluent::Plugin class WendelinOutput < Output Fluent::Plugin.register_output('wendelin', self) # where Wendelin's Input Stream Tool is located, # ex http://example.com/erp5/portal_ingestion_policies/example_ingestion config_param :streamtool_uri, :string # credentials to authenticate this fluentd to wendelin # by default credentials are not used # TODO user/password -> certificate config_param :user, :string, :default => nil config_param :password, :string, :default => nil config_param :use_keep_alive, :bool, :default => false config_param :ssl_timeout, :integer, :default => 60 config_param :open_timeout, :integer, :default => 60 config_param :read_timeout, :integer, :default => 60 config_param :keep_alive_timeout, :integer, :default => 300 config_section :buffer do config_set_default :chunk_keys, ["tag"] end def configure(conf) super unless @chunk_key_tag raise Fluent::ConfigError, "buffer chunk key must include 'tag' for wendelin output" end credentials = {} if @user credentials['user'] = @user credentials['password'] = @password end @wendelin = WendelinClient.new(@streamtool_uri, credentials, @log, @ssl_timeout, @open_timeout, @read_timeout, @keep_alive_timeout) end def start super # TODO end def shutdown super # TODO end # Use normal "Synchronous Buffer" - write out records from a buffer chunk for a tag. # def write(chunk) return if chunk.empty? # NOTE if this fail and raises -> it will unroll to Output#try_flush # which detects errors and retries outputting logs up to retry maxcount # times and aborts outputting current logs if all try fail. # # This way, we don't need to code rescue here. # NOTE tag is 1, and chunk stores an event stream, usually [] of # (timestamp, record) in msgpack, but it general it could be arbitrary # data - we send it as-is. data_chunk = chunk.read() # for input_stream_ref use tag as-is - it will be processed/translated # further on server by Wendelin reference = chunk.metadata.tag if @use_keep_alive @wendelin.ingest_with_keep_alive(reference, data_chunk) else @wendelin.ingest(reference, data_chunk) end end end end