require 'drb' require 'thread' require 'acts_as_ferret/server/unix_daemon' require 'acts_as_ferret/server/config' module ActsAsFerret module Server # This class acts as a drb server listening for indexing and # search requests from models declared to 'acts_as_ferret :remote => true' # # Usage: # - modify Rails.root/config/ferret_server.yml to suit your needs. # - environments for which no section in the config file exists will use # the index locally (good for unit tests/development mode) # - run script/ferret_server to start the server: # script/ferret_server -e production start # - to stop the server run # script/ferret_server -e production stop # class Server include UnixDaemon cattr_accessor :running def initialize @cfg = Config.new ActiveRecord::Base.logger = @logger = Logger.new(@cfg.log_file) ActiveRecord::Base.logger.level = Logger.const_get(@cfg.log_level.upcase) rescue Logger::DEBUG if @cfg.script path = File.join(Rails.root, @cfg.script) load path @logger.info "loaded custom startup script from #{path}" end end # start the server as a daemon process def start raise "ferret_server not configured for #{Rails.env}" unless (@cfg.uri rescue nil) platform_daemon { run_drb_service } end # run the server and block until it exits def run raise "ferret_server not configured for #{Rails.env}" unless (@cfg.uri rescue nil) run_drb_service end def run_drb_service $stdout.puts("starting ferret server...") self.class.running = true DRb.start_service(@cfg.uri, self) DRb.thread.join rescue Exception => e @logger.error(e.to_s) raise end # handles all incoming method calls, and sends them on to the correct local index # instance. # # Calls are not queued, so this will block until the call returned. # def method_missing(name, *args) @logger.debug "\#method_missing(#{name.inspect}, #{args.inspect})" index_name = args.shift index = if name.to_s =~ /^multi_(.+)/ name = $1 ActsAsFerret::multi_index(index_name) else ActsAsFerret::get_index(index_name) end if index.nil? @logger.error "\#index with name #{index_name} not found in call to #{name} with args #{args.inspect}" raise ActsAsFerret::IndexNotDefined.new(index_name) end # TODO find another way to implement the reconnection logic (maybe in # local_index or class_methods) # reconnect_when_needed(clazz) do # using respond_to? here so we not have to catch NoMethodError # which would silently catch those from deep inside the indexing # code, too... if index.respond_to?(name) index.send name, *args # TODO check where we need this: #elsif clazz.respond_to?(name) # @logger.debug "no luck, trying to call class method instead" # clazz.send name, *args else raise NoMethodError.new("method #{name} not supported by DRb server") end rescue => e @logger.error "ferret server error #{$!}\n#{$!.backtrace.join "\n"}" raise e end def register_class(class_name) @logger.debug "############ registerclass #{class_name}" class_name.constantize @logger.debug "index for class #{class_name}: #{ActsAsFerret::ferret_indexes[class_name.underscore.to_sym]}" end # make sure we have a versioned index in place, building one if necessary def ensure_index_exists(index_name) @logger.debug "DRb server: ensure_index_exists for index #{index_name}" definition = ActsAsFerret::get_index(index_name).index_definition dir = definition[:index_dir] unless File.directory?(dir) && File.file?(File.join(dir, 'segments')) && dir =~ %r{/\d+(_\d+)?$} rebuild_index(index_name) end end # disconnects the db connection for the class specified by class_name # used only in unit tests to check the automatic reconnection feature def db_disconnect!(class_name) with_class class_name do |clazz| clazz.connection.disconnect! end end # hides LocalIndex#rebuild_index to implement index versioning def rebuild_index(index_name) definition = ActsAsFerret::get_index(index_name).index_definition.dup models = definition[:registered_models] index = new_index_for(definition) # TODO fix reconnection stuff # reconnect_when_needed(clazz) do # @logger.debug "DRb server: rebuild index for class(es) #{models.inspect} in #{index.options[:path]}" index.index_models models # end new_version = File.join definition[:index_base_dir], Time.now.utc.strftime('%Y%m%d%H%M%S') # create a unique directory name (needed for unit tests where # multiple rebuilds per second may occur) if File.exists?(new_version) i = 0 i+=1 while File.exists?("#{new_version}_#{i}") new_version << "_#{i}" end File.rename index.options[:path], new_version ActsAsFerret::change_index_dir index_name, new_version end protected def reconnect_when_needed(clazz) retried = false begin yield rescue ActiveRecord::StatementInvalid => e if e.message =~ /MySQL server has gone away/ if retried raise e else @logger.info "StatementInvalid caught, trying to reconnect..." clazz.connection.reconnect! retried = true retry end else @logger.error "StatementInvalid caught, but unsure what to do with it: #{e}" raise e end end end def new_index_for(index_definition) ferret_cfg = index_definition[:ferret].dup ferret_cfg.update :auto_flush => false, :create => true, :field_infos => ActsAsFerret::field_infos(index_definition), :path => File.join(index_definition[:index_base_dir], 'rebuild') Ferret::Index::Index.new(ferret_cfg).tap do |i| i.batch_size = index_definition[:reindex_batch_size] i.logger = @logger end end end end end