lib/tresse.rb in tresse-0.1.0 vs lib/tresse.rb in tresse-1.0.0

- old
+ new

@@ -2,31 +2,27 @@ require 'thread' module Tresse - VERSION = '0.1.0' + VERSION = '1.0.0' class << self - attr_accessor :max_work_threads - def init - @max_work_threads = 7 @work_queue = Queue.new - @thread_queue = Queue.new + @work_threads = 8.times.collect { |i| make_work_thread } - @on_error = lambda do |where, err| - puts "-" * 80 - p where - p err - puts err.backtrace - puts "-" * 80 - end - - run + @on_error = + lambda do |where, err| + puts "-" * 80 + p where + p err + puts err.backtrace + puts "-" * 80 + end end def enqueue(batch) @work_queue << batch @@ -37,48 +33,50 @@ def on_error(&block) @on_error = block end - protected + def max_work_thread_count - def run + @work_threads.size + end - @max_work_threads.times { |i| @thread_queue << i } + def max_work_thread_count=(i) - Thread.new do - loop do - begin + i0 = @work_threads.size - i = @thread_queue.pop - batch = @work_queue.pop + @work_threads << make_work_thread while @work_threads.size < i + @work_threads.pop while @work_threads.size > i - hand_to_worker_thread(i, batch) - - rescue => err - - @on_error.call(:in_loop, err) - end - end - end + i end - def hand_to_worker_thread(i, batch) + protected + def make_work_thread + Thread.new do - begin - Thread.current[:tress] = true - Thread.current[:i] = i + t = Thread.current + t[:tresse] = true - batch.process + loop do + begin - @thread_queue << i unless i >= @max_work_threads + batch = @work_queue.pop - rescue => err + unless @work_threads.include?(t) + @work_queue << batch + break + end - @on_error.call(:in_worker_thread, err) + batch.process + + rescue => err + + @on_error.call(:in_worker_thread, err) + end end end end end # @@ -86,125 +84,156 @@ class Batch attr_reader :group - attr_reader :each_index, :value + attr_reader :map_index + attr_accessor :value def initialize(group, block_or_group) @group = group @bog = block_or_group - @each_index = -1 + @map_index = -1 @value = nil end def process - @each_index += 1 - @group.send(:hand, self) + @map_index += 1 + @group.send(:receive, self) end - protected + def source - def generate - args = [ group ] + [ nil ] * 7 args = args[0, @bog.method(:call).arity] @value = @bog.call(*args) end + + def map(type, block) + + args = [ @value, self ] + args = args[0, block.method(:call).arity.abs] + + r = block.call(*args) + + @value = r if type == :map + end end class Group attr_accessor :name - attr_reader :batches + #attr_reader :batches - def initialize(name) + def initialize(name=nil) @name = name @batches = [] - @eaches = [ nil ] + @launched = false + @maps = [ nil ] - @final = nil - @final_batches = [] - @final_queue = Queue.new + @reduce = nil + @reduce_batches = [] + @reduction_queue = Queue.new end # - # appending methods + # sourcing methods - def append(o=nil, &block) + def source(o=nil, &block) batch = Tresse::Batch.new(self, o ? o : block) @batches << batch - Tresse.enqueue(batch) + + self end # - # step methods + # mapping def each(&block) - @eaches << block + do_map(:each, block) + end - self + def map(&block) + + do_map(:map, block) end # - # final methods + # reducing - def inject(target, &block) + def reduce(target, &block) - @final = [ target, block ] + do_reduce(target, block) + end + alias inject reduce - @final_queue.pop + def flatten + + do_reduce([], lambda { |a, e| a.concat(e) }) end - alias reduce inject + alias values flatten - def collect(&block) + protected - @final = block + def do_map(type, block) - @final_queue.pop + @maps << [ type, block ] + + launch + + self end - alias map collect - protected + def do_reduce(target, block) - def hand(batch) + @reduce = [ target, block ] - if batch.each_index == 0 - batch.send(:generate) + launch + + @reduction_queue.pop + end + + def launch + + return if @launched == true + @launched = true + + @batches.each { |b| Tresse.enqueue(b) } + end + + def receive(batch) + + if batch.map_index == 0 + batch.source Tresse.enqueue(batch) - elsif e = @eaches[batch.each_index] - args = [ batch.value, batch ] - args = args[0, e.method(:call).arity.abs] - e.call(*args) + elsif m = @maps[batch.map_index] + batch.map(*m) Tresse.enqueue(batch) else - queue_for_final(batch) + queue_for_reduction(batch) end end - def queue_for_final(batch) + def queue_for_reduction(batch) - @final_batches << batch + @reduce_batches << batch - return if @final_batches.size < @batches.size + return if @reduce_batches.size < @batches.size + return unless @reduce es = @batches.collect(&:value) + target, block = @reduce - @final_queue << - if @final.is_a?(Array) - es.inject(@final[0], &@final[1]) - else - es.collect(&@final) - end + @reduction_queue << es.inject(target, &block) end end end