lib/legion/extensions/scheduler/runners/schedule.rb in lex-scheduler-0.1.0 vs lib/legion/extensions/scheduler/runners/schedule.rb in lex-scheduler-0.1.1
- old
+ new
@@ -1,5 +1,7 @@
+require 'fugit'
+
module Legion
module Extensions
module Scheduler
module Runners
module Schedule
@@ -11,29 +13,47 @@
def push_refresh(**)
Legion::Extensions::Scheduler::Transport::Messages::Refresh.new.publish
end
def refresh(**)
- Legion::Cache.set('scheduler_schedule_lock', Legion::Settings[:client][:name], 5)
+ Legion::Cache.set('scheduler_schedule_lock', Legion::Settings[:client][:name], 2)
end
def schedule_tasks(**)
return unless Legion::Cache.get('scheduler_schedule_lock') == Legion::Settings[:client][:name]
models_class::Schedule.where(active: 1).each do |row|
- next unless row.values[:interval].positive?
- next if (Time.now - row.values[:last_run]) < row.values[:interval]
+ if row.values[:interval].is_a?(Integer) && row.values[:interval].positive?
+ next if (Time.now - row.values[:last_run]) < row.values[:interval]
+ elsif row.values[:cron].is_a? String
+ cron_class = Fugit.parse(row.values[:cron])
+ if cron_class.respond_to? :to_sec
+ next if (Time.now - row.values[:last_run]) < cron_class.to_sec
+ elsif cron_class.respond_to? :previous_time
+ next if Time.now < Time.parse(cron_class.previous_time.to_s)
+ next if row.values[:last_run] > Time.parse(cron_class.previous_time.to_s)
+ end
+ end
- send_task(function_id: row.values[:function_id], **Legion::JSON.load(row.values[:payload]))
+ send_task(transformation: row.values[:transformation],
+ function_id: row.values[:function_id],
+ function: row.values[:name],
+ **Legion::JSON.load(row.values[:payload]))
row.update(last_run: Sequel::CURRENT_TIMESTAMP)
end
end
def send_task(**opts)
payload = {}
%i[runner_class function_id function debug args].each do |thing|
payload[thing] = opts[thing] if opts.key? thing
end
+
+ return Legion::Transport::Messages::Dynamic.new(**payload).publish if opts[:transformation].nil?
+
+ payload[:exchange] = 'task'
+ payload[:routing_key] = 'task.subtask.transform'
+ payload[:transformation] = opts[:transformation]
Legion::Extensions::Scheduler::Transport::Messages::SendTask.new(**payload).publish
end
end
end