lib/cli/ui/spinner/spin_group.rb in cli-ui-2.2.3 vs lib/cli/ui/spinner/spin_group.rb in cli-ui-2.3.0
- old
+ new
@@ -1,12 +1,15 @@
# typed: true
+# frozen_string_literal: true
+require_relative '../work_queue'
+
module CLI
module UI
module Spinner
class SpinGroup
- DEFAULT_FINAL_GLYPH = ->(success) { success ? CLI::UI::Glyph::CHECK.to_s : CLI::UI::Glyph::X.to_s }
+ DEFAULT_FINAL_GLYPH = ->(success) { success ? CLI::UI::Glyph::CHECK : CLI::UI::Glyph::X }
class << self
extend T::Sig
sig { returns(Mutex) }
@@ -45,10 +48,15 @@
# This lets you add +Task+ objects to the group to multi-thread work
#
# ==== Options
#
# * +:auto_debrief+ - Automatically debrief exceptions or through success_debrief? Default to true
+ # * +:interrupt_debrief+ - Automatically debrief on interrupt. Default to false
+ # * +:max_concurrent+ - Maximum number of concurrent tasks. Default is 0 (effectively unlimited)
+ # * +:work_queue+ - Custom WorkQueue instance. If not provided, a new one will be created
+ # * +:to+ - Target stream, like $stdout or $stderr. Can be anything with print and puts methods,
+ # or under Sorbet, IO or StringIO. Defaults to $stdout
#
# ==== Example Usage
#
# CLI::UI::SpinGroup.new do |spin_group|
# spin_group.add('Title') { |spinner| sleep 3.0 }
@@ -57,20 +65,35 @@
#
# Output:
#
# https://user-images.githubusercontent.com/3074765/33798558-c452fa26-dce8-11e7-9e90-b4b34df21a46.gif
#
- sig { params(auto_debrief: T::Boolean).void }
- def initialize(auto_debrief: true)
+ sig do
+ params(
+ auto_debrief: T::Boolean,
+ interrupt_debrief: T::Boolean,
+ max_concurrent: Integer,
+ work_queue: T.nilable(WorkQueue),
+ to: IOLike,
+ ).void
+ end
+ def initialize(auto_debrief: true, interrupt_debrief: false, max_concurrent: 0, work_queue: nil, to: $stdout)
@m = Mutex.new
- @consumed_lines = 0
@tasks = []
+ @puts_above = []
@auto_debrief = auto_debrief
+ @interrupt_debrief = interrupt_debrief
@start = Time.new
+ @stopped = false
+ @internal_work_queue = work_queue.nil?
+ @work_queue = T.let(
+ work_queue || WorkQueue.new(max_concurrent.zero? ? 1024 : max_concurrent),
+ WorkQueue,
+ )
if block_given?
yield self
- wait
+ wait(to: to)
end
end
class Task
extend T::Sig
@@ -79,10 +102,13 @@
attr_reader :title, :stdout, :stderr
sig { returns(T::Boolean) }
attr_reader :success
+ sig { returns(T::Boolean) }
+ attr_reader :done
+
sig { returns(T.nilable(Exception)) }
attr_reader :exception
# Initializes a new Task
# This is managed entirely internally by +SpinGroup+
@@ -93,21 +119,22 @@
# * +block+ - Block for the task, will be provided with an instance of the spinner
#
sig do
params(
title: String,
- final_glyph: T.proc.params(success: T::Boolean).returns(String),
+ final_glyph: T.proc.params(success: T::Boolean).returns(T.any(Glyph, String)),
merged_output: T::Boolean,
duplicate_output_to: IO,
+ work_queue: WorkQueue,
block: T.proc.params(task: Task).returns(T.untyped),
).void
end
- def initialize(title, final_glyph:, merged_output:, duplicate_output_to:, &block)
+ def initialize(title, final_glyph:, merged_output:, duplicate_output_to:, work_queue:, &block)
@title = title
@final_glyph = final_glyph
@always_full_render = title =~ Formatter::SCAN_WIDGET
- @thread = Thread.new do
+ @future = work_queue.enqueue do
cap = CLI::UI::StdoutRouter::Capture.new(
merged_output: merged_output, duplicate_output_to: duplicate_output_to,
) { block.call(self) }
begin
cap.run
@@ -119,30 +146,37 @@
@m = Mutex.new
@force_full_render = false
@done = false
@exception = nil
- @success = false
+ @success = false
end
+ sig { params(block: T.proc.params(task: Task).void).void }
+ def on_done(&block)
+ @on_done = block
+ end
+
# Checks if a task is finished
#
sig { returns(T::Boolean) }
def check
return true if @done
- return false if @thread.alive?
+ return false unless @future.completed?
@done = true
begin
- status = @thread.join.status
- @success = (status == false)
- @success = false if @thread.value == TASK_FAILED
+ result = @future.value
+ @success = true
+ @success = false if result == TASK_FAILED
rescue => exc
@exception = exc
@success = false
end
+ @on_done&.call(self)
+
@done
end
# Re-renders the task if required:
#
@@ -163,11 +197,11 @@
# * +width+ - current terminal width to format for
#
sig { params(index: Integer, force: T::Boolean, width: Integer).returns(String) }
def render(index, force = true, width: CLI::UI::Terminal.width)
@m.synchronize do
- if force || @always_full_render || @force_full_render
+ if !CLI::UI.enable_cursor? || force || @always_full_render || @force_full_render
full_render(index, width)
else
partial_render(index)
end
ensure
@@ -188,42 +222,55 @@
@title = new_title
@force_full_render = true
end
end
- sig { void }
- def interrupt
- @thread.raise(Interrupt)
- end
-
private
sig { params(index: Integer, terminal_width: Integer).returns(String) }
def full_render(index, terminal_width)
- prefix = inset +
- glyph(index) +
- CLI::UI::Color::RESET.code +
- ' '
+ o = +''
- truncation_width = terminal_width - CLI::UI::ANSI.printing_width(prefix)
+ o << inset
+ o << glyph(index)
+ o << ' '
- prefix +
- CLI::UI.resolve_text(title, truncate_to: truncation_width) +
- "\e[K"
+ truncation_width = terminal_width - CLI::UI::ANSI.printing_width(o)
+
+ o << CLI::UI.resolve_text(title, truncate_to: truncation_width)
+ o << ANSI.clear_to_end_of_line if CLI::UI.enable_cursor?
+
+ o
end
sig { params(index: Integer).returns(String) }
def partial_render(index)
- CLI::UI::ANSI.cursor_forward(inset_width) + glyph(index) + CLI::UI::Color::RESET.code
+ o = +''
+
+ o << CLI::UI::ANSI.cursor_forward(inset_width)
+ o << glyph(index)
+
+ o
end
sig { params(index: Integer).returns(String) }
def glyph(index)
if @done
- @final_glyph.call(@success)
+ final_glyph = @final_glyph.call(@success)
+ if final_glyph.is_a?(Glyph)
+ CLI::UI.enable_color? ? final_glyph.to_s : final_glyph.char
+ else
+ final_glyph
+ end
+ elsif CLI::UI.enable_cursor?
+ if !@future.started?
+ CLI::UI.enable_color? ? Glyph::HOURGLASS.to_s : Glyph::HOURGLASS.char
+ else
+ CLI::UI.enable_color? ? GLYPHS[index] : RUNES[index]
+ end
else
- GLYPHS[index]
+ Glyph::HOURGLASS.char
end
end
sig { returns(String) }
def inset
@@ -249,11 +296,11 @@
# spin_group.wait
#
sig do
params(
title: String,
- final_glyph: T.proc.params(success: T::Boolean).returns(String),
+ final_glyph: T.proc.params(success: T::Boolean).returns(T.any(Glyph, String)),
merged_output: T::Boolean,
duplicate_output_to: IO,
block: T.proc.params(task: Task).void,
).void
end
@@ -268,51 +315,119 @@
@tasks << Task.new(
title,
final_glyph: final_glyph,
merged_output: merged_output,
duplicate_output_to: duplicate_output_to,
+ work_queue: @work_queue,
&block
)
end
end
+ sig { void }
+ def stop
+ # If we already own the mutex (called from within another synchronized block),
+ # set stopped directly to avoid deadlock
+ if @m.owned?
+ return if @stopped
+
+ @stopped = true
+ else
+ @m.synchronize do
+ return if @stopped
+
+ @stopped = true
+ end
+ end
+ # Interrupt is thread-safe on its own, so we can call it outside the mutex
+ @work_queue.interrupt
+ end
+
+ sig { returns(T::Boolean) }
+ def stopped?
+ if @m.owned?
+ @stopped
+ else
+ @m.synchronize { @stopped }
+ end
+ end
+
# Tells the group you're done adding tasks and to wait for all of them to finish
#
+ # ==== Options
+ #
+ # * +:to+ - Target stream, like $stdout or $stderr. Can be anything with print and puts methods,
+ # or under Sorbet, IO or StringIO. Defaults to $stdout
+ #
# ==== Example Usage:
# spin_group = CLI::UI::SpinGroup.new
# spin_group.add('Title') { |spinner| sleep 1.0 }
# spin_group.wait
#
- sig { returns(T::Boolean) }
- def wait
+ sig { params(to: IOLike).returns(T::Boolean) }
+ def wait(to: $stdout)
idx = 0
+ consumed_lines = 0
+
+ @work_queue.close if @internal_work_queue
+
+ tasks_seen = @tasks.map { false }
+ tasks_seen_done = @tasks.map { false }
+
loop do
+ break if stopped?
+
done_count = 0
width = CLI::UI::Terminal.width
self.class.pause_mutex.synchronize do
next if self.class.paused?
@m.synchronize do
CLI::UI.raw do
+ force_full_render = false
+
+ unless @puts_above.empty?
+ to.print(CLI::UI::ANSI.cursor_up(consumed_lines)) if CLI::UI.enable_cursor?
+ while (message = @puts_above.shift)
+ to.print(CLI::UI::ANSI.insert_lines(message.lines.count)) if CLI::UI.enable_cursor?
+ message.lines.each do |line|
+ to.print(CLI::UI::Frame.prefix + CLI::UI.fmt(line))
+ end
+ to.print("\n")
+ end
+ # we descend with newlines rather than ANSI.cursor_down as the inserted lines may've
+ # pushed the spinner off the front of the buffer, so we can't move back down below it
+ to.print("\n" * consumed_lines) if CLI::UI.enable_cursor?
+
+ force_full_render = true
+ end
+
@tasks.each.with_index do |task, int_index|
nat_index = int_index + 1
task_done = task.check
done_count += 1 if task_done
- if nat_index > @consumed_lines
- print(task.render(idx, true, width: width) + "\n")
- @consumed_lines += 1
- else
- offset = @consumed_lines - int_index
- move_to = CLI::UI::ANSI.cursor_up(offset) + "\r"
- move_from = "\r" + CLI::UI::ANSI.cursor_down(offset)
+ if CLI::UI.enable_cursor?
+ if nat_index > consumed_lines
+ to.print(task.render(idx, true, width: width) + "\n")
+ consumed_lines += 1
+ else
+ offset = consumed_lines - int_index
+ move_to = CLI::UI::ANSI.cursor_up(offset) + "\r"
+ move_from = "\r" + CLI::UI::ANSI.cursor_down(offset)
- print(move_to + task.render(idx, idx.zero?, width: width) + move_from)
+ to.print(move_to + task.render(idx, idx.zero? || force_full_render, width: width) + move_from)
+ end
+ elsif !tasks_seen[int_index] || (task_done && !tasks_seen_done[int_index])
+ to.print(task.render(idx, true, width: width) + "\n")
end
+
+ tasks_seen[int_index] = true
+ tasks_seen_done[int_index] ||= task_done
end
end
end
end
@@ -322,19 +437,27 @@
Spinner.index = idx
sleep(PERIOD)
end
if @auto_debrief
- debrief
+ debrief(to: to)
else
all_succeeded?
end
rescue Interrupt
- @tasks.each(&:interrupt)
- raise
+ @work_queue.interrupt
+ debrief(to: to) if @interrupt_debrief
+ stopped? ? false : raise
end
+ sig { params(message: String).void }
+ def puts_above(message)
+ @m.synchronize do
+ @puts_above << message
+ end
+ end
+
# Provide an alternative debriefing for failed tasks
sig do
params(
block: T.proc.params(title: String, exception: T.nilable(Exception), out: String, err: String).void,
).void
@@ -360,37 +483,45 @@
end
end
# Debriefs failed tasks is +auto_debrief+ is true
#
- sig { returns(T::Boolean) }
- def debrief
+ # ==== Options
+ #
+ # * +:to+ - Target stream, like $stdout or $stderr. Can be anything with print and puts methods,
+ # or under Sorbet, IO or StringIO. Defaults to $stdout
+ #
+ sig { params(to: IOLike).returns(T::Boolean) }
+ def debrief(to: $stdout)
@m.synchronize do
@tasks.each do |task|
+ next unless task.done
+
title = task.title
out = task.stdout
err = task.stderr
if task.success
next @success_debrief&.call(title, out, err)
end
+ # exception will not be set if the wait loop is stopped before the task is checked
e = task.exception
next @failure_debrief.call(title, e, out, err) if @failure_debrief
CLI::UI::Frame.open('Task Failed: ' + title, color: :red, timing: Time.new - @start) do
if e
- puts "#{e.class}: #{e.message}"
- puts "\tfrom #{e.backtrace.join("\n\tfrom ")}"
+ to.puts("#{e.class}: #{e.message}")
+ to.puts("\tfrom #{e.backtrace.join("\n\tfrom ")}")
end
CLI::UI::Frame.divider('STDOUT')
out = '(empty)' if out.nil? || out.strip.empty?
- puts out
+ to.puts(out)
CLI::UI::Frame.divider('STDERR')
err = '(empty)' if err.nil? || err.strip.empty?
- puts err
+ to.puts(err)
end
end
@tasks.all?(&:success)
end
end