Sha256: 3f4ee297ec5bdd0f0055fd05b3fdedf4cb43026053117e970690966fc9311961

Contents?: true

Size: 2 KB

Versions: 804

Compression:

Stored size: 2 KB

Contents

module TSV

  def pthrough(num_threads = 10, new_key_field = nil, new_fields = nil, uniq = false, zipped = false, &block)
    q = RbbtThreadQueue.new num_threads

    q.init(true, &block)

    begin
      res = through(new_key_field, new_fields, uniq, zipped) do |*p|
        q.process p
      end
      q.join
    ensure
      q.clean
    end

  end

  def ppthrough_callback(&block)
    @ppthrough_callback = block
  end

  def ppthrough(num_procs = 7, new_key_field = nil, new_fields = nil, uniq = false, zipped = false, &block)

    q = RbbtProcessQueue.new num_procs

    q.callback &@ppthrough_callback
    @ppthrough_callback = nil

    q.init do |k,v|
      block.call k,v
    end

    begin
      res = through(new_key_field, new_fields, uniq, zipped) do |*p|
        q.process q
      end
      q.join
    ensure
      q.clean
    end

    res
  end

  def ppthrough(num_procs = 7, new_key_field = nil, new_fields = nil, uniq = false, zipped = false, &block)

    q = RbbtProcessQueue.new num_procs

    q.callback &@ppthrough_callback
    @ppthrough_callback = nil

    _pat_size = 20
    _pat = "A" << _pat_size.to_s

    num_fields = fields.length
    pattern = case type
              when :single, :flat
                _pat * 2
              when :list, :double
                _pat * (num_fields + 1)
              end

    q.init do |str|
      _parts = str.unpack(pattern)

      case type
      when :single
        k, v = _parts
      when :list
        k, *v = _parts
      when :flat
        k, v = _parts
        v = v.split "|"
      when :double
        k, *v = _parts
        v = v.collect{|l| l.split "|" }
      end

      block.call k,v
    end

    begin
      res = through(new_key_field, new_fields, uniq, zipped) do |k,v|
        case type
        when :flat
          v = v * "|" 
        when :double
          v = v.collect{|l| l * "|" } if type == :double
        end

        str = [k,v].flatten.pack(pattern)
        q.process str
      end
      q.join
    ensure
      q.clean
    end

    res
  end
end

Version data entries

804 entries across 804 versions & 1 rubygems

Version Path
rbbt-util-5.43.0 lib/rbbt/tsv/parallel/through.rb
rbbt-util-5.42.0 lib/rbbt/tsv/parallel/through.rb
rbbt-util-5.41.1 lib/rbbt/tsv/parallel/through.rb
rbbt-util-5.41.0 lib/rbbt/tsv/parallel/through.rb
rbbt-util-5.40.5 lib/rbbt/tsv/parallel/through.rb
rbbt-util-5.40.4 lib/rbbt/tsv/parallel/through.rb
rbbt-util-5.40.3 lib/rbbt/tsv/parallel/through.rb
rbbt-util-5.40.0 lib/rbbt/tsv/parallel/through.rb
rbbt-util-5.39.0 lib/rbbt/tsv/parallel/through.rb
rbbt-util-5.38.1 lib/rbbt/tsv/parallel/through.rb
rbbt-util-5.38.0 lib/rbbt/tsv/parallel/through.rb
rbbt-util-5.37.16 lib/rbbt/tsv/parallel/through.rb
rbbt-util-5.37.15 lib/rbbt/tsv/parallel/through.rb
rbbt-util-5.37.14 lib/rbbt/tsv/parallel/through.rb
rbbt-util-5.37.13 lib/rbbt/tsv/parallel/through.rb
rbbt-util-5.37.12 lib/rbbt/tsv/parallel/through.rb
rbbt-util-5.37.11 lib/rbbt/tsv/parallel/through.rb
rbbt-util-5.37.10 lib/rbbt/tsv/parallel/through.rb
rbbt-util-5.37.9 lib/rbbt/tsv/parallel/through.rb
rbbt-util-5.37.8 lib/rbbt/tsv/parallel/through.rb