Sha256: 61a4fc4d574dfa8ca8dd3109aa7b3b48400a45260367c66dfc568b88a3cc6abe

Contents?: true

Size: 1.87 KB

Versions: 1

Compression:

Stored size: 1.87 KB

Contents

# Private: Wrapper around an IO object to read/write Marshaled objects.
class MarshalStream
  include Enumerable

  DEFAULT_MAX_OUTBOX = 10

  class StreamError < StandardError; end

  attr_reader :io
  attr_reader :max_outbox

  def initialize(io, max_outbox: DEFAULT_MAX_OUTBOX)
    @io = io
    @max_outbox = max_outbox
    @inbox = []
    @outbox = []
  end

  def close
    flush_outbox
    io.close
  end

  def closed?
    io.closed?
  end

  def each
    return to_enum unless block_given?

    read { |obj| yield obj } until eof
  end

  def eof?
    inbox.empty? && io.eof?
  end

  alias_method :eof, :eof?

  def flush_buffer
    self
  end

  def flush_outbox
    outbox.each { |obj| write_to_stream(obj.is_a? Proc ? obj.call : obj) }
    outbox.clear

    self
  end

  def read
    if block_given?
      read_from_inbox { |obj| yield obj }
      read_from_stream { |obj| yield obj }

      nil
    else
      read_one
    end
  end

  def read_from_stream
    yield Marshal.load(io)
  rescue IOError, SystemCallError
    raise
  rescue => e
    raise StreamError, "Unreadble stream: #{e}"
  end

  def read_one
    return inbox.shift unless inbox.empty?

    result = nil

    read { |obj| result.nil? ? result = obj : (inbox << obj) } while result.nil?

    result
  end

  def to_io
    io
  end

  def write(*objects)
    write_to_buffer(*objects)
    flush_buffer
  end

  alias_method :<<, :write

  def write_to_buffer(*objects)
    flush_outbox
    objects.each { |object| write_to_stream object }

    self
  end

  def write_to_outbox(object = nil, &block)
    outbox << (block || object)

    flush_outbox if outbox.size > max_outbox

    self
  end

  def write_to_stream(object)
    Marshal.dump(object, io)

    self
  end

  private

  attr_reader :inbox
  attr_reader :outbox

  def read_from_inbox
    return if inbox.empty?

    inbox.each { |obj| yield obj }
    inbox.clear
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
aggro-0.0.4 lib/aggro/marshal_stream.rb