lib/rinda/tuplespace.rb in rubysl-rinda-1.0.0 vs lib/rinda/tuplespace.rb in rubysl-rinda-2.0.0
- old
+ new
@@ -1,9 +1,11 @@
require 'monitor'
require 'thread'
require 'drb/drb'
require 'rinda/rinda'
+require 'enumerator'
+require 'forwardable'
module Rinda
##
# A TupleEntry is a Tuple (i.e. a possible entry in some Tuplespace)
@@ -69,18 +71,18 @@
return true unless @expires
return @expires < Time.now
end
##
- # Reset the expiry time according to +sec_or_renewer+.
+ # Reset the expiry time according to +sec_or_renewer+.
#
# +nil+:: it is set to expire in the far future.
# +false+:: it has expired.
# Numeric:: it will expire in that many seconds.
#
# Otherwise the argument refers to some kind of renewer object
- # which will reset its expiry time.
+ # which will reset its expiry time.
def renew(sec_or_renewer)
sec, @renewer = get_renewer(sec_or_renewer)
@expires = make_expires(sec)
end
@@ -164,11 +166,11 @@
# details on how a Template matches a Tuple.
def match(tuple)
@tuple.match(tuple)
end
-
+
alias === match
def make_tuple(ary) # :nodoc:
Rinda::Template.new(ary)
end
@@ -220,15 +222,15 @@
#
# == Example
#
# ts = Rinda::TupleSpace.new
# observer = ts.notify 'write', [nil]
- #
+ #
# Thread.start do
# observer.each { |t| p t }
# end
- #
+ #
# 3.times { |i| ts.write [i] }
#
# Outputs:
#
# ['write', [0]]
@@ -272,11 +274,11 @@
def each # :yields: event, tuple
while !@done
it = pop
yield(it)
end
- rescue
+ rescue
ensure
cancel
end
end
@@ -284,92 +286,136 @@
##
# TupleBag is an unordered collection of tuples. It is the basis
# of Tuplespace.
class TupleBag
+ class TupleBin
+ extend Forwardable
+ def_delegators '@bin', :find_all, :delete_if, :each, :empty?
+ def initialize
+ @bin = []
+ end
+
+ def add(tuple)
+ @bin.push(tuple)
+ end
+
+ def delete(tuple)
+ idx = @bin.rindex(tuple)
+ @bin.delete_at(idx) if idx
+ end
+
+ def find
+ @bin.reverse_each do |x|
+ return x if yield(x)
+ end
+ nil
+ end
+ end
+
def initialize # :nodoc:
@hash = {}
+ @enum = enum_for(:each_entry)
end
##
# +true+ if the TupleBag to see if it has any expired entries.
def has_expires?
- @hash.each do |k, v|
- v.each do |tuple|
- return true if tuple.expires
- end
+ @enum.find do |tuple|
+ tuple.expires
end
- false
end
##
- # Add +ary+ to the TupleBag.
+ # Add +tuple+ to the TupleBag.
- def push(ary)
- size = ary.size
- @hash[size] ||= []
- @hash[size].push(ary)
+ def push(tuple)
+ key = bin_key(tuple)
+ @hash[key] ||= TupleBin.new
+ @hash[key].add(tuple)
end
##
- # Removes +ary+ from the TupleBag.
+ # Removes +tuple+ from the TupleBag.
- def delete(ary)
- size = ary.size
- @hash.fetch(size, []).delete(ary)
+ def delete(tuple)
+ key = bin_key(tuple)
+ bin = @hash[key]
+ return nil unless bin
+ bin.delete(tuple)
+ @hash.delete(key) if bin.empty?
+ tuple
end
##
# Finds all live tuples that match +template+.
-
def find_all(template)
- @hash.fetch(template.size, []).find_all do |tuple|
+ bin_for_find(template).find_all do |tuple|
tuple.alive? && template.match(tuple)
end
end
##
# Finds a live tuple that matches +template+.
def find(template)
- @hash.fetch(template.size, []).find do |tuple|
+ bin_for_find(template).find do |tuple|
tuple.alive? && template.match(tuple)
end
end
##
# Finds all tuples in the TupleBag which when treated as templates, match
# +tuple+ and are alive.
def find_all_template(tuple)
- @hash.fetch(tuple.size, []).find_all do |template|
+ @enum.find_all do |template|
template.alive? && template.match(tuple)
end
end
##
# Delete tuples which dead tuples from the TupleBag, returning the deleted
# tuples.
def delete_unless_alive
deleted = []
- @hash.keys.each do |size|
- ary = []
- @hash[size].each do |tuple|
+ @hash.each do |key, bin|
+ bin.delete_if do |tuple|
if tuple.alive?
- ary.push(tuple)
+ false
else
deleted.push(tuple)
+ true
end
end
- @hash[size] = ary
end
deleted
end
+ private
+ def each_entry(&blk)
+ @hash.each do |k, v|
+ v.each(&blk)
+ end
+ end
+
+ def bin_key(tuple)
+ head = tuple[0]
+ if head.class == Symbol
+ return head
+ else
+ false
+ end
+ end
+
+ def bin_for_find(template)
+ key = bin_key(template)
+ key ? @hash.fetch(key, []) : @enum
+ end
end
##
# The Tuplespace manages access to the tuples it contains,
# ensuring mutual exclusion requirements are met.
@@ -401,21 +447,21 @@
##
# Adds +tuple+
def write(tuple, sec=nil)
- entry = TupleEntry.new(tuple, sec)
- start_keeper
+ entry = create_entry(tuple, sec)
synchronize do
if entry.expired?
@read_waiter.find_all_template(entry).each do |template|
template.read(tuple)
end
notify_event('write', entry.value)
notify_event('delete', entry.value)
else
@bag.push(entry)
+ start_keeper if entry.expires
@read_waiter.find_all_template(entry).each do |template|
template.read(tuple)
end
@take_waiter.find_all_template(entry).each do |template|
template.signal
@@ -437,11 +483,10 @@
# Moves +tuple+ to +port+.
def move(port, tuple, sec=nil)
template = WaitTemplateEntry.new(self, tuple, sec)
yield(template) if block_given?
- start_keeper
synchronize do
entry = @bag.find(template)
if entry
port.push(entry.value) if port
@bag.delete(entry)
@@ -450,10 +495,11 @@
end
raise RequestExpiredError if template.expired?
begin
@take_waiter.push(template)
+ start_keeper if template.expires
while true
raise RequestCanceledError if template.canceled?
raise RequestExpiredError if template.expired?
entry = @bag.find(template)
if entry
@@ -474,18 +520,18 @@
# Reads +tuple+, but does not remove it.
def read(tuple, sec=nil)
template = WaitTemplateEntry.new(self, tuple, sec)
yield(template) if block_given?
- start_keeper
synchronize do
entry = @bag.find(template)
return entry.value if entry
raise RequestExpiredError if template.expired?
begin
@read_waiter.push(template)
+ start_keeper if template.expires
template.wait
raise RequestCanceledError if template.canceled?
raise RequestExpiredError if template.expired?
return template.found
ensure
@@ -527,10 +573,14 @@
template
end
private
+ def create_entry(tuple, sec)
+ TupleEntry.new(tuple, sec)
+ end
+
##
# Removes dead tuples.
def keep_clean
synchronize do
@@ -564,12 +614,15 @@
# Creates a thread that scans the tuplespace for expired tuples.
def start_keeper
return if @keeper && @keeper.alive?
@keeper = Thread.new do
- while need_keeper?
- keep_clean
+ while true
sleep(@period)
+ synchronize do
+ break unless need_keeper?
+ keep_clean
+ end
end
end
end
##