lib/legion/extensions/scheduler/runners/schedule.rb in lex-scheduler-0.1.1 vs lib/legion/extensions/scheduler/runners/schedule.rb in lex-scheduler-0.1.2
- old
+ new
@@ -32,24 +32,27 @@
next if Time.now < Time.parse(cron_class.previous_time.to_s)
next if row.values[:last_run] > Time.parse(cron_class.previous_time.to_s)
end
end
+ function = Legion::Data::Model::Function[row.values[:function_id]]
+
send_task(transformation: row.values[:transformation],
function_id: row.values[:function_id],
- function: row.values[:name],
+ expiration: row.values[:task_ttl],
+ function: function.values[:name],
**Legion::JSON.load(row.values[:payload]))
row.update(last_run: Sequel::CURRENT_TIMESTAMP)
end
end
def send_task(**opts)
payload = {}
- %i[runner_class function_id function debug args].each do |thing|
+ %i[runner_class function_id function debug args expiration].each do |thing|
payload[thing] = opts[thing] if opts.key? thing
end
- return Legion::Transport::Messages::Dynamic.new(**payload).publish if opts[:transformation].nil?
+ return Legion::Transport::Messages::Dynamic.new(**opts).publish if opts[:transformation].nil?
payload[:exchange] = 'task'
payload[:routing_key] = 'task.subtask.transform'
payload[:transformation] = opts[:transformation]