require 'riddle/client/filter'
require 'riddle/client/message'
require 'riddle/client/response'
module Riddle
class VersionError < StandardError; end
class ResponseError < StandardError; end
class OutOfBoundsError < StandardError; end
# This class was heavily based on the existing Client API by Dmytro Shteflyuk
# and Alexy Kovyrin. Their code worked fine, I just wanted something a bit
# more Ruby-ish (ie. lowercase and underscored method names). I also have
# used a few helper classes, just to neaten things up.
#
# Feel free to use it wherever. Send bug reports, patches, comments and
# suggestions to pat at freelancing-gods dot com.
#
# Most properties of the client are accessible through attribute accessors,
# and where relevant use symboles instead of the long constants common in
# other clients.
# Some examples:
#
# client.sort_mode = :extended
# client.sort_by = "birthday DESC"
# client.match_mode = :extended
#
# To add a filter, you will need to create a Filter object:
#
# client.filters << Riddle::Client::Filter.new("birthday",
# Time.at(1975, 1, 1).to_i..Time.at(1985, 1, 1).to_i, false)
#
class Client
Commands = {
:search => 0, # SEARCHD_COMMAND_SEARCH
:excerpt => 1, # SEARCHD_COMMAND_EXCERPT
:update => 2, # SEARCHD_COMMAND_UPDATE
:keywords => 3, # SEARCHD_COMMAND_KEYWORDS
:persist => 4, # SEARCHD_COMMAND_PERSIST
:status => 5, # SEARCHD_COMMAND_STATUS
:query => 6, # SEARCHD_COMMAND_QUERY
:flushattrs => 7 # SEARCHD_COMMAND_FLUSHATTRS
}
Versions = {
:search => 0x113, # VER_COMMAND_SEARCH
:excerpt => 0x100, # VER_COMMAND_EXCERPT
:update => 0x101, # VER_COMMAND_UPDATE
:keywords => 0x100, # VER_COMMAND_KEYWORDS
:status => 0x100, # VER_COMMAND_STATUS
:query => 0x100, # VER_COMMAND_QUERY
:flushattrs => 0x100 # VER_COMMAND_FLUSHATTRS
}
Statuses = {
:ok => 0, # SEARCHD_OK
:error => 1, # SEARCHD_ERROR
:retry => 2, # SEARCHD_RETRY
:warning => 3 # SEARCHD_WARNING
}
MatchModes = {
:all => 0, # SPH_MATCH_ALL
:any => 1, # SPH_MATCH_ANY
:phrase => 2, # SPH_MATCH_PHRASE
:boolean => 3, # SPH_MATCH_BOOLEAN
:extended => 4, # SPH_MATCH_EXTENDED
:fullscan => 5, # SPH_MATCH_FULLSCAN
:extended2 => 6 # SPH_MATCH_EXTENDED2
}
RankModes = {
:proximity_bm25 => 0, # SPH_RANK_PROXIMITY_BM25
:bm25 => 1, # SPH_RANK_BM25
:none => 2, # SPH_RANK_NONE
:wordcount => 3, # SPH_RANK_WORDCOUNT
:proximity => 4, # SPH_RANK_PROXIMITY
:match_any => 5, # SPH_RANK_MATCHANY
:fieldmask => 6, # SPH_RANK_FIELDMASK
:sph04 => 7, # SPH_RANK_SPH04
:total => 8 # SPH_RANK_TOTAL
}
SortModes = {
:relevance => 0, # SPH_SORT_RELEVANCE
:attr_desc => 1, # SPH_SORT_ATTR_DESC
:attr_asc => 2, # SPH_SORT_ATTR_ASC
:time_segments => 3, # SPH_SORT_TIME_SEGMENTS
:extended => 4, # SPH_SORT_EXTENDED
:expr => 5 # SPH_SORT_EXPR
}
AttributeTypes = {
:integer => 1, # SPH_ATTR_INTEGER
:timestamp => 2, # SPH_ATTR_TIMESTAMP
:ordinal => 3, # SPH_ATTR_ORDINAL
:bool => 4, # SPH_ATTR_BOOL
:float => 5, # SPH_ATTR_FLOAT
:bigint => 6, # SPH_ATTR_BIGINT
:string => 7, # SPH_ATTR_STRING
:multi => 0x40000000 # SPH_ATTR_MULTI
}
GroupFunctions = {
:day => 0, # SPH_GROUPBY_DAY
:week => 1, # SPH_GROUPBY_WEEK
:month => 2, # SPH_GROUPBY_MONTH
:year => 3, # SPH_GROUPBY_YEAR
:attr => 4, # SPH_GROUPBY_ATTR
:attrpair => 5 # SPH_GROUPBY_ATTRPAIR
}
FilterTypes = {
:values => 0, # SPH_FILTER_VALUES
:range => 1, # SPH_FILTER_RANGE
:float_range => 2 # SPH_FILTER_FLOATRANGE
}
attr_accessor :servers, :port, :offset, :limit, :max_matches,
:match_mode, :sort_mode, :sort_by, :weights, :id_range, :filters,
:group_by, :group_function, :group_clause, :group_distinct, :cut_off,
:retry_count, :retry_delay, :anchor, :index_weights, :rank_mode,
:max_query_time, :field_weights, :timeout, :overrides, :select,
:connection, :key
attr_reader :queue
@@connection = nil
def self.connection=(value)
Riddle.mutex.synchronize do
@@connection = value
end
end
def self.connection
@@connection
end
# Can instantiate with a specific server and port - otherwise it assumes
# defaults of localhost and 3312 respectively. All other settings can be
# accessed and changed via the attribute accessors.
def initialize(servers = nil, port = nil, key = nil)
Riddle.version_warning
servers = Array(servers || "localhost")
@servers = servers.respond_to?(:shuffle) ? servers.shuffle : servers.sort_by{ rand }
@port = port || 9312
@socket = nil
@key = key
reset
@queue = []
end
# Reset attributes and settings to defaults.
def reset
# defaults
@offset = 0
@limit = 20
@max_matches = 1000
@match_mode = :all
@sort_mode = :relevance
@sort_by = ''
@weights = []
@id_range = 0..0
@filters = []
@group_by = ''
@group_function = :day
@group_clause = '@group desc'
@group_distinct = ''
@cut_off = 0
@retry_count = 0
@retry_delay = 0
@anchor = {}
# string keys are index names, integer values are weightings
@index_weights = {}
@rank_mode = :proximity_bm25
@max_query_time = 0
# string keys are field names, integer values are weightings
@field_weights = {}
@timeout = 0
@overrides = {}
@select = "*"
end
# The searchd server to query. Servers are removed from @server after a
# Timeout::Error is hit to allow for fail-over.
def server
@servers.first
end
# Backwards compatible writer to the @servers array.
def server=(server)
@servers = server.to_a
end
# Set the geo-anchor point - with the names of the attributes that contain
# the latitude and longitude (in radians), and the reference position.
# Note that for geocoding to work properly, you must also set
# match_mode to :extended. To sort results by distance, you will
# need to set sort_by to '@geodist asc', and sort_mode to extended (as an
# example). Sphinx expects latitude and longitude to be returned from you
# SQL source in radians.
#
# Example:
# client.set_anchor('lat', -0.6591741, 'long', 2.530770)
#
def set_anchor(lat_attr, lat, long_attr, long)
@anchor = {
:latitude_attribute => lat_attr,
:latitude => lat,
:longitude_attribute => long_attr,
:longitude => long
}
end
# Append a query to the queue. This uses the same parameters as the query
# method.
def append_query(search, index = '*', comments = '')
@queue << query_message(search, index, comments)
end
# Run all the queries currently in the queue. This will return an array of
# results hashes.
def run
response = Response.new request(:search, @queue)
results = @queue.collect do
result = {
:matches => [],
:fields => [],
:attributes => {},
:attribute_names => [],
:words => {}
}
result[:status] = response.next_int
case result[:status]
when Statuses[:warning]
result[:warning] = response.next
when Statuses[:error]
result[:error] = response.next
next result
end
result[:fields] = response.next_array
attributes = response.next_int
for i in 0...attributes
attribute_name = response.next
type = response.next_int
result[:attributes][attribute_name] = type
result[:attribute_names] << attribute_name
end
result_attribute_names_and_types = result[:attribute_names].
inject([]) { |array, attr| array.push([ attr, result[:attributes][attr] ]) }
matches = response.next_int
is_64_bit = response.next_int
result[:matches] = (0...matches).map do |i|
doc = is_64_bit > 0 ? response.next_64bit_int : response.next_int
weight = response.next_int
current_match_attributes = {}
result_attribute_names_and_types.each do |attr, type|
current_match_attributes[attr] = attribute_from_type(type, response)
end
{:doc => doc, :weight => weight, :index => i, :attributes => current_match_attributes}
end
result[:total] = response.next_int.to_i || 0
result[:total_found] = response.next_int.to_i || 0
result[:time] = ('%.3f' % (response.next_int / 1000.0)).to_f || 0.0
words = response.next_int
for i in 0...words
word = response.next
docs = response.next_int
hits = response.next_int
result[:words][word] = {:docs => docs, :hits => hits}
end
result
end
@queue.clear
results
end
# Query the Sphinx daemon - defaulting to all indexes, but you can specify
# a specific one if you wish. The search parameter should be a string
# following Sphinx's expectations.
#
# The object returned from this method is a hash with the following keys:
#
# * :matches
# * :fields
# * :attributes
# * :attribute_names
# * :words
# * :total
# * :total_found
# * :time
# * :status
# * :warning (if appropriate)
# * :error (if appropriate)
#
# The key :matches returns an array of hashes - the actual search
# results. Each hash has the document id (:doc), the result
# weighting (:weight), and a hash of the attributes for the
# document (:attributes).
#
# The :fields and :attribute_names keys return list of
# fields and attributes for the documents. The key :attributes
# will return a hash of attribute name and type pairs, and :words
# returns a hash of hashes representing the words from the search, with the
# number of documents and hits for each, along the lines of:
#
# results[:words]["Pat"] #=> {:docs => 12, :hits => 15}
#
# :total, :total_found and :time return the
# number of matches available, the total number of matches (which may be
# greater than the maximum available, depending on the number of matches
# and your sphinx configuration), and the time in milliseconds that the
# query took to run.
#
# :status is the error code for the query - and if there was a
# related warning, it will be under the :warning key. Fatal errors
# will be described under :error.
#
def query(search, index = '*', comments = '')
@queue << query_message(search, index, comments)
self.run.first
end
# Build excerpts from search terms (the +words+) and the text of documents. Excerpts are bodies of text that have the +words+ highlighted.
# They may also be abbreviated to fit within a word limit.
#
# As part of the options hash, you will need to
# define:
# * :docs
# * :words
# * :index
#
# Optional settings include:
# * :before_match (defaults to )
# * :after_match (defaults to )
# * :chunk_separator (defaults to ' … ' - which is an HTML ellipsis)
# * :limit (defaults to 256)
# * :around (defaults to 5)
# * :exact_phrase (defaults to false)
# * :single_passage (defaults to false)
#
# The defaults differ from the official PHP client, as I've opted for
# semantic HTML markup.
#
# Example:
#
# client.excerpts(:docs => ["Pat Allan, Pat Cash"], :words => 'Pat', :index => 'pats')
# #=> ["Pat Allan, Pat Cash"]
#
# lorem_lipsum = "Lorem ipsum dolor..."
#
# client.excerpts(:docs => ["Pat Allan, #{lorem_lipsum} Pat Cash"], :words => 'Pat', :index => 'pats')
# #=> ["Pat Allan, Lorem ipsum dolor sit amet, consectetur adipisicing
# elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua … . Excepteur
# sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est
# laborum. Pat Cash"]
#
# Workflow:
#
# Excerpt creation is completely isolated from searching the index. The nominated index is only used to
# discover encoding and charset information.
#
# Therefore, the workflow goes:
#
# 1. Do the sphinx query.
# 2. Fetch the documents found by sphinx from their repositories.
# 3. Pass the documents' text to +excerpts+ for marking up of matched terms.
#
def excerpts(options = {})
options[:index] ||= '*'
options[:before_match] ||= ''
options[:after_match] ||= ''
options[:chunk_separator] ||= ' … ' # ellipsis
options[:limit] ||= 256
options[:limit_passages] ||= 0
options[:limit_words] ||= 0
options[:around] ||= 5
options[:exact_phrase] ||= false
options[:single_passage] ||= false
options[:query_mode] ||= false
options[:force_all_words] ||= false
options[:start_passage_id] ||= 1
options[:load_files] ||= false
options[:html_strip_mode] ||= 'index'
options[:allow_empty] ||= false
options[:passage_boundary] ||= 'none'
options[:emit_zones] ||= false
response = Response.new request(:excerpt, excerpts_message(options))
options[:docs].collect { response.next }
end
# Update attributes - first parameter is the relevant index, second is an
# array of attributes to be updated, and the third is a hash, where the
# keys are the document ids, and the values are arrays with the attribute
# values - in the same order as the second parameter.
#
# Example:
#
# client.update('people', ['birthday'], {1 => [Time.at(1982, 20, 8).to_i]})
#
def update(index, attributes, values_by_doc)
response = Response.new request(
:update,
update_message(index, attributes, values_by_doc)
)
response.next_int
end
# Generates a keyword list for a given query. Each keyword is represented
# by a hash, with keys :tokenised and :normalised. If return_hits is set to
# true it will also report on the number of hits and documents for each
# keyword (see :hits and :docs keys respectively).
def keywords(query, index, return_hits = false)
response = Response.new request(
:keywords,
keywords_message(query, index, return_hits)
)
(0...response.next_int).collect do
hash = {}
hash[:tokenised] = response.next
hash[:normalised] = response.next
if return_hits
hash[:docs] = response.next_int
hash[:hits] = response.next_int
end
hash
end
end
def status
response = Response.new request(
:status, Message.new
)
rows, cols = response.next_int, response.next_int
(0...rows).inject({}) do |hash, row|
hash[response.next.to_sym] = response.next
hash
end
end
def flush_attributes
response = Response.new request(
:flushattrs, Message.new
)
response.next_int
end
def add_override(attribute, type, values)
@overrides[attribute] = {:type => type, :values => values}
end
def open
open_socket
return if Versions[:search] < 0x116
@socket.send [
Commands[:persist], 0, 4, 1
].pack("nnNN"), 0
end
def close
close_socket
end
private
def open_socket
raise "Already Connected" unless @socket.nil?
if @timeout == 0
@socket = initialise_connection
else
begin
Timeout.timeout(@timeout) { @socket = initialise_connection }
rescue Timeout::Error, Riddle::ConnectionError => e
failed_servers ||= []
failed_servers << servers.shift
retry if !servers.empty?
case e
when Timeout::Error
raise Riddle::ConnectionError,
"Connection to #{failed_servers.inspect} on #{@port} timed out after #{@timeout} seconds"
else
raise e
end
end
end
true
end
def close_socket
raise "Not Connected" if @socket.nil?
@socket.close
@socket = nil
true
end
# If there's an active connection to the Sphinx daemon, this will yield the
# socket. If there's no active connection, then it will connect, yield the
# new socket, then close it.
def connect(&block)
if @socket.nil? || @socket.closed?
@socket = nil
open_socket
begin
yield @socket
ensure
close_socket
end
else
yield @socket
end
end
def initialise_connection
socket = initialise_socket
# Checking version
version = socket.recv(4).unpack('N*').first
if version < 1
socket.close
raise VersionError, "Can only connect to searchd version 1.0 or better, not version #{version}"
end
# Send version
socket.send [1].pack('N'), 0
socket
end
def initialise_socket
tries = 0
begin
socket = if self.connection
self.connection.call(self)
elsif self.class.connection
self.class.connection.call(self)
elsif server.index('/') == 0
UNIXSocket.new server
else
TCPSocket.new server, @port
end
rescue Errno::ECONNREFUSED => e
retry if (tries += 1) < 5
raise Riddle::ConnectionError,
"Connection to #{server} on #{@port} failed. #{e.message}"
end
socket
end
def request_header(command, length = 0)
length += key_message.length if key
core_header = case command
when :search
# Message length is +4/+8 to account for the following count value for
# the number of messages.
if Versions[command] >= 0x118
[Commands[command], Versions[command], 8 + length].pack("nnN")
else
[Commands[command], Versions[command], 4 + length].pack("nnN")
end
when :status
[Commands[command], Versions[command], 4, 1].pack("nnNN")
else
[Commands[command], Versions[command], length].pack("nnN")
end
key ? core_header + key_message : core_header
end
def key_message
@key_message ||= begin
message = Message.new
message.append_string key
message.to_s
end
end
# Send a collection of messages, for a command type (eg, search, excerpts,
# update), to the Sphinx daemon.
def request(command, messages)
response = ""
status = -1
version = 0
length = 0
message = Array(messages).join("")
if message.respond_to?(:force_encoding)
message = message.force_encoding('ASCII-8BIT')
end
connect do |socket|
case command
when :search
if Versions[command] >= 0x118
socket.send request_header(command, message.length) +
[0, messages.length].pack('NN') + message, 0
else
socket.send request_header(command, message.length) +
[messages.length].pack('N') + message, 0
end
when :status
socket.send request_header(command, message.length), 0
else
socket.send request_header(command, message.length) + message, 0
end
header = socket.recv(8)
status, version, length = header.unpack('n2N')
while response.length < (length || 0)
part = socket.recv(length - response.length)
response << part if part
end
end
if response.empty? || response.length != length
raise ResponseError, "No response from searchd (status: #{status}, version: #{version})"
end
case status
when Statuses[:ok]
if version < Versions[command]
puts format("searchd command v.%d.%d older than client (v.%d.%d)",
version >> 8, version & 0xff,
Versions[command] >> 8, Versions[command] & 0xff)
end
response
when Statuses[:warning]
length = response[0, 4].unpack('N*').first
puts response[4, length]
response[4 + length, response.length - 4 - length]
when Statuses[:error], Statuses[:retry]
message = response[4, response.length - 4]
klass = message[/out of bounds/] ? OutOfBoundsError : ResponseError
raise klass, "searchd error (status: #{status}): #{message}"
else
raise ResponseError, "Unknown searchd error (status: #{status})"
end
end
# Generation of the message to send to Sphinx for a search.
def query_message(search, index, comments = '')
message = Message.new
# Mode, Limits, Sort Mode
message.append_ints @offset, @limit, MatchModes[@match_mode],
RankModes[@rank_mode], SortModes[@sort_mode]
message.append_string @sort_by
# Query
message.append_string search
# Weights
message.append_int @weights.length
message.append_ints *@weights
# Index
message.append_string index
# ID Range
message.append_int 1
message.append_64bit_ints @id_range.first, @id_range.last
# Filters
message.append_int @filters.length
@filters.each { |filter| message.append filter.query_message }
# Grouping
message.append_int GroupFunctions[@group_function]
message.append_string @group_by
message.append_int @max_matches
message.append_string @group_clause
message.append_ints @cut_off, @retry_count, @retry_delay
message.append_string @group_distinct
# Anchor Point
if @anchor.empty?
message.append_int 0
else
message.append_int 1
message.append_string @anchor[:latitude_attribute]
message.append_string @anchor[:longitude_attribute]
message.append_floats @anchor[:latitude], @anchor[:longitude]
end
# Per Index Weights
message.append_int @index_weights.length
@index_weights.each do |key,val|
message.append_string key.to_s
message.append_int val
end
# Max Query Time
message.append_int @max_query_time
# Per Field Weights
message.append_int @field_weights.length
@field_weights.each do |key,val|
message.append_string key.to_s
message.append_int val
end
message.append_string comments
return message.to_s if Versions[:search] < 0x116
# Overrides
message.append_int @overrides.length
@overrides.each do |key,val|
message.append_string key.to_s
message.append_int AttributeTypes[val[:type]]
message.append_int val[:values].length
val[:values].each do |id,map|
message.append_64bit_int id
method = case val[:type]
when :float
:append_float
when :bigint
:append_64bit_int
else
:append_int
end
message.send method, map
end
end
message.append_string @select
message.to_s
end
# Generation of the message to send to Sphinx for an excerpts request.
def excerpts_message(options)
message = Message.new
message.append [0, excerpt_flags(options)].pack('N2') # 0 = mode
message.append_string options[:index]
message.append_string options[:words]
# options
message.append_string options[:before_match]
message.append_string options[:after_match]
message.append_string options[:chunk_separator]
message.append_ints options[:limit], options[:around]
message.append_array options[:docs]
message.to_s
end
# Generation of the message to send to Sphinx to update attributes of a
# document.
def update_message(index, attributes, values_by_doc)
message = Message.new
message.append_string index
message.append_array attributes
message.append_int values_by_doc.length
values_by_doc.each do |key,values|
message.append_64bit_int key # document ID
message.append_ints *values # array of new values (integers)
end
message.to_s
end
# Generates the simple message to send to the daemon for a keywords request.
def keywords_message(query, index, return_hits)
message = Message.new
message.append_string query
message.append_string index
message.append_int return_hits ? 1 : 0
message.to_s
end
AttributeHandlers = {
AttributeTypes[:integer] => :next_int,
AttributeTypes[:timestamp] => :next_int,
AttributeTypes[:ordinal] => :next_int,
AttributeTypes[:bool] => :next_int,
AttributeTypes[:float] => :next_float,
AttributeTypes[:bigint] => :next_64bit_int,
AttributeTypes[:string] => :next,
AttributeTypes[:multi] + AttributeTypes[:integer] => :next_int_array,
AttributeTypes[:multi] + AttributeTypes[:timestamp] => :next_int_array,
AttributeTypes[:multi] + AttributeTypes[:ordinal] => :next_int_array,
AttributeTypes[:multi] + AttributeTypes[:bool] => :next_int_array,
AttributeTypes[:multi] + AttributeTypes[:float] => :next_float_array,
AttributeTypes[:multi] + AttributeTypes[:bigint] => :next_64bit_int_array,
AttributeTypes[:multi] + AttributeTypes[:string] => :next_array
}
def attribute_from_type(type, response)
handler = AttributeHandlers[type]
response.send handler
end
def excerpt_flags(options)
flags = 1
flags |= 2 if options[:exact_phrase]
flags |= 4 if options[:single_passage]
flags |= 8 if options[:use_boundaries]
flags |= 16 if options[:weight_order]
flags |= 32 if options[:query_mode]
flags |= 64 if options[:force_all_words]
flags |= 128 if options[:load_files]
flags |= 256 if options[:allow_empty]
flags |= 512 if options[:emit_zones]
flags
end
end
end