# coding: utf-8 require "json" require "optparse" require "net/http" require "fluent/logger" module Magellan class LogFunnel class Error < StandardError end def self.start(argv) self.new(argv).run end def initialize(argv) @argv = argv end def parse_options(argv) @conf = {} OptionParser.new do |o| o.on("--fluentd URI") do |uri| @conf[:fluentd] = URI(uri) end o.on("--project PROJECT") do |proj| @conf[:project] = proj end o.on("--stage STAGE") do |stage| @conf[:stage] = stage end o.on("--worker-version VERSION") do |ver| @conf[:version] = ver end o.on("--container NAME") do |cname| @conf[:container] = cname end o.on("-d", "--[no-]daemon") do |daemon| @conf[:daemon] = daemon end o.order!(argv) end end def connect_fluentd @logger = Fluent::Logger::FluentLogger.new("worker_logs") end def now Time.now end def stage_tag @stage_tag ||= "#{@conf[:project]}.#{@conf[:stage]}" end def post_msg(msg) @logger.post(stage_tag, { time: self.now.to_f, project: @conf[:project], stage: @conf[:stage], version: @conf[:version], container: @conf[:container], message: msg }) end def run parse_options(@argv) connect_fluentd if @conf[:daemon] Process.daemon end begin IO.pipe do |out| IO.pipe do |err| @pid = Process.spawn(*@argv, out: out[1], err: err[1]) out[1].close err[1].close th_out = Thread.start do begin while l = out[0].gets post_msg(l.chomp) end rescue IOError end end th_err = Thread.start do begin while l = err[0].gets post_msg(l.chomp) end rescue IOError end end th_out.join th_err.join Process.waitpid(@pid) @child_status = $? return (@child_status.exitstatus || 127) end end end ensure unless @child_status # interrupted by signal or so. # stop child process if @pid Process.kill(:TERM, @pid) end end end end end