Sha256: f596dc0f2d6b3619472b2abdca04bbee51a8995a531f3bda2aa0a396d21242b2

Contents?: true

Size: 1.41 KB

Versions: 2

Compression:

Stored size: 1.41 KB

Contents

require 'jsonpath'

module Embulk
  module Filter

    class ExpandJsonArray < FilterPlugin
      Plugin.register_filter("expand_json_array", self)

      def self.transaction(config, in_schema, &control)
        # configuration code:
        task = {
          "json_column_name" => config.param("json_column_name", :string),
          "root" => config.param("root", :string),
          "expanded_column_name" => config.param("expanded_column_name", :string)
        }

        task['parse_target_column'] = in_schema.find{|c| c.name == task['json_column_name']}

        columns = [
          Column.new(nil, task["expanded_column_name"], :string),
        ]

        out_columns = in_schema + columns

        yield(task, out_columns)
      end

      def init
        # initialization code:
        @json_column_name = task["json_column_name"]
        @root = task["root"]
        @expanded_column_name = task["expanded_column_name"]

        @parse_target_column = task['parse_target_column']

        @json_path = JsonPath.new(@root)
      end

      def close
      end

      def add(page)
        # filtering code:
        page.each do |record|
          expanded_columns = @json_path.on(record[@parse_target_column['index']]).flatten

          expanded_columns.each do |ec|
            page_builder.add(record + [ec])
          end
        end
      end

      def finish
        page_builder.finish
      end
    end

  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
embulk-filter-expand_json_array-0.1.1 lib/embulk/filter/expand_json_array.rb
embulk-filter-expand_json_array-0.1.0 lib/embulk/filter/expand_json_array.rb