module CanvasSync::JobBatches class Pool include RedisModel POOL_REFILL = RedisScript.new(Pathname.new(__FILE__) + "../pool_refill.lua") attr_reader :pid redis_attr :description redis_attr :created_at redis_attr :concurrency, :int redis_attr :complete_count, :int redis_attr :order redis_attr :on_failed_job, :symbol redis_attr :clean_when_empty, :bool def initialize(pooolid = nil, **kwargs) if pooolid @existing = true @pid = pooolid else @pid = SecureRandom.urlsafe_base64(10) initialize_new(**kwargs) end end def self.from_pid(pid) raise "PID must be given" unless pid.present? new(pid) end def <<(job_desc) add_job(job_desc) end def add_job(job_desc) add_jobs([job_desc]) end def add_jobs(job_descs, skip_refill: false) job_descs.each do |job_desc| wrapper = Batch.new wrapper.description = "Pool Job Wrapper (PID: #{pid})" checkin_event = (on_failed_job == :wait) ? :success : :complete wrapper.on(checkin_event, "#{self.class.to_s}.job_checked_in", pool_id: pid) wrapper.placeholder! job_desc = job_desc.symbolize_keys job_desc = job_desc.merge!( job: job_desc[:job].to_s, pool_wrapper_batch: wrapper.bid, ) push_job_to_pool(job_desc) end refill_allotment unless skip_refill end def keep_open!(token = SecureRandom.urlsafe_base64(10)) if block_given? begin token = keep_open!(token) yield ensure let_close!(token) end else redis.multi do |r| r.sadd("#{redis_key}-holds", token) r.expire("#{redis_key}-holds", Batch::BID_EXPIRE_TTL) end token end end def let_close!(token = :unset) if token == :unset # Legacy redis.del("#{redis_key}-holds") redis.hset(redis_key, 'keep_open', 'false') else redis.srem("#{redis_key}-holds", token) end cleanup_if_empty end def cleanup_redis Batch.logger.debug {"Cleaning redis of pool #{pid}"} redis do |r| r.zrem("pools", pid) r.unlink( "#{redis_key}", "#{redis_key}-jobs", ) end end def cleanup_if_empty self.order activec, pactivec, pendingc, clean_when_empty, keep_open, holds = redis.multi do |r| r.hlen("#{redis_key}-active") r.hget(redis_key, "_active_count") pending_count(r) r.hget(redis_key, 'clean_when_empty') r.hget(redis_key, 'keep_open') r.scard("#{redis_key}-holds") end return if keep_open == 'true' || clean_when_empty == 'false' || (holds && holds > 0) if activec <= 0 && (pactivec.try(:to_i) || 0) <= 0 && pendingc <= 0 cleanup_redis end end def active_count(r = redis) r.hlen("#{redis_key}-active") + r.hincrby(redis_key, "_active_count", 0) end def active_jobs(r = redis) r.hvals("#{redis_key}-active").map {|desc| JSON.parse(desc)[0] } end def pending_count(r = redis) jobs_key = "#{redis_key}-jobs" order = self.order || 'fifo' case order.to_sym when :fifo, :lifo r.llen(jobs_key) when :random r.scard(jobs_key) when :priority r.zcard(jobs_key) end end def job_checked_in(status, options) active_count = refill_allotment(status.bid) cleanup_if_empty unless active_count > 0 end def self.job_checked_in(status, options) pid = options['pool_id'] from_pid(pid).job_checked_in(status, options) end # Administrative/console method to cleanup expired pools from the WebUI def self.cleanup_redis_index! suffixes = ["", "-active", "-jobs"] r.zrangebyscore("pools", "0", Batch::BID_EXPIRE_TTL.seconds.ago.to_i).each do |pid| r.zrem("pools", pid) if Batch.cleanup_redis_index_for("POOLID-#{pid}", suffixes) end end protected def redis_key "POOLID-#{pid}" end def refill_allotment(checkin_bid = nil) active_count, job_descs = POOL_REFILL.call(redis, [redis_key, "#{redis_key}-jobs", "#{redis_key}-active"], [checkin_bid || ""]) return active_count if active_count < 0 pending_job_descs = job_descs.dup added_jobs = {} failed_to_add_jobs = [] add_exception = nil while pending_job_descs.count > 0 begin job_json = pending_job_descs.shift job_desc = ::ActiveJob::Arguments.deserialize(JSON.parse(job_json))[0]&.symbolize_keys wbid = job_desc[:pool_wrapper_batch] Batch.new(wbid).jobs do ChainBuilder.enqueue_job(job_desc) end added_jobs[wbid] = job_json rescue => ex failed_to_add_jobs << job_json add_exception = ex end end redis.multi do |r| r.mapped_hmset("#{redis_key}-active", added_jobs) if added_jobs.count > 0 # Release reserved slots now that we've added the jobs to `-active` r.hincrby(redis_key, "_active_count", -job_descs.count) r.expire(redis_key, Batch::BID_EXPIRE_TTL) r.expire("#{redis_key}-active", Batch::BID_EXPIRE_TTL) r.expire("#{redis_key}-jobs", Batch::BID_EXPIRE_TTL) end # If this happens, we end up in a bad state (as we don't try to re-add items to the pool or refill_allotment again), but # this should be a _really_ rare case that should only occur if we've lost connection to Redis or something, so we're # operating on the assumption that if we get here, any recovery logic will fail too if add_exception.present? Batch.logger.error {"Error popping jobs from Pool #{pid}: #{add_exception}"} raise add_exception end active_count + added_jobs.count end def push_job_to_pool(job_desc) jobs_key = "#{redis_key}-jobs" # This allows duplicate jobs when a Redis Set is used job_desc[:_pool_random_key_] = SecureRandom.urlsafe_base64(10) job_json = JSON.unparse(::ActiveJob::Arguments.serialize([job_desc])) order = self.order redis.multi do |r| case order.to_sym when :fifo, :lifo r.rpush(jobs_key, job_json) when :random r.sadd(jobs_key, job_json) when :priority r.zadd(jobs_key, job_desc[:priority] || 0, job_json) end r.expire(redis_key, Batch::BID_EXPIRE_TTL) r.expire(jobs_key, Batch::BID_EXPIRE_TTL) end end def self.redis(&blk) Batch.redis &blk end delegate :redis, to: :class def flush_pending_attrs super redis.expire(redis_key, Batch::BID_EXPIRE_TTL) redis.zadd("pools", created_at, pid) end private def initialize_new(concurrency: nil, order: :fifo, clean_when_empty: true, on_failed_job: :wait, description: nil) self.created_at = Time.now.utc.to_f self.description = description self.order = order self.concurrency = concurrency self.clean_when_empty = clean_when_empty self.on_failed_job = on_failed_job flush_pending_attrs end end end