Sha256: b82bedd53821e33208f58722332b6468e4a5379ec49b6caf72e8e321f528dc1f

Contents?: true

Size: 1.88 KB

Versions: 1

Compression:

Stored size: 1.88 KB

Contents

require "uri"

module Embulk
  module Parser

    class QueryString < ParserPlugin
      Plugin.register_parser("query_string", self)

      def self.transaction(config, &control)
        decoder_task = config.load_config(Java::LineDecoder::DecoderTask)

        task = {
          decoder: DataSource.from_java(decoder_task.dump),
          strip_quote: config.param("strip_quote", :bool, default: true),
          strip_whitespace: config.param("strip_whitespace", :bool, default: true),
        }

        columns = []
        schema = config.param(:schema, :array, default: [])
        schema.each do |column|
          name = column["name"]
          type = column["type"].to_sym

          columns << Column.new(nil, name, type)
        end

        yield(task, columns)
      end

      def init
        @options = {
          strip_quote: task[:strip_quote],
          strip_whitespace: task[:strip_whitespace],
        }

        @decoder = task.param(:decoder, :hash).load_task(Java::LineDecoder::DecoderTask)
      end

      def run(file_input)
        decoder = Java::LineDecoder.new(file_input.instance_variable_get(:@java_file_input), @decoder)

        while decoder.nextFile
          while line = decoder.poll
            process_line(line)
          end
        end

        page_builder.finish
      end

      def self.parse(line, options = {})
        line.chomp!
        line.strip! if options[:strip_whitespace]
        if options[:strip_quote]
          line = line[/\A(?:["'])?(.*?)(?:["'])?\z/, 1]
        end

        begin
          Hash[URI.decode_www_form(line)]
        rescue ArgumentError
          nil
        end
      end

      private

      def process_line(line)
        record = self.class.parse(line, @options)

        return unless record

        records = schema.map do |column|
          record[column.name]
        end
        page_builder.add(records)
      end
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
embulk-parser-query_string-0.0.2 lib/embulk/parser/query_string.rb