lib/pmux/reducer.rb in pmux-0.1.2 vs lib/pmux/reducer.rb in pmux-0.1.3
- old
+ new
@@ -34,12 +34,14 @@
def do_reduce_task
reducer_cmd = @task['reducer'] || 'cat'
@output_path = "#{@tmp_dir}/r#{@task['pindex']}"
err_path = "#{@tmp_dir}/.err.#{$$}"
err_msg = nil
+ #cmd_line = fix_cmd_line reducer_cmd,
+ # @paths.join(' '), @output_path, err_path, tmp_dir
cmd_line = fix_cmd_line reducer_cmd,
- @paths.join(' '), @output_path, err_path, tmp_dir
+ "#{tmp_dir}/t*-#{@task['pindex']}", @output_path, err_path, tmp_dir
Log.debug "system: #{cmd_line}"
system cmd_line
@exitstatus = $?.exitstatus
if File.size? err_path
err_msg = File.read(err_path).chomp!
@@ -54,11 +56,13 @@
def do_streaming_reduce_task
reducer_cmd = @task['reducer'] || 'cat'
@output_path = "#{@tmp_dir}/r#{@task['pindex']}"
err_path = "#{@tmp_dir}/.rerr.#{$$}"
err_msg = nil
+ #cmd_line = fix_cmd_line reducer_cmd,
+ # @paths.join(' '), nil, err_path, tmp_dir
cmd_line = fix_cmd_line reducer_cmd,
- @paths.join(' '), nil, err_path, tmp_dir
+ "#{tmp_dir}/t*-#{@task['pindex']}", nil, err_path, tmp_dir
Log.debug "popen: #{cmd_line}"
pipeio = nil
Dir.chdir(@tmp_dir) {pipeio = PipeIO.new cmd_line}
if @on_receive
pipeio.on_receive &@on_receive