lib/queue_bus/adapters/data.rb in queue-bus-0.9.0 vs lib/queue_bus/adapters/data.rb in queue-bus-0.9.1

- old
+ new

@@ -1,65 +1,66 @@ +# frozen_string_literal: true + # a base adapter just for publishing and redis connection module QueueBus module Adapters class Data < QueueBus::Adapters::Base def enabled! # nothing to do end - def redis=(client) - @redis = client - end + attr_writer :redis def redis(&block) - raise "no redis instance set" unless @redis + raise 'no redis instance set' unless @redis + block.call(@redis) end def enqueue(queue_name, klass, json) - push(queue_name, :class => klass.to_s, :args => [json]) + push(queue_name, class: klass.to_s, args: [json]) end def enqueue_at(epoch_seconds, queue_name, klass, json) item = delayed_job_to_hash_with_queue(queue_name, klass, [json]) delayed_push(epoch_seconds, item) end - def setup_heartbeat!(queue_name) + def setup_heartbeat!(_queue_name) raise NotImplementedError end protected def push(queue, item) watch_queue(queue) - self.redis { |redis| redis.rpush "queue:#{queue}", ::QueueBus::Util.encode(item) } + redis { |redis| redis.rpush "queue:#{queue}", ::QueueBus::Util.encode(item) } end # Used internally to keep track of which queues we've created. # Don't call this directly. def watch_queue(queue) - self.redis { |redis| redis.sadd(:queues, queue.to_s) } + redis { |redis| redis.sadd(:queues, queue.to_s) } end # Used internally to stuff the item into the schedule sorted list. # +timestamp+ can be either in seconds or a datetime object # Insertion if O(log(n)). # Returns true if it's the first job to be scheduled at that time, else false def delayed_push(timestamp, item) - self.redis do |redis| + redis do |redis| # First add this item to the list for this timestamp redis.rpush("delayed:#{timestamp.to_i}", ::QueueBus::Util.encode(item)) # Now, add this timestamp to the zsets. The score and the value are # the same since we'll be querying by timestamp, and we don't have # anything else to store. redis.zadd :delayed_queue_schedule, timestamp.to_i, timestamp.to_i end end - + def delayed_job_to_hash_with_queue(queue, klass, args) - {:class => klass.to_s, :args => args, :queue => queue} + { class: klass.to_s, args: args, queue: queue } end end end end