lib/embulk/parser/mysqldump_tab.rb in embulk-parser-mysqldump_tab-0.1.0 vs lib/embulk/parser/mysqldump_tab.rb in embulk-parser-mysqldump_tab-0.1.1
- old
+ new
@@ -11,44 +11,33 @@
Plugin.register_parser("mysqldump_tab", self)
def self.transaction(config, &control)
# configuration code:
-
parser_task = config.load_config(Java::LineDecoder::DecoderTask)
task = {
"decoder_task" => DataSource.from_java(parser_task.dump)
- # "option1" => config.param("option1", :integer), # integer, required
- # "option2" => config.param("option2", :string, default: "myvalue"), # string, optional
- # "option3" => config.param("option3", :string, default: nil), # string, optional
}
- # https://github.com/treasure-data/embulk-input-jira/blob/master/lib/embulk/input/jira.rb#L22
+ # see https://github.com/treasure-data/embulk-input-jira/blob/master/lib/embulk/input/jira.rb#L22
attributes = {}
columns = config.param(:columns, :array).map do |column|
name = column["name"]
type = column["type"].to_sym
attributes[name] = type
Column.new(nil, name, type, column["format"])
end
task[:attributes] = attributes
+ task[:columns] = columns
- # parser option
- # task[:option1] = config['option1']
- # task[:option1] = config.param(:option1, :integer, default: 5)
-
yield(task, columns)
end
def init
# initialization code:
- # @option1 = task["option1"]
- # @option2 = task["option2"]
- # @option3 = task["option3"]
-
@decoder_task = task.param("decoder_task", :hash).load_task(Java::LineDecoder::DecoderTask)
end
def run(file_input)
decoder = Java::LineDecoder.new(file_input.instance_eval { @java_file_input }, @decoder_task)
@@ -74,32 +63,80 @@
end
page_builder.finish
end
+ private
def parse_line(line)
# Escape "escaped TAB" temporarily
line = line.gsub(/\\#{FIELDS_TERMINATED_BY}/, DUMMY_STRING)
# Split with separator (TAB)
cols = line.split(FIELDS_TERMINATED_BY)
cols.map! { |item| item.gsub(/#{DUMMY_STRING}/, FIELDS_TERMINATED_BY) }
- len = task[:attributes].length
- cols = adjust_column(cols, len)
- return cols
+ cols = make_record(cols)
end
def in_column?(line)
/#{Regexp.escape(FIELDS_ESCAPED_BY)}$/.match(line) ? true : false # escaped new line
end
- # Adjust array length
- def adjust_column(arr, len)
- arr = arr.slice(0, len) # Truncate if more than len
- arr.fill(0, len) { |i| arr[i] } # If it is less than len, fill it with nil
+ # see https://github.com/takumakanari/embulk-parser-json/blob/master/lib/embulk/parser/jsonpath.rb#L43
+ def make_record(arr)
+ columns = @task[:columns]
+ record = columns.map.with_index do |col, i|
+ val = cast_value(arr[i], col)
+ end
end
- end
+ def cast_value(val, col)
+ type = col["type"]
+ fmt = col["format"]
+ case type
+ when "string"
+ val
+ when "long"
+ val.to_i
+ when "double"
+ val.to_f
+ when "json"
+ val
+ when "boolean"
+ if kind_of_boolean?(val)
+ val
+ elsif val.nil? || val.empty?
+ nil
+ elsif val.kind_of?(String)
+ ["yes", "true", "1"].include?(val.downcase)
+ elsif val.kind_of?(Numeric)
+ !val.zero?
+ else
+ !!val
+ end
+ when "timestamp"
+ if val.nil? || val.empty?
+ nil
+ else
+ begin
+ Time.strptime(val, fmt)
+ rescue ArgumentError => e
+ #raise DataParseError.new e
+ nil
+ end
+ end
+ else
+ raise "Unsupported type #{type}"
+ end
+ end
+
+ def kind_of_boolean?(val)
+ val.kind_of?(TrueClass) || val.kind_of?(FalseClass)
+ end
+
+ class DataParseError < StandardError
+ end
+
+ end
end
end