# # Fluentd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # module Fluent class DetachProcessManager require 'singleton' include Singleton class Broker def initialize end def engine Engine end end def initialize require 'drb' DRb.start_service(create_drb_uri, Broker.new) @parent_uri = DRb.uri end def fork(delegate_object) ipr, ipw = IO.pipe # child Engine.emit_stream -> parent Engine.emit_stream opr, opw = IO.pipe # parent target.emit -> child target.emit pid = Process.fork if pid # parent process ipw.close opr.close forward_thread = process_parent(ipr, opw, pid, delegate_object) return pid, forward_thread end # child process ipr.close opw.close forward_thread = process_child(ipw, opr, delegate_object) return nil, forward_thread end private def read_header(ipr) sz = ipr.read(4).unpack('N')[0] ipr.read(sz) end def send_header(ipw, data) ipw.write [data.bytesize].pack('N') ipw.write data ipw.flush end def create_drb_uri "drbunix:" # TODO end private def process_child(ipw, opr, delegate_object) DRb.start_service(create_drb_uri, delegate_object) child_uri = DRb.uri send_header(ipw, child_uri) # override target.emit_stream to write event stream to the pipe fwd = new_forwarder(ipw, 0.5) # TODO interval Engine.define_singleton_method(:emit_stream) do |tag,es| fwd.emit(tag, es) end # read event stream from the pipe and forward to target.emit forward_thread = Thread.new(opr, delegate_object, &method(:output_forward_main)) # override global methods to call parent process override_shared_methods(@parent_uri) return forward_thread end def override_shared_methods(parent_uri) broker = DRbObject.new_with_uri(parent_uri) shared_methods.each {|(broker_accessor,target,name)| remote = broker.send(broker_accessor) target.define_singleton_method(name) do |*args,&block| remote.send(name, *args, &block) end } end def shared_methods [ #[:engine, Engine, :flush!], #[:engine, Engine, :stop], ] end def process_parent(ipr, opw, pid, delegate_object) child_uri = read_header(ipr) # read event stream from the pipe and forward to Engine.emit_stream forward_thread = Thread.new(ipr, pid, &method(:input_forward_main)) # note: don't override methods in parent process # because another process may fork after overriding #override_delegate_methods(delegate_object, child_uri) # return forwarder for DetachProcessMixin to # override target.emit and write event stream to the pipe fwd = new_forwarder(opw, 0.5) # TODO interval # note: override emit method on DetachProcessMixin forward_thread.define_singleton_method(:forwarder) do fwd end return forward_thread end #def override_delegate_methods(target, child_uri) # remote = DRbObject.new_with_uri(child_uri) # delegate_methods(target).each {|name| # target.define_singleton_method(name) do |*args,&block| # remote.send(name, *args, &block) # end # } #end # #def delegate_methods(target) # target.methods - Object.public_instance_methods #end def output_forward_main(opr, target) read_event_stream(opr) {|tag,es| # FIXME error handling begin target.emit(tag, es, NullOutputChain.instance) rescue $log.warn "failed to emit", :error=>$!.to_s, :pid=>Process.pid $log.warn_backtrace end } rescue $log.error "error on output process forwarding thread", :error=>$!.to_s, :pid=>Process.pid $log.error_backtrace raise end def input_forward_main(ipr, pid) read_event_stream(ipr) {|tag,es| # FIXME error handling begin Engine.emit_stream(tag, es) rescue $log.warn "failed to emit", :error=>$!.to_s, :pid=>Process.pid $log.warn_backtrace end } rescue $log.error "error on input process forwarding thread", :error=>$!.to_s, :pid=>Process.pid $log.error_backtrace raise end def read_event_stream(r, &block) u = MessagePack::Unpacker.new(r) begin #buf = '' #map = {} #while true # r.readpartial(64*1024, buf) # u.feed_each(buf) {|tag,ms| # if msbuf = map[tag] # msbuf << ms # else # map[tag] = ms # end # } # unless map.empty? # map.each_pair {|tag,ms| # es = MessagePackEventStream.new(ms) # block.call(tag, es) # } # map.clear # end #end u.each {|tag,ms| es = MessagePackEventStream.new(ms) block.call(tag, es) } rescue EOFError ensure r.close end end def new_forwarder(w, interval) if interval < 0.2 # TODO interval Forwarder.new(w) else DelayedForwarder.new(w, interval) end end class Forwarder def initialize(w) @w = w end def emit(tag, es) ms = es.to_msgpack_stream #[tag, ms].to_msgpack(@w) # not thread safe @w.write [tag, ms].to_msgpack end end class DelayedForwarder def initialize(w, interval) @w = w @interval = interval @buffer = {} Thread.new(&method(:run)) end def emit(tag, es) if ms = @buffer[tag] ms << es.to_msgpack_stream else @buffer[tag] = es.to_msgpack_stream end end def run while true sleep @interval @buffer.keys.each {|tag| if ms = @buffer.delete(tag) [tag, ms].to_msgpack(@w) #@w.write [tag, ms].to_msgpack end } end rescue $log.error "error on forwerder thread", :error=>$!.to_s $log.error_backtrace raise end end class MultiForwarder def initialize(forwarders) @forwarders = forwarders @rr = 1 end def emit(tag, es) forwarder = @forwarders[@rr] @rr = (@rr + 1) % @forwarders.length forwarder.emit(tag, es) end end end module DetachProcessImpl def on_detach_process(i) end protected def detach_process_impl(num, &block) children = [] num.times do |i| pid, forward_thread = DetachProcessManager.instance.fork(self) if pid # parent process $log.info "detached process", :class=>self.class, :pid=>pid children << [pid, forward_thread] next end # child process begin on_detach_process(i) block.call # disable Engine.stop called by signal handler Engine.define_singleton_method(:stop) do # do nothing end # override signal handlers called by parent process fin = FinishWait.new trap :INT do fin.stop end trap :TERM do fin.stop end #forward_thread.join # TODO this thread won't stop because parent doesn't close pipe fin.wait exit! 0 ensure $log.error "unknown error while shutting down this child process", :error=>$!.to_s, :pid=>Process.pid $log.error_backtrace end exit! 1 end # parent process # override shutdown method to kill child processes define_singleton_method(:shutdown) do children.each {|pair| begin pid = pair[0] forward_thread = pair[1] if pid Process.kill(:TERM, pid) forward_thread.join # wait until child closes pipe Process.waitpid(pid) pair[0] = nil end rescue $log.error "unknown error while shutting down remote child process", :error=>$!.to_s $log.error_backtrace end } end # override target.emit and write event stream to the pipe forwarders = children.map {|pair| pair[1].forwarder } if forwarders.length > 1 # use roundrobin fwd = DetachProcessManager::MultiForwarder.new(forwarders) else fwd = forwarders[0] end define_singleton_method(:emit) do |tag,es,chain| chain.next fwd.emit(tag, es) end end class FinishWait def initialize @finished = false @mutex = Mutex.new @cond = ConditionVariable.new end def wait @mutex.synchronize do until @finished @cond.wait(@mutex, 1.0) end end end def stop return if @finished @finished = true # Creating new thread due to mutex can't lock in main thread during trap context Thread.new { @mutex.synchronize do @cond.broadcast end }.run end def finished? @finished end end end module DetachProcessMixin include DetachProcessImpl def configure(conf) super if detach_process = conf['detach_process'] b3v = Config.bool_value(detach_process) case b3v when nil num = detach_process.to_i if num > 1 $log.warn "'detach_process' parameter supports only 1 process on this plugin: #{conf}" elsif num > 0 @detach_process = true elsif detach_process =~ /0+/ @detach_process = false else @detach_process = true end when true @detach_process = true when false @detach_process = false end end end def detach_process(&block) if @detach_process detach_process_impl(1, &block) else block.call end end end module DetachMultiProcessMixin include DetachProcessImpl def initialize @detach_process_num = 2 super end def configure(conf) super if detach_process = conf['detach_process'] b3v = Config.bool_value(detach_process) case b3v when nil num = detach_process.to_i if num > 0 @detach_process = true @detach_process_num = num elsif detach_process =~ /0+/ @detach_process = false else @detach_process = true end when true @detach_process = true when false @detach_process = false end end end protected def detach_multi_process(&block) if @detach_process detach_process_impl(@detach_process_num, &block) else block.call end end end end