begin require 'sidekiq' rescue LoadError end require_relative './redis_model' require_relative './redis_script' require_relative "./callback" require_relative "./context_hash" require_relative "./status" require_relative "./pool" Dir[File.dirname(__FILE__) + "/jobs/*.rb"].each { |file| require file } require_relative "./chain_builder" # Implement Job Batching similar to Sidekiq::Batch. Supports ActiveJob and Sidekiq, or a mix thereof. # Much of this code is modifed/extended from https://github.com/breamware/sidekiq-batch module CanvasSync module JobBatches CURRENT_BATCH_THREAD_KEY = :job_batches_batch class Batch include RedisModel class NoBlockGivenError < StandardError; end delegate :redis, to: :class BID_EXPIRE_TTL = 90.days.to_i SCHEDULE_CALLBACK = RedisScript.new(Pathname.new(__FILE__) + "../schedule_callback.lua") BID_HIERARCHY = RedisScript.new(Pathname.new(__FILE__) + "../hier_batch_ids.lua") attr_reader :bid def initialize(existing_bid = nil) @bid = existing_bid || SecureRandom.urlsafe_base64(10) @existing = !(!existing_bid || existing_bid.empty?) # Basically existing_bid.present? @initialized = false @bidkey = "BID-" + @bid.to_s self.created_at = Time.now.utc.to_f unless @existing end redis_attr :description redis_attr :created_at redis_attr :callback_queue, read_only: false redis_attr :callback_params, :json redis_attr :allow_context_changes def context return @context if defined?(@context) if (@initialized || @existing) @context = ContextHash.new(bid) else @context = ContextHash.new(bid, {}) end end def context=(value) raise "context is read-only once the batch has been started" if (@initialized || @existing) # && !allow_context_changes raise "context must be a Hash" unless value.is_a?(Hash) || value.nil? return nil if value.nil? && @context.nil? value = {} if value.nil? value = value.local if value.is_a?(ContextHash) @context ||= ContextHash.new(bid, {}) @context.set_local(value) # persist_bid_attr('context', JSON.unparse(@context.local)) end def save_context_changes @context&.save! end def on(event, callback, options = {}) return unless Callback::VALID_CALLBACKS.include?(event.to_s) callback_key = "#{@bidkey}-callbacks-#{event}" redis.multi do |r| r.sadd(callback_key, JSON.unparse({ callback: callback, opts: options })) r.expire(callback_key, BID_EXPIRE_TTL) end end def jobs raise NoBlockGivenError unless block_given? if !@existing && !@initialized parent_bid = Thread.current[CURRENT_BATCH_THREAD_KEY]&.bid redis.multi do |r| r.hset(@bidkey, "parent_bid", parent_bid.to_s) if parent_bid r.expire(@bidkey, BID_EXPIRE_TTL) if parent_bid r.hincrby("BID-#{parent_bid}", "children", 1) r.expire("BID-#{parent_bid}", BID_EXPIRE_TTL) r.zadd("BID-#{parent_bid}-bids", created_at, bid) else r.zadd("BID-ROOT-bids", created_at, bid) end end flush_pending_attrs @context&.save! @initialized = true else assert_batch_is_open end begin parent = Thread.current[CURRENT_BATCH_THREAD_KEY] Thread.current[CURRENT_BATCH_THREAD_KEY] = self yield ensure Thread.current[CURRENT_BATCH_THREAD_KEY] = parent end nil end def increment_job_queue(jid) assert_batch_is_open append_jobs([jid]) end def invalidate_all redis.setex("invalidated-bid-#{bid}", BID_EXPIRE_TTL, 1) end def parent_bid redis.hget(@bidkey, "parent_bid") end def parent if parent_bid Batch.new(parent_bid) end end def valid?(batch = self) valid = !redis.exists?("invalidated-bid-#{batch.bid}") batch.parent ? valid && valid?(batch.parent) : valid end def keep_open! if block_given? begin keep_open! yield ensure let_close! end else redis.hset(@bidkey, 'keep_open', "true") end end def let_close! _, failed, pending, children, complete, success = redis.multi do |r| r.hset(@bidkey, 'keep_open', "false") r.scard("BID-#{bid}-failed") r.hincrby("BID-#{bid}", "pending", 0) r.hincrby("BID-#{bid}", "children", 0) r.scard("BID-#{bid}-batches-complete") r.scard("BID-#{bid}-batches-success") end all_success = pending.to_i.zero? && children == success # if complete or successfull call complete callback (the complete callback may then call successful) if (pending.to_i == failed.to_i && children == complete) || all_success self.class.enqueue_callbacks(:complete, bid) self.class.enqueue_callbacks(:success, bid) if all_success end end def self.with_batch(batch) batch = self.new(batch) if batch.is_a?(String) parent = Thread.current[CURRENT_BATCH_THREAD_KEY] Thread.current[CURRENT_BATCH_THREAD_KEY] = batch yield ensure Thread.current[CURRENT_BATCH_THREAD_KEY] = parent end # Any Batches or Jobs created in the given block won't be assocaiated to the current batch def self.without_batch(&blk) with_batch(nil, &blk) end protected def redis_key @bidkey end def flush_pending_attrs super redis.zadd("batches", created_at, bid) end private def assert_batch_is_open unless defined?(@closed) @closed = redis.hget(@bidkey, 'success') == 'true' end raise "Cannot add jobs to Batch #{} bid - it has already entered the callback-stage" if @closed end def append_jobs(jids) jids = jids.uniq return unless jids.size > 0 redis do |r| tme = Time.now.utc.to_f added = r.zadd(@bidkey + "-jids", jids.map{|jid| [tme, jid] }, nx: true) r.multi do |r| r.hincrby(@bidkey, "pending", added) r.hincrby(@bidkey, "job_count", added) r.expire(@bidkey, BID_EXPIRE_TTL) r.expire(@bidkey + "-jids", BID_EXPIRE_TTL) end end end class << self def current Thread.current[CURRENT_BATCH_THREAD_KEY] end def current_context current&.context || {} end def process_failed_job(bid, jid) _, pending, failed, children, complete, parent_bid = redis do |r| return unless r.exists?("BID-#{bid}") r.multi do |r| r.sadd("BID-#{bid}-failed", jid) r.hincrby("BID-#{bid}", "pending", 0) r.scard("BID-#{bid}-failed") r.hincrby("BID-#{bid}", "children", 0) r.scard("BID-#{bid}-batches-complete") r.hget("BID-#{bid}", "parent_bid") r.expire("BID-#{bid}-failed", BID_EXPIRE_TTL) end end if pending.to_i == failed.to_i && children == complete enqueue_callbacks(:complete, bid) end end # Dead jobs are a Sidekiq feature. # If this is called for a job, process_failed_job was also called def process_dead_job(bid, jid) _, dead_count = redis do |r| return unless r.exists?("BID-#{bid}") r.multi do |r| r.sadd("BID-#{bid}-dead", jid) r.scard("BID-#{bid}-dead") r.expire("BID-#{bid}-dead", BID_EXPIRE_TTL) end end enqueue_callbacks(:death, bid) end def process_successful_job(bid, jid) _, failed, pending, children, complete, success, parent_bid, keep_open = redis do |r| return unless r.exists?("BID-#{bid}") r.multi do |r| r.srem("BID-#{bid}-failed", jid) r.scard("BID-#{bid}-failed") r.hincrby("BID-#{bid}", "pending", -1) r.hincrby("BID-#{bid}", "children", 0) r.scard("BID-#{bid}-batches-complete") r.scard("BID-#{bid}-batches-success") r.hget("BID-#{bid}", "parent_bid") r.hget("BID-#{bid}", "keep_open") r.hincrby("BID-#{bid}", "successful-jobs", 1) r.zrem("BID-#{bid}-jids", jid) r.expire("BID-#{bid}", BID_EXPIRE_TTL) end end all_success = pending.to_i.zero? && children == success # if complete or successfull call complete callback (the complete callback may then call successful) if (pending.to_i == failed.to_i && children == complete) || all_success enqueue_callbacks(:complete, bid) enqueue_callbacks(:success, bid) if all_success end end def enqueue_callbacks(event, bid) batch_key = "BID-#{bid}" callback_key = "#{batch_key}-callbacks-#{event}" callbacks, queue, parent_bid, callback_params = redis do |r| return unless r.exists?(batch_key) return if r.hget(batch_key, 'keep_open') == 'true' r.multi do |r| r.smembers(callback_key) r.hget(batch_key, "callback_queue") r.hget(batch_key, "parent_bid") r.hget(batch_key, "callback_params") end end queue ||= "default" parent_bid = !parent_bid || parent_bid.empty? ? nil : parent_bid # Basically parent_bid.blank? callback_params = JSON.parse(callback_params) if callback_params.present? callback_args = callbacks.reduce([]) do |memo, jcb| cb = JSON.load(jcb) memo << [cb['callback'], event.to_s, cb['opts'], bid, parent_bid] end opts = {"bid" => bid, "event" => event} should_schedule_batch = callback_args.present? && !callback_params.present? already_processed = redis do |r| SCHEDULE_CALLBACK.call(r, [batch_key], [event.to_s, should_schedule_batch.to_s, BID_EXPIRE_TTL]) end return if already_processed == 'true' if should_schedule_batch logger.debug {"Enqueue callback bid: #{bid} event: #{event} args: #{callback_args.inspect}"} with_batch(parent_bid) do cb_batch = self.new cb_batch.callback_params = { for_bid: bid, event: event, } opts['callback_bid'] = cb_batch.bid logger.debug {"Adding callback batch: #{cb_batch.bid} for batch: #{bid}"} cb_batch.jobs do push_callbacks(callback_args, queue) end end end if callback_params.present? opts['origin'] = callback_params end logger.debug {"Run batch finalizer bid: #{bid} event: #{event} args: #{callback_args.inspect}"} finalizer = Batch::Callback::Finalize.new status = Status.new bid finalizer.dispatch(status, opts) end def cleanup_redis(bid) logger.debug {"Cleaning redis of batch #{bid}"} redis do |r| r.zrem("batches", bid) r.zrem("BID-ROOT-bids", bid) r.unlink( "BID-#{bid}", "BID-#{bid}-callbacks-complete", "BID-#{bid}-callbacks-success", "BID-#{bid}-failed", "BID-#{bid}-batches-success", "BID-#{bid}-batches-complete", "BID-#{bid}-batches-failed", "BID-#{bid}-bids", "BID-#{bid}-jids", "BID-#{bid}-pending_callbacks", ) end end def delete_prematurely!(bid) child_bids = redis do |r| r.zrange("BID-#{bid}-bids", 0, -1) end child_bids.each do |cbid| delete_prematurely!(cbid) end cleanup_redis(bid) end def redis(&blk) return RedisProxy.new unless block_given? if Thread.current[:job_batches_redis] yield Thread.current[:job_batches_redis] elsif defined?(::Sidekiq) ::Sidekiq.redis do |r| Thread.current[:job_batches_redis] = r yield r ensure Thread.current[:job_batches_redis] = nil end else # TODO end end def logger defined?(::Sidekiq) ? ::Sidekiq.logger : Rails.logger end def push_callbacks(args, queue) Batch::Callback::worker_class.enqueue_all(args, queue) end def bid_hierarchy(bid, depth: 4, per_depth: 5, slice: nil) args = [bid, depth, per_depth] args << slice if slice redis do |r| BID_HIERARCHY.call(r, [], args) end end end class RedisProxy def multi(*args, &block) Batch.redis do |r| r.multi(*args) do |r| block.call(r) end end end def pipelined(*args, &block) Batch.redis do |r| if block.arity == 1 r.pipelined(*args) do block.call(r) end else r.pipelined(*args, &block) end end end def uget(key) Batch.redis do |r| case r.type(key) when 'string' r.get(key) when 'list' r.lrange(key, 0, -1) when 'hash' r.hgetall(key) when 'set' r.smembers(key) when 'zset' r.zrange(key, 0, -1) end end end def method_missing(method_name, *arguments, &block) Batch.redis do |r| r.send(method_name, *arguments, &block) end end def respond_to_missing?(method_name, include_private = false) super || Redis.method_defined?(method_name) end end end end end # Automatically integrate with Sidekiq if it is present. if defined?(::Sidekiq) require_relative './sidekiq' CanvasSync::JobBatches::Sidekiq.configure end # Automatically integrate with ActiveJob if it is present. if defined?(::ActiveJob) require_relative './active_job' CanvasSync::JobBatches::ActiveJob.configure end