lib/sidekiq-status/storage.rb in sidekiq-status-2.1.3 vs lib/sidekiq-status/storage.rb in sidekiq-status-3.0.0
- old
+ new
@@ -10,10 +10,11 @@
# @param [Hash] status_updates updated values
# @param [Integer] expiration optional expire time in seconds
# @param [ConnectionPool] redis_pool optional redis connection pool
# @return [String] Redis operation status code
def store_for_id(id, status_updates, expiration = nil, redis_pool=nil)
+ status_updates.transform_values!(&:to_s)
redis_connection(redis_pool) do |conn|
conn.multi do |pipeline|
pipeline.hmset key(id), 'update_time', Time.now.to_i, *(status_updates.to_a.flatten(1))
pipeline.expire key(id), (expiration || Sidekiq::Status::DEFAULT_EXPIRY)
pipeline.publish "status_updates", id
@@ -34,11 +35,11 @@
# Unschedules the job and deletes the Status
# @param [String] id job id
# @param [Num] job_unix_time, unix timestamp for the scheduled job
def delete_and_unschedule(job_id, job_unix_time = nil)
- Sidekiq.redis do |conn|
+ Sidekiq::Status.redis_adapter do |conn|
scan_options = {offset: 0, conn: conn, start: (job_unix_time || '-inf'), end: (job_unix_time || '+inf')}
while not (jobs = schedule_batch(scan_options)).empty?
match = scan_scheduled_jobs_for_jid jobs, job_id
unless match.nil?
@@ -64,21 +65,21 @@
# Gets a single valued from job status hash
# @param [String] id job id
# @param [String] Symbol field fetched field name
# @return [String] Redis operation status code
def read_field_for_id(id, field)
- Sidekiq.redis do |conn|
+ Sidekiq::Status.redis_adapter do |conn|
conn.hget(key(id), field)
end
end
# Gets the whole status hash from the job status
# @param [String] id job id
# @return [Hash] Hash stored in redis
def read_hash_for_id(id)
- Sidekiq.redis do |conn|
- conn.hgetall key(id)
+ Sidekiq::Status.redis_adapter do |conn|
+ conn.hgetall(key(id))
end
end
private
@@ -88,10 +89,10 @@
# - conn: Redis connection
# - start: start score (i.e. -inf or a unix timestamp)
# - end: end score (i.e. +inf or a unix timestamp)
# - offset: current progress through (all) jobs (e.g.: 100 if you want jobs from 100 to BATCH_LIMIT)
def schedule_batch(options)
- options[:conn].zrangebyscore "schedule", options[:start], options[:end], limit: [options[:offset], BATCH_LIMIT]
+ Sidekiq::Status.wrap_redis_connection(options[:conn]).schedule_batch("schedule", options.merge(limit: BATCH_LIMIT))
end
# Searches the jobs Array for the job_id
# @param [Array] scheduled_jobs, results of Redis schedule key
# @param [String] id job id