Sha256: f8b56c35a23c5789106548eb4fabb572da6e53b541237f62212503cc371c7e27
Contents?: true
Size: 1.58 KB
Versions: 1
Compression:
Stored size: 1.58 KB
Contents
class Worker class Ctx end class Defer class ValueError < StandardError end def initialize(&block) @value = Queue.new Thread.new do @value.push block.call end end def value @value.pop end def value! if @value.length == 1 value else raise ValueError end end end def initialize(opts={}, &block) @in = Queue.new @out = Queue.new @block = block @ctx = Worker::Ctx.new @defers = Queue.new @retries = 0 @opts = opts run! end def perform(*args) @in.push args ret = @out.pop if ret.is_a? Exception raise ret else ret end rescue Exception => ex backoff = @opts.dig(:backoff) || 0.1 backoff_max = @opts.dig(:backoff_max) retries_max = @opts.dig(:retry) || 0 if @retries == retries_max @retries = 0 return if @opts.dig(:raise) == false raise ex end @retries += 1 sleeping = @retries * backoff sleeping = backoff_max if backoff_max && sleeping > backoff_max sleep sleeping retry end def perform_async(*args) defer = Defer.new do ret = perform(*args) @defers.pop ret end @defers.push defer defer end def join loop do break if @defers.size == 0 sleep 0.1 end end def run! @thread = Thread.new do loop do ret = @ctx.instance_exec *@in.pop, &@block @out.push ret rescue Exception => ex @out.push ex end end self end def stop! @thread&.kill self end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
worker-0.6.0 | lib/worker.rb |