# # Fluentd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # require 'fluent/plugin/output' require 'fluent/plugin_helper/socket' require 'fluent/engine' require 'fluent/clock' module Fluent::Plugin class ForwardOutput < Output class AckHandler module Result SUCCESS = :success FAILED = :failed CHUNKID_UNMATCHED = :chunkid_unmatched end def initialize(timeout:, log:, read_length:) @mutex = Mutex.new @ack_waitings = [] @timeout = timeout @log = log @read_length = read_length @unpacker = Fluent::MessagePackFactory.msgpack_unpacker end def collect_response(select_interval) now = Fluent::Clock.now sockets = [] results = [] begin new_list = [] @mutex.synchronize do @ack_waitings.each do |info| if info.expired?(now) # There are 2 types of cases when no response has been received from socket: # (1) the node does not support sending responses # (2) the node does support sending response but responses have not arrived for some reasons. @log.warn 'no response from node. regard it as unavailable.', host: info.node.host, port: info.node.port results << [info, Result::FAILED] else sockets << info.sock new_list << info end end @ack_waitings = new_list end begin readable_sockets, _, _ = IO.select(sockets, nil, nil, select_interval) rescue IOError @log.info "connection closed while waiting for readable sockets" readable_sockets = nil end if readable_sockets readable_sockets.each do |sock| results << read_ack_from_sock(sock) end end results.each do |info, ret| if info.nil? yield nil, nil, nil, ret else yield info.chunk_id, info.node, info.sock, ret end end rescue => e @log.error 'unexpected error while receiving ack', error: e @log.error_backtrace end end ACKWaitingSockInfo = Struct.new(:sock, :chunk_id, :chunk_id_base64, :node, :expired_time) do def expired?(now) expired_time < now end end Ack = Struct.new(:chunk_id, :node, :handler) do def enqueue(sock) handler.enqueue(node, sock, chunk_id) end end def create_ack(chunk_id, node) Ack.new(chunk_id, node, self) end def enqueue(node, sock, cid) info = ACKWaitingSockInfo.new(sock, cid, Base64.encode64(cid), node, Fluent::Clock.now + @timeout) @mutex.synchronize do @ack_waitings << info end end private def read_ack_from_sock(sock) begin raw_data = sock.instance_of?(Fluent::PluginHelper::Socket::WrappedSocket::TLS) ? sock.readpartial(@read_length) : sock.recv(@read_length) rescue Errno::ECONNRESET, EOFError # ECONNRESET for #recv, #EOFError for #readpartial raw_data = '' rescue IOError @log.info "socket closed while receiving ack response" return nil, Result::FAILED end info = find(sock) if info.nil? # The info can be deleted by another thread during `sock.recv()` and `find()`. # This is OK since another thread has completed to process the ack, so we can skip this. # Note: exclusion mechanism about `collect_response()` may need to be considered. @log.debug "could not find the ack info. this ack may be processed by another thread." return nil, Result::FAILED elsif raw_data.empty? # When connection is closed by remote host, socket is ready to read and #recv returns an empty string that means EOF. # If this happens we assume the data wasn't delivered and retry it. @log.warn 'destination node closed the connection. regard it as unavailable.', host: info.node.host, port: info.node.port # info.node.disable! return info, Result::FAILED else @unpacker.feed(raw_data) res = @unpacker.read @log.trace 'getting response from destination', host: info.node.host, port: info.node.port, chunk_id: dump_unique_id_hex(info.chunk_id), response: res if res['ack'] != info.chunk_id_base64 # Some errors may have occurred when ack and chunk id is different, so send the chunk again. @log.warn 'ack in response and chunk id in sent data are different', chunk_id: dump_unique_id_hex(info.chunk_id), ack: res['ack'] return info, Result::CHUNKID_UNMATCHED else @log.trace 'got a correct ack response', chunk_id: dump_unique_id_hex(info.chunk_id) end return info, Result::SUCCESS end rescue => e @log.error 'unexpected error while receiving ack message', error: e @log.error_backtrace [nil, Result::FAILED] ensure delete(info) end def dump_unique_id_hex(unique_id) Fluent::UniqueId.hex(unique_id) end def find(sock) @mutex.synchronize do @ack_waitings.find { |info| info.sock == sock } end end def delete(info) @mutex.synchronize do @ack_waitings.delete(info) end end end end end