require_relative 'logger' require_relative 'error' module Remon class Scheduler include Logger attr_reader :queue def initialize(schedule, queue:, scheduler_offset: 0) @schedule = schedule @queue = queue @scheduler_pipeline = {} @scheduler_offset = scheduler_offset validate_schedule end def run(n = :inf, show_progress: true) logger.debug "starting scheduler" ticker(n, show_progress) do |t| schedule_tasks (t + @scheduler_offset) end end def schedule_tasks(time) task_groups = keys.select { |i| time % i[:interval] == 0} task_groups.each do |tg| pipeline_task_group tg, time end enqueue_tasks(time) end def ticker(n, show_progress = false) t = Time.now.to_i case n when Integer n.times do |i| puts i if show_progress yield t t = t + 1 sleep_till t end else loop do yield t t = t + 1 sleep_till t end end end private def keys @keys ||= @schedule.keys end def sleep_till(t) diff = (t - Time.now.to_f) sleep diff if diff > 0 end def pipeline_task_group(task_group, time) randomize = task_group[:randomize] offset = task_group[:offset] if randomize && (offset > 0) offset = rand(0..offset) end tasks = @schedule[task_group] t = time + offset @scheduler_pipeline[t] ||= Set.new tasks.each { |task| @scheduler_pipeline[t] << task } end def enqueue_tasks(time) tasks = @scheduler_pipeline[time] return if not tasks tasks.each do |t| logger.debug "scheduling #{t} for time #{time}" if logger.debug? @queue << t end @scheduler_pipeline.delete(time) end def validate_schedule if not @schedule.is_a? Hash raise Error, "invalid schedule: not a hash" end @schedule.each do |task_group, tasks| raise Error, "invalid task_group: #{task_group}" if not valid_task_group? task_group end end def valid_task_group?(t) required_keys = [:interval, :randomize, :offset] return false if not t.is_a? Hash return false if not t.keys.all? { |i| required_keys.include? i } return true end end end