lib/fluent/plugin/out_forward.rb in fluentd-0.14.8 vs lib/fluent/plugin/out_forward.rb in fluentd-0.14.9

- old
+ new

@@ -12,46 +12,27 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # -require 'base64' -require 'socket' -require 'fileutils' - -require 'cool.io' - require 'fluent/output' require 'fluent/config/error' +require 'base64' -module Fluent - class ForwardOutputError < StandardError - end +require 'fluent/compat/socket_util' - class ForwardOutputResponseError < ForwardOutputError - end +module Fluent::Plugin + class ForwardOutput < Output + class Error < StandardError; end + class ResponseError < Error; end + class ConnectionClosedError < Error; end + class ACKTimeoutError < Error; end - class ForwardOutputConnectionClosedError < ForwardOutputError - end + Fluent::Plugin.register_output('forward', self) - class ForwardOutputACKTimeoutError < ForwardOutputResponseError - end - - class ForwardOutput < ObjectBufferedOutput - Plugin.register_output('forward', self) - LISTEN_PORT = 24224 - def initialize - super - require 'fluent/plugin/socket_util' - @nodes = [] #=> [Node] - @loop = nil - @thread = nil - @finished = false - end - desc 'The timeout time when sending event logs.' config_param :send_timeout, :time, default: 60 desc 'The transport protocol to use for heartbeats.(udp,tcp,none)' config_param :heartbeat_type, :enum, list: [:tcp, :udp, :none], default: :tcp desc 'The interval of the heartbeat packer.' @@ -112,21 +93,38 @@ attr_reader :nodes config_param :port, :integer, default: LISTEN_PORT, obsoleted: "User <server> section instead." config_param :host, :string, default: nil, obsoleted: "Use <server> section instead." + config_section :buffer do + config_set_default :chunk_keys, ["tag"] + end + attr_reader :read_interval, :recover_sample_size + def initialize + super + + @nodes = [] #=> [Node] + @loop = nil + @thread = nil + @finished = false + end + def configure(conf) super + unless @chunk_key_tag + raise Fluent::ConfigError, "buffer chunk key must include 'tag' for forward output" + end + @read_interval = @read_interval_msec / 1000.0 @recover_sample_size = @recover_wait / @heartbeat_interval if @dns_round_robin if @heartbeat_type == :udp - raise ConfigError, "forward output heartbeat type must be 'tcp' or 'none' to use dns_round_robin option" + raise Fluent::ConfigError, "forward output heartbeat type must be 'tcp' or 'none' to use dns_round_robin option" end end @servers.each do |server| failure = FailureDetector.new(@heartbeat_interval, @hard_timeout, Time.now.to_i.to_f) @@ -138,18 +136,20 @@ else @nodes << Node.new(self, server, failure: failure) end end - if @compress == :gzip && @buffer.compress == :text - @buffer.compress = :gzip - elsif @compress == :text && @buffer.compress == :gzip - log.info "buffer is compressed. If you also want to save the bandwidth of a network, Add `compress` configuration in <match>" + unless @as_secondary + if @compress == :gzip && @buffer.compress == :text + @buffer.compress = :gzip + elsif @compress == :text && @buffer.compress == :gzip + log.info "buffer is compressed. If you also want to save the bandwidth of a network, Add `compress` configuration in <match>" + end end if @nodes.empty? - raise ConfigError, "forward output plugin requires at least one <server> is required" + raise Fluent::ConfigError, "forward output plugin requires at least one <server> is required" end raise Fluent::ConfigError, "ack_response_timeout must be a positive integer" if @ack_response_timeout < 1 end @@ -164,11 +164,11 @@ unless @heartbeat_type == :none @loop = Coolio::Loop.new if @heartbeat_type == :udp # assuming all hosts use udp - @usock = SocketUtil.create_udp_socket(@nodes.first.host) + @usock = Fluent::Compat::SocketUtil.create_udp_socket(@nodes.first.host) @usock.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK) @hb = HeartbeatHandler.new(@usock, method(:on_heartbeat)) @loop.attach(@hb) end @@ -181,11 +181,12 @@ def shutdown @finished = true if @loop @loop.watchers.each {|w| w.detach } - @loop.stop + # @loop.stop + @loop.stop rescue nil end @thread.join if @thread @usock.close if @usock super @@ -196,13 +197,14 @@ rescue log.error "unexpected error", error: $!.to_s log.error_backtrace end - def write_objects(tag, chunk) + def write(chunk) return if chunk.empty? + tag = chunk.metadata.tag error = nil wlen = @weight_array.length wlen.times do @rr = (@rr + 1) % wlen @@ -437,14 +439,14 @@ if @state != :established establish_connection(sock) end unless available? - raise ForwardOutputConnectionClosedError, "failed to establish connection with node #{@name}" + raise ConnectionClosedError, "failed to establish connection with node #{@name}" end - option = { 'size' => chunk.size_of_events, 'compressed' => @compress } + option = { 'size' => chunk.size, 'compressed' => @compress } option['chunk'] = Base64.encode64(chunk.unique_id) if @sender.require_ack_response # out_forward always uses Raw32 type for content. # Raw16 can store only 64kbytes, and it should be much smaller than buffer chunk size. @@ -470,28 +472,28 @@ # When connection is closed by remote host, socket is ready to read and #recv returns an empty string that means EOF. # If this happens we assume the data wasn't delivered and retry it. if raw_data.empty? @log.warn "node closed the connection. regard it as unavailable.", host: @host, port: @port disable! - raise ForwardOutputConnectionClosedError, "node #{@host}:#{@port} closed connection" + raise ConnectionClosedError, "node #{@host}:#{@port} closed connection" else @unpacker.feed(raw_data) res = @unpacker.read if res['ack'] != option['chunk'] # Some errors may have occured when ack and chunk id is different, so send the chunk again. - raise ForwardOutputResponseError, "ack in response and chunk id in sent data are different" + raise ResponseError, "ack in response and chunk id in sent data are different" end end else # IO.select returns nil on timeout. # There are 2 types of cases when no response has been received: # (1) the node does not support sending responses # (2) the node does support sending response but responses have not arrived for some reasons. @log.warn "no response from node. regard it as unavailable.", host: @host, port: @port disable! - raise ForwardOutputACKTimeoutError, "node #{host}:#{port} does not return ACK" + raise ACKTimeoutError, "node #{host}:#{port} does not return ACK" end end heartbeat(false) res # for test @@ -597,14 +599,9 @@ @log.warn "recovered forwarding server '#{@name}'", host: @host, port: @port true else nil end - end - - # TODO: #to_msgpack(string) is deprecated - def to_msgpack(out = '') - [@host, @port, @weight, @available].to_msgpack(out) end def generate_salt SecureRandom.hex(16) end