15: def touch
16:
17: set_q
18:
19: @priority = @options['priority']
20: debug{ "priority <#{ @priority }>" }
21:
22: @tag = @options['tag']
23: debug{ "tag <#{ @tag }>" }
24:
25: @runner = @options['runner']
26: debug{ "runner <#{ @runner }>" }
27:
28: @restartable = @options['restartable']
29: debug{ "restartable <#{ @restartable }>" }
30:
31: @infile = @options['infile']
32: debug{ "infile <#{ @infile }>" }
33:
34: @job_stdin = @options['stdin']
35: debug{ "job_stdin <#{ @job_stdin }>" }
36:
37: @stage = @options['stage']
38: debug{ "stage <#{ @stage }>" }
39:
40: if job_stdin == '-' and stdin?
41: abort "cannot specify both jobs and job input on stdin"
42: end
43:
44: jobs = []
45:
46: unless @argv.empty?
47: job = Job::new
48: job['command'] = @argv.join(' ')
49: job['priority'] = @priority
50: job['tag'] = @tag
51: job['runner'] = @runner
52: job['restartable'] = @restartable
53: jobs << job
54: end
55:
56: if @infile
57: open(@infile) do |f|
58: debug{ "reading jobs from <#{ @infile }>" }
59: loadio f, @infile, jobs
60: end
61: end
62:
63: if jobs.empty? and stdin?
64: debug{ "reading jobs from <stdin>" }
65: loadio stdin, 'stdin', jobs
66: end
67:
68: abort "no jobs specified!" if jobs.empty?
69:
70: init_job_stdin!
71:
72: state = @stage ? 'holding' : 'pending'
73:
74: jobs.each do |job|
75: job['state'] = state
76: job['priority'] = @priority if @options.has_key?('priority')
77: job['tag'] = @tag if @options.has_key?('tag')
78: job['runner'] = @runner if @options.has_key?('runner')
79: job['restartable'] = @restartable if @options.has_key?('restartable')
80: job['stdin'] = @job_stdin if @job_stdin
81: end
82:
83:
84:
85:
86:
87: list = []
88:
89: tmpfile =
90: lambda do |basename|
91: basename = File.basename basename.to_s
92: Tempfile.new "#{ basename }_#{ Process.pid }_#{ rand.to_s }"
93: end
94:
95: update_job =
96: lambda do |pjob, ujob|
97: kvs, jid = {}, pjob['jid']
98:
99: pstdin, ustdin = pjob['stdin'], ujob['stdin']
100: if pstdin || ustdin
101: pbuf =
102: if pstdin
103: pstdin = @q.standard_in_4 jid
104: IO.read pstdin if test ?e, pstdin
105: end
106: ubuf =
107: if ustdin
108: IO.read ustdin if test ?e, ustdin
109: end
110:
111:
112: f = ustdin ? open(ustdin,'w') : tmpfile[ustdin]
113: begin
114: f.write pbuf if pbuf
115: f.write ubuf if pbuf
116: ensure
117: f.close
118: end
119: kvs['stdin'] = ujob['stdin'] = f.path
120:
121: end
122:
123: allowed = %( priority runner restartable )
124: allowed.each do |key|
125: val = ujob[key]
126: kvs[key] = val if val
127: end
128: @q.update(kvs, jid){|updated| list << updated}
129: end
130:
131: submit_job =
132: lambda do |job|
133: @q.submit(job){|submitted| list << submitted}
134: end
135:
136:
137:
138:
139:
140: @q.transaction do
141: pending = @q.list 'pending'
142:
143: pjobs, pcommands = {}, {}
144:
145: pending.each do |job|
146: jid = job['jid']
147: command = job['command'].strip
148: tag = job['tag'].to_s.strip
149: pjobs[jid] = job
150: pcommands[[command, tag]] = jid
151: end
152:
153: jobs.each do |job|
154: jid = job['jid']
155: command = job['command'].strip
156: tag = job['tag'].to_s.strip
157: if((jid = pcommands[[command, tag]]))
158: update_job[ pjobs[jid], job ]
159: else
160: submit_job[ job ]
161: end
162: end
163: end
164:
165: list.each &dumping_yaml_tuples unless @options['quiet']
166:
167: jobs = nil
168: list = nil
169: self
170:
171: end