Sha256: 79f87cf571f43159dbc9b19d26505d1b8fe24344eb3b9d4c4c4f5e12cc98411c
Contents?: true
Size: 1.7 KB
Versions: 1
Compression:
Stored size: 1.7 KB
Contents
require 'json' module Embulk module Parser class JsonlParserPlugin < ParserPlugin Plugin.register_parser("jsonl", self) def self.transaction(config, &control) parser_task = config.load_config(Java::LineDecoder::DecoderTask) task = { "decoder_task" => DataSource.from_java(parser_task.dump), "schema" => config.param("schema", :array) } columns = task["schema"].each_with_index.map do |c, i| Column.new(i, c["name"], c["type"].to_sym) end yield(task, columns) end def init @decoder_task = task.param("decoder_task", :hash).load_task(Java::LineDecoder::DecoderTask) end def run(file_input) decoder = Java::LineDecoder.new(file_input.instance_eval { @java_file_input }, @decoder_task) schema = @task["schema"] while decoder.nextFile while line = decoder.poll begin hash = JSON.parse(line) @page_builder.add(make_record(schema, hash)) rescue # TODO: logging end end end page_builder.finish end private def make_record(schema, e) schema.map do |c| val = e[c["name"]] v = val.nil? ? "" : val case c["type"] when "string" v when "long" v.to_i when "double" v.to_f when "boolean" ["yes", "true", "1"].include?(v.downcase) when "timestamp" v.empty? ? nil : Time.strptime(v, c["time_format"]) else raise "Unsupported type #{c['type']}" end end end end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
embulk-parser-jsonl-0.0.1 | lib/embulk/parser/jsonl.rb |