lib/agent/channel.rb in agent-0.9.1 vs lib/agent/channel.rb in agent-0.10.0

- old
+ new

@@ -8,30 +8,27 @@ def self.channel!(*args) Channel.new(*args) end class Channel - attr_reader :name, :direction, :type, :max + ::Agent::Push::SKIP_MARSHAL_TYPES << ::Agent::Channel + attr_reader :name, :direction, :type, :max, :queue + def initialize(*args) - opts = args.last.is_a?(Hash) ? args.pop : {} - @type = args.shift - @max = args.shift || 0 - @closed = false - @name = opts[:name] || UUID.generate - @direction = opts[:direction] || :bidirectional - @close_mutex = Mutex.new - @queue = Queues.register(@name, @type, @max) + opts = args.last.is_a?(Hash) ? args.pop : {} + @type = args.shift + @max = args.shift || 0 + @closed = false + @name = opts[:name] || UUID.generate + @direction = opts[:direction] || :bidirectional + @skip_marshal = opts[:skip_marshal] + @close_mutex = Mutex.new + @queue = Queues.register(@name, @type, @max) end - def queue - q = @queue - raise Errors::ChannelClosed unless q - q - end - # Serialization methods def marshal_load(ary) @closed, @name, @max, @type, @direction = *ary @queue = Queues[@name] @@ -46,11 +43,13 @@ # Sending methods def send(object, options={}) check_direction(:send) - queue.push(object, options) + q = queue + raise Errors::ChannelClosed unless q + q.push(object, {:skip_marshal => @skip_marshal}.merge(options)) end alias :push :send alias :<< :send def push?; queue.push?; end @@ -59,10 +58,12 @@ # Receiving methods def receive(options={}) check_direction(:receive) - queue.pop(options) + q = queue + return [nil, false] unless q + q.pop(options) end alias :pop :receive def pop?; queue.pop?; end alias :receive? :pop?