Sha256: d7f83826813b7cf3e86f34ad9791c0a323dcacf13dfaea967dbc79dde3b5af27

Contents?: true

Size: 1.5 KB

Versions: 2

Compression:

Stored size: 1.5 KB

Contents

require 'arc-furnace/sink'
require 'msgpack'

module ArcFurnace
  class AllFieldsCSVSink < Sink
    private_attr_reader :csv, :fields, :tmp_file, :packer, :fields, :field_mappings

    def initialize(filename: , encoding: 'UTF-8')
      @tmp_file = Tempfile.new('intermediate_results', encoding: 'binary')
      @packer = MessagePack::Packer.new(tmp_file)
      @csv = CSV.open(filename, 'wb', encoding: encoding, headers: true)
      @fields = {}
    end

    def prepare(fields = nil)

    end

    def finalize
      packer.flush
      tmp_file.rewind

      write_header_row!

      unpacker = MessagePack::Unpacker.new(tmp_file)
      unpacker.each do |hash|
        write_row(hash)
      end

      csv.close
    end

    def row(hash)
      update_field_counts(hash)
      packer.write(hash)
    end

    private

    def write_header_row!
      header_row = []
      fields.each do |key, count|
        count.times { header_row << key }
      end
      csv << header_row
    end

    def write_row(hash)
      row = []
      fields.each do |key, count|
        values = Array.wrap(hash[key])
        (values.slice(0, count) || []).each do |value|
          row << value
        end
        (count - values.length).times { row << nil }
      end
      csv << row
    end

    def update_field_counts(hash)
      hash.each do |key, values|
        value_count = Array.wrap(values).size
        existing_value_count = fields[key] || 0
        fields[key] = value_count if value_count > existing_value_count
      end
    end
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
arc-furnace-0.1.3 lib/arc-furnace/all_fields_csv_sink.rb
arc-furnace-0.1.0 lib/arc-furnace/all_fields_csv_sink.rb