lib/legion/extensions/scheduler/runners/schedule.rb in lex-scheduler-0.1.1 vs lib/legion/extensions/scheduler/runners/schedule.rb in lex-scheduler-0.1.2

- old
+ new

@@ -32,24 +32,27 @@ next if Time.now < Time.parse(cron_class.previous_time.to_s) next if row.values[:last_run] > Time.parse(cron_class.previous_time.to_s) end end + function = Legion::Data::Model::Function[row.values[:function_id]] + send_task(transformation: row.values[:transformation], function_id: row.values[:function_id], - function: row.values[:name], + expiration: row.values[:task_ttl], + function: function.values[:name], **Legion::JSON.load(row.values[:payload])) row.update(last_run: Sequel::CURRENT_TIMESTAMP) end end def send_task(**opts) payload = {} - %i[runner_class function_id function debug args].each do |thing| + %i[runner_class function_id function debug args expiration].each do |thing| payload[thing] = opts[thing] if opts.key? thing end - return Legion::Transport::Messages::Dynamic.new(**payload).publish if opts[:transformation].nil? + return Legion::Transport::Messages::Dynamic.new(**opts).publish if opts[:transformation].nil? payload[:exchange] = 'task' payload[:routing_key] = 'task.subtask.transform' payload[:transformation] = opts[:transformation]