require 'flydata/source/parse_dump_and_send' require 'flydata/source_oracle/oracle_component' require 'flydata-core/table_def/oracle_table_def' module Flydata module SourceOracle class ParseDumpAndSend < Source::ParseDumpAndSend include OracleComponent def value_converters FlydataCore::TableDef::OracleTableDef::VALUE_CONVERTERS end # dump format # dump file is in msgpack. Each table data starts with a source table hash # followed by row arrays. # # {"table_name"=>"users", "columns"=>{"id"=>{"column_name"=>"id", "format_type"=>"bigint"}, "name"=>{"column_name"=>"name", "format_type"=>"character varying"}, "another_id"=>{"column_name"=>"another_id", "format_type"=>"integer"}}} # ["2", "hay", "1"] # ["3", "hoe", "2"] def parse_dump(dump_pos_info, dmpio, create_table_block, insert_record_block, check_point_block) parser = DumpParser.new(dump_pos_info, dmpio, create_table_block, insert_record_block, check_point_block) parser.parse_all end end class DumpParser MAX_ROW_BYTES = 1 * 1024 * 1024 # Parser holds rows until the total byte size # reaches this number def initialize(dump_pos_info, dmpio, create_table_block, insert_record_block, check_point_block) @source_pos = dump_pos_info[:source_pos] raise ArgumentError.new("source position is required") unless @source_pos @current_table = nil @last_pos = 0 @row_head_pos = nil @rows = [] @dmpio = dmpio @create_table_block = create_table_block @insert_record_block = insert_record_block @check_point_block = check_point_block resume(dump_pos_info) end def parse_all u = MessagePack::Unpacker.new(@dmpio) u.each do |obj| @last_pos = @dmpio.pos - u.buffer.size parse(obj) end close end def parse(obj) obj.kind_of?(Hash) ? handle_table_info(obj) : handle_data_row(obj) end def close unless @rows.empty? call_insert_record_block # Core logic expects a check point callback with CREATE_TABLE at the # end of table data insertion of each table. #handle_table_info takes # care of all tables but the last table. This is for the last table. call_check_point_block(Parser::State::CREATE_TABLE) end end private def resume(dump_pos_info) last_pos = dump_pos_info[:last_pos] ? dump_pos_info[:last_pos].to_i : -1 if last_pos == -1 # no resume point return end @last_pos = last_pos @dmpio.pos = last_pos @current_table = dump_pos_info[:source_table] end # {"table_name"=>"users", "columns"=>{"id"=>{"column_name"=>"id", "format_type"=>"bigint"}, "name"=>{"column_name"=>"name", "format_type"=>"character varying"}, "another_id"=>{"column_name"=>"another_id", "format_type"=>"integer"}}} def handle_table_info(table_info) call_insert_record_block # flush rows if any call_check_point_block(Parser::State::CREATE_TABLE) columns = table_info["columns"].inject({}) do |h, (k, v)| h[k] = %w(column_name format_type).inject({}) do |hh, kk| hh[kk.to_sym] = v[kk] hh end h end @current_table = Parser::SourceTable.new(table_info["table_name"], columns) @create_table_block.call(@current_table) call_check_point_block(Parser::State::INSERT_RECORD) end def handle_data_row(row) @row_head_pos = @last_pos unless @row_head_pos @rows << row if @last_pos - @row_head_pos > MAX_ROW_BYTES call_insert_record_block end end def call_check_point_block(state) @check_point_block.call(@current_table, @last_pos, @last_pos, @source_pos, state) end def call_insert_record_block return if @rows.empty? if @insert_record_block.call(@current_table, @rows) call_check_point_block(Parser::State::INSERT_RECORD) end @rows = [] @row_head_pos = nil end end end end