module SearchFlip
# The SearchFlip::Criteria class serves the purpose of chaining various
# filtering and aggregation methods. Each chainable method creates a new
# criteria object until a method is called that finally sends the respective
# request to ElasticSearch and returns the result.
#
# @example
# CommentIndex.where(public: true).sort(id: "desc").limit(1_000).records
# CommentIndex.range(:created_at, lt: Time.parse("2014-01-01").delete
# CommentIndex.search("hello world").total_entries
# CommentIndex.query(more_like_this: { "...", fields: ["description"] })]
# CommentIndex.exists(:user_id).paginate(page: 1, per_page: 100)
# CommentIndex.sort("_doc").find_each { |comment| "..." }
class Criteria
include SearchFlip::Filterable
include SearchFlip::PostFilterable
include SearchFlip::Aggregatable
extend Forwardable
attr_accessor :target, :profile_value, :source_value, :sort_values, :highlight_values, :suggest_values, :offset_value, :limit_value,
:includes_values, :eager_load_values, :preload_values, :failsafe_value, :scroll_args, :custom_value, :terminate_after_value, :timeout_value
# Creates a new criteria while merging the attributes (constraints,
# settings, etc) of the current criteria with the attributes of another one
# passed as argument. For multi-value contstraints the resulting criteria
# will include constraints of both criterias. For single-value constraints,
# the values of the criteria passed as an argument are used.
#
# @example
# CommentIndex.where(approved: true).merge(CommentIndex.range(:created_at, gt: Time.parse("2015-01-01")))
# CommentIndex.aggregate(:user_id).merge(CommentIndex.where(admin: true))
#
# @return [SearchFlip::Criteria] A newly created extended criteria
def merge(other)
other = other.criteria
fresh.tap do |criteria|
criteria.profile_value = other.profile_value unless other.profile_value.nil?
criteria.source_value = (criteria.source_value || []) + other.source_value if other.source_value
criteria.sort_values = (criteria.sort_values || []) + other.sort_values if other.sort_values
criteria.highlight_values = (criteria.highlight_values || {}).merge(other.highlight_values) if other.highlight_values
criteria.suggest_values = (criteria.suggest_values || {}).merge(other.suggest_values) if other.suggest_values
criteria.offset_value = other.offset_value if other.offset_value
criteria.limit_value = other.limit_value if other.limit_value
criteria.includes_values = (criteria.includes_values || []) + other.includes_values if other.includes_values
criteria.preload_values = (criteria.preload_values || []) + other.preload_values if other.preload_values
criteria.eager_load_values = (criteria.eager_load_values || []) + other.eager_load_values if other.eager_load_values
criteria.failsafe_value = other.failsafe_value unless other.failsafe_value.nil?
criteria.scroll_args = other.scroll_args if other.scroll_args
criteria.custom_value = (criteria.custom_value || {}).merge(other.custom_value) if other.custom_value
criteria.search_values = (criteria.search_values || []) + other.search_values if other.search_values
criteria.must_values = (criteria.must_values || []) + other.must_values if other.must_values
criteria.must_not_values = (criteria.must_not_values || []) + other.must_not_values if other.must_not_values
criteria.should_values = (criteria.should_values || []) + other.should_values if other.should_values
criteria.filter_values = (criteria.filter_values || []) + other.filter_values if other.filter_values
criteria.post_search_values = (criteria.post_search_values || []) + other.post_search_values if other.post_search_values
criteria.post_must_values = (criteria.post_must_values || []) + other.post_must_values if other.post_must_values
criteria.post_must_not_values = (criteria.post_must_not_values || []) + other.post_must_not_values if other.post_must_not_values
criteria.post_should_values = (criteria.post_should_values || []) + other.post_should_values if other.post_should_values
criteria.post_filter_values = (criteria.post_filter_vales || []) + other.post_filter_values if other.post_filter_values
criteria.aggregation_values = (criteria.aggregation_values || {}).merge(other.aggregation_values) if other.aggregation_values
criteria.terminate_after_value = other.terminate_after_value unless other.terminate_after_value.nil?
criteria.timeout_value = other.timeout_value unless other.timeout_value.nil?
end
end
# Specifies a query timeout, such that the processing will be stopped after
# that timeout and only the results calculated up to that point will be
# processed and returned.
#
# @example
# ProductIndex.timeout("3s").search("hello world")
#
# @param value [String] The timeout value
#
# @return [SearchFlip::Criteria] A newly created extended criteria
def timeout(value)
fresh.tap do |criteria|
criteria.timeout_value = value
end
end
# Specifies early query termination, such that the processing will be
# stopped after the specified number of results has been accumulated.
#
# @example
# ProductIndex.terminate_after(10_000).search("hello world")
#
# @param value [Fixnum] The number of records to terminate after
#
# @return [SearchFlip::Criteria] A newly created extended criteria
def terminate_after(value)
fresh.tap do |criteria|
criteria.terminate_after_value = value
end
end
# Creates a new criteria while removing all specified scopes. Currently,
# you can unscope :search, :post_search, :sort, :highlight, :suggest, :custom
# and :aggregate.
#
# @example
# CommentIndex.search("hello world").aggregate(:username).unscope(:search, :aggregate)
#
# @param scopes [Symbol] All scopes that you want to remove
#
# @return [SearchFlip::Criteria] A newly created extended criteria
def unscope(*scopes)
unknown = scopes - [:search, :post_search, :sort, :highlight, :suggest, :custom, :aggregate]
raise(ArgumentError, "Can't unscope #{unknown.join(", ")}") if unknown.size > 0
scopes = scopes.to_set
fresh.tap do |criteria|
criteria.search_values = nil if scopes.include?(:search)
criteria.post_search_values = nil if scopes.include?(:search)
criteria.sort_values = nil if scopes.include?(:sort)
criteria.hightlight_values = nil if scopes.include?(:highlight)
criteria.suggest_values = nil if scopes.include?(:suggest)
criteria.custom_values = nil if scopes.include?(:custom)
criteria.aggregation_values = nil if scopes.include?(:aggregate)
end
end
# @api private
#
# Convenience method to have a unified conversion api.
#
# @return [SearchFlip::Criteria] Simply returns self
def criteria
self
end
# Creates a new SearchFlip::Criteria.
#
# @param attributes [Hash] Attributes to initialize the Criteria with
def initialize(attributes = {})
attributes.each do |key, value|
send "#{key}=", value
end
end
# Generates the request object from the attributes specified via chaining,
# like eg offset, limit, query, filters, aggregations, etc and returns a
# Hash that later gets serialized as JSON.
#
# @return [Hash] The generated request object
def request
res = {}
if must_values || search_values || must_not_values || should_values || filter_values
if target.connection.version.to_i >= 2
res[:query] = {
bool: {}
.merge(must_values || search_values ? { must: (must_values || []) + (search_values || []) } : {})
.merge(must_not_values ? { must_not: must_not_values } : {})
.merge(should_values ? { should: should_values } : {})
.merge(filter_values ? { filter: filter_values } : {})
}
else
filters = (filter_values || []) + (must_not_values || []).map { |must_not_value| { not: must_not_value } }
queries = {}
.merge(must_values || search_values ? { must: (must_values || []) + (search_values || []) } : {})
.merge(should_values ? { should: should_values } : {})
res[:query] =
if filters.size > 0
{
filtered: {}
.merge(queries.size > 0 ? { query: { bool: queries } } : {})
.merge(filter: filters.size > 1 ? { and: filters } : filters.first)
}
else
{ bool: queries }
end
end
end
res.update from: offset_value_with_default, size: limit_value_with_default
res[:timeout] = timeout_value if timeout_value
res[:terminate_after] = terminate_after_value if terminate_after_value
res[:highlight] = highlight_values if highlight_values
res[:suggest] = suggest_values if suggest_values
res[:sort] = sort_values if sort_values
res[:aggregations] = aggregation_values if aggregation_values
if post_must_values || post_search_values || post_must_not_values || post_should_values || post_filter_values
if target.connection.version.to_i >= 2
res[:post_filter] = {
bool: {}
.merge(post_must_values || post_search_values ? { must: (post_must_values || []) + (post_search_values || []) } : {})
.merge(post_must_not_values ? { must_not: post_must_not_values } : {})
.merge(post_should_values ? { should: post_should_values } : {})
.merge(post_filter_values ? { filter: post_filter_values } : {})
}
else
post_filters = (post_filter_values || []) + (post_must_not_values || []).map { |post_must_not_value| { not: post_must_not_value } }
post_queries = {}
.merge(post_must_values || post_search_values ? { must: (post_must_values || []) + (post_search_values || []) } : {})
.merge(post_should_values ? { should: post_should_values } : {})
post_filters_and_queries = post_filters + (post_queries.size > 0 ? [bool: post_queries] : [])
res[:post_filter] = post_filters_and_queries.size > 1 ? { and: post_filters_and_queries } : post_filters_and_queries.first
end
end
res[:_source] = source_value unless source_value.nil?
res[:profile] = true if profile_value
res.update(custom_value) if custom_value
res
end
# Adds highlighting of the given fields to the request.
#
# @example
# CommentIndex.highlight([:title, :message])
# CommentIndex.highlight(:title).highlight(:description)
# CommentIndex.highlight(:title, require_field_match: false)
# CommentIndex.highlight(title: { type: "fvh" })
#
# @example
# query = CommentIndex.highlight(:title).search("hello")
# query.results[0].highlight.title # => "hello world"
#
# @param fields [Hash, Array, String, Symbol] The fields to highligt.
# Supports raw ElasticSearch values by passing a Hash.
#
# @param options [Hash] Extra highlighting options. Check out the ElasticSearch
# docs for further details.
#
# @return [SearchFlip::Criteria] A new criteria including the highlighting
def highlight(fields, options = {})
fresh.tap do |criteria|
criteria.highlight_values = (criteria.highlight_values || {}).merge(options)
hash =
if fields.is_a?(Hash)
fields
elsif fields.is_a?(Array)
fields.each_with_object({}) { |field, h| h[field] = {} }
else
{ fields => {} }
end
criteria.highlight_values[:fields] = (criteria.highlight_values[:fields] || {}).merge(hash)
end
end
# Adds a suggestion section with the given name to the request.
#
# @example
# query = CommentIndex.suggest(:suggestion, text: "helo", term: { field: "message" })
# query.suggestions(:suggestion).first["text"] # => "hello"
#
# @param name [String, Symbol] The name of the suggestion section
#
# @param options [Hash] Additional suggestion options. Check out the ElasticSearch
# docs for further details.
#
# @return [SearchFlip::Criteria] A new criteria including the suggestion section
def suggest(name, options = {})
fresh.tap do |criteria|
criteria.suggest_values = (criteria.suggest_values || {}).merge(name => options)
end
end
# Sets whether or not query profiling should be enabled.
#
# @example
# query = CommentIndex.profile(true)
# query.raw_response["profile"] # => { "shards" => ... }
#
# @param value [Boolean] Whether query profiling should be enabled or not
#
# @return [SearchFlip::Criteria] A newly created extended criteria
def profile(value)
fresh.tap do |criteria|
criteria.profile_value = value
end
end
# Adds scrolling to the request with or without an already existing scroll
# id and using the specified timeout.
#
# @example
# query = CommentIndex.scroll(timeout: "5m")
#
# until query.records.empty?
# # ...
#
# query = query.scroll(id: query.scroll_id, timeout: "5m")
# end
#
# @param id [String, nil] The scroll id of the last request returned by
# SearchFlip or nil
#
# @param timeout [String] The timeout of the scroll request, ie. how long
# SearchFlip should keep the scroll handle open
#
# @return [SearchFlip::Criteria] A newly created extended criteria
def scroll(id: nil, timeout: "1m")
fresh.tap do |criteria|
criteria.scroll_args = { id: id, timeout: timeout }
end
end
# Sends a delete by query request to ElasticSearch, such that all documents
# matching the query get deleted. Please note, for certain ElasticSearch
# versions you need to install the delete-by-query plugin to get support
# for this feature. Refreshes the index if the auto_refresh is enabled.
# Raises SearchFlip::ResponseError in case any errors occur.
#
# @see SearchFlip::Config See SearchFlip::Config for auto_refresh
#
# @example
# CommentIndex.range(lt: Time.parse("2014-01-01")).delete
# CommentIndex.where(public: false).delete
def delete
dupped_request = request.dup
dupped_request.delete(:from)
dupped_request.delete(:size)
if target.connection.version.to_i >= 5
SearchFlip::HTTPClient.post("#{target.type_url}/_delete_by_query", json: dupped_request)
else
SearchFlip::HTTPClient.delete("#{target.type_url}/_query", json: dupped_request)
end
target.refresh if SearchFlip::Config[:auto_refresh]
true
end
# Use to specify which fields of the source document you want ElasticSearch
# to return for each matching result.
#
# @example
# CommentIndex.source([:id, :message]).search("hello world")
#
# @param value [Array] Array listing the field names of the source document
#
# @return [SearchFlip::Criteria] A newly created extended criteria
def source(value)
fresh.tap do |criteria|
criteria.source_value = value
end
end
# Specify associations of the target model you want to include via
# ActiveRecord's or other ORM's mechanisms when records get fetched from
# the database.
#
# @example
# CommentIndex.includes(:user, :post).records
# PostIndex.includes(:comments => :user).records
#
# @param args The args that get passed to the includes method of
# ActiveRecord or other ORMs
#
# @return [SearchFlip::Criteria] A newly created extended criteria
def includes(*args)
fresh.tap do |criteria|
criteria.includes_values = (includes_values || []) + args
end
end
# Specify associations of the target model you want to eager load via
# ActiveRecord's or other ORM's mechanisms when records get fetched from
# the database.
#
# @example
# CommentIndex.eager_load(:user, :post).records
# PostIndex.eager_load(:comments => :user).records
#
# @param args The args that get passed to the eager load method of
# ActiveRecord or other ORMs
#
# @return [SearchFlip::Criteria] A newly created extended criteria
def eager_load(*args)
fresh.tap do |criteria|
criteria.eager_load_values = (eager_load_values || []) + args
end
end
# Specify associations of the target model you want to preload via
# ActiveRecord's or other ORM's mechanisms when records get fetched from
# the database.
#
# @example
# CommentIndex.preload(:user, :post).records
# PostIndex.includes(:comments => :user).records
#
# @param args The args that get passed to the preload method of
# ActiveRecord or other ORMs
#
# @return [SearchFlip::Criteria] A newly created extended criteria
def preload(*args)
fresh.tap do |criteria|
criteria.preload_values = (preload_values || []) + args
end
end
# Specify the sort order you want ElasticSearch to use for sorting the
# results. When you call this multiple times, the sort orders are appended
# to the already existing ones. The sort arguments get passed to
# ElasticSearch without modifications, such that you can use sort by
# script, etc here as well.
#
# @example Default usage
# CommentIndex.sort(:user_id, :id)
#
# # Same as
#
# CommentIndex.sort(:user_id).sort(:id)
#
# @example Default hash usage
# CommentIndex.sort(user_id: "asc").sort(id: "desc")
#
# # Same as
#
# CommentIndex.sort({ user_id: "asc" }, { id: "desc" })
#
# @example Sort by native script
# CommentIndex.sort("_script" => "sort_script", lang: "native", order: "asc", type: "number")
#
# @param args The sort values that get passed to ElasticSearch
#
# @return [SearchFlip::Criteria] A newly created extended criteria
def sort(*args)
fresh.tap do |criteria|
criteria.sort_values = (sort_values || []) + args
end
end
alias_method :order, :sort
# Specify the sort order you want ElasticSearch to use for sorting the
# results with already existing sort orders being removed.
#
# @example
# CommentIndex.sort(user_id: "asc").resort(id: "desc")
#
# # Same as
#
# CommentIndex.sort(id: "desc")
#
# @return [SearchFlip::Criteria] A newly created extended criteria
#
# @see #sort See #sort for more details
def resort(*args)
fresh.tap do |criteria|
criteria.sort_values = args
end
end
alias_method :reorder, :resort
# Adds a fully custom field/section to the request, such that upcoming or
# minor ElasticSearch features as well as other custom requirements can be
# used without having yet specialized criteria methods.
#
# @note Use with caution, because using #custom will potentiall override
# other sections like +aggregations+, +query+, +sort+, etc if you use the
# the same section names.
#
# @example
# CommentIndex.custom(section: { argument: "value" }).request
# => {:section=>{:argument=>"value"},...}
#
# @param hash [Hash] The custom section that is added to the request
#
# @return [SearchFlip::Criteria] A newly created extended criteria
def custom(hash)
fresh.tap do |criteria|
criteria.custom_value = (custom_value || {}).merge(hash)
end
end
# Sets the request offset, ie SearchFlip's from parameter that is used
# to skip results in the result set from being returned.
#
# @example
# CommentIndex.offset(100)
#
# @param value [Fixnum] The offset value, ie the number of results that are
# skipped in the result set
#
# @return [SearchFlip::Criteria] A newly created extended criteria
def offset(value)
fresh.tap do |criteria|
criteria.offset_value = value.to_i
end
end
# Returns the offset value or, if not yet set, the default limit value (0).
#
# @return [Fixnum] The offset value
def offset_value_with_default
(offset_value || 0).to_i
end
# Sets the request limit, ie ElasticSearch's size parameter that is used
# to restrict the results that get returned.
#
# @example
# CommentIndex.limit(100)
#
# @param value [Fixnum] The limit value, ie the max number of results that
# should be returned
#
# @return [SearchFlip::Criteria] A newly created extended criteria
def limit(value)
fresh.tap do |criteria|
criteria.limit_value = value.to_i
end
end
# Returns the limit value or, if not yet set, the default limit value (30).
#
# @return [Fixnum] The limit value
def limit_value_with_default
(limit_value || 30).to_i
end
# Sets pagination parameters for the criteria by using offset and limit,
# ie ElasticSearch's from and size parameters.
#
# @example
# CommentIndex.paginate(page: 3)
# CommentIndex.paginate(page: 5, per_page: 60)
#
# @param page [#to_i] The current page
# @param per_page [#to_i] The number of results per page
#
# @return [SearchFlip::Criteria] A newly created extended criteria
def paginate(page:, per_page: limit_value_with_default)
page = [page.to_i, 1].max
per_page = per_page.to_i
offset((page - 1) * per_page).limit(per_page)
end
def page(value)
paginate(page: value)
end
def per(value)
paginate(page: offset_value_with_default / limit_value_with_default + 1, per_page: value)
end
# Fetches the records specified by the criteria in batches using the
# ElasicSearch scroll API and yields each batch. The batch size and scroll
# API timeout can be specified. Check out the ElasticSearch docs for
# further details.
#
# @example
# CommentIndex.search("hello world").find_in_batches(batch_size: 100) do |batch|
# # ...
# end
#
# @param options [Hash] The options to control the fetching of batches
# @option options batch_size [Fixnum] The number of records to fetch per
# batch. Uses #limit to control the batch size.
# @option options timeout [String] The timeout per scroll request, ie how
# long ElasticSearch will keep the request handle open.
def find_in_batches(options = {})
return enum_for(:find_in_batches, options) unless block_given?
yield_in_batches(options) do |criteria|
yield(criteria.records) if criteria.records.size > 0
end
end
# Fetches the results specified by the criteria in batches using the
# ElasticSearch scroll API and yields each batch. The batch size and scroll
# API timeout can be specified. Checkout out the ElasticSearch docs for
# further details.
#
# @example
# CommentIndex.search("hello world").find_results_in_batches(batch_size: 100) do |batch|
# # ...
# end
#
# @param options [Hash] The options to control the fetching of batches
# @option options batch_size [Fixnum] The number of records to fetch per
# batch. Uses #limit to control the batch size.
# @option options timeout [String] The timeout per scroll request, ie how
# long ElasticSearch will keep the request handle open.
def find_results_in_batches(options = {})
return enum_for(:find_results_in_batches, options) unless block_given?
yield_in_batches(options) do |criteria|
yield criteria.results
end
end
# Fetches the records specified by the relatin in batches using the
# ElasticSearch scroll API and yields each record. The batch size and
# scroll API timeout can be specified. Check out the ElasticSearch docs for
# further details.
#
# @example
# CommentIndex.search("hello world").find_each(batch_size: 100) do |record|
# # ...
# end
#
# @param options [Hash] The options to control the fetching of batches
# @option options batch_size [Fixnum] The number of records to fetch per
# batch. Uses #limit to control the batch size.
# @option options timeout [String] The timeout per scroll request, ie how
# long ElasticSearch will keep the request handle open.
def find_each(options = {})
return enum_for(:find_each, options) unless block_given?
find_in_batches options do |batch|
batch.each do |record|
yield record
end
end
end
alias_method :each, :find_each
# Executes the search request for the current criteria, ie sends the
# request to ElasticSearch and returns the response. Connection and
# response errors will be rescued if you specify the criteria to be
# #failsafe, such that an empty response is returned instead.
#
# @param connection An optional alternative connection to used to send the
# request to for e.g. proxying
#
# @example
# response = CommentIndex.search("hello world").execute
#
# @return [SearchFlip::Response] The response object
def execute(connection: target.connection)
@response ||= begin
http_request = SearchFlip::HTTPClient.headers(accept: "application/json")
http_response =
if scroll_args && scroll_args[:id]
if target.connection.version.to_i >= 2
http_request.post(
"#{connection.base_url}/_search/scroll",
json: { scroll: scroll_args[:timeout], scroll_id: scroll_args[:id] }
)
else
http_request
.headers(content_type: "text/plain")
.post("#{connection.base_url}/_search/scroll", params: { scroll: scroll_args[:timeout] }, body: scroll_args[:id])
end
elsif scroll_args
http_request.post(
"#{target.type_url(connection: connection)}/_search",
params: { scroll: scroll_args[:timeout] },
json: request
)
else
http_request.post("#{target.type_url(connection: connection)}/_search", json: request)
end
SearchFlip::Response.new(self, http_response.parse)
rescue SearchFlip::ConnectionError, SearchFlip::ResponseError => e
raise e unless failsafe_value
SearchFlip::Response.new(self, "took" => 0, "hits" => { "total" => 0, "hits" => [] })
end
end
alias_method :response, :execute
# Marks the criteria to be failsafe, ie certain exceptions raised due to
# invalid queries, inavailability of ElasticSearch, etc get rescued and an
# empty criteria is returned instead.
#
# @see #execute See #execute for further details
#
# @example
# CommentIndex.search("invalid/request").execute
# # raises SearchFlip::ResponseError
#
# # ...
#
# CommentIndex.search("invalid/request").failsafe(true).execute
# # => #
#
# @param value [Boolean] Whether or not the criteria should be failsafe
#
# @return [SearchFlip::Response] A newly created extended criteria
def failsafe(value)
fresh.tap do |criteria|
criteria.failsafe_value = value
end
end
# Returns a fresh, ie dupped, criteria with the response cache being
# cleared.
#
# @example
# CommentIndex.search("hello world").fresh
#
# @return [SearchFlip::Response] A dupped criteria with the response
# cache being cleared
def fresh
dup.tap do |criteria|
criteria.instance_variable_set(:@response, nil)
end
end
def respond_to_missing?(name, *args)
target.respond_to?(name, *args)
end
def method_missing(name, *args, &block)
if target.respond_to?(name)
merge(target.send(name, *args, &block))
else
super
end
end
def_delegators :response, :total_entries, :total_count, :current_page, :previous_page,
:prev_page, :next_page, :first_page?, :last_page?, :out_of_range?, :total_pages,
:hits, :ids, :count, :size, :length, :took, :aggregations, :suggestions,
:scope, :results, :records, :scroll_id, :raw_response
private
def yield_in_batches(options = {})
return enum_for(:yield_in_batches, options) unless block_given?
batch_size = options[:batch_size] || 1_000
timeout = options[:timeout] || "1m"
criteria = limit(batch_size).scroll(timeout: timeout)
until criteria.ids.empty?
yield criteria.response
criteria = criteria.scroll(id: criteria.scroll_id, timeout: timeout)
end
end
end
end