# frozen-string-literal: true
module Sequel
extension 'async_thread_pool'
module Plugins
# The concurrent_eager_loading plugin allows for eager loading multiple associations
# concurrently in separate threads. You must load the async_thread_pool Database
# extension into the Database object the model class uses in order for this plugin
# to work.
#
# By default in Sequel, eager loading happens in a serial manner. If you have code
# such as:
#
# Album.eager(:artist, :genre, :tracks)
#
# Sequel will load the albums, then the artists for the albums, then
# the genres for the albums, then the tracks for the albums.
#
# With the concurrent_eager_loading plugin, you can use the +eager_load_concurrently+
# method to allow for concurrent eager loading:
#
# Album.eager_load_concurrently.eager(:artist, :genre, :tracks)
#
# This will load the albums, first, since it needs to load the albums to know
# which artists, genres, and tracks to eagerly load. However, it will load the
# artists, genres, and tracks for the albums concurrently in separate threads.
# This can significantly improve performance, especially if there is significant
# latency between the application and the database. Note that using separate threads
# is only used in the case where there are multiple associations to eagerly load.
# With only a single association to eagerly load, there is no reason to use a
# separate thread, since it would not improve performance.
#
# If you want to make concurrent eager loading the default, you can load the
# plugin with the +:always+ option. In this case, all eager loads will be
# concurrent. If you want to force a non-concurrent eager load, you can use
# +eager_load_serially+:
#
# Album.eager_load_serially.eager(:artist, :genre, :tracks)
#
# Note that making concurrent eager loading the default is probably a bad idea
# if you are eager loading inside transactions and want the eager load to
# reflect changes made inside the transaction, unless you plan to use
# +eager_load_serially+ for such cases. See the async_thread_pool
# Database extension documentation for more general caveats regarding its use.
#
# The default eager loaders for all of the association types that ship with Sequel
# support safe concurrent eager loading. However, if you are specifying a custom
# +:eager_loader+ for an association, it may not work safely unless it it modified to
# support concurrent eager loading. Taking this example from the
# {Advanced Associations guide}[rdoc-ref:doc/advanced_associations.rdoc]
#
# Album.many_to_one :artist, :eager_loader=>(proc do |eo_opts|
# eo_opts[:rows].each{|album| album.associations[:artist] = nil}
# id_map = eo_opts[:id_map]
# Artist.where(:id=>id_map.keys).all do |artist|
# if albums = id_map[artist.id]
# albums.each do |album|
# album.associations[:artist] = artist
# end
# end
# end
# end)
#
# This would not support concurrent eager loading safely. To support safe
# concurrent eager loading, you need to make sure you are not modifying
# the associations for objects concurrently by separate threads. This is
# implemented using a mutex, which you can access via eo_opts[:mutex].
# To keep things simple, you can use +Sequel.synchronize_with+ to only
# use this mutex if it is available. You want to use the mutex around the
# code that initializes the associations (usually to +nil+ or []),
# and also around the code that sets the associatied objects appropriately
# after they have been retreived. You do not want to use the mutex around
# the code that loads the objects, since that will prevent concurrent loading.
# So after the changes, the custom eager loader would look like this:
#
# Album.many_to_one :artist, :eager_loader=>(proc do |eo_opts|
# Sequel.synchronize_with(eo[:mutex]) do
# eo_opts[:rows].each{|album| album.associations[:artist] = nil}
# end
# id_map = eo_opts[:id_map]
# rows = Artist.where(:id=>id_map.keys).all
# Sequel.synchronize_with(eo[:mutex]) do
# rows.each do |artist|
# if albums = id_map[artist.id]
# albums.each do |album|
# album.associations[:artist] = artist
# end
# end
# end
# end
# end)
#
# Usage:
#
# # Make all model subclass datasets support concurrent eager loading
# Sequel::Model.plugin :concurrent_eager_loading
#
# # Make the Album class datasets support concurrent eager loading
# Album.plugin :concurrent_eager_loading
#
# # Make all model subclass datasets concurrently eager load by default
# Sequel::Model.plugin :concurrent_eager_loading, always: true
module ConcurrentEagerLoading
def self.configure(mod, opts=OPTS)
if opts.has_key?(:always)
mod.instance_variable_set(:@always_eager_load_concurrently, opts[:always])
end
end
module ClassMethods
Plugins.inherited_instance_variables(self, :@always_eager_load_concurrently => nil)
Plugins.def_dataset_methods(self, [:eager_load_concurrently, :eager_load_serially])
# Whether datasets for this class should eager load concurrently by default.
def always_eager_load_concurrently?
@always_eager_load_concurrently
end
end
module DatasetMethods
# Return a cloned dataset that will eager load associated results concurrently
# using the async thread pool.
def eager_load_concurrently
cached_dataset(:_eager_load_concurrently) do
clone(:eager_load_concurrently=>true)
end
end
# Return a cloned dataset that will noteager load associated results concurrently
# using the async thread pool. Only useful if the current dataset has been marked
# as loading concurrently, or loading concurrently is the model's default behavior.
def eager_load_serially
cached_dataset(:_eager_load_serially) do
clone(:eager_load_concurrently=>false)
end
end
private
# Whether this particular dataset will eager load results concurrently.
def eager_load_concurrently?
v = @opts[:eager_load_concurrently]
v.nil? ? model.always_eager_load_concurrently? : v
end
# If performing eager loads concurrently, and at least 2 associations are being
# eagerly loaded, create a single mutex used for all eager loads. After the
# eager loads have been performed, force loading of any async results, so that
# all eager loads will have been completed before this method returns.
def perform_eager_loads(eager_load_data)
return super if !eager_load_concurrently? || eager_load_data.length < 2
mutex = Mutex.new
eager_load_data.each_value do |eo|
eo[:mutex] = mutex
end
super.each do |v|
if Sequel::Database::AsyncThreadPool::BaseProxy === v
v.__value
end
end
end
# If performing eager loads concurrently, perform this eager load using the
# async thread pool.
def perform_eager_load(loader, eo)
eo[:mutex] ? db.send(:async_run){super} : super
end
end
end
end
end