lib/embulk/parser/mysqldump_tab.rb in embulk-parser-mysqldump_tab-0.1.0 vs lib/embulk/parser/mysqldump_tab.rb in embulk-parser-mysqldump_tab-0.1.1

- old
+ new

@@ -11,44 +11,33 @@ Plugin.register_parser("mysqldump_tab", self) def self.transaction(config, &control) # configuration code: - parser_task = config.load_config(Java::LineDecoder::DecoderTask) task = { "decoder_task" => DataSource.from_java(parser_task.dump) - # "option1" => config.param("option1", :integer), # integer, required - # "option2" => config.param("option2", :string, default: "myvalue"), # string, optional - # "option3" => config.param("option3", :string, default: nil), # string, optional } - # https://github.com/treasure-data/embulk-input-jira/blob/master/lib/embulk/input/jira.rb#L22 + # see https://github.com/treasure-data/embulk-input-jira/blob/master/lib/embulk/input/jira.rb#L22 attributes = {} columns = config.param(:columns, :array).map do |column| name = column["name"] type = column["type"].to_sym attributes[name] = type Column.new(nil, name, type, column["format"]) end task[:attributes] = attributes + task[:columns] = columns - # parser option - # task[:option1] = config['option1'] - # task[:option1] = config.param(:option1, :integer, default: 5) - yield(task, columns) end def init # initialization code: - # @option1 = task["option1"] - # @option2 = task["option2"] - # @option3 = task["option3"] - @decoder_task = task.param("decoder_task", :hash).load_task(Java::LineDecoder::DecoderTask) end def run(file_input) decoder = Java::LineDecoder.new(file_input.instance_eval { @java_file_input }, @decoder_task) @@ -74,32 +63,80 @@ end page_builder.finish end + private def parse_line(line) # Escape "escaped TAB" temporarily line = line.gsub(/\\#{FIELDS_TERMINATED_BY}/, DUMMY_STRING) # Split with separator (TAB) cols = line.split(FIELDS_TERMINATED_BY) cols.map! { |item| item.gsub(/#{DUMMY_STRING}/, FIELDS_TERMINATED_BY) } - len = task[:attributes].length - cols = adjust_column(cols, len) - return cols + cols = make_record(cols) end def in_column?(line) /#{Regexp.escape(FIELDS_ESCAPED_BY)}$/.match(line) ? true : false # escaped new line end - # Adjust array length - def adjust_column(arr, len) - arr = arr.slice(0, len) # Truncate if more than len - arr.fill(0, len) { |i| arr[i] } # If it is less than len, fill it with nil + # see https://github.com/takumakanari/embulk-parser-json/blob/master/lib/embulk/parser/jsonpath.rb#L43 + def make_record(arr) + columns = @task[:columns] + record = columns.map.with_index do |col, i| + val = cast_value(arr[i], col) + end end - end + def cast_value(val, col) + type = col["type"] + fmt = col["format"] + case type + when "string" + val + when "long" + val.to_i + when "double" + val.to_f + when "json" + val + when "boolean" + if kind_of_boolean?(val) + val + elsif val.nil? || val.empty? + nil + elsif val.kind_of?(String) + ["yes", "true", "1"].include?(val.downcase) + elsif val.kind_of?(Numeric) + !val.zero? + else + !!val + end + when "timestamp" + if val.nil? || val.empty? + nil + else + begin + Time.strptime(val, fmt) + rescue ArgumentError => e + #raise DataParseError.new e + nil + end + end + else + raise "Unsupported type #{type}" + end + end + + def kind_of_boolean?(val) + val.kind_of?(TrueClass) || val.kind_of?(FalseClass) + end + + class DataParseError < StandardError + end + + end end end