lib/sidekiq-status/storage.rb in sidekiq-status-2.1.3 vs lib/sidekiq-status/storage.rb in sidekiq-status-3.0.0

- old
+ new

@@ -10,10 +10,11 @@ # @param [Hash] status_updates updated values # @param [Integer] expiration optional expire time in seconds # @param [ConnectionPool] redis_pool optional redis connection pool # @return [String] Redis operation status code def store_for_id(id, status_updates, expiration = nil, redis_pool=nil) + status_updates.transform_values!(&:to_s) redis_connection(redis_pool) do |conn| conn.multi do |pipeline| pipeline.hmset key(id), 'update_time', Time.now.to_i, *(status_updates.to_a.flatten(1)) pipeline.expire key(id), (expiration || Sidekiq::Status::DEFAULT_EXPIRY) pipeline.publish "status_updates", id @@ -34,11 +35,11 @@ # Unschedules the job and deletes the Status # @param [String] id job id # @param [Num] job_unix_time, unix timestamp for the scheduled job def delete_and_unschedule(job_id, job_unix_time = nil) - Sidekiq.redis do |conn| + Sidekiq::Status.redis_adapter do |conn| scan_options = {offset: 0, conn: conn, start: (job_unix_time || '-inf'), end: (job_unix_time || '+inf')} while not (jobs = schedule_batch(scan_options)).empty? match = scan_scheduled_jobs_for_jid jobs, job_id unless match.nil? @@ -64,21 +65,21 @@ # Gets a single valued from job status hash # @param [String] id job id # @param [String] Symbol field fetched field name # @return [String] Redis operation status code def read_field_for_id(id, field) - Sidekiq.redis do |conn| + Sidekiq::Status.redis_adapter do |conn| conn.hget(key(id), field) end end # Gets the whole status hash from the job status # @param [String] id job id # @return [Hash] Hash stored in redis def read_hash_for_id(id) - Sidekiq.redis do |conn| - conn.hgetall key(id) + Sidekiq::Status.redis_adapter do |conn| + conn.hgetall(key(id)) end end private @@ -88,10 +89,10 @@ # - conn: Redis connection # - start: start score (i.e. -inf or a unix timestamp) # - end: end score (i.e. +inf or a unix timestamp) # - offset: current progress through (all) jobs (e.g.: 100 if you want jobs from 100 to BATCH_LIMIT) def schedule_batch(options) - options[:conn].zrangebyscore "schedule", options[:start], options[:end], limit: [options[:offset], BATCH_LIMIT] + Sidekiq::Status.wrap_redis_connection(options[:conn]).schedule_batch("schedule", options.merge(limit: BATCH_LIMIT)) end # Searches the jobs Array for the job_id # @param [Array] scheduled_jobs, results of Redis schedule key # @param [String] id job id