module Sidekiq::Status::Storage RESERVED_FIELDS=%w(status stop update_time).freeze BATCH_LIMIT = 500 protected # Stores multiple values into a job's status hash, # sets last update time # @param [String] id job id # @param [Hash] status_updates updated values # @param [Integer] expiration optional expire time in seconds # @param [ConnectionPool] redis_pool optional redis connection pool # @return [String] Redis operation status code def store_for_id(id, status_updates, expiration = nil, redis_pool=nil) redis_connection(redis_pool) do |conn| conn.multi do conn.hmset key(id), 'update_time', Time.now.to_i, *(status_updates.to_a.flatten(1)) conn.expire key(id), (expiration || Sidekiq::Status::DEFAULT_EXPIRY) conn.publish "status_updates", id end[0] end end # Stores job status and sets expiration time to it # only in case of :failed or :stopped job # @param [String] id job id # @param [Symbol] job status # @param [Integer] expiration optional expire time in seconds # @param [ConnectionPool] redis_pool optional redis connection pool # @return [String] Redis operation status code def store_status(id, status, expiration = nil, redis_pool=nil) store_for_id id, {status: status}, expiration, redis_pool end # Unschedules the job and deletes the Status # @param [String] id job id # @param [Num] job_unix_time, unix timestamp for the scheduled job def delete_and_unschedule(job_id, job_unix_time = nil) Sidekiq.redis do |conn| scan_options = {offset: 0, conn: conn, start: (job_unix_time || '-inf'), end: (job_unix_time || '+inf')} while not (jobs = schedule_batch(scan_options)).empty? match = scan_scheduled_jobs_for_jid jobs, job_id unless match.nil? conn.zrem "schedule", match conn.del key(job_id) return true # Done end scan_options[:offset] += BATCH_LIMIT end end false end # Deletes status hash info for given job id # @param[String] job id # @retrun [Integer] number of keys that were removed def delete_status(id) redis_connection do |conn| conn.del(key(id)) end end # Gets a single valued from job status hash # @param [String] id job id # @param [String] Symbol field fetched field name # @return [String] Redis operation status code def read_field_for_id(id, field) Sidekiq.redis do |conn| conn.hget(key(id), field) end end # Gets the whole status hash from the job status # @param [String] id job id # @return [Hash] Hash stored in redis def read_hash_for_id(id) Sidekiq.redis do |conn| conn.hgetall key(id) end end private # Gets the batch of scheduled jobs based on input options # Uses Redis zrangebyscore for log(n) search, if unix-time is provided # @param [Hash] options, options hash containing (REQUIRED) keys: # - conn: Redis connection # - start: start score (i.e. -inf or a unix timestamp) # - end: end score (i.e. +inf or a unix timestamp) # - offset: current progress through (all) jobs (e.g.: 100 if you want jobs from 100 to BATCH_LIMIT) def schedule_batch(options) options[:conn].zrangebyscore "schedule", options[:start], options[:end], limit: [options[:offset], BATCH_LIMIT] end # Searches the jobs Array for the job_id # @param [Array] scheduled_jobs, results of Redis schedule key # @param [String] id job id def scan_scheduled_jobs_for_jid(scheduled_jobs, job_id) # A Little skecthy, I know, but the structure of these internal JSON # is predefined in such a way where this will not catch unintentional elements, # and this is notably faster than performing JSON.parse() for every listing: scheduled_jobs.select { |job_listing| job_listing.match(/\"jid\":\"#{job_id}\"/) }[0] end # Yields redis connection. Uses redis pool if available. # @param [ConnectionPool] redis_pool optional redis connection pool def redis_connection(redis_pool=nil) if redis_pool redis_pool.with do |conn| yield conn end else Sidekiq.redis do |conn| yield conn end end end def key(id) "sidekiq:status:#{id}" end end