lib/atomic_sidekiq/atomic_fetch.rb in atomic-sidekiq-1.1.4 vs lib/atomic_sidekiq/atomic_fetch.rb in atomic-sidekiq-1.2.0
- old
+ new
@@ -23,25 +23,29 @@
def retrieve_work
collect_dead_jobs!
work = retrieve_op.perform(ordered_queues, expire_at)
return UnitOfWork.new(*work, in_flight_keymaker: keymaker) if work
+
sleep(poll_interval)
nil
end
private
- attr_reader :retrieve_op, :queues, :strictly_ordered_queues,
- :collection_interval, :poll_interval, :expiration_time,
+ attr_reader :retrieve_op, :queues, :ignored_queues,
+ :strictly_ordered_queues, :collection_interval,
+ :poll_interval, :expiration_time,
:keymaker
def configure_atomic_fetch(options)
@expiration_time = options[:expiration_time] || DEFAULT_EXPIRATION_TIME
@collection_interval = options[:collection_wait_time] ||
DEFAULT_COLLECTION_INTERVAL
@poll_interval = options[:poll_interval] || DEFAULT_POLL_INTERVAL
+ @ignored_queues = (options[:ignored_queues] || [])
+ .map { |q| "queue:#{q}" }
end
def ordered_queues
if strictly_ordered_queues
queues
@@ -50,11 +54,16 @@
end
end
def collect_dead_jobs!
return if @@next_collection > Time.now
+
@@next_collection = Time.now + collection_interval
- DeadJobCollector.collect!(ordered_queues, in_flight_keymaker: keymaker)
+ DeadJobCollector.collect!(
+ ordered_queues,
+ in_flight_keymaker: keymaker,
+ skip_recovery_queues: ignored_queues
+ )
end
def expire_at
Time.now.utc.to_i + expiration_time
end