lib/sidekiq/dynamic_queues/attributes.rb in sidekiq-dynamic-queues-0.5.5 vs lib/sidekiq/dynamic_queues/attributes.rb in sidekiq-dynamic-queues-0.5.6
- old
+ new
@@ -56,9 +56,62 @@
queues = Sidekiq.redis {|r| r.hgetall(DYNAMIC_QUEUE_KEY) }
queues.each {|k, v| result[k] = json_decode(v) }
result[FALLBACK_KEY] ||= ['*']
return result
end
+
+ # Returns a list of queues to use when searching for a job.
+ #
+ # A splat ("*") means you want every queue (in alpha order) - this
+ # can be useful for dynamically adding new queues.
+ #
+ # The splat can also be used as a wildcard within a queue name,
+ # e.g. "*high*", and negation can be indicated with a prefix of "!"
+ #
+ # An @key can be used to dynamically look up the queue list for key from redis.
+ # If no key is supplied, it defaults to the worker's hostname, and wildcards
+ # and negations can be used inside this dynamic queue list. Set the queue
+ # list for a key with
+ # Sidekiq::DynamicQueues::Attributes.set_dynamic_queue(key, ["q1", "q2"]
+ #
+ def expand_queues(queues)
+ queue_names = queues.dup
+
+ real_queues = Sidekiq::Client.registered_queues
+ matched_queues = []
+
+ while q = queue_names.shift
+ q = q.to_s
+
+ if q =~ /^(!)?@(.*)/
+ key = $2.strip
+ key = hostname if key.size == 0
+
+ add_queues = get_dynamic_queue(key)
+ add_queues.map! { |q| q.gsub!(/^!/, '') || q.gsub!(/^/, '!') } if $1
+
+ queue_names.concat(add_queues)
+ next
+ end
+
+ if q =~ /^!/
+ negated = true
+ q = q[1..-1]
+ end
+
+ patstr = q.gsub(/\*/, ".*")
+ pattern = /^#{patstr}$/
+ if negated
+ matched_queues -= matched_queues.grep(pattern)
+ else
+ matches = real_queues.grep(/^#{pattern}$/)
+ matches = [q] if matches.size == 0 && q == patstr
+ matched_queues.concat(matches)
+ end
+ end
+
+ return matched_queues.collect { |q| "queue:#{q}" }.uniq.sort
+ end
end
end
end