lib/rinda/tuplespace.rb in rubysl-rinda-1.0.0 vs lib/rinda/tuplespace.rb in rubysl-rinda-2.0.0

- old
+ new

@@ -1,9 +1,11 @@ require 'monitor' require 'thread' require 'drb/drb' require 'rinda/rinda' +require 'enumerator' +require 'forwardable' module Rinda ## # A TupleEntry is a Tuple (i.e. a possible entry in some Tuplespace) @@ -69,18 +71,18 @@ return true unless @expires return @expires < Time.now end ## - # Reset the expiry time according to +sec_or_renewer+. + # Reset the expiry time according to +sec_or_renewer+. # # +nil+:: it is set to expire in the far future. # +false+:: it has expired. # Numeric:: it will expire in that many seconds. # # Otherwise the argument refers to some kind of renewer object - # which will reset its expiry time. + # which will reset its expiry time. def renew(sec_or_renewer) sec, @renewer = get_renewer(sec_or_renewer) @expires = make_expires(sec) end @@ -164,11 +166,11 @@ # details on how a Template matches a Tuple. def match(tuple) @tuple.match(tuple) end - + alias === match def make_tuple(ary) # :nodoc: Rinda::Template.new(ary) end @@ -220,15 +222,15 @@ # # == Example # # ts = Rinda::TupleSpace.new # observer = ts.notify 'write', [nil] - # + # # Thread.start do # observer.each { |t| p t } # end - # + # # 3.times { |i| ts.write [i] } # # Outputs: # # ['write', [0]] @@ -272,11 +274,11 @@ def each # :yields: event, tuple while !@done it = pop yield(it) end - rescue + rescue ensure cancel end end @@ -284,92 +286,136 @@ ## # TupleBag is an unordered collection of tuples. It is the basis # of Tuplespace. class TupleBag + class TupleBin + extend Forwardable + def_delegators '@bin', :find_all, :delete_if, :each, :empty? + def initialize + @bin = [] + end + + def add(tuple) + @bin.push(tuple) + end + + def delete(tuple) + idx = @bin.rindex(tuple) + @bin.delete_at(idx) if idx + end + + def find + @bin.reverse_each do |x| + return x if yield(x) + end + nil + end + end + def initialize # :nodoc: @hash = {} + @enum = enum_for(:each_entry) end ## # +true+ if the TupleBag to see if it has any expired entries. def has_expires? - @hash.each do |k, v| - v.each do |tuple| - return true if tuple.expires - end + @enum.find do |tuple| + tuple.expires end - false end ## - # Add +ary+ to the TupleBag. + # Add +tuple+ to the TupleBag. - def push(ary) - size = ary.size - @hash[size] ||= [] - @hash[size].push(ary) + def push(tuple) + key = bin_key(tuple) + @hash[key] ||= TupleBin.new + @hash[key].add(tuple) end ## - # Removes +ary+ from the TupleBag. + # Removes +tuple+ from the TupleBag. - def delete(ary) - size = ary.size - @hash.fetch(size, []).delete(ary) + def delete(tuple) + key = bin_key(tuple) + bin = @hash[key] + return nil unless bin + bin.delete(tuple) + @hash.delete(key) if bin.empty? + tuple end ## # Finds all live tuples that match +template+. - def find_all(template) - @hash.fetch(template.size, []).find_all do |tuple| + bin_for_find(template).find_all do |tuple| tuple.alive? && template.match(tuple) end end ## # Finds a live tuple that matches +template+. def find(template) - @hash.fetch(template.size, []).find do |tuple| + bin_for_find(template).find do |tuple| tuple.alive? && template.match(tuple) end end ## # Finds all tuples in the TupleBag which when treated as templates, match # +tuple+ and are alive. def find_all_template(tuple) - @hash.fetch(tuple.size, []).find_all do |template| + @enum.find_all do |template| template.alive? && template.match(tuple) end end ## # Delete tuples which dead tuples from the TupleBag, returning the deleted # tuples. def delete_unless_alive deleted = [] - @hash.keys.each do |size| - ary = [] - @hash[size].each do |tuple| + @hash.each do |key, bin| + bin.delete_if do |tuple| if tuple.alive? - ary.push(tuple) + false else deleted.push(tuple) + true end end - @hash[size] = ary end deleted end + private + def each_entry(&blk) + @hash.each do |k, v| + v.each(&blk) + end + end + + def bin_key(tuple) + head = tuple[0] + if head.class == Symbol + return head + else + false + end + end + + def bin_for_find(template) + key = bin_key(template) + key ? @hash.fetch(key, []) : @enum + end end ## # The Tuplespace manages access to the tuples it contains, # ensuring mutual exclusion requirements are met. @@ -401,21 +447,21 @@ ## # Adds +tuple+ def write(tuple, sec=nil) - entry = TupleEntry.new(tuple, sec) - start_keeper + entry = create_entry(tuple, sec) synchronize do if entry.expired? @read_waiter.find_all_template(entry).each do |template| template.read(tuple) end notify_event('write', entry.value) notify_event('delete', entry.value) else @bag.push(entry) + start_keeper if entry.expires @read_waiter.find_all_template(entry).each do |template| template.read(tuple) end @take_waiter.find_all_template(entry).each do |template| template.signal @@ -437,11 +483,10 @@ # Moves +tuple+ to +port+. def move(port, tuple, sec=nil) template = WaitTemplateEntry.new(self, tuple, sec) yield(template) if block_given? - start_keeper synchronize do entry = @bag.find(template) if entry port.push(entry.value) if port @bag.delete(entry) @@ -450,10 +495,11 @@ end raise RequestExpiredError if template.expired? begin @take_waiter.push(template) + start_keeper if template.expires while true raise RequestCanceledError if template.canceled? raise RequestExpiredError if template.expired? entry = @bag.find(template) if entry @@ -474,18 +520,18 @@ # Reads +tuple+, but does not remove it. def read(tuple, sec=nil) template = WaitTemplateEntry.new(self, tuple, sec) yield(template) if block_given? - start_keeper synchronize do entry = @bag.find(template) return entry.value if entry raise RequestExpiredError if template.expired? begin @read_waiter.push(template) + start_keeper if template.expires template.wait raise RequestCanceledError if template.canceled? raise RequestExpiredError if template.expired? return template.found ensure @@ -527,10 +573,14 @@ template end private + def create_entry(tuple, sec) + TupleEntry.new(tuple, sec) + end + ## # Removes dead tuples. def keep_clean synchronize do @@ -564,12 +614,15 @@ # Creates a thread that scans the tuplespace for expired tuples. def start_keeper return if @keeper && @keeper.alive? @keeper = Thread.new do - while need_keeper? - keep_clean + while true sleep(@period) + synchronize do + break unless need_keeper? + keep_clean + end end end end ##