module CanvasSync module JobBatches class Pool include RedisModel HINCR_MAX = RedisScript.new(Pathname.new(__FILE__) + "../hincr_max.lua") attr_reader :pid redis_attr :description redis_attr :created_at redis_attr :concurrency, :int redis_attr :order redis_attr :on_failed_job, :symbol redis_attr :clean_when_empty, :bool def initialize(pooolid = nil, **kwargs) if pooolid @existing = true @pid = pooolid else @pid = SecureRandom.urlsafe_base64(10) initialize_new(**kwargs) end end def self.from_pid(pid) raise "PID must be given" unless pid.present? new(pid) end def <<(job_desc) add_job(job_desc) end def add_job(job_desc) add_jobs([job_desc]) end def add_jobs(job_descs) job_descs.each do |job_desc| wrapper = Batch.new wrapper.description = "Pool Job Wrapper (PID: #{pid})" checkin_event = (on_failed_job == :wait) ? :success : :complete wrapper.on(checkin_event, "#{self.class.to_s}.job_checked_in", pool_id: pid) wrapper.jobs {} job_desc = job_desc.with_indifferent_access job_desc = job_desc.merge!( job: job_desc[:job].to_s, pool_wrapper_batch: wrapper.bid, ) push_job_to_pool(job_desc) end refill_allotment end def keep_open! if block_given? begin keep_open! yield ensure let_close! end else redis do |r| r.hset(redis_key, 'keep_open', true) end end end def let_close! _, active_count = redis do |r| r.multi do r.hset(redis_key, 'keep_open', false) r.hincrby(redis_key, "active_count", 0) end end if active_count == 0 && pending_count == 0 cleanup_redis if clean_when_empty end end def cleanup_redis Batch.logger.debug {"Cleaning redis of pool #{pid}"} redis do |r| r.zrem("pools", pid) r.unlink( "#{redis_key}", "#{redis_key}-jobs", ) end end def active_count redis do |r| r.hincrby(redis_key, "active_count", 0) end end def pending_count jobs_key = "#{redis_key}-jobs" order = self.order || 'fifo' redis do |r| case order.to_sym when :fifo, :lifo r.llen(jobs_key) when :random r.scard(jobs_key) when :priority r.zcard(jobs_key) end end end def job_checked_in(status, options) active_count = redis do |r| return unless r.exists?(redis_key) r.hincrby(redis_key, "active_count", -1) end added_count = refill_allotment if active_count == 0 && added_count == 0 if clean_when_empty && redis {|r| r.hget(redis_key, 'keep_open') } != 'true' cleanup_redis end end end def self.job_checked_in(status, options) pid = options['pool_id'] from_pid(pid).job_checked_in(status, options) end protected def redis_key "POOLID-#{pid}" end def refill_allotment jobs_added = 0 limit = concurrency.to_i redis do |r| current_count = 0 while true current_count = HINCR_MAX.call(r, [redis_key], ["active_count", limit]).to_i if current_count < limit job_desc = pop_job_from_pool if job_desc.present? Batch.new(job_desc['pool_wrapper_batch']).jobs do ChainBuilder.enqueue_job(job_desc) end jobs_added += 1 else r.hincrby(redis_key, "active_count", -1) break end else break end end r.expire(redis_key, Batch::BID_EXPIRE_TTL) r.expire("#{redis_key}-jobs", Batch::BID_EXPIRE_TTL) end jobs_added end def push_job_to_pool(job_desc) jobs_key = "#{redis_key}-jobs" # This allows duplicate jobs when a Redis Set is used job_desc['_pool_random_key_'] = SecureRandom.urlsafe_base64(10) job_json = JSON.unparse(ActiveJob::Arguments.serialize([job_desc])) order = self.order redis do |r| r.multi do case order.to_sym when :fifo, :lifo r.rpush(jobs_key, job_json) when :random r.sadd(jobs_key, job_json) when :priority r.zadd(jobs_key, job_desc[:priority] || 0, job_json) end r.expire(jobs_key, Batch::BID_EXPIRE_TTL) end end end def pop_job_from_pool jobs_key = "#{redis_key}-jobs" order = self.order job_json = nil redis do |r| job_json = case order.to_sym when :fifo r.lpop(jobs_key) when :lifo r.rpop(jobs_key) when :random r.spop(jobs_key) when :priority r.zpopmax(jobs_key) end end return nil unless job_json.present? ActiveJob::Arguments.deserialize(JSON.parse(job_json))[0] end def self.redis(&blk) Batch.redis &blk end delegate :redis, to: :class protected def flush_pending_attrs super redis do |r| r.zadd("pools", created_at, pid) end end private def initialize_new(concurrency: nil, order: :fifo, clean_when_empty: true, on_failed_job: :wait, description: nil) self.created_at = Time.now.utc.to_f self.description = description self.order = order self.concurrency = concurrency self.clean_when_empty = clean_when_empty self.on_failed_job = on_failed_job flush_pending_attrs end end end end