lib/fluent/plugin/out_forward.rb in fluentd-0.10.35 vs lib/fluent/plugin/out_forward.rb in fluentd-0.10.36

- old
+ new

@@ -14,516 +14,512 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # module Fluent + class ForwardOutput < ObjectBufferedOutput + Plugin.register_output('forward', self) + def initialize + super + require 'socket' + require 'fileutils' + require 'fluent/plugin/socket_util' + @nodes = [] #=> [Node] + end -class ForwardOutput < ObjectBufferedOutput - Plugin.register_output('forward', self) + config_param :send_timeout, :time, :default => 60 + config_param :heartbeat_type, :default => :udp do |val| + case val.downcase + when 'tcp' + :tcp + when 'udp' + :udp + else + raise ConfigError, "forward output heartbeat type should be 'tcp' or 'udp'" + end + end + config_param :heartbeat_interval, :time, :default => 1 + config_param :recover_wait, :time, :default => 10 + config_param :hard_timeout, :time, :default => 60 + config_param :expire_dns_cache, :time, :default => nil # 0 means disable cache + config_param :phi_threshold, :integer, :default => 16 + attr_reader :nodes - def initialize - super - require 'socket' - require 'fileutils' - require 'fluent/plugin/socket_util' - @nodes = [] #=> [Node] - end + # backward compatibility + config_param :port, :integer, :default => DEFAULT_LISTEN_PORT + config_param :host, :string, :default => nil - config_param :send_timeout, :time, :default => 60 - config_param :heartbeat_type, :default => :udp do |val| - case val.downcase - when 'tcp' - :tcp - when 'udp' - :udp - else - raise ConfigError, "forward output heartbeat type should be 'tcp' or 'udp'" - end - end - config_param :heartbeat_interval, :time, :default => 1 - config_param :recover_wait, :time, :default => 10 - config_param :hard_timeout, :time, :default => 60 - config_param :expire_dns_cache, :time, :default => nil # 0 means disable cache - config_param :phi_threshold, :integer, :default => 16 - attr_reader :nodes + def configure(conf) + super - # backward compatibility - config_param :port, :integer, :default => DEFAULT_LISTEN_PORT - config_param :host, :string, :default => nil + # backward compatibility + if host = conf['host'] + $log.warn "'host' option in forward output is obsoleted. Use '<server> host xxx </server>' instead." + port = conf['port'] + port = port ? port.to_i : DEFAULT_LISTEN_PORT + e = conf.add_element('server') + e['host'] = host + e['port'] = port.to_s + end - def configure(conf) - super + recover_sample_size = @recover_wait / @heartbeat_interval - # backward compatibility - if host = conf['host'] - $log.warn "'host' option in forward output is obsoleted. Use '<server> host xxx </server>' instead." - port = conf['port'] - port = port ? port.to_i : DEFAULT_LISTEN_PORT - e = conf.add_element('server') - e['host'] = host - e['port'] = port.to_s - end + conf.elements.each {|e| + next if e.name != "server" - recover_sample_size = @recover_wait / @heartbeat_interval + host = e['host'] + port = e['port'] + port = port ? port.to_i : DEFAULT_LISTEN_PORT - conf.elements.each {|e| - next if e.name != "server" + weight = e['weight'] + weight = weight ? weight.to_i : 60 - host = e['host'] - port = e['port'] - port = port ? port.to_i : DEFAULT_LISTEN_PORT + standby = !!e['standby'] - weight = e['weight'] - weight = weight ? weight.to_i : 60 + name = e['name'] + unless name + name = "#{host}:#{port}" + end - standby = !!e['standby'] + failure = FailureDetector.new(@heartbeat_interval, @hard_timeout, Time.now.to_i.to_f) + @nodes << Node.new(name, host, port, weight, standby, failure, + @phi_threshold, recover_sample_size, @expire_dns_cache) + $log.info "adding forwarding server '#{name}'", :host=>host, :port=>port, :weight=>weight + } + end - name = e['name'] - unless name - name = "#{host}:#{port}" - end + def start + super - failure = FailureDetector.new(@heartbeat_interval, @hard_timeout, Time.now.to_i.to_f) - @nodes << Node.new(name, host, port, weight, standby, failure, - @phi_threshold, recover_sample_size, @expire_dns_cache) - $log.info "adding forwarding server '#{name}'", :host=>host, :port=>port, :weight=>weight - } - end + @rand_seed = Random.new.seed + rebuild_weight_array + @rr = 0 - def start - super + @loop = Coolio::Loop.new - @rand_seed = Random.new.seed - rebuild_weight_array - @rr = 0 + if @heartbeat_type == :udp + # assuming all hosts use udp + @usock = SocketUtil.create_udp_socket(@nodes.first.host) + @usock.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK) + @hb = HeartbeatHandler.new(@usock, method(:on_heartbeat)) + @loop.attach(@hb) + end - @loop = Coolio::Loop.new + @timer = HeartbeatRequestTimer.new(@heartbeat_interval, method(:on_timer)) + @loop.attach(@timer) - if @heartbeat_type == :udp - # assuming all hosts use udp - @usock = SocketUtil.create_udp_socket(@nodes.first.host) - @usock.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK) - @hb = HeartbeatHandler.new(@usock, method(:on_heartbeat)) - @loop.attach(@hb) + @thread = Thread.new(&method(:run)) end - @timer = HeartbeatRequestTimer.new(@heartbeat_interval, method(:on_timer)) - @loop.attach(@timer) + def shutdown + @finished = true + @loop.watchers.each {|w| w.detach } + @loop.stop + @thread.join + @usock.close if @usock + end - @thread = Thread.new(&method(:run)) - end + def run + @loop.run + rescue + $log.error "unexpected error", :error=>$!.to_s + $log.error_backtrace + end - def shutdown - @finished = true - @loop.watchers.each {|w| w.detach } - @loop.stop - @thread.join - @usock.close if @usock - end + def write_objects(tag, chunk) + return if chunk.empty? - def run - @loop.run - rescue - $log.error "unexpected error", :error=>$!.to_s - $log.error_backtrace - end + error = nil - def write_objects(tag, chunk) - return if chunk.empty? + wlen = @weight_array.length + wlen.times do + @rr = (@rr + 1) % wlen + node = @weight_array[@rr] - error = nil - - wlen = @weight_array.length - wlen.times do - @rr = (@rr + 1) % wlen - node = @weight_array[@rr] - - if node.available? - begin - send_data(node, tag, chunk) - return - rescue - # for load balancing during detecting crashed servers - error = $! # use the latest error + if node.available? + begin + send_data(node, tag, chunk) + return + rescue + # for load balancing during detecting crashed servers + error = $! # use the latest error + end end end - end - if error - raise error - else - raise "no nodes are available" # TODO message + if error + raise error + else + raise "no nodes are available" # TODO message + end end - end - private + private - def rebuild_weight_array - standby_nodes, regular_nodes = @nodes.partition {|n| - n.standby? - } + def rebuild_weight_array + standby_nodes, regular_nodes = @nodes.partition {|n| + n.standby? + } - lost_weight = 0 - regular_nodes.each {|n| - unless n.available? - lost_weight += n.weight - end - } - $log.debug "rebuilding weight array", :lost_weight=>lost_weight - - if lost_weight > 0 - standby_nodes.each {|n| - if n.available? - regular_nodes << n - $log.info "using standby node #{n.host}:#{n.port}", :weight=>n.weight - lost_weight -= n.weight - break if lost_weight <= 0 + lost_weight = 0 + regular_nodes.each {|n| + unless n.available? + lost_weight += n.weight end } - end + $log.debug "rebuilding weight array", :lost_weight=>lost_weight - weight_array = [] - gcd = regular_nodes.map {|n| n.weight }.inject(0) {|r,w| r.gcd(w) } - regular_nodes.each {|n| - (n.weight / gcd).times { - weight_array << n + if lost_weight > 0 + standby_nodes.each {|n| + if n.available? + regular_nodes << n + $log.warn "using standby node #{n.host}:#{n.port}", :weight=>n.weight + lost_weight -= n.weight + break if lost_weight <= 0 + end + } + end + + weight_array = [] + gcd = regular_nodes.map {|n| n.weight }.inject(0) {|r,w| r.gcd(w) } + regular_nodes.each {|n| + (n.weight / gcd).times { + weight_array << n + } } - } - # for load balancing during detecting crashed servers - coe = (regular_nodes.size * 6) / weight_array.size - weight_array *= coe if coe > 1 + # for load balancing during detecting crashed servers + coe = (regular_nodes.size * 6) / weight_array.size + weight_array *= coe if coe > 1 - r = Random.new(@rand_seed) - weight_array.sort_by! { r.rand } + r = Random.new(@rand_seed) + weight_array.sort_by! { r.rand } - @weight_array = weight_array - end + @weight_array = weight_array + end - # MessagePack FixArray length = 2 - FORWARD_HEADER = [0x92].pack('C') + # MessagePack FixArray length = 2 + FORWARD_HEADER = [0x92].pack('C') - #FORWARD_TCP_HEARTBEAT_DATA = FORWARD_HEADER + ''.to_msgpack + [].to_msgpack - def send_heartbeat_tcp(node) - sock = connect(node) - begin - opt = [1, @send_timeout.to_i].pack('I!I!') # { int l_onoff; int l_linger; } - sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_LINGER, opt) - opt = [@send_timeout.to_i, 0].pack('L!L!') # struct timeval - # don't send any data to not cause a compatibility problem - #sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_SNDTIMEO, opt) - #sock.write FORWARD_TCP_HEARTBEAT_DATA - node.heartbeat(true) - ensure - sock.close + #FORWARD_TCP_HEARTBEAT_DATA = FORWARD_HEADER + ''.to_msgpack + [].to_msgpack + def send_heartbeat_tcp(node) + sock = connect(node) + begin + opt = [1, @send_timeout.to_i].pack('I!I!') # { int l_onoff; int l_linger; } + sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_LINGER, opt) + opt = [@send_timeout.to_i, 0].pack('L!L!') # struct timeval + # don't send any data to not cause a compatibility problem + #sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_SNDTIMEO, opt) + #sock.write FORWARD_TCP_HEARTBEAT_DATA + node.heartbeat(true) + ensure + sock.close + end end - end - def send_data(node, tag, chunk) - sock = connect(node) - begin - opt = [1, @send_timeout.to_i].pack('I!I!') # { int l_onoff; int l_linger; } - sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_LINGER, opt) + def send_data(node, tag, chunk) + sock = connect(node) + begin + opt = [1, @send_timeout.to_i].pack('I!I!') # { int l_onoff; int l_linger; } + sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_LINGER, opt) - opt = [@send_timeout.to_i, 0].pack('L!L!') # struct timeval - sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_SNDTIMEO, opt) + opt = [@send_timeout.to_i, 0].pack('L!L!') # struct timeval + sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_SNDTIMEO, opt) - # beginArray(2) - sock.write FORWARD_HEADER + # beginArray(2) + sock.write FORWARD_HEADER - # writeRaw(tag) - sock.write tag.to_msgpack # tag + # writeRaw(tag) + sock.write tag.to_msgpack # tag - # beginRaw(size) - sz = chunk.size - #if sz < 32 - # # FixRaw - # sock.write [0xa0 | sz].pack('C') - #elsif sz < 65536 - # # raw 16 - # sock.write [0xda, sz].pack('Cn') - #else + # beginRaw(size) + sz = chunk.size + #if sz < 32 + # # FixRaw + # sock.write [0xa0 | sz].pack('C') + #elsif sz < 65536 + # # raw 16 + # sock.write [0xda, sz].pack('Cn') + #else # raw 32 sock.write [0xdb, sz].pack('CN') - #end + #end - # writeRawBody(packed_es) - chunk.write_to(sock) + # writeRawBody(packed_es) + chunk.write_to(sock) - node.heartbeat(false) - ensure - sock.close + node.heartbeat(false) + ensure + sock.close + end end - end - def connect(node) - # TODO unix socket? - TCPSocket.new(node.resolved_host, node.port) - end + def connect(node) + # TODO unix socket? + TCPSocket.new(node.resolved_host, node.port) + end - class HeartbeatRequestTimer < Coolio::TimerWatcher - def initialize(interval, callback) - super(interval, true) - @callback = callback + class HeartbeatRequestTimer < Coolio::TimerWatcher + def initialize(interval, callback) + super(interval, true) + @callback = callback + end + + def on_timer + @callback.call + rescue + # TODO log? + end end def on_timer - @callback.call - rescue - # TODO log? + return if @finished + @nodes.each {|n| + if n.tick + rebuild_weight_array + end + begin + #$log.trace "sending heartbeat #{n.host}:#{n.port} on #{@heartbeat_type}" + if @heartbeat_type == :tcp + send_heartbeat_tcp(n) + else + @usock.send "\0", 0, Socket.pack_sockaddr_in(n.port, n.resolved_host) + end + rescue Errno::EAGAIN, Errno::EWOULDBLOCK, Errno::EINTR + # TODO log + $log.debug "failed to send heartbeat packet to #{n.host}:#{n.port}", :error=>$!.to_s + end + } end - end - def on_timer - return if @finished - @nodes.each {|n| - if n.tick - rebuild_weight_array + class HeartbeatHandler < Coolio::IO + def initialize(io, callback) + super(io) + @io = io + @callback = callback end - begin - #$log.trace "sending heartbeat #{n.host}:#{n.port} on #{@heartbeat_type}" - if @heartbeat_type == :tcp - send_heartbeat_tcp(n) - else - @usock.send "\0", 0, Socket.pack_sockaddr_in(n.port, n.resolved_host) + + def on_readable + begin + msg, addr = @io.recvfrom(1024) + rescue Errno::EAGAIN, Errno::EWOULDBLOCK, Errno::EINTR + return end - rescue Errno::EAGAIN, Errno::EWOULDBLOCK, Errno::EINTR - # TODO log - $log.debug "failed to send heartbeat packet to #{n.host}:#{n.port}", :error=>$!.to_s + host = addr[3] + port = addr[1] + sockaddr = Socket.pack_sockaddr_in(port, host) + @callback.call(sockaddr, msg) + rescue + # TODO log? end - } - end - - class HeartbeatHandler < Coolio::IO - def initialize(io, callback) - super(io) - @io = io - @callback = callback end - def on_readable - begin - msg, addr = @io.recvfrom(1024) - rescue Errno::EAGAIN, Errno::EWOULDBLOCK, Errno::EINTR - return + def on_heartbeat(sockaddr, msg) + port, host = Socket.unpack_sockaddr_in(sockaddr) + if node = @nodes.find {|n| n.sockaddr == sockaddr } + #$log.trace "heartbeat from '#{node.name}'", :host=>node.host, :port=>node.port + if node.heartbeat + rebuild_weight_array + end end - host = addr[3] - port = addr[1] - sockaddr = Socket.pack_sockaddr_in(port, host) - @callback.call(sockaddr, msg) - rescue - # TODO log? end - end - def on_heartbeat(sockaddr, msg) - port, host = Socket.unpack_sockaddr_in(sockaddr) - if node = @nodes.find {|n| n.sockaddr == sockaddr } - #$log.trace "heartbeat from '#{node.name}'", :host=>node.host, :port=>node.port - if node.heartbeat - rebuild_weight_array + class Node + def initialize(name, host, port, weight, standby, failure, + phi_threshold, recover_sample_size, expire_dns_cache) + @name = name + @host = host + @port = port + @weight = weight + @standby = standby + @failure = failure + @phi_threshold = phi_threshold + @recover_sample_size = recover_sample_size + @expire_dns_cache = expire_dns_cache + @available = true + + @resolved_host = nil + @resolved_time = 0 + resolved_host # check dns end - end - end - class Node - def initialize(name, host, port, weight, standby, failure, - phi_threshold, recover_sample_size, expire_dns_cache) - @name = name - @host = host - @port = port - @weight = weight - @standby = standby - @failure = failure - @phi_threshold = phi_threshold - @recover_sample_size = recover_sample_size - @expire_dns_cache = expire_dns_cache - @available = true + attr_reader :name, :host, :port, :weight + attr_writer :weight, :standby, :available + attr_reader :sockaddr # used by on_heartbeat - @resolved_host = nil - @resolved_time = 0 - resolved_host # check dns - end + def available? + @available + end - attr_reader :name, :host, :port, :weight - attr_writer :weight, :standby, :available - attr_reader :sockaddr # used by on_heartbeat + def standby? + @standby + end - def available? - @available - end + def resolved_host + case @expire_dns_cache + when 0 + # cache is disabled + return resolve_dns! - def standby? - @standby - end + when nil + # persistent cache + return @resolved_host ||= resolve_dns! - def resolved_host - case @expire_dns_cache - when 0 - # cache is disabled - return resolve_dns! - - when nil - # persistent cache - return @resolved_host ||= resolve_dns! - - else - now = Engine.now - rh = @resolved_host - if !rh || now - @resolved_time >= @expire_dns_cache - rh = @resolved_host = resolve_dns! - @resolved_time = now + else + now = Engine.now + rh = @resolved_host + if !rh || now - @resolved_time >= @expire_dns_cache + rh = @resolved_host = resolve_dns! + @resolved_time = now + end + return rh end - return rh end - end - def resolve_dns! - @sockaddr = Socket.pack_sockaddr_in(@port, @host) - port, resolved_host = Socket.unpack_sockaddr_in(@sockaddr) - return resolved_host - end - private :resolve_dns! + def resolve_dns! + @sockaddr = Socket.pack_sockaddr_in(@port, @host) + port, resolved_host = Socket.unpack_sockaddr_in(@sockaddr) + return resolved_host + end + private :resolve_dns! - def tick - now = Time.now.to_f - if !@available + def tick + now = Time.now.to_f + if !@available + if @failure.hard_timeout?(now) + @failure.clear + end + return nil + end + if @failure.hard_timeout?(now) + $log.warn "detached forwarding server '#{@name}'", :host=>@host, :port=>@port, :hard_timeout=>true + @available = false + @resolved_host = nil # expire cached host @failure.clear + return true end - return nil - end - if @failure.hard_timeout?(now) - $log.info "detached forwarding server '#{@name}'", :host=>@host, :port=>@port, :hard_timeout=>true - @available = false - @resolved_host = nil # expire cached host - @failure.clear - return true + phi = @failure.phi(now) + #$log.trace "phi '#{@name}'", :host=>@host, :port=>@port, :phi=>phi + if phi > @phi_threshold + $log.warn "detached forwarding server '#{@name}'", :host=>@host, :port=>@port, :phi=>phi + @available = false + @resolved_host = nil # expire cached host + @failure.clear + return true + else + return false + end end - phi = @failure.phi(now) - #$log.trace "phi '#{@name}'", :host=>@host, :port=>@port, :phi=>phi - if phi > @phi_threshold - $log.info "detached forwarding server '#{@name}'", :host=>@host, :port=>@port, :phi=>phi - @available = false - @resolved_host = nil # expire cached host - @failure.clear - return true - else - return false + def heartbeat(detect=true) + now = Time.now.to_f + @failure.add(now) + #$log.trace "heartbeat from '#{@name}'", :host=>@host, :port=>@port, :available=>@available, :sample_size=>@failure.sample_size + if detect && !@available && @failure.sample_size > @recover_sample_size + @available = true + $log.warn "recovered forwarding server '#{@name}'", :host=>@host, :port=>@port + return true + else + return nil + end end - end - def heartbeat(detect=true) - now = Time.now.to_f - @failure.add(now) - #$log.trace "heartbeat from '#{@name}'", :host=>@host, :port=>@port, :available=>@available, :sample_size=>@failure.sample_size - if detect && !@available && @failure.sample_size > @recover_sample_size - @available = true - $log.info "recovered forwarding server '#{@name}'", :host=>@host, :port=>@port - return true - else - return nil + def to_msgpack(out = '') + [@host, @port, @weight, @available].to_msgpack(out) end end - def to_msgpack(out = '') - [@host, @port, @weight, @available].to_msgpack(out) - end - end + class FailureDetector + PHI_FACTOR = 1.0 / Math.log(10.0) + SAMPLE_SIZE = 1000 - class FailureDetector - PHI_FACTOR = 1.0 / Math.log(10.0) - SAMPLE_SIZE = 1000 + def initialize(heartbeat_interval, hard_timeout, init_last) + @heartbeat_interval = heartbeat_interval + @last = init_last + @hard_timeout = hard_timeout - def initialize(heartbeat_interval, hard_timeout, init_last) - @heartbeat_interval = heartbeat_interval - @last = init_last - @hard_timeout = hard_timeout + # microsec + @init_gap = (heartbeat_interval * 1e6).to_i + @window = [@init_gap] + end - # microsec - @init_gap = (heartbeat_interval * 1e6).to_i - @window = [@init_gap] - end + def hard_timeout?(now) + now - @last > @hard_timeout + end - def hard_timeout?(now) - now - @last > @hard_timeout - end - - def add(now) - if @window.empty? - @window << @init_gap - @last = now - else - gap = now - @last - @window << (gap * 1e6).to_i - @window.shift if @window.length > SAMPLE_SIZE - @last = now + def add(now) + if @window.empty? + @window << @init_gap + @last = now + else + gap = now - @last + @window << (gap * 1e6).to_i + @window.shift if @window.length > SAMPLE_SIZE + @last = now + end end - end - def phi(now) - size = @window.size - return 0.0 if size == 0 + def phi(now) + size = @window.size + return 0.0 if size == 0 - # Calculate weighted moving average - mean_usec = 0 - fact = 0 - @window.each_with_index {|gap,i| - mean_usec += gap * (1+i) - fact += (1+i) - } - mean_usec = mean_usec / fact + # Calculate weighted moving average + mean_usec = 0 + fact = 0 + @window.each_with_index {|gap,i| + mean_usec += gap * (1+i) + fact += (1+i) + } + mean_usec = mean_usec / fact - # Normalize arrive intervals into 1sec - mean = (mean_usec.to_f / 1e6) - @heartbeat_interval + 1 + # Normalize arrive intervals into 1sec + mean = (mean_usec.to_f / 1e6) - @heartbeat_interval + 1 - # Calculate phi of the phi accrual failure detector - t = now - @last - @heartbeat_interval + 1 - phi = PHI_FACTOR * t / mean + # Calculate phi of the phi accrual failure detector + t = now - @last - @heartbeat_interval + 1 + phi = PHI_FACTOR * t / mean - return phi - end + return phi + end - def sample_size - @window.size - end + def sample_size + @window.size + end - def clear - @window.clear - @last = 0 + def clear + @window.clear + @last = 0 + end end - end - ## TODO - #class RPC - # def initialize(this) - # @this = this - # end - # - # def list_nodes - # @this.nodes - # end - # - # def list_fault_nodes - # list_nodes.select {|n| !n.available? } - # end - # - # def list_available_nodes - # list_nodes.select {|n| n.available? } - # end - # - # def add_node(name, host, port, weight) - # end - # - # def recover_node(host, port) - # end - # - # def remove_node(host, port) - # end - #end -end - - + ## TODO + #class RPC + # def initialize(this) + # @this = this + # end + # + # def list_nodes + # @this.nodes + # end + # + # def list_fault_nodes + # list_nodes.select {|n| !n.available? } + # end + # + # def list_available_nodes + # list_nodes.select {|n| n.available? } + # end + # + # def add_node(name, host, port, weight) + # end + # + # def recover_node(host, port) + # end + # + # def remove_node(host, port) + # end + #end + end end