lib/baleen/runner.rb in baleen-0.1.1 vs lib/baleen/runner.rb in baleen-0.1.2
- old
+ new
@@ -1,134 +1,83 @@
require "baleen/error"
+require 'forwardable'
module Baleen
- class DockerParam
- def initialize(params)
- @params = params
- end
-
- def method_missing(name, *args)
- DockerParam.class_eval{
- define_method "#{name}" do
- @params[name.to_sym]
- end
-
- define_method "#{name}=" do |*args|
- @params[name.to_sym] = args.first
- end
- }
- send(name)
- end
-
- def commands
- %{
- #{@params[:before_command]}
- cd #{@params[:work_dir]}
- #{command}
- }
- end
-
- def command
- @params[:command] ||= %{#{@params[:exe]} #{@params[:files]}}
- end
-
- def command=(c)
- @params[:command] = c
- end
-
- def dup
- copy_params = @params.dup
- Object.const_get(self.class.to_s).new(copy_params)
- end
- end
-
class RunnerManager
- include Celluloid::IO
-
- CONCURRENCY=2
-
- def initialize(socket, msg)
- @socket = socket
- @queue = []
- @results = []
- @params = DockerParam.new(msg.params)
+ def initialize(connection, task)
+ @task = task
+ @connection = connection
end
def run
+ results = []
+ prepare_task
create_runners.each do |runners|
- @queue = runners
- @queue.each do |runner|
- runner.async.run
+ runners.map{|runner| runner.future.run}.each do |actor|
+ results << actor.value
end
- loop {break if monitor_runners}
- @results += @queue.map {|runner| runner.status.params}
end
- msg = Message::Response::JobComplete.new(status: "done", results: @results.to_json)
- @socket.puts(msg.to_json)
+ @task.results = results
+ yield @task
end
private
- def create_runners
- target_files.map {|file|
- params = @params.dup
- params.files = file
- Runner.new(params)
- }.each_slice(@params.concurrency).map {|r| r}
+ def prepare_task
+ @task.prepare
end
- def target_files
- params = @params.dup
- params.command = %{find #{params.files} | grep "\\.feature"}
- runner = Runner.new(params)
- runner.run
- runner.status.log.split("\n")
+ def create_runners
+ @task.target_files.map {|file|
+ task = @task.dup
+ task.files = file
+ Runner.new(task, @connection)
+ }.each_slice(@task.concurrency).map {|r| r}
end
- def monitor_runners
- @queue.all?{ |r| r.status }
- end
end
class Runner
- include Celluloid::IO
+ include Celluloid
+ extend Forwardable
- attr_reader :status
+ def_delegator :@connection, :notify_info
- def initialize(params)
- @docker_client = Container::DockerClient.new
- @status = nil
- @params = params
+ def initialize(task, connection=nil)
+ @container = Docker::Container.create('Cmd' => [task.shell, task.opt, task.commands], 'Image' => task.image)
+ @connection = connection ? connection : Connection.new
+ @task = task
end
def run
- start_runner do |result|
- @status = Message::Response::RunnerFinish.new(
- status_code: result.status_code,
- container_id: result.container_id,
- log: result.log,
- file: @params.files,
- )
- end
- sleep 0.1 # Stop a moment until RunnerManager checks the status
- end
-
- def start_runner
max_retry = 3; count = 0
begin
- @docker_client.create_container(@params)
- @docker_client.start_container
+ notify_info("Start container #{@container.id}")
+ @container.start
+ @container.wait(600) #TODO move to configuration
+ notify_info("Finish container #{@container.id}")
+
+ if @task.commit
+ notify_info("Committing the change of container #{@container.id}")
+ @container.commit({repo: @task.image}) if @task.commit
+ end
rescue Excon::Errors::NotFound => e
count += 1
if count > max_retry
raise Baleen::Error::StartContainerFail
else
retry
end
end
- yield( @docker_client.result )
+
+ return {
+ status_code: @container.json["State"]["ExitCode"],
+ container_id: @container.id,
+ log: @container.attach(:stream => false, :stdout => true, :stderr => true, :logs => true),
+ file: @task.files,
+ }
end
end
end
\ No newline at end of file