lib/agent/channel.rb in agent-0.1.0 vs lib/agent/channel.rb in agent-0.9.0
- old
+ new
@@ -1,74 +1,121 @@
-# Channels combine communication—the exchange of a value—with synchronization—guaranteeing
-# that two calculations (goroutines) are in a known state.
-# - http://golang.org/doc/effective_go.html#channels
+require "agent/uuid"
+require "agent/push"
+require "agent/pop"
+require "agent/queues"
+require "agent/errors"
module Agent
+ def self.channel!(*args)
+ Channel.new(*args)
+ end
+
class Channel
- attr_reader :name, :transport, :chan
+ attr_reader :name, :direction, :type, :max
- def initialize(opts = {})
- @state = :active
- @name = opts[:name]
- @max = opts[:size] || 1
- @type = opts[:type]
- @direction = opts[:direction] || :bidirectional
- @transport = opts[:transport] || Agent::Transport::Queue
+ def initialize(*args)
+ opts = args.last.is_a?(Hash) ? args.pop : {}
+ @type = args.shift
+ @max = args.shift || 0
+ @closed = false
+ @name = opts[:name] || UUID.generate
+ @direction = opts[:direction] || :bidirectional
+ @close_mutex = Mutex.new
+ @queue = Queues.register(@name, @type, @max)
+ end
- raise NoName if @name.nil?
- raise Untyped if @type.nil?
-
- @chan = @transport.new(@name, @max)
+ def queue
+ q = @queue
+ raise Errors::ChannelClosed unless q
+ q
end
+
+ # Serialization methods
+
def marshal_load(ary)
- @state, @name, @type, @direction, @transport = *ary
- @chan = @transport.new(@name)
+ @closed, @name, @max, @type, @direction = *ary
+ @queue = Queues[@name]
+ @closed = @queue.nil? || @queue.closed?
self
end
def marshal_dump
- [@state, @name, @type, @direction, @transport]
+ [@closed, @name, @max, @type, @direction]
end
- def send(msg)
- check_direction(:send)
- check_type(msg)
- @chan.send(Marshal.dump(msg))
+ # Sending methods
+
+ def send(object, options={})
+ check_direction(:send)
+ queue.push(object, options)
end
alias :push :send
alias :<< :send
- def receive
- check_direction(:receive)
+ def push?; queue.push?; end
+ alias :send? :push?
- msg = Marshal.load(@chan.receive)
- check_type(msg)
- msg
+ # Receiving methods
+
+ def receive(options={})
+ check_direction(:receive)
+ queue.pop(options)
end
alias :pop :receive
- def closed?; @state == :closed; end
+ def pop?; queue.pop?; end
+ alias :receive? :pop?
+
+
+ # Closing methods
+
def close
- @chan.close
- @state = :closed
+ @close_mutex.synchronize do
+ raise Errors::ChannelClosed if @closed
+ @closed = true
+ @queue.close
+ @queue = nil
+ Queues.delete(@name)
+ end
end
+ def closed?; @closed; end
+ def open?; !@closed; end
- private
+ def remove_operations(operations)
+ # ugly, but it overcomes the race condition without synchronization
+ # since instance variable access is atomic.
+ q = @queue
+ q.remove_operations(operations) if q
+ end
- def check_type(msg)
- raise InvalidType if !msg.is_a? @type
- end
+ def as_send_only
+ as_direction_only(:send)
+ end
- def check_direction(direction)
- return if @direction == :bidirectional
- raise InvalidDirection if @direction != direction
+ def as_receive_only
+ as_direction_only(:receive)
+ end
+
+
+ private
+
+ def as_direction_only(direction)
+ @close_mutex.synchronize do
+ raise Errors::ChannelClosed if @closed
+ channel!(@type, @max, :name => @name, :direction => direction)
end
+ end
- class InvalidDirection < Exception; end
- class NoName < Exception; end
- class Untyped < Exception; end
- class InvalidType < Exception; end
+ def check_type(object)
+ raise Errors::InvalidType unless object.is_a?(@type)
+ end
+
+ def check_direction(direction)
+ return if @direction == :bidirectional
+ raise Errors::InvalidDirection if @direction != direction
+ end
+
end
end