lib/pmux/reducer.rb in pmux-0.1.2 vs lib/pmux/reducer.rb in pmux-0.1.3

- old
+ new

@@ -34,12 +34,14 @@ def do_reduce_task reducer_cmd = @task['reducer'] || 'cat' @output_path = "#{@tmp_dir}/r#{@task['pindex']}" err_path = "#{@tmp_dir}/.err.#{$$}" err_msg = nil + #cmd_line = fix_cmd_line reducer_cmd, + # @paths.join(' '), @output_path, err_path, tmp_dir cmd_line = fix_cmd_line reducer_cmd, - @paths.join(' '), @output_path, err_path, tmp_dir + "#{tmp_dir}/t*-#{@task['pindex']}", @output_path, err_path, tmp_dir Log.debug "system: #{cmd_line}" system cmd_line @exitstatus = $?.exitstatus if File.size? err_path err_msg = File.read(err_path).chomp! @@ -54,11 +56,13 @@ def do_streaming_reduce_task reducer_cmd = @task['reducer'] || 'cat' @output_path = "#{@tmp_dir}/r#{@task['pindex']}" err_path = "#{@tmp_dir}/.rerr.#{$$}" err_msg = nil + #cmd_line = fix_cmd_line reducer_cmd, + # @paths.join(' '), nil, err_path, tmp_dir cmd_line = fix_cmd_line reducer_cmd, - @paths.join(' '), nil, err_path, tmp_dir + "#{tmp_dir}/t*-#{@task['pindex']}", nil, err_path, tmp_dir Log.debug "popen: #{cmd_line}" pipeio = nil Dir.chdir(@tmp_dir) {pipeio = PipeIO.new cmd_line} if @on_receive pipeio.on_receive &@on_receive