class Rufus::Scheduler::Job EoTime = ::EtOrbi::EoTime # # Used by Job#kill # class KillSignal < StandardError; end attr_reader :id attr_reader :opts attr_reader :original attr_reader :scheduled_at attr_reader :last_time attr_reader :unscheduled_at attr_reader :tags attr_reader :locals attr_reader :count attr_reader :last_work_time attr_reader :mean_work_time attr_accessor :name # next trigger time # attr_accessor :next_time # previous "next trigger time" # attr_accessor :previous_time # anything with a #call(job[, timet]) method, # what gets actually triggered # attr_reader :callable # a reference to the instance whose call method is the @callable # attr_reader :handler # Default, core, implementation has no effect. Repeat jobs do override it. # def resume_discard_past=(v); end def initialize(scheduler, original, opts, block) @scheduler = scheduler @original = original @opts = opts @handler = block @callable = if block.respond_to?(:arity) block elsif block.respond_to?(:call) block.method(:call) elsif block.is_a?(Class) @handler = block.new @handler.method(:call) rescue nil else nil end @scheduled_at = EoTime.now @unscheduled_at = nil @last_time = nil @locals = opts[:locals] || opts[:l] || {} @local_mutex = Mutex.new @id = determine_id @name = opts[:name] || opts[:n] fail( ArgumentError, 'missing block or callable to schedule', caller[2..-1] ) unless @callable @tags = Array(opts[:tag] || opts[:tags]).collect { |t| t.to_s } @count = 0 @last_work_time = 0.0 @mean_work_time = 0.0 # tidy up options if @opts[:allow_overlap] == false || @opts[:allow_overlapping] == false @opts[:overlap] = false end if m = @opts[:mutex] @opts[:mutex] = Array(m) end end alias job_id id def source_location @callable.source_location end alias location source_location # Will fail with an ArgumentError if the job frequency is higher than # the scheduler frequency. # def check_frequency # this parent implementation never fails end def trigger(time) @previous_time = @next_time set_next_time(time) do_trigger(time) end # Trigger the job right now, off of its schedule. # # Done in collaboration with Piavka in # https://github.com/jmettraux/rufus-scheduler/issues/214 # def trigger_off_schedule(time=EoTime.now) do_trigger(time) end def unschedule @unscheduled_at = EoTime.now end def threads Thread.list.select { |t| t[:rufus_scheduler_job] == self } end # Kills all the threads this Job currently has going on. # def kill threads.each { |t| t.raise(KillSignal) } end def running? threads.any? end def scheduled? @scheduler.scheduled?(self) end def []=(key, value) @local_mutex.synchronize { @locals[key] = value } end def [](key) @local_mutex.synchronize { @locals[key] } end def has_key?(key) @local_mutex.synchronize { @locals.has_key?(key) } end alias key? has_key? def keys; @local_mutex.synchronize { @locals.keys }; end def values; @local_mutex.synchronize { @locals.values }; end def entries; @local_mutex.synchronize { @locals.entries }; end #def hash # self.object_id #end #def eql?(o) # o.class == self.class && o.hash == self.hash #end # # might be necessary at some point def next_times(count) next_time ? [ next_time ] : [] end # Calls the callable (usually a block) wrapped in this Job instance. # # Warning: error rescueing is the responsibity of the caller. # def call(do_rescue=false) do_call(EoTime.now, do_rescue) end protected def callback(meth, time) return true unless @scheduler.respond_to?(meth) arity = @scheduler.method(meth).arity args = [ self, time ][0, (arity < 0 ? 2 : arity)] @scheduler.send(meth, *args) end def compute_timeout if to = @opts[:timeout] Rufus::Scheduler.parse(to) else nil end end def mutex(m) m.is_a?(Mutex) ? m : (@scheduler.mutexes[m.to_s] ||= Mutex.new) end def do_call(time, do_rescue) args = [ self, time ][0, @callable.arity] @scheduler.around_trigger(self) do @callable.call(*args) end rescue StandardError => se fail se unless do_rescue return if se.is_a?(KillSignal) # discard @scheduler.on_error(self, se) # exceptions above StandardError do pass through end def do_trigger(time) return if ( opts[:overlap] == false && running? ) return if ( callback(:confirm_lock, time) && callback(:on_pre_trigger, time) ) == false @count += 1 if opts[:blocking] trigger_now(time) else trigger_queue(time) end end def trigger_now(time) ct = Thread.current t = EoTime.now # if there are mutexes, t might be really bigger than time ct[:rufus_scheduler_job] = self ct[:rufus_scheduler_time] = t ct[:rufus_scheduler_timeout] = compute_timeout @last_time = t do_call(time, true) ensure @last_work_time = EoTime.now - ct[:rufus_scheduler_time] @mean_work_time = ((@count - 1) * @mean_work_time + @last_work_time) / @count post_trigger(time) ct[:rufus_scheduler_job] = nil ct[:rufus_scheduler_time] = nil ct[:rufus_scheduler_timeout] = nil end def post_trigger(time) set_next_time(time, true) # except IntervalJob instances, jobs will ignore this call callback(:on_post_trigger, time) end def start_work_thread thread = Thread.new do ct = Thread.current ct[:rufus_scheduler_job] = true # indicates that the thread is going to be assigned immediately ct[@scheduler.thread_key] = true ct[:rufus_scheduler_work_thread] = true loop do break if @scheduler.started_at == nil job, time = @scheduler.work_queue.pop break if job == :shutdown break if @scheduler.started_at == nil next if job.unscheduled_at begin (job.opts[:mutex] || []).reduce( lambda { job.trigger_now(time) } ) do |b, m| lambda { mutex(m).synchronize { b.call } } end.call rescue KillSignal # simply go on looping end end end thread[@scheduler.thread_key] = true thread[:rufus_scheduler_work_thread] = true # # same as above (in the thead block), # but since it has to be done as quickly as possible. # So, whoever is running first (scheduler thread vs job thread) # sets this information thread end def trigger_queue(time) threads = @scheduler.work_threads vac = threads.select { |t| t[:rufus_scheduler_job] == nil }.size que = @scheduler.work_queue.size cur = threads.size max = @scheduler.max_work_threads start_work_thread if vac - que < 1 && cur < max @scheduler.work_queue << [ self, time ] end end