lib/sup/xapian_index.rb in sup-0.9.1 vs lib/sup/xapian_index.rb in sup-0.10

- old
+ new

@@ -1,5 +1,7 @@ +ENV["XAPIAN_FLUSH_THRESHOLD"] = "1000" + require 'xapian' require 'set' module Redwood @@ -40,18 +42,18 @@ end else @xapian = Xapian::WritableDatabase.new(path, Xapian::DB_CREATE) @xapian.set_metadata 'version', INDEX_VERSION end - @term_generator = Xapian::TermGenerator.new() - @term_generator.stemmer = Xapian::Stem.new(STEM_LANGUAGE) @enquire = Xapian::Enquire.new @xapian @enquire.weighting_scheme = Xapian::BoolWeight.new @enquire.docid_order = Xapian::Enquire::ASCENDING end def save_index + info "Flushing Xapian updates to disk. This may take a while..." + @xapian.flush end def optimize end @@ -89,46 +91,14 @@ m.load_from_index! entry m end - def add_message m; sync_message m end - def update_message m; sync_message m end - def update_message_state m; sync_message m end + def add_message m; sync_message m, true end + def update_message m; sync_message m, true end + def update_message_state m; sync_message m, false end - def sync_message m, opts={} - entry = synchronize { get_entry m.id } - snippet = m.snippet - entry ||= {} - labels = m.labels - entry = {} if opts[:force_overwrite] - - d = { - :message_id => m.id, - :source_id => m.source.id, - :source_info => m.source_info, - :date => (entry[:date] || m.date), - :snippet => snippet, - :labels => labels, - :from => (entry[:from] || [m.from.email, m.from.name]), - :to => (entry[:to] || m.to.map { |p| [p.email, p.name] }), - :cc => (entry[:cc] || m.cc.map { |p| [p.email, p.name] }), - :bcc => (entry[:bcc] || m.bcc.map { |p| [p.email, p.name] }), - :subject => m.subj, - :refs => (entry[:refs] || m.refs), - :replytos => (entry[:replytos] || m.replytos), - } - - labels.each { |l| LabelManager << l } - - synchronize do - index_message m, d, opts - end - true - end - private :sync_message - def num_results_for query={} xapian_query = build_xapian_query query matchset = run_query xapian_query, 0, 0, 100 matchset.matches_estimated end @@ -151,20 +121,19 @@ each_id(query) { |id| yield id, lambda { build_message id } } end def each_message_in_thread_for m, opts={} # TODO thread by subject - # TODO handle killed threads return unless doc = find_doc(m.id) queue = doc.value(THREAD_VALUENO).split(',') msgids = [m.id] seen_threads = Set.new seen_messages = Set.new [m.id] while not queue.empty? thread_id = queue.pop next if seen_threads.member? thread_id - return false if thread_killed? thread_id + return false if opts[:skip_killed] && thread_killed?(thread_id) seen_threads << thread_id docs = term_docids(mkterm(:thread, thread_id)).map { |x| @xapian.document x } docs.each do |doc| msgid = doc.value MSGID_VALUENO next if seen_messages.member? msgid @@ -192,18 +161,19 @@ def parse_query s query = {} subs = HookManager.run("custom-search", :subs => s) || s subs = subs.gsub(/\b(to|from):(\S+)\b/) do - field, name = $1, $2 - if(p = ContactManager.contact_for(name)) - [field, p.email] - elsif name == "me" - [field, "(" + AccountManager.user_emails.join("||") + ")"] + field, value = $1, $2 + email_field, name_field = %w(email name).map { |x| "#{field}_#{x}" } + if(p = ContactManager.contact_for(value)) + "#{email_field}:#{p.email}" + elsif value == "me" + '(' + AccountManager.user_emails.map { |e| "#{email_field}:#{e}" }.join(' OR ') + ')' else - [field, name] - end.join(":") + "(#{email_field}:#{value} OR #{name_field}:#{value})" + end end ## if we see a label:deleted or a label:spam term anywhere in the query ## string, we set the extra load_spam or load_deleted options to true. ## bizarre? well, because the query allows arbitrary parenthesized boolean @@ -283,20 +253,24 @@ else raise ParseError, "non-numeric limit #{lim.inspect}" end end + debug "translated query: #{subs.inspect}" + qp = Xapian::QueryParser.new qp.database = @xapian qp.stemmer = Xapian::Stem.new(STEM_LANGUAGE) qp.stemming_strategy = Xapian::QueryParser::STEM_SOME qp.default_op = Xapian::Query::OP_AND qp.add_valuerangeprocessor(Xapian::NumberValueRangeProcessor.new(DATE_VALUENO, 'date:', true)) NORMAL_PREFIX.each { |k,v| qp.add_prefix k, v } BOOLEAN_PREFIX.each { |k,v| qp.add_boolean_prefix k, v } xapian_query = qp.parse_query(subs, Xapian::QueryParser::FLAG_PHRASE|Xapian::QueryParser::FLAG_BOOLEAN|Xapian::QueryParser::FLAG_LOVEHATE|Xapian::QueryParser::FLAG_WILDCARD, PREFIX['body']) + debug "parsed xapian query: #{xapian_query.description}" + raise ParseError if xapian_query.nil? or xapian_query.empty? query[:qobj] = xapian_query query[:text] = s query end @@ -436,104 +410,144 @@ else Q.new(Q::OP_AND_NOT, [pos_query, neg_query]) end end - def index_message m, entry, opts - terms = [] - text = [] + def sync_message m, overwrite + doc = synchronize { find_doc(m.id) } + existed = doc != nil + doc ||= Xapian::Document.new + do_index_static = overwrite || !existed + old_entry = !do_index_static && doc.entry + snippet = do_index_static ? m.snippet : old_entry[:snippet] - subject_text = m.indexable_subject - body_text = m.indexable_body + entry = { + :message_id => m.id, + :source_id => m.source.id, + :source_info => m.source_info, + :date => m.date, + :snippet => snippet, + :labels => m.labels.to_a, + :from => [m.from.email, m.from.name], + :to => m.to.map { |p| [p.email, p.name] }, + :cc => m.cc.map { |p| [p.email, p.name] }, + :bcc => m.bcc.map { |p| [p.email, p.name] }, + :subject => m.subj, + :refs => m.refs.to_a, + :replytos => m.replytos.to_a, + } + if do_index_static + doc.clear_terms + doc.clear_values + index_message_static m, doc, entry + end + + index_message_threading doc, entry, old_entry + index_message_labels doc, entry[:labels], (do_index_static ? [] : old_entry[:labels]) + doc.entry = entry + + synchronize do + unless docid = existed ? doc.docid : assign_docid(m, truncate_date(m.date)) + # Could be triggered by spam + warn "docid underflow, dropping #{m.id.inspect}" + return + end + @xapian.replace_document docid, doc + end + + m.labels.each { |l| LabelManager << l } + true + end + + ## Index content that can't be changed by the user + def index_message_static m, doc, entry # Person names are indexed with several prefixes person_termer = lambda do |d| lambda do |p| ["#{d}_name", "name", "body"].each do |x| - text << [p.name, PREFIX[x]] + doc.index_text p.name, PREFIX[x] end if p.name - [d, :any].each { |x| terms << mkterm(:email, x, p.email) } + [d, :any].each { |x| doc.add_term mkterm(:email, x, p.email) } end end person_termer[:from][m.from] if m.from (m.to+m.cc+m.bcc).each(&(person_termer[:to])) - terms << mkterm(:date,m.date) if m.date - m.labels.each { |t| terms << mkterm(:label,t) } - terms << mkterm(:type, 'mail') - terms << mkterm(:msgid, m.id) - terms << mkterm(:source_id, m.source.id) + # Full text search content + subject_text = m.indexable_subject + body_text = m.indexable_body + doc.index_text subject_text, PREFIX['subject'] + doc.index_text subject_text, PREFIX['body'] + doc.index_text body_text, PREFIX['body'] + m.attachments.each { |a| doc.index_text a, PREFIX['attachment'] } + + # Miscellaneous terms + doc.add_term mkterm(:date, m.date) if m.date + doc.add_term mkterm(:type, 'mail') + doc.add_term mkterm(:msgid, m.id) + doc.add_term mkterm(:source_id, m.source.id) m.attachments.each do |a| a =~ /\.(\w+)$/ or next - t = mkterm(:attachment_extension, $1) - terms << t + doc.add_term mkterm(:attachment_extension, $1) end - ## Thread membership - children = term_docids(mkterm(:ref, m.id)).map { |docid| @xapian.document docid } - parent_ids = m.refs + m.replytos + # Date value for range queries + date_value = begin + Xapian.sortable_serialise m.date.to_i + rescue TypeError + Xapian.sortable_serialise 0 + end + + doc.add_value MSGID_VALUENO, m.id + doc.add_value DATE_VALUENO, date_value + end + + def index_message_labels doc, new_labels, old_labels + return if new_labels == old_labels + added = new_labels.to_a - old_labels.to_a + removed = old_labels.to_a - new_labels.to_a + added.each { |t| doc.add_term mkterm(:label,t) } + removed.each { |t| doc.remove_term mkterm(:label,t) } + end + + ## Assign a set of thread ids to the document. This is a hybrid of the runtime + ## search done by the Ferret index and the index-time union done by previous + ## versions of the Xapian index. We first find the thread ids of all messages + ## with a reference to or from us. If that set is empty, we use our own + ## message id. Otherwise, we use all the thread ids we previously found. In + ## the common case there's only one member in that set, but if we're the + ## missing link between multiple previously unrelated threads we can have + ## more. XapianIndex#each_message_in_thread_for follows the thread ids when + ## searching so the user sees a single unified thread. + def index_message_threading doc, entry, old_entry + return if old_entry && (entry[:refs] == old_entry[:refs]) && (entry[:replytos] == old_entry[:replytos]) + children = term_docids(mkterm(:ref, entry[:message_id])).map { |docid| @xapian.document docid } + parent_ids = entry[:refs] + entry[:replytos] parents = parent_ids.map { |id| find_doc id }.compact thread_members = SavingHash.new { [] } (children + parents).each do |doc2| thread_ids = doc2.value(THREAD_VALUENO).split ',' thread_ids.each { |thread_id| thread_members[thread_id] << doc2 } end + thread_ids = thread_members.empty? ? [entry[:message_id]] : thread_members.keys + thread_ids.each { |thread_id| doc.add_term mkterm(:thread, thread_id) } + parent_ids.each { |ref| doc.add_term mkterm(:ref, ref) } + doc.add_value THREAD_VALUENO, (thread_ids * ',') + end - thread_ids = thread_members.empty? ? [m.id] : thread_members.keys - - thread_ids.each { |thread_id| terms << mkterm(:thread, thread_id) } - parent_ids.each do |ref| - terms << mkterm(:ref, ref) - end - - # Full text search content - text << [subject_text, PREFIX['subject']] - text << [subject_text, PREFIX['body']] - text << [body_text, PREFIX['body']] - m.attachments.each { |a| text << [a, PREFIX['attachment']] } - - truncated_date = if m.date < MIN_DATE - debug "warning: adjusting too-low date #{m.date} for indexing" + def truncate_date date + if date < MIN_DATE + debug "warning: adjusting too-low date #{date} for indexing" MIN_DATE - elsif m.date > MAX_DATE - debug "warning: adjusting too-high date #{m.date} for indexing" + elsif date > MAX_DATE + debug "warning: adjusting too-high date #{date} for indexing" MAX_DATE else - m.date + date end - - # Date value for range queries - date_value = begin - Xapian.sortable_serialise truncated_date.to_i - rescue TypeError - Xapian.sortable_serialise 0 - end - - docid = nil - unless doc = find_doc(m.id) - doc = Xapian::Document.new - if not docid = assign_docid(m, truncated_date) - # Could be triggered by spam - Redwood::log "warning: docid underflow, dropping #{m.id.inspect}" - return - end - else - doc.clear_terms - doc.clear_values - docid = doc.docid - end - - @term_generator.document = doc - text.each { |text,prefix| @term_generator.index_text text, 1, prefix } - terms.each { |term| doc.add_term term if term.length <= MAX_TERM_LENGTH } - doc.add_value MSGID_VALUENO, m.id - doc.add_value THREAD_VALUENO, (thread_ids * ',') - doc.add_value DATE_VALUENO, date_value - doc.data = Marshal.dump entry - - @xapian.replace_document docid, doc end # Construct a Xapian term def mkterm type, *args case type @@ -558,9 +572,34 @@ PREFIX[type.to_s] + args[0][0...(MAX_TERM_LENGTH-1)] else raise "Invalid term type #{type}" end end +end end +class Xapian::Document + def entry + Marshal.load data + end + + def entry=(x) + self.data = Marshal.dump x + end + + def index_text text, prefix, weight=1 + term_generator = Xapian::TermGenerator.new + term_generator.stemmer = Xapian::Stem.new(Redwood::XapianIndex::STEM_LANGUAGE) + term_generator.document = self + term_generator.index_text text, weight, prefix + end + + alias old_add_term add_term + def add_term term + if term.length <= Redwood::XapianIndex::MAX_TERM_LENGTH + old_add_term term, 0 + else + warn "dropping excessively long term #{term}" + end + end end