# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. module Skywalking module Reporter class Scheduler include Log::Logging def initialize @read_side, @write_side = IO.pipe @queue = Queue.new @workers = {} @jobs = Hash.new { |h, k| h[k] = [] } @jobs[:timer_job] << proc { |worker| add_worker(worker) } @jobs[:event_job] << proc { |event, blk| @jobs[event] << blk } @running = true end def run while running? reads = IO.select([@read_side], nil, nil, find_next_trigger) if reads&.dig(0, 0) == @read_side @read_side.read(1) end @workers.each_value do |worker| if worker.need_trigger? @queue << [worker.job_name] worker.set_latest_trigger_time end end until @queue.empty? job, args = @queue.pop dispatch(job, args) @workers[job]&.init_next_trigger_time end end end def dispatch(job, args) @jobs[job].each do |orig_job| begin orig_job.call(*args) rescue Exception => e warn "Error in job #{job}: #{e.message}" end end end def subscribe(job_name, job_interval, &job_func) trigger(:event_job, [job_name, job_func]) trigger(:timer_job, Timer.new(job_name, job_interval)) end def trigger(job_type, *args) @queue.push([job_type, *args]) notify end def find_next_trigger return nil if @workers.empty? timeout = @workers.values.map(&:next_trigger_time).min - Process.clock_gettime(Process::CLOCK_REALTIME) timeout.positive? ? timeout : 0 end def notify @write_side.write_nonblock('n') end def add_worker(worker) orig_job = @workers[worker.job_name] orig_job.adjust_next_trigger_time(Process.clock_gettime(Process::CLOCK_REALTIME) - orig_job.latest_trigger) if orig_job @workers[worker.job_name] = worker trigger_worker(worker) end def trigger_worker(worker) if worker.need_trigger? @queue.push([worker.job_name]) worker.set_latest_trigger_time end end def running? @running end def shutdown return unless running? @running = false @write_side.write_nonblock('s') end class Timer attr_reader :job_name, :job_interval, :next_trigger_time, :latest_trigger_time def initialize(job_name, job_interval) @job_name = job_name @job_interval = job_interval @start_time = Process.clock_gettime(Process::CLOCK_REALTIME) @latest_trigger_time = nil init_next_trigger_time end def init_next_trigger_time @next_trigger_time = gen_next_trigger_time end def set_latest_trigger_time @latest_trigger_time = Process.clock_gettime(Process::CLOCK_REALTIME) end def to_s "" end def latest_trigger @start_time || @latest_trigger_time end def adjust_next_trigger_time(time) @next_trigger_time -= time end def need_trigger?(now = Process.clock_gettime(Process::CLOCK_REALTIME)) now >= @next_trigger_time end def gen_next_trigger_time now = Process.clock_gettime(Process::CLOCK_REALTIME) return now if @job_interval == 0 ret = @latest_trigger_time || now ret += @job_interval while ret <= now ret end end end end end