Sha256: eb2a6fecb601fe60c23fd4b6ee775350c439d7f71e66f2f75fc124c033f3c37d
Contents?: true
Size: 1.85 KB
Versions: 4
Compression:
Stored size: 1.85 KB
Contents
module Searchkick class ReindexQueue attr_reader :name def initialize(name) @name = name raise Error, "Searchkick.redis not set" unless Searchkick.redis end # supports single and multiple ids def push(record_ids) Searchkick.with_redis { |r| r.call("LPUSH", redis_key, record_ids) } end def push_records(records) record_ids = records.map do |record| # always pass routing in case record is deleted # before the queue job runs if record.respond_to?(:search_routing) routing = record.search_routing end # escape pipe with double pipe value = escape(record.id.to_s) value = "#{value}|#{escape(routing)}" if routing value end push(record_ids) end # TODO use reliable queuing def reserve(limit: 1000) if supports_rpop_with_count? Searchkick.with_redis { |r| r.call("RPOP", redis_key, limit) }.to_a else record_ids = [] Searchkick.with_redis do |r| while record_ids.size < limit && (record_id = r.call("RPOP", redis_key)) record_ids << record_id end end record_ids end end def clear Searchkick.with_redis { |r| r.call("DEL", redis_key) } end def length Searchkick.with_redis { |r| r.call("LLEN", redis_key) } end private def redis_key "searchkick:reindex_queue:#{name}" end def supports_rpop_with_count? redis_version >= Gem::Version.new("6.2") end def redis_version @redis_version ||= Searchkick.with_redis do |r| info = r.call("INFO") matches = /redis_version:(\S+)/.match(info) Gem::Version.new(matches[1]) end end def escape(value) value.to_s.gsub("|", "||") end end end
Version data entries
4 entries across 4 versions & 1 rubygems