# coding: utf-8 require "json" require "optparse" require "net/http" module Magellan class LogFunnel class Error < StandardError end def self.start(argv) self.new(argv).run end def initialize(argv) @argv = argv end def parse_options(argv) @conf = {} OptionParser.new do |o| o.on("--fluentd URI") do |uri| @conf[:fluentd] = URI(uri) end o.on("--project PROJECT") do |proj| @conf[:project] = proj end o.on("--stage STAGE") do |stage| @conf[:stage] = stage end o.on("--worker-version VERSION") do |ver| @conf[:version] = ver end o.on("--container NAME") do |cname| @conf[:container] = cname end o.on("-d", "--[no-]daemon") do |daemon| @conf[:daemon] = daemon end o.order!(argv) end end def now Time.now end def post_msg(msg) Net::HTTP.post_form(@conf[:fluentd], json: JSON.generate({ time: self.now.to_i, project: @conf[:project], stage: @conf[:stage], version: @conf[:version], container: @conf[:container], message: msg })) end def run parse_options(@argv) @conf[:fluentd] ||= URI("http://localhost:8888") @conf[:fluentd].path = "/worker_logs.#{@conf[:project]}.#{@conf[:stage]}" if @conf[:daemon] Process.daemon end begin IO.pipe do |out| IO.pipe do |err| @pid = Process.spawn(*@argv, out: out[1], err: err[1]) out[1].close err[1].close th_out = Thread.start do begin while l = out[0].gets post_msg(l.chomp) end rescue IOError end end th_err = Thread.start do begin while l = err[0].gets post_msg(l.chomp) end rescue IOError end end th_out.join th_err.join Process.waitpid(@pid) @child_status = $? return (@child_status.exitstatus || 127) end end end ensure unless @child_status # interrupted by signal or so. # stop child process if @pid Process.kill(:TERM, @pid) end end end end end