lib/tresse.rb in tresse-0.1.0 vs lib/tresse.rb in tresse-1.0.0
- old
+ new
@@ -2,31 +2,27 @@
require 'thread'
module Tresse
- VERSION = '0.1.0'
+ VERSION = '1.0.0'
class << self
- attr_accessor :max_work_threads
-
def init
- @max_work_threads = 7
@work_queue = Queue.new
- @thread_queue = Queue.new
+ @work_threads = 8.times.collect { |i| make_work_thread }
- @on_error = lambda do |where, err|
- puts "-" * 80
- p where
- p err
- puts err.backtrace
- puts "-" * 80
- end
-
- run
+ @on_error =
+ lambda do |where, err|
+ puts "-" * 80
+ p where
+ p err
+ puts err.backtrace
+ puts "-" * 80
+ end
end
def enqueue(batch)
@work_queue << batch
@@ -37,48 +33,50 @@
def on_error(&block)
@on_error = block
end
- protected
+ def max_work_thread_count
- def run
+ @work_threads.size
+ end
- @max_work_threads.times { |i| @thread_queue << i }
+ def max_work_thread_count=(i)
- Thread.new do
- loop do
- begin
+ i0 = @work_threads.size
- i = @thread_queue.pop
- batch = @work_queue.pop
+ @work_threads << make_work_thread while @work_threads.size < i
+ @work_threads.pop while @work_threads.size > i
- hand_to_worker_thread(i, batch)
-
- rescue => err
-
- @on_error.call(:in_loop, err)
- end
- end
- end
+ i
end
- def hand_to_worker_thread(i, batch)
+ protected
+ def make_work_thread
+
Thread.new do
- begin
- Thread.current[:tress] = true
- Thread.current[:i] = i
+ t = Thread.current
+ t[:tresse] = true
- batch.process
+ loop do
+ begin
- @thread_queue << i unless i >= @max_work_threads
+ batch = @work_queue.pop
- rescue => err
+ unless @work_threads.include?(t)
+ @work_queue << batch
+ break
+ end
- @on_error.call(:in_worker_thread, err)
+ batch.process
+
+ rescue => err
+
+ @on_error.call(:in_worker_thread, err)
+ end
end
end
end
end
#
@@ -86,125 +84,156 @@
class Batch
attr_reader :group
- attr_reader :each_index, :value
+ attr_reader :map_index
+ attr_accessor :value
def initialize(group, block_or_group)
@group = group
@bog = block_or_group
- @each_index = -1
+ @map_index = -1
@value = nil
end
def process
- @each_index += 1
- @group.send(:hand, self)
+ @map_index += 1
+ @group.send(:receive, self)
end
- protected
+ def source
- def generate
-
args = [ group ] + [ nil ] * 7
args = args[0, @bog.method(:call).arity]
@value = @bog.call(*args)
end
+
+ def map(type, block)
+
+ args = [ @value, self ]
+ args = args[0, block.method(:call).arity.abs]
+
+ r = block.call(*args)
+
+ @value = r if type == :map
+ end
end
class Group
attr_accessor :name
- attr_reader :batches
+ #attr_reader :batches
- def initialize(name)
+ def initialize(name=nil)
@name = name
@batches = []
- @eaches = [ nil ]
+ @launched = false
+ @maps = [ nil ]
- @final = nil
- @final_batches = []
- @final_queue = Queue.new
+ @reduce = nil
+ @reduce_batches = []
+ @reduction_queue = Queue.new
end
#
- # appending methods
+ # sourcing methods
- def append(o=nil, &block)
+ def source(o=nil, &block)
batch = Tresse::Batch.new(self, o ? o : block)
@batches << batch
- Tresse.enqueue(batch)
+
+ self
end
#
- # step methods
+ # mapping
def each(&block)
- @eaches << block
+ do_map(:each, block)
+ end
- self
+ def map(&block)
+
+ do_map(:map, block)
end
#
- # final methods
+ # reducing
- def inject(target, &block)
+ def reduce(target, &block)
- @final = [ target, block ]
+ do_reduce(target, block)
+ end
+ alias inject reduce
- @final_queue.pop
+ def flatten
+
+ do_reduce([], lambda { |a, e| a.concat(e) })
end
- alias reduce inject
+ alias values flatten
- def collect(&block)
+ protected
- @final = block
+ def do_map(type, block)
- @final_queue.pop
+ @maps << [ type, block ]
+
+ launch
+
+ self
end
- alias map collect
- protected
+ def do_reduce(target, block)
- def hand(batch)
+ @reduce = [ target, block ]
- if batch.each_index == 0
- batch.send(:generate)
+ launch
+
+ @reduction_queue.pop
+ end
+
+ def launch
+
+ return if @launched == true
+ @launched = true
+
+ @batches.each { |b| Tresse.enqueue(b) }
+ end
+
+ def receive(batch)
+
+ if batch.map_index == 0
+ batch.source
Tresse.enqueue(batch)
- elsif e = @eaches[batch.each_index]
- args = [ batch.value, batch ]
- args = args[0, e.method(:call).arity.abs]
- e.call(*args)
+ elsif m = @maps[batch.map_index]
+ batch.map(*m)
Tresse.enqueue(batch)
else
- queue_for_final(batch)
+ queue_for_reduction(batch)
end
end
- def queue_for_final(batch)
+ def queue_for_reduction(batch)
- @final_batches << batch
+ @reduce_batches << batch
- return if @final_batches.size < @batches.size
+ return if @reduce_batches.size < @batches.size
+ return unless @reduce
es = @batches.collect(&:value)
+ target, block = @reduce
- @final_queue <<
- if @final.is_a?(Array)
- es.inject(@final[0], &@final[1])
- else
- es.collect(&@final)
- end
+ @reduction_queue << es.inject(target, &block)
end
end
end