return unless defined?(Sidekiq.schedule) require_relative 'settings_validation' module PandaPal module OrganizationConcerns module TaskScheduling extend ActiveSupport::Concern include OrganizationConcerns::SettingsValidation included do after_commit :sync_schedule, on: [:create, :update] after_commit :unschedule_tasks, on: :destroy end class_methods do def _schedule_descriptors @_schedule_descriptors ||= {} end def settings_structure return super unless _schedule_descriptors.present? super.tap do |struc| struc[:properties] ||= {} struc[:properties][:timezone] ||= { type: 'String', required: false, validate: ->(timezone, *args) { ActiveSupport::TimeZone[timezone].present? ? nil : " Invalid Timezone '#{timezone}'" }, } struc[:properties][:task_schedules] = { type: 'Hash', required: false, properties: _schedule_descriptors.keys.reduce({}) do |hash, k| hash.tap do |hash| hash[k.to_sym] = hash[k.to_s] = { required: false, validate: ->(value, *args, errors:, **kwargs) { begin Rufus::Scheduler.parse(value) if value nil rescue ArgumentError errors << " must be false or a Crontab string" end } } end end, } end end def scheduled_task(cron_time, name_or_method = nil, worker: nil, queue: nil, &block) task_key = (name_or_method.presence || "scheduled_task_#{caller_locations[0].lineno}").to_s raise "Task key '#{task_key}' already taken!" if _schedule_descriptors.key?(task_key) _schedule_descriptors[task_key] = { key: task_key, schedule: cron_time, worker: worker || block || name_or_method.to_sym, queue: queue || 'default', } end def sync_schedules # Ensure deleted Orgs are removed existing_orgs = pluck(:name) old_schedules = Sidekiq.get_schedule.select do |k, v| m = k.match(/^org:([a-z0-9_]+)\-/i) m.present? && !existing_orgs.include?(m[1]) end old_schedules.keys.each do |k| Sidekiq.remove_schedule(k) end find_each(&:sync_schedule) end end def generate_schedule schedule = {} self.class._schedule_descriptors.values.each do |desc| cron_time = schedule_task_cron_time(desc) next unless cron_time.present? schedule["org:#{name}-#{desc[:key]}"] = { 'cron' => cron_time, 'queue' => desc[:queue], 'class' => ScheduledTaskExecutor.to_s, 'args' => [name, desc[:key]], } end schedule end def sync_schedule new_schedules = generate_schedule unschedule_tasks(new_schedules.keys) new_schedules.each do |k, v| Sidekiq.set_schedule(k, v) end end private def unschedule_tasks(new_task_keys = nil) current_schedules = Sidekiq.get_schedule.select { |k,v| k.starts_with?("org:#{name}-") } del_tasks = current_schedules.keys del_tasks -= new_task_keys if new_task_keys del_tasks.each do |k| Sidekiq.remove_schedule(k) end end def schedule_task_cron_time(desc) cron_time = nil cron_time = settings&.dig(:task_schedules, desc[:key].to_s) if cron_time.nil? cron_time = settings&.dig(:task_schedules, desc[:key].to_sym) if cron_time.nil? cron_time = desc[:schedule] if cron_time.nil? return nil unless cron_time.present? cron_time = instance_exec(&cron_time) if cron_time.is_a?(Proc) if !Rufus::Scheduler.parse(cron_time).zone.present? && settings && settings[:timezone] cron_time += " #{settings[:timezone]}" end cron_time end class ScheduledTaskExecutor include Sidekiq::Worker def perform(org_name, task_key) org = Organization.find_by!(name: org_name) task = Organization._schedule_descriptors[task_key] worker = task[:worker] Apartment::Tenant.switch(org.name) do if worker.is_a?(Proc) org.instance_exec(&worker) elsif worker.is_a?(Symbol) org.send(worker) elsif worker.is_a?(String) worker.constantize.perform_async elsif worker.is_a?(Class) worker.perform_async end end end end end end end SidekiqScheduler::Scheduler.instance.dynamic = true module SidekiqScheduler module Schedule original_schedule_setter = instance_method(:schedule=) define_method :schedule= do |sched| original_schedule_setter.bind(self).(sched).tap do PandaPal::Organization.sync_schedules end end end end