lib/worker.rb in worker-0.3.0 vs lib/worker.rb in worker-0.6.0

- old
+ new

@@ -1,15 +1,43 @@ class Worker class Ctx end - def initialize(&block) + class Defer + class ValueError < StandardError + end + + def initialize(&block) + @value = Queue.new + + Thread.new do + @value.push block.call + end + end + + def value + @value.pop + end + + def value! + if @value.length == 1 + value + else + raise ValueError + end + end + end + + def initialize(opts={}, &block) @in = Queue.new @out = Queue.new @block = block @ctx = Worker::Ctx.new + @defers = Queue.new + @retries = 0 + @opts = opts run! end def perform(*args) @in.push args @@ -17,9 +45,42 @@ ret = @out.pop if ret.is_a? Exception raise ret else ret + end + rescue Exception => ex + backoff = @opts.dig(:backoff) || 0.1 + backoff_max = @opts.dig(:backoff_max) + retries_max = @opts.dig(:retry) || 0 + + if @retries == retries_max + @retries = 0 + return if @opts.dig(:raise) == false + raise ex + end + @retries += 1 + sleeping = @retries * backoff + sleeping = backoff_max if backoff_max && sleeping > backoff_max + + sleep sleeping + retry + end + + def perform_async(*args) + defer = Defer.new do + ret = perform(*args) + @defers.pop + ret + end + @defers.push defer + defer + end + + def join + loop do + break if @defers.size == 0 + sleep 0.1 end end def run! @thread = Thread.new do