lib/agent/channel.rb in agent-0.9.1 vs lib/agent/channel.rb in agent-0.10.0
- old
+ new
@@ -8,30 +8,27 @@
def self.channel!(*args)
Channel.new(*args)
end
class Channel
- attr_reader :name, :direction, :type, :max
+ ::Agent::Push::SKIP_MARSHAL_TYPES << ::Agent::Channel
+ attr_reader :name, :direction, :type, :max, :queue
+
def initialize(*args)
- opts = args.last.is_a?(Hash) ? args.pop : {}
- @type = args.shift
- @max = args.shift || 0
- @closed = false
- @name = opts[:name] || UUID.generate
- @direction = opts[:direction] || :bidirectional
- @close_mutex = Mutex.new
- @queue = Queues.register(@name, @type, @max)
+ opts = args.last.is_a?(Hash) ? args.pop : {}
+ @type = args.shift
+ @max = args.shift || 0
+ @closed = false
+ @name = opts[:name] || UUID.generate
+ @direction = opts[:direction] || :bidirectional
+ @skip_marshal = opts[:skip_marshal]
+ @close_mutex = Mutex.new
+ @queue = Queues.register(@name, @type, @max)
end
- def queue
- q = @queue
- raise Errors::ChannelClosed unless q
- q
- end
-
# Serialization methods
def marshal_load(ary)
@closed, @name, @max, @type, @direction = *ary
@queue = Queues[@name]
@@ -46,11 +43,13 @@
# Sending methods
def send(object, options={})
check_direction(:send)
- queue.push(object, options)
+ q = queue
+ raise Errors::ChannelClosed unless q
+ q.push(object, {:skip_marshal => @skip_marshal}.merge(options))
end
alias :push :send
alias :<< :send
def push?; queue.push?; end
@@ -59,10 +58,12 @@
# Receiving methods
def receive(options={})
check_direction(:receive)
- queue.pop(options)
+ q = queue
+ return [nil, false] unless q
+ q.pop(options)
end
alias :pop :receive
def pop?; queue.pop?; end
alias :receive? :pop?