Sha256: 79f87cf571f43159dbc9b19d26505d1b8fe24344eb3b9d4c4c4f5e12cc98411c

Contents?: true

Size: 1.7 KB

Versions: 1

Compression:

Stored size: 1.7 KB

Contents

require 'json'

module Embulk
  module Parser

    class JsonlParserPlugin < ParserPlugin
      Plugin.register_parser("jsonl", self)

      def self.transaction(config, &control)
        parser_task = config.load_config(Java::LineDecoder::DecoderTask)
        task = {
          "decoder_task" => DataSource.from_java(parser_task.dump),
          "schema" => config.param("schema", :array)
        }
        columns = task["schema"].each_with_index.map do |c, i|
          Column.new(i, c["name"], c["type"].to_sym)
        end
        yield(task, columns)
      end

      def init
        @decoder_task = task.param("decoder_task", :hash).load_task(Java::LineDecoder::DecoderTask)
      end

      def run(file_input)
        decoder = Java::LineDecoder.new(file_input.instance_eval { @java_file_input }, @decoder_task)
        schema = @task["schema"]

        while decoder.nextFile
          while line = decoder.poll
            begin
              hash = JSON.parse(line)
              @page_builder.add(make_record(schema, hash))
            rescue
              # TODO: logging
            end
          end
        end
        page_builder.finish
      end

      private

      def make_record(schema, e)
        schema.map do |c|
          val = e[c["name"]]
          v = val.nil? ? "" : val
          case c["type"]
          when "string"
            v
          when "long"
            v.to_i
          when "double"
            v.to_f
          when "boolean"
            ["yes", "true", "1"].include?(v.downcase)
          when "timestamp"
            v.empty? ? nil : Time.strptime(v, c["time_format"])
          else
            raise "Unsupported type #{c['type']}"
          end
        end
      end
    end

  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
embulk-parser-jsonl-0.0.1 lib/embulk/parser/jsonl.rb