lib/agent/channel.rb in agent-0.1.0 vs lib/agent/channel.rb in agent-0.9.0

- old
+ new

@@ -1,74 +1,121 @@ -# Channels combine communication—the exchange of a value—with synchronization—guaranteeing -# that two calculations (goroutines) are in a known state. -# - http://golang.org/doc/effective_go.html#channels +require "agent/uuid" +require "agent/push" +require "agent/pop" +require "agent/queues" +require "agent/errors" module Agent + def self.channel!(*args) + Channel.new(*args) + end + class Channel - attr_reader :name, :transport, :chan + attr_reader :name, :direction, :type, :max - def initialize(opts = {}) - @state = :active - @name = opts[:name] - @max = opts[:size] || 1 - @type = opts[:type] - @direction = opts[:direction] || :bidirectional - @transport = opts[:transport] || Agent::Transport::Queue + def initialize(*args) + opts = args.last.is_a?(Hash) ? args.pop : {} + @type = args.shift + @max = args.shift || 0 + @closed = false + @name = opts[:name] || UUID.generate + @direction = opts[:direction] || :bidirectional + @close_mutex = Mutex.new + @queue = Queues.register(@name, @type, @max) + end - raise NoName if @name.nil? - raise Untyped if @type.nil? - - @chan = @transport.new(@name, @max) + def queue + q = @queue + raise Errors::ChannelClosed unless q + q end + + # Serialization methods + def marshal_load(ary) - @state, @name, @type, @direction, @transport = *ary - @chan = @transport.new(@name) + @closed, @name, @max, @type, @direction = *ary + @queue = Queues[@name] + @closed = @queue.nil? || @queue.closed? self end def marshal_dump - [@state, @name, @type, @direction, @transport] + [@closed, @name, @max, @type, @direction] end - def send(msg) - check_direction(:send) - check_type(msg) - @chan.send(Marshal.dump(msg)) + # Sending methods + + def send(object, options={}) + check_direction(:send) + queue.push(object, options) end alias :push :send alias :<< :send - def receive - check_direction(:receive) + def push?; queue.push?; end + alias :send? :push? - msg = Marshal.load(@chan.receive) - check_type(msg) - msg + # Receiving methods + + def receive(options={}) + check_direction(:receive) + queue.pop(options) end alias :pop :receive - def closed?; @state == :closed; end + def pop?; queue.pop?; end + alias :receive? :pop? + + + # Closing methods + def close - @chan.close - @state = :closed + @close_mutex.synchronize do + raise Errors::ChannelClosed if @closed + @closed = true + @queue.close + @queue = nil + Queues.delete(@name) + end end + def closed?; @closed; end + def open?; !@closed; end - private + def remove_operations(operations) + # ugly, but it overcomes the race condition without synchronization + # since instance variable access is atomic. + q = @queue + q.remove_operations(operations) if q + end - def check_type(msg) - raise InvalidType if !msg.is_a? @type - end + def as_send_only + as_direction_only(:send) + end - def check_direction(direction) - return if @direction == :bidirectional - raise InvalidDirection if @direction != direction + def as_receive_only + as_direction_only(:receive) + end + + + private + + def as_direction_only(direction) + @close_mutex.synchronize do + raise Errors::ChannelClosed if @closed + channel!(@type, @max, :name => @name, :direction => direction) end + end - class InvalidDirection < Exception; end - class NoName < Exception; end - class Untyped < Exception; end - class InvalidType < Exception; end + def check_type(object) + raise Errors::InvalidType unless object.is_a?(@type) + end + + def check_direction(direction) + return if @direction == :bidirectional + raise Errors::InvalidDirection if @direction != direction + end + end end