# frozen_string_literal: true
require "action_dispatch/http/response"
require "delegate"
require "active_support/json"
module ActionController
# Mix this module into your controller, and all actions in that controller
# will be able to stream data to the client as it's written.
#
# class MyController < ActionController::Base
# include ActionController::Live
#
# def stream
# response.headers['Content-Type'] = 'text/event-stream'
# 100.times {
# response.stream.write "hello world\n"
# sleep 1
# }
# ensure
# response.stream.close
# end
# end
#
# There are a few caveats with this module. You *cannot* write headers after the
# response has been committed (Response#committed? will return truthy).
# Calling +write+ or +close+ on the response stream will cause the response
# object to be committed. Make sure all headers are set before calling write
# or close on your stream.
#
# You *must* call close on your stream when you're finished, otherwise the
# socket may be left open forever.
#
# The final caveat is that your actions are executed in a separate thread than
# the main thread. Make sure your actions are thread safe, and this shouldn't
# be a problem (don't share state across threads, etc).
module Live
extend ActiveSupport::Concern
module ClassMethods
def make_response!(request)
if request.get_header("HTTP_VERSION") == "HTTP/1.0"
super
else
Live::Response.new.tap do |res|
res.request = request
end
end
end
end
# This class provides the ability to write an SSE (Server Sent Event)
# to an IO stream. The class is initialized with a stream and can be used
# to either write a JSON string or an object which can be converted to JSON.
#
# Writing an object will convert it into standard SSE format with whatever
# options you have configured. You may choose to set the following options:
#
# 1) Event. If specified, an event with this name will be dispatched on
# the browser.
# 2) Retry. The reconnection time in milliseconds used when attempting
# to send the event.
# 3) Id. If the connection dies while sending an SSE to the browser, then
# the server will receive a +Last-Event-ID+ header with value equal to +id+.
#
# After setting an option in the constructor of the SSE object, all future
# SSEs sent across the stream will use those options unless overridden.
#
# Example Usage:
#
# class MyController < ActionController::Base
# include ActionController::Live
#
# def index
# response.headers['Content-Type'] = 'text/event-stream'
# sse = SSE.new(response.stream, retry: 300, event: "event-name")
# sse.write({ name: 'John'})
# sse.write({ name: 'John'}, id: 10)
# sse.write({ name: 'John'}, id: 10, event: "other-event")
# sse.write({ name: 'John'}, id: 10, event: "other-event", retry: 500)
# ensure
# sse.close
# end
# end
#
# Note: SSEs are not currently supported by IE. However, they are supported
# by Chrome, Firefox, Opera, and Safari.
class SSE
PERMITTED_OPTIONS = %w( retry event id )
def initialize(stream, options = {})
@stream = stream
@options = options
end
def close
@stream.close
end
def write(object, options = {})
case object
when String
perform_write(object, options)
else
perform_write(ActiveSupport::JSON.encode(object), options)
end
end
private
def perform_write(json, options)
current_options = @options.merge(options).stringify_keys
PERMITTED_OPTIONS.each do |option_name|
if (option_value = current_options[option_name])
@stream.write "#{option_name}: #{option_value}\n"
end
end
message = json.gsub("\n", "\ndata: ")
@stream.write "data: #{message}\n\n"
end
end
class ClientDisconnected < RuntimeError
end
class Buffer < ActionDispatch::Response::Buffer # :nodoc:
include MonitorMixin
class << self
attr_accessor :queue_size
end
@queue_size = 10
# Ignore that the client has disconnected.
#
# If this value is `true`, calling `write` after the client
# disconnects will result in the written content being silently
# discarded. If this value is `false` (the default), a
# ClientDisconnected exception will be raised.
attr_accessor :ignore_disconnect
def initialize(response)
super(response, build_queue(self.class.queue_size))
@error_callback = lambda { true }
@cv = new_cond
@aborted = false
@ignore_disconnect = false
end
def write(string)
unless @response.committed?
@response.headers["Cache-Control"] ||= "no-cache"
@response.delete_header "Content-Length"
end
super
unless connected?
@buf.clear
unless @ignore_disconnect
# Raise ClientDisconnected, which is a RuntimeError (not an
# IOError), because that's more appropriate for something beyond
# the developer's control.
raise ClientDisconnected, "client disconnected"
end
end
end
# Same as +write+ but automatically include a newline at the end of the string.
def writeln(string)
write string.end_with?("\n") ? string : "#{string}\n"
end
# Write a 'close' event to the buffer; the producer/writing thread
# uses this to notify us that it's finished supplying content.
#
# See also #abort.
def close
synchronize do
super
@buf.push nil
@cv.broadcast
end
end
# Inform the producer/writing thread that the client has
# disconnected; the reading thread is no longer interested in
# anything that's being written.
#
# See also #close.
def abort
synchronize do
@aborted = true
@buf.clear
end
end
# Is the client still connected and waiting for content?
#
# The result of calling `write` when this is `false` is determined
# by `ignore_disconnect`.
def connected?
!@aborted
end
def on_error(&block)
@error_callback = block
end
def call_on_error
@error_callback.call
end
private
def each_chunk(&block)
loop do
str = nil
ActiveSupport::Dependencies.interlock.permit_concurrent_loads do
str = @buf.pop
end
break unless str
yield str
end
end
def build_queue(queue_size)
queue_size ? SizedQueue.new(queue_size) : Queue.new
end
end
class Response < ActionDispatch::Response # :nodoc: all
private
def before_committed
super
jar = request.cookie_jar
# The response can be committed multiple times
jar.write self unless committed?
end
def build_buffer(response, body)
buf = Live::Buffer.new response
body.each { |part| buf.write part }
buf
end
end
def process(name)
t1 = Thread.current
locals = t1.keys.map { |key| [key, t1[key]] }
error = nil
# This processes the action in a child thread. It lets us return the
# response code and headers back up the Rack stack, and still process
# the body in parallel with sending data to the client.
new_controller_thread {
ActiveSupport::Dependencies.interlock.running do
t2 = Thread.current
# Since we're processing the view in a different thread, copy the
# thread locals from the main thread to the child thread. :'(
locals.each { |k, v| t2[k] = v }
ActiveSupport::IsolatedExecutionState.share_with(t1)
begin
super(name)
rescue => e
if @_response.committed?
begin
@_response.stream.write(ActionView::Base.streaming_completion_on_exception) if request.format == :html
@_response.stream.call_on_error
rescue => exception
log_error(exception)
ensure
log_error(e)
@_response.stream.close
end
else
error = e
end
ensure
@_response.commit!
end
end
}
ActiveSupport::Dependencies.interlock.permit_concurrent_loads do
@_response.await_commit
end
raise error if error
end
def response_body=(body)
super
response.close if response
end
# Sends a stream to the browser, which is helpful when you're generating exports or other running data where you
# don't want the entire file buffered in memory first. Similar to send_data, but where the data is generated live.
#
# Options:
# * :filename - suggests a filename for the browser to use.
# * :type - specifies an HTTP content type.
# You can specify either a string or a symbol for a registered type with Mime::Type.register, for example :json.
# If omitted, type will be inferred from the file extension specified in :filename.
# If no content type is registered for the extension, the default type 'application/octet-stream' will be used.
# * :disposition - specifies whether the file will be shown inline or downloaded.
# Valid values are 'inline' and 'attachment' (default).
#
# Example of generating a csv export:
#
# send_stream(filename: "subscribers.csv") do |stream|
# stream.write "email_address,updated_at\n"
#
# @subscribers.find_each do |subscriber|
# stream.write "#{subscriber.email_address},#{subscriber.updated_at}\n"
# end
# end
def send_stream(filename:, disposition: "attachment", type: nil)
response.headers["Content-Type"] =
(type.is_a?(Symbol) ? Mime[type].to_s : type) ||
Mime::Type.lookup_by_extension(File.extname(filename).downcase.delete("."))&.to_s ||
"application/octet-stream"
response.headers["Content-Disposition"] =
ActionDispatch::Http::ContentDisposition.format(disposition: disposition, filename: filename)
yield response.stream
ensure
response.stream.close
end
private
# Spawn a new thread to serve up the controller in. This is to get
# around the fact that Rack isn't based around IOs and we need to use
# a thread to stream data from the response bodies. Nobody should call
# this method except in Rails internals. Seriously!
def new_controller_thread # :nodoc:
Thread.new {
t2 = Thread.current
t2.abort_on_exception = true
yield
}
end
def log_error(exception)
logger = ActionController::Base.logger
return unless logger
logger.fatal do
message = +"\n#{exception.class} (#{exception.message}):\n"
message << exception.annotated_source_code.to_s if exception.respond_to?(:annotated_source_code)
message << " " << exception.backtrace.join("\n ")
"#{message}\n\n"
end
end
end
end