Sha256: 1976a589c3ce91e1d2da7c0dd58f036d04151355666527c785d6bc859422a879

Contents?: true

Size: 1.43 KB

Versions: 1

Compression:

Stored size: 1.43 KB

Contents

require 'csv'

module Embulk
  module Filter
    class ExpandCsvArray < FilterPlugin
      Plugin.register_filter("expand_csv_array", self)

      def self.transaction(config, in_schema, &control)
        # configuration code:
        task = {
          "csv_column_name" => config.param("csv_column_name", :string),
          "column_names" => config.param("column_names", :array, default: [])
        }

        task["target"] = in_schema.find {|c| c.name == task['csv_column_name']}
        task["columns"] = in_schema.select {|c| task["column_names"].include?(c.name) }

        out_columns = []
        task["column_names"].each_with_index do |name, idx|
          out_columns << Column.new(idx, name, :string)
        end

        yield(task, out_columns)
      end

      def init
        # initialization code:
        @csv_column_name = task["csv_column_name"]
        @columns = task["columns"]
        @target = task["target"]
      end

      def close
      end

      def add(page)
        # filtering code:
        page.each do |record|
          record[@target["index"]].to_s.parse_csv.to_a.each do |val|
            data = @columns.map do |f|
              if f["index"] == @target["index"]
                val
              else
                record[f["index"]]
              end
            end
            page_builder.add(data)
          end
        end
      end

      def finish
        page_builder.finish
      end
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
embulk-filter-expand_csv_array-0.1.4 lib/embulk/filter/expand_csv_array.rb