lib/fluent/plugin/out_forward.rb in fluentd-0.14.8 vs lib/fluent/plugin/out_forward.rb in fluentd-0.14.9
- old
+ new
@@ -12,46 +12,27 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
-require 'base64'
-require 'socket'
-require 'fileutils'
-
-require 'cool.io'
-
require 'fluent/output'
require 'fluent/config/error'
+require 'base64'
-module Fluent
- class ForwardOutputError < StandardError
- end
+require 'fluent/compat/socket_util'
- class ForwardOutputResponseError < ForwardOutputError
- end
+module Fluent::Plugin
+ class ForwardOutput < Output
+ class Error < StandardError; end
+ class ResponseError < Error; end
+ class ConnectionClosedError < Error; end
+ class ACKTimeoutError < Error; end
- class ForwardOutputConnectionClosedError < ForwardOutputError
- end
+ Fluent::Plugin.register_output('forward', self)
- class ForwardOutputACKTimeoutError < ForwardOutputResponseError
- end
-
- class ForwardOutput < ObjectBufferedOutput
- Plugin.register_output('forward', self)
-
LISTEN_PORT = 24224
- def initialize
- super
- require 'fluent/plugin/socket_util'
- @nodes = [] #=> [Node]
- @loop = nil
- @thread = nil
- @finished = false
- end
-
desc 'The timeout time when sending event logs.'
config_param :send_timeout, :time, default: 60
desc 'The transport protocol to use for heartbeats.(udp,tcp,none)'
config_param :heartbeat_type, :enum, list: [:tcp, :udp, :none], default: :tcp
desc 'The interval of the heartbeat packer.'
@@ -112,21 +93,38 @@
attr_reader :nodes
config_param :port, :integer, default: LISTEN_PORT, obsoleted: "User <server> section instead."
config_param :host, :string, default: nil, obsoleted: "Use <server> section instead."
+ config_section :buffer do
+ config_set_default :chunk_keys, ["tag"]
+ end
+
attr_reader :read_interval, :recover_sample_size
+ def initialize
+ super
+
+ @nodes = [] #=> [Node]
+ @loop = nil
+ @thread = nil
+ @finished = false
+ end
+
def configure(conf)
super
+ unless @chunk_key_tag
+ raise Fluent::ConfigError, "buffer chunk key must include 'tag' for forward output"
+ end
+
@read_interval = @read_interval_msec / 1000.0
@recover_sample_size = @recover_wait / @heartbeat_interval
if @dns_round_robin
if @heartbeat_type == :udp
- raise ConfigError, "forward output heartbeat type must be 'tcp' or 'none' to use dns_round_robin option"
+ raise Fluent::ConfigError, "forward output heartbeat type must be 'tcp' or 'none' to use dns_round_robin option"
end
end
@servers.each do |server|
failure = FailureDetector.new(@heartbeat_interval, @hard_timeout, Time.now.to_i.to_f)
@@ -138,18 +136,20 @@
else
@nodes << Node.new(self, server, failure: failure)
end
end
- if @compress == :gzip && @buffer.compress == :text
- @buffer.compress = :gzip
- elsif @compress == :text && @buffer.compress == :gzip
- log.info "buffer is compressed. If you also want to save the bandwidth of a network, Add `compress` configuration in <match>"
+ unless @as_secondary
+ if @compress == :gzip && @buffer.compress == :text
+ @buffer.compress = :gzip
+ elsif @compress == :text && @buffer.compress == :gzip
+ log.info "buffer is compressed. If you also want to save the bandwidth of a network, Add `compress` configuration in <match>"
+ end
end
if @nodes.empty?
- raise ConfigError, "forward output plugin requires at least one <server> is required"
+ raise Fluent::ConfigError, "forward output plugin requires at least one <server> is required"
end
raise Fluent::ConfigError, "ack_response_timeout must be a positive integer" if @ack_response_timeout < 1
end
@@ -164,11 +164,11 @@
unless @heartbeat_type == :none
@loop = Coolio::Loop.new
if @heartbeat_type == :udp
# assuming all hosts use udp
- @usock = SocketUtil.create_udp_socket(@nodes.first.host)
+ @usock = Fluent::Compat::SocketUtil.create_udp_socket(@nodes.first.host)
@usock.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK)
@hb = HeartbeatHandler.new(@usock, method(:on_heartbeat))
@loop.attach(@hb)
end
@@ -181,11 +181,12 @@
def shutdown
@finished = true
if @loop
@loop.watchers.each {|w| w.detach }
- @loop.stop
+ # @loop.stop
+ @loop.stop rescue nil
end
@thread.join if @thread
@usock.close if @usock
super
@@ -196,13 +197,14 @@
rescue
log.error "unexpected error", error: $!.to_s
log.error_backtrace
end
- def write_objects(tag, chunk)
+ def write(chunk)
return if chunk.empty?
+ tag = chunk.metadata.tag
error = nil
wlen = @weight_array.length
wlen.times do
@rr = (@rr + 1) % wlen
@@ -437,14 +439,14 @@
if @state != :established
establish_connection(sock)
end
unless available?
- raise ForwardOutputConnectionClosedError, "failed to establish connection with node #{@name}"
+ raise ConnectionClosedError, "failed to establish connection with node #{@name}"
end
- option = { 'size' => chunk.size_of_events, 'compressed' => @compress }
+ option = { 'size' => chunk.size, 'compressed' => @compress }
option['chunk'] = Base64.encode64(chunk.unique_id) if @sender.require_ack_response
# out_forward always uses Raw32 type for content.
# Raw16 can store only 64kbytes, and it should be much smaller than buffer chunk size.
@@ -470,28 +472,28 @@
# When connection is closed by remote host, socket is ready to read and #recv returns an empty string that means EOF.
# If this happens we assume the data wasn't delivered and retry it.
if raw_data.empty?
@log.warn "node closed the connection. regard it as unavailable.", host: @host, port: @port
disable!
- raise ForwardOutputConnectionClosedError, "node #{@host}:#{@port} closed connection"
+ raise ConnectionClosedError, "node #{@host}:#{@port} closed connection"
else
@unpacker.feed(raw_data)
res = @unpacker.read
if res['ack'] != option['chunk']
# Some errors may have occured when ack and chunk id is different, so send the chunk again.
- raise ForwardOutputResponseError, "ack in response and chunk id in sent data are different"
+ raise ResponseError, "ack in response and chunk id in sent data are different"
end
end
else
# IO.select returns nil on timeout.
# There are 2 types of cases when no response has been received:
# (1) the node does not support sending responses
# (2) the node does support sending response but responses have not arrived for some reasons.
@log.warn "no response from node. regard it as unavailable.", host: @host, port: @port
disable!
- raise ForwardOutputACKTimeoutError, "node #{host}:#{port} does not return ACK"
+ raise ACKTimeoutError, "node #{host}:#{port} does not return ACK"
end
end
heartbeat(false)
res # for test
@@ -597,14 +599,9 @@
@log.warn "recovered forwarding server '#{@name}'", host: @host, port: @port
true
else
nil
end
- end
-
- # TODO: #to_msgpack(string) is deprecated
- def to_msgpack(out = '')
- [@host, @port, @weight, @available].to_msgpack(out)
end
def generate_salt
SecureRandom.hex(16)
end