require 'resque-lock-timeout' # A simple job class that processes a given index. # class ThinkingSphinx::Deltas::ResqueDelta::DeltaJob extend Resque::Plugins::LockTimeout @queue = :ts_delta @lock_timeout = 240 # Runs Sphinx's indexer tool to process the index. Currently assumes Sphinx # is running. # # @param [String] index the name of the Sphinx index # def self.perform(index) return if skip?(index) config = ThinkingSphinx::Configuration.instance # Delta Index output = `#{config.bin_path}#{config.indexer_binary_name} --config #{config.config_file} --rotate #{index}` puts output unless ThinkingSphinx.suppress_delta_output? # Flag As Deleted return unless ThinkingSphinx.sphinx_running? index = ThinkingSphinx::Deltas::ResqueDelta::IndexUtils.delta_to_core(index) # Get the document ids we've saved flag_as_deleted_ids = ThinkingSphinx::Deltas::ResqueDelta::FlagAsDeletedSet.processing_members(index) unless flag_as_deleted_ids.empty? # Filter out the ids that aren't present in sphinx flag_as_deleted_ids = filter_flag_as_deleted_ids(flag_as_deleted_ids, index) unless flag_as_deleted_ids.empty? # Each hash element should be of the form { id => [1] } flag_hash = Hash[*flag_as_deleted_ids.collect {|id| [id, [1]] }.flatten(1)] config.client.update(index, ['sphinx_deleted'], flag_hash) end end end # Try again later if lock is in use. def self.lock_failed(*args) Resque.enqueue(self, *args) end # Run only one DeltaJob at a time regardless of index. #def self.identifier(*args) #nil #end # This allows us to have a concurrency safe version of ts-delayed-delta's # duplicates_exist: # # http://github.com/freelancing-god/ts-delayed-delta/blob/master/lib/thinkin # g_sphinx/deltas/delayed_delta/job.rb#L47 # # The name of this method ensures that it runs within around_perform_lock. # # We've leveraged resque-lock-timeout to ensure that only one DeltaJob is # running at a time. Now, this around filter essentially ensures that only # one DeltaJob of each index type can sit at the queue at once. If the queue # has more than one, lrem will clear the rest off. # def self.around_perform_lock1(*args) # Remove all other instances of this job (with the same args) from the # queue. Uses LREM (http://code.google.com/p/redis/wiki/LremCommand) which # takes the form: "LREM key count value" and if count == 0 removes all # instances of value from the list. redis_job_value = Resque.encode(:class => self.to_s, :args => args) Resque.redis.lrem("queue:#{@queue}", 0, redis_job_value) # Grab the subset of flag as deleted document ids to work on core_index = ThinkingSphinx::Deltas::ResqueDelta::IndexUtils.delta_to_core(*args) ThinkingSphinx::Deltas::ResqueDelta::FlagAsDeletedSet.get_subset_for_processing(core_index) yield # Clear processing set ThinkingSphinx::Deltas::ResqueDelta::FlagAsDeletedSet.clear_processing(core_index) end protected def self.skip?(index) ThinkingSphinx::Deltas::ResqueDelta.locked?(index) end def self.filter_flag_as_deleted_ids(ids, index) #config = ThinkingSphinx::Configuration.instance #client = config.client #client.open search_results = [] partition_ids(ids, 4096) do |subset| search_results += ThinkingSphinx.search_for_ids( :with => {:@id => subset}, :index => index#, :client => client ).results[:matches].collect { |match| match[:doc] } #client.reset end #client.close search_results end def self.partition_ids(ids, n) if n > 0 && n < ids.size result = [] max_subarray_size = n - 1 i = j = 0 while i < ids.size && j < ids.size j = i + max_subarray_size result << ids.slice(i..j) i += n end else result = ids end if block_given? result.each do |ary| yield ary end end result end end