lib/baleen/runner.rb in baleen-0.1.1 vs lib/baleen/runner.rb in baleen-0.1.2

- old
+ new

@@ -1,134 +1,83 @@ require "baleen/error" +require 'forwardable' module Baleen - class DockerParam - def initialize(params) - @params = params - end - - def method_missing(name, *args) - DockerParam.class_eval{ - define_method "#{name}" do - @params[name.to_sym] - end - - define_method "#{name}=" do |*args| - @params[name.to_sym] = args.first - end - } - send(name) - end - - def commands - %{ - #{@params[:before_command]} - cd #{@params[:work_dir]} - #{command} - } - end - - def command - @params[:command] ||= %{#{@params[:exe]} #{@params[:files]}} - end - - def command=(c) - @params[:command] = c - end - - def dup - copy_params = @params.dup - Object.const_get(self.class.to_s).new(copy_params) - end - end - class RunnerManager - include Celluloid::IO - - CONCURRENCY=2 - - def initialize(socket, msg) - @socket = socket - @queue = [] - @results = [] - @params = DockerParam.new(msg.params) + def initialize(connection, task) + @task = task + @connection = connection end def run + results = [] + prepare_task create_runners.each do |runners| - @queue = runners - @queue.each do |runner| - runner.async.run + runners.map{|runner| runner.future.run}.each do |actor| + results << actor.value end - loop {break if monitor_runners} - @results += @queue.map {|runner| runner.status.params} end - msg = Message::Response::JobComplete.new(status: "done", results: @results.to_json) - @socket.puts(msg.to_json) + @task.results = results + yield @task end private - def create_runners - target_files.map {|file| - params = @params.dup - params.files = file - Runner.new(params) - }.each_slice(@params.concurrency).map {|r| r} + def prepare_task + @task.prepare end - def target_files - params = @params.dup - params.command = %{find #{params.files} | grep "\\.feature"} - runner = Runner.new(params) - runner.run - runner.status.log.split("\n") + def create_runners + @task.target_files.map {|file| + task = @task.dup + task.files = file + Runner.new(task, @connection) + }.each_slice(@task.concurrency).map {|r| r} end - def monitor_runners - @queue.all?{ |r| r.status } - end end class Runner - include Celluloid::IO + include Celluloid + extend Forwardable - attr_reader :status + def_delegator :@connection, :notify_info - def initialize(params) - @docker_client = Container::DockerClient.new - @status = nil - @params = params + def initialize(task, connection=nil) + @container = Docker::Container.create('Cmd' => [task.shell, task.opt, task.commands], 'Image' => task.image) + @connection = connection ? connection : Connection.new + @task = task end def run - start_runner do |result| - @status = Message::Response::RunnerFinish.new( - status_code: result.status_code, - container_id: result.container_id, - log: result.log, - file: @params.files, - ) - end - sleep 0.1 # Stop a moment until RunnerManager checks the status - end - - def start_runner max_retry = 3; count = 0 begin - @docker_client.create_container(@params) - @docker_client.start_container + notify_info("Start container #{@container.id}") + @container.start + @container.wait(600) #TODO move to configuration + notify_info("Finish container #{@container.id}") + + if @task.commit + notify_info("Committing the change of container #{@container.id}") + @container.commit({repo: @task.image}) if @task.commit + end rescue Excon::Errors::NotFound => e count += 1 if count > max_retry raise Baleen::Error::StartContainerFail else retry end end - yield( @docker_client.result ) + + return { + status_code: @container.json["State"]["ExitCode"], + container_id: @container.id, + log: @container.attach(:stream => false, :stdout => true, :stderr => true, :logs => true), + file: @task.files, + } end end end \ No newline at end of file