Sha256: 3f4ee297ec5bdd0f0055fd05b3fdedf4cb43026053117e970690966fc9311961

Contents?: true

Size: 2 KB

Versions: 804

Compression:

Stored size: 2 KB

Contents

module TSV

  def pthrough(num_threads = 10, new_key_field = nil, new_fields = nil, uniq = false, zipped = false, &block)
    q = RbbtThreadQueue.new num_threads

    q.init(true, &block)

    begin
      res = through(new_key_field, new_fields, uniq, zipped) do |*p|
        q.process p
      end
      q.join
    ensure
      q.clean
    end

  end

  def ppthrough_callback(&block)
    @ppthrough_callback = block
  end

  def ppthrough(num_procs = 7, new_key_field = nil, new_fields = nil, uniq = false, zipped = false, &block)

    q = RbbtProcessQueue.new num_procs

    q.callback &@ppthrough_callback
    @ppthrough_callback = nil

    q.init do |k,v|
      block.call k,v
    end

    begin
      res = through(new_key_field, new_fields, uniq, zipped) do |*p|
        q.process q
      end
      q.join
    ensure
      q.clean
    end

    res
  end

  def ppthrough(num_procs = 7, new_key_field = nil, new_fields = nil, uniq = false, zipped = false, &block)

    q = RbbtProcessQueue.new num_procs

    q.callback &@ppthrough_callback
    @ppthrough_callback = nil

    _pat_size = 20
    _pat = "A" << _pat_size.to_s

    num_fields = fields.length
    pattern = case type
              when :single, :flat
                _pat * 2
              when :list, :double
                _pat * (num_fields + 1)
              end

    q.init do |str|
      _parts = str.unpack(pattern)

      case type
      when :single
        k, v = _parts
      when :list
        k, *v = _parts
      when :flat
        k, v = _parts
        v = v.split "|"
      when :double
        k, *v = _parts
        v = v.collect{|l| l.split "|" }
      end

      block.call k,v
    end

    begin
      res = through(new_key_field, new_fields, uniq, zipped) do |k,v|
        case type
        when :flat
          v = v * "|" 
        when :double
          v = v.collect{|l| l * "|" } if type == :double
        end

        str = [k,v].flatten.pack(pattern)
        q.process str
      end
      q.join
    ensure
      q.clean
    end

    res
  end
end

Version data entries

804 entries across 804 versions & 1 rubygems

Version Path
rbbt-util-5.37.6 lib/rbbt/tsv/parallel/through.rb
rbbt-util-5.37.4 lib/rbbt/tsv/parallel/through.rb
rbbt-util-5.37.3 lib/rbbt/tsv/parallel/through.rb
rbbt-util-5.37.1 lib/rbbt/tsv/parallel/through.rb
rbbt-util-5.37.0 lib/rbbt/tsv/parallel/through.rb
rbbt-util-5.36.0 lib/rbbt/tsv/parallel/through.rb
rbbt-util-5.35.4 lib/rbbt/tsv/parallel/through.rb
rbbt-util-5.35.3 lib/rbbt/tsv/parallel/through.rb
rbbt-util-5.35.2 lib/rbbt/tsv/parallel/through.rb
rbbt-util-5.35.1 lib/rbbt/tsv/parallel/through.rb
rbbt-util-5.34.27 lib/rbbt/tsv/parallel/through.rb
rbbt-util-5.34.26 lib/rbbt/tsv/parallel/through.rb
rbbt-util-5.34.25 lib/rbbt/tsv/parallel/through.rb
rbbt-util-5.34.24 lib/rbbt/tsv/parallel/through.rb
rbbt-util-5.34.23 lib/rbbt/tsv/parallel/through.rb
rbbt-util-5.34.22 lib/rbbt/tsv/parallel/through.rb
rbbt-util-5.34.21 lib/rbbt/tsv/parallel/through.rb
rbbt-util-5.34.20 lib/rbbt/tsv/parallel/through.rb
rbbt-util-5.34.18 lib/rbbt/tsv/parallel/through.rb
rbbt-util-5.34.17 lib/rbbt/tsv/parallel/through.rb