lib/searchkick.rb in searchkick-1.5.0 vs lib/searchkick.rb in searchkick-1.5.1
- old
+ new
@@ -2,17 +2,19 @@
require "elasticsearch"
require "hashie"
require "searchkick/version"
require "searchkick/index_options"
require "searchkick/index"
+require "searchkick/indexer"
require "searchkick/results"
require "searchkick/query"
require "searchkick/reindex_job"
require "searchkick/model"
require "searchkick/tasks"
require "searchkick/middleware"
require "searchkick/logging" if defined?(ActiveSupport::Notifications)
+require "active_support/core_ext/hash/deep_merge"
# background jobs
begin
require "active_job"
rescue LoadError
@@ -27,27 +29,28 @@
class InvalidQueryError < Elasticsearch::Transport::Transport::Errors::BadRequest; end
class DangerousOperation < Error; end
class ImportError < Error; end
class << self
- attr_accessor :search_method_name, :wordnet_path, :timeout, :models
+ attr_accessor :search_method_name, :wordnet_path, :timeout, :models, :client_options
attr_writer :client, :env, :search_timeout
attr_reader :aws_credentials
end
self.search_method_name = :search
self.wordnet_path = "/var/lib/wn_s.pl"
self.timeout = 10
self.models = []
+ self.client_options = {}
def self.client
@client ||= begin
require "typhoeus/adapters/faraday" if defined?(Typhoeus)
- Elasticsearch::Client.new(
+ Elasticsearch::Client.new({
url: ENV["ELASTICSEARCH_URL"],
transport_options: {request: {timeout: timeout}, headers: {content_type: "application/json"}}
- ) do |f|
+ }.deep_merge(client_options)) do |f|
f.use Searchkick::Middleware
f.request :aws_signers_v4, {
credentials: Aws::Credentials.new(aws_credentials[:access_key_id], aws_credentials[:secret_access_key]),
service_name: "es",
region: aws_credentials[:region] || "us-east-1"
@@ -70,10 +73,32 @@
def self.server_below?(version)
Gem::Version.new(server_version.sub("-", ".")) < Gem::Version.new(version.sub("-", "."))
end
+ def self.search(term = nil, options = {}, &block)
+ query = Searchkick::Query.new(nil, term, options)
+ block.call(query.body) if block
+ if options[:execute] == false
+ query
+ else
+ query.execute
+ end
+ end
+
+ def self.multi_search(queries)
+ if queries.any?
+ responses = client.msearch(body: queries.flat_map { |q| [q.params.except(:body), q.body] })["responses"]
+ queries.each_with_index do |query, i|
+ query.handle_response(responses[i])
+ end
+ end
+ nil
+ end
+
+ # callbacks
+
def self.enable_callbacks
self.callbacks_value = nil
end
def self.disable_callbacks
@@ -88,11 +113,11 @@
if block_given?
previous_value = callbacks_value
begin
self.callbacks_value = value
yield
- perform_bulk if callbacks_value == :bulk
+ indexer.perform if callbacks_value == :bulk
ensure
self.callbacks_value = previous_value
end
else
self.callbacks_value = value
@@ -104,72 +129,21 @@
@aws_credentials = creds
@client = nil # reset client
end
# private
- def self.queue_items(items)
- queued_items.concat(items)
- perform_bulk unless callbacks_value == :bulk
+ def self.indexer
+ Thread.current[:searchkick_indexer] ||= Searchkick::Indexer.new
end
# private
- def self.perform_bulk
- items = queued_items
- clear_queued_items
- perform_items(items)
- end
-
- # private
- def self.perform_items(items)
- if items.any?
- response = client.bulk(body: items)
- if response["errors"]
- first_with_error = response["items"].map do |item|
- (item["index"] || item["delete"] || item["update"])
- end.find { |item| item["error"] }
- raise Searchkick::ImportError, "#{first_with_error["error"]} on item with id '#{first_with_error["_id"]}'"
- end
- end
- end
-
- # private
- def self.queued_items
- Thread.current[:searchkick_queued_items] ||= []
- end
-
- # private
- def self.clear_queued_items
- Thread.current[:searchkick_queued_items] = []
- end
-
- # private
def self.callbacks_value
Thread.current[:searchkick_callbacks_enabled]
end
# private
def self.callbacks_value=(value)
Thread.current[:searchkick_callbacks_enabled] = value
- end
-
- def self.search(term = nil, options = {}, &block)
- query = Searchkick::Query.new(nil, term, options)
- block.call(query.body) if block
- if options[:execute] == false
- query
- else
- query.execute
- end
- end
-
- def self.multi_search(queries)
- if queries.any?
- responses = client.msearch(body: queries.flat_map { |q| [q.params.except(:body), q.body] })["responses"]
- queries.each_with_index do |query, i|
- query.handle_response(responses[i])
- end
- end
- nil
end
end
# TODO find better ActiveModel hook
ActiveModel::Callbacks.send(:include, Searchkick::Model)