return unless defined?(Sidekiq.schedule) require_relative 'settings_validation' module PandaPal module OrganizationConcerns module TaskScheduling extend ActiveSupport::Concern include OrganizationConcerns::SettingsValidation included do after_commit :sync_schedule, on: [:create, :update] after_commit :unschedule_tasks, on: :destroy end class_methods do def _schedule_descriptors @_schedule_descriptors ||= {} end def settings_structure return super unless _schedule_descriptors.present? super.tap do |struc| struc[:properties] ||= {} struc[:properties][:timezone] ||= { type: 'String', required: false, validate: ->(timezone, *args) { ActiveSupport::TimeZone[timezone].present? ? nil : " Invalid Timezone '#{timezone}'" }, } struc[:properties][:task_schedules] = { type: 'Hash', required: false, properties: _schedule_descriptors.keys.reduce({}) do |hash, k| desc = _schedule_descriptors[k] hash.tap do |hash| kl = ' ' * (k.to_s.length - 4) hash[k.to_sym] = hash[k.to_s] = { required: false, description: <<~MARKDOWN, Override schedule for '#{k.to_s}' task. **Default**: #{desc[:schedule].is_a?(String) ? desc[:schedule] : ''} Set to `false` to disable or supply a Cron string: ```yaml #{k.to_s}: 0 0 0 * * * America/Denver ##{kl} │ │ │ │ │ │ └── Timezone (Optional) ##{kl} │ │ │ │ │ └── Day of Week ##{kl} │ │ │ │ └── Month ##{kl} │ │ │ └── Day of Month ##{kl} │ │ └── Hour ##{kl} │ └── Minute ##{kl} └── Second (Optional) ```` MARKDOWN json_schema: { oneOf: [ { type: 'string', pattern: '^((((\d+,)+\d+|(\d+(\/|-)\d+)|\d+|\*) ?){5,6})(\w+\/\w+)?$' }, { enum: [false] }, ], default: desc[:schedule].is_a?(String) ? desc[:schedule] : '0 0 3 * * * America/Denver', }, validate: ->(value, *args, errors:, **kwargs) { begin Rufus::Scheduler.parse(value) if value nil rescue ArgumentError errors << " must be false or a Crontab string" end } } end end, } end end def scheduled_task(cron_time, name_or_method = nil, worker: nil, queue: nil, &block) task_key = (name_or_method.presence || "scheduled_task_#{caller_locations[0].lineno}").to_s raise "Task key '#{task_key}' already taken!" if _schedule_descriptors.key?(task_key) _schedule_descriptors[task_key] = { key: task_key, schedule: cron_time, worker: worker || block || name_or_method.to_sym, queue: queue || 'default', } end def remove_scheduled_task(name_or_method) dval = _schedule_descriptors.delete(name_or_method.to_s) Rails.logger.warn("No task with key '#{name_or_method}' to delete!") unless dval.present? end def sync_schedules # Ensure deleted Orgs are removed existing_orgs = pluck(:name) old_schedules = Sidekiq.get_schedule.select do |k, v| m = k.match(/^org:([a-z0-9_]+)\-/i) m.present? && !existing_orgs.include?(m[1]) end old_schedules.keys.each do |k| Sidekiq.remove_schedule(k) end find_each(&:sync_schedule) end end def generate_schedule schedule = {} self.class._schedule_descriptors.values.each do |desc| cron_time = schedule_task_cron_time(desc) next unless cron_time.present? schedule["org:#{name}-#{desc[:key]}"] = { 'cron' => cron_time, 'queue' => desc[:queue], 'class' => ScheduledTaskExecutor.to_s, 'args' => [name, desc[:key]], } end schedule end def sync_schedule new_schedules = generate_schedule unschedule_tasks(new_schedules.keys) new_schedules.each do |k, v| Sidekiq.set_schedule(k, v) end end private def unschedule_tasks(new_task_keys = nil) current_schedules = Sidekiq.get_schedule.select { |k,v| k.starts_with?("org:#{name}-") } del_tasks = current_schedules.keys del_tasks -= new_task_keys if new_task_keys del_tasks.each do |k| Sidekiq.remove_schedule(k) end end def schedule_task_cron_time(desc) cron_time = nil cron_time = settings&.dig(:task_schedules, desc[:key].to_s) if cron_time.nil? cron_time = settings&.dig(:task_schedules, desc[:key].to_sym) if cron_time.nil? cron_time = desc[:schedule] if cron_time.nil? return nil unless cron_time.present? cron_time = instance_exec(&cron_time) if cron_time.is_a?(Proc) if !Rufus::Scheduler.parse(cron_time).zone.present? && settings && settings[:timezone] cron_time += " #{settings[:timezone]}" end cron_time end class ScheduledTaskExecutor include Sidekiq::Worker def perform(org_name, task_key) org = Organization.find_by!(name: org_name) task = Organization._schedule_descriptors[task_key] worker = task[:worker] Apartment::Tenant.switch(org.name) do if worker.is_a?(Proc) org.instance_exec(&worker) elsif worker.is_a?(Symbol) org.send(worker) elsif worker.is_a?(String) worker.constantize.perform_async elsif worker.is_a?(Class) worker.perform_async end end end end end end end SidekiqScheduler::Scheduler.instance.dynamic = true module SidekiqScheduler module Schedule original_schedule_setter = instance_method(:schedule=) define_method :schedule= do |sched| original_schedule_setter.bind(self).(sched).tap do PandaPal::Organization.sync_schedules end end end end