Sha256: b82bedd53821e33208f58722332b6468e4a5379ec49b6caf72e8e321f528dc1f
Contents?: true
Size: 1.88 KB
Versions: 1
Compression:
Stored size: 1.88 KB
Contents
require "uri" module Embulk module Parser class QueryString < ParserPlugin Plugin.register_parser("query_string", self) def self.transaction(config, &control) decoder_task = config.load_config(Java::LineDecoder::DecoderTask) task = { decoder: DataSource.from_java(decoder_task.dump), strip_quote: config.param("strip_quote", :bool, default: true), strip_whitespace: config.param("strip_whitespace", :bool, default: true), } columns = [] schema = config.param(:schema, :array, default: []) schema.each do |column| name = column["name"] type = column["type"].to_sym columns << Column.new(nil, name, type) end yield(task, columns) end def init @options = { strip_quote: task[:strip_quote], strip_whitespace: task[:strip_whitespace], } @decoder = task.param(:decoder, :hash).load_task(Java::LineDecoder::DecoderTask) end def run(file_input) decoder = Java::LineDecoder.new(file_input.instance_variable_get(:@java_file_input), @decoder) while decoder.nextFile while line = decoder.poll process_line(line) end end page_builder.finish end def self.parse(line, options = {}) line.chomp! line.strip! if options[:strip_whitespace] if options[:strip_quote] line = line[/\A(?:["'])?(.*?)(?:["'])?\z/, 1] end begin Hash[URI.decode_www_form(line)] rescue ArgumentError nil end end private def process_line(line) record = self.class.parse(line, @options) return unless record records = schema.map do |column| record[column.name] end page_builder.add(records) end end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
embulk-parser-query_string-0.0.2 | lib/embulk/parser/query_string.rb |