# frozen_string_literal: true
##
# Rage provides a simple and efficient API to wait on several instances of IO at the same time - {Fiber.await}.
#
# Let's say we have the following controller:
# ```ruby
# class UsersController < RageController::API
# def show
# user = Net::HTTP.get(URI("http://users.service/users/#{params[:id]}"))
# bookings = Net::HTTP.get(URI("http://bookings.service/bookings?user_id=#{params[:id]}"))
# render json: { user: user, bookings: bookings }
# end
# end
# ```
# This code will fire two consecutive HTTP requests. If each request takes 1 second to execute, the total execution time will be 2 seconds.
# With {Fiber.await}, we can significantly decrease the overall execution time by changing the code to fire the requests concurrently.
#
# To do this, we will need to:
#
# 1. Wrap every request in a separate fiber using {Fiber.schedule};
# 2. Pass newly created fibers into {Fiber.await};
#
# ```ruby
# class UsersController < RageController::API
# def show
# user, bookings = Fiber.await([
# Fiber.schedule { Net::HTTP.get(URI("http://users.service/users/#{params[:id]}")) },
# Fiber.schedule { Net::HTTP.get(URI("http://bookings.service/bookings?user_id=#{params[:id]}")) }
# ])
#
# render json: { user: user, bookings: bookings }
# end
# end
# ```
# With this change, if each request takes 1 second to execute, the total execution time will still be 1 second.
#
# ## Creating fibers
# Many developers see fibers as "lightweight threads" that should be used in conjunction with fiber pools, the same way we use thread pools for threads.
# Instead, it makes sense to think of fibers as regular Ruby objects. We don't use a pool of arrays when we need to create an array - we create a new object and let Ruby and the GC do their job.
# Same applies to fibers. Feel free to create as many fibers as you need on demand.
class Fiber
# @private
AWAIT_ERROR_MESSAGE = "err"
# @private
def __set_result(result)
@__result = result
end
# @private
def __get_result
@__result
end
# @private
def __set_err(err)
@__err = err
end
# @private
def __get_err
@__err
end
# @private
def __set_id
@__rage_id = object_id.to_s
end
# @private
def __get_id
@__rage_id
end
# @private
def __block_channel(force = false)
@__block_channel_i ||= 0
@__block_channel_i += 1 if force
"block:#{object_id}:#{@__block_channel_i}"
end
# @private
# pause a fiber and resume in the next iteration of the event loop
def self.pause
f = Fiber.current
Iodine.defer { f.resume }
Fiber.yield
end
# @private
# under normal circumstances, the method is a copy of `yield`, but it can be overriden to perform
# additional steps on yielding, e.g. releasing AR connections; see "lib/rage/rails.rb"
class << self
alias_method :defer, :yield
end
# Wait on several fibers at the same time. Calling this method will automatically pause the current fiber, allowing the
# server to process other requests. Once all fibers have completed, the current fiber will be automatically resumed.
#
# @param fibers [Fiber, Array] one or several fibers to wait on. The fibers must be created using the `Fiber.schedule` call.
# @example
# Fiber.await([
# Fiber.schedule { request_1 },
# Fiber.schedule { request_2 },
# ])
# @note This method should only be used when multiple fibers have to be processed in parallel. There's no need to use `Fiber.await` for single IO calls.
def self.await(fibers)
f, fibers = Fiber.current, Array(fibers)
# check which fibers are alive (i.e. have yielded) and which have errored out
i, err, num_wait_for = 0, nil, 0
while i < fibers.length
if fibers[i].alive?
num_wait_for += 1
else
err = fibers[i].__get_err
break if err
end
i += 1
end
# raise if one of the fibers has errored out or return the result if none have yielded
if err
raise err
elsif num_wait_for == 0
return fibers.map!(&:__get_result)
end
# wait on async fibers; resume right away if one of the fibers errors out
Iodine.subscribe("await:#{f.object_id}") do |_, err|
if err == AWAIT_ERROR_MESSAGE
f.resume
else
num_wait_for -= 1
f.resume if num_wait_for == 0
end
end
Fiber.yield
Iodine.defer { Iodine.unsubscribe("await:#{f.object_id}") }
# if num_wait_for is not 0 means we exited prematurely because of an error
if num_wait_for > 0
raise fibers.find(&:__get_err).__get_err
else
fibers.map!(&:__get_result)
end
end
# @!method self.schedule(&block)
# Create a non-blocking fiber. Should mostly be used in conjunction with `Fiber.await`.
# @example
# Fiber.await([
# Fiber.schedule { request_1 },
# Fiber.schedule { request_2 }
# ])
# @example
# fiber_1 = Fiber.schedule { request_1 }
# fiber_2 = Fiber.schedule { request_2 }
# Fiber.await([fiber_1, fiber_2])
end