# Fluentd output plugin to ingest data to Wendelin system # Copyright (C) 2015 Nexedi SA and Contributors. # Kirill Smelkov # # This program is free software: you can Use, Study, Modify and Redistribute # it under the terms of the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # This plugin hooks into Fluentd in a way similiar to out_forward and # out_secure_forward and outputs event stream to a Wendelin HTTP endpoint. require 'fluent/output' require_relative 'wendelin_client' module Fluent class WendelinOutput < ObjectBufferedOutput # XXX verify base class Plugin.register_output('wendelin', self) # where Wendelin's Input Stream Tool is located, # ex http://example.com/erp5/portal_ingestion_policies/example_ingestion config_param :streamtool_uri, :string # credentials to authenticate this fluentd to wendelin # by default credentials are not used # TODO user/password -> certificate config_param :user, :string, :default => nil config_param :password, :string, :default => nil def configure(conf) super credentials = {} if @user credentials['user'] = @user credentials['password'] = @password end @wendelin = WendelinClient.new(@streamtool_uri, credentials) end def start super # TODO end def shutdown super # TODO end # hooked to ObjectBufferedOutput - write out records from a buffer chunk for a tag. # # NOTE this is called from a separate thread (see OutputThread and BufferedOutput) def write_objects(tag, chunk) # NOTE if this fail and raises -> it will unroll to BufferedOutput#try_flush # which detects errors and retries outputting logs up to retry maxcount # times and aborts outputting current logs if all try fail. # # This way, we don't need to code rescue here. # NOTE tag is 1, and chunk stores an event stream, usually [] of # (timestamp, record) in msgpack, but it general it could be arbitrary # data - we send it as-is. data_chunk = chunk.read() # for input_stream_ref use tag as-is - it will be processed/translated # further on server by Wendelin reference = tag @wendelin.ingest(reference, data_chunk) end end end