# coding: utf-8
require 'spec_helper'
require 'socket'

module Flydata
  module Command
    describe Sync do
      let(:default_mysqldump_dir) do
        File.join('/tmp', "sync_dump_#{Time.now.to_i}")
      end
      let(:default_data_entry) do
        {"id"=>93,
        "name"=>"flydata_sync_mysql",
        "data_port_id"=>52,
        "display_name"=>"flydata_sync_mysql",
        "enabled"=>true,
        "heroku_log_type"=>nil,
        "heroku_resource_id"=>nil,
        "log_deletion"=>nil,
        "log_file_delimiter"=>nil,
        "log_file_type"=>nil,
        "log_path"=>nil,
        "redshift_schema_name"=>"",
        "redshift_table_name"=>nil,
        "created_at"=>"2014-01-22T18:58:43Z",
        "updated_at"=>"2014-01-30T02:42:26Z",
        "type"=>"RedshiftMysqlDataEntry",
        "tag_name"=>"flydata.a458c641_dp52.flydata_mysql",
        "tag_name_dev"=>"flydata.a458c641_dp52.flydata_mysql.dev",
        "data_port_key"=>"a458c641",
        "mysql_data_entry_preference" =>
          { "host"=>"localhost", "port"=>3306, "username"=>"masashi",
            "password"=>"welcome", "database"=>"sync_test", "tables"=>"table1, table2",
            "mysqldump_dir"=>default_mysqldump_dir, "forwarder" => "tcpforwarder",
            "data_servers"=>"localhost:9905" }
         }
      end

      after :each do
        if Dir.exists?(default_mysqldump_dir)
          Dir.delete(default_mysqldump_dir) rescue nil
        end
        if File.exists?(default_mysqldump_dir)
          File.delete(default_mysqldump_dir) rescue nil
        end
      end

      describe '#do_generate_table_ddl' do
        subject { Sync.new }
        context 'with full options' do
          it 'issues mysqldump command with expected parameters' do
            expect(IO).to receive(:popen).with(
             'mysqldump -d -h localhost -P 3306 -u masashi -pwelcome sync_test table1  table2', 'r').and_call_original
            subject.send(:do_generate_table_ddl, default_data_entry)
          end
        end
        context 'without_host' do
          before do
            default_data_entry['mysql_data_entry_preference'].delete('host')
          end
          it "throws an error" do
            expect {
              subject.send(:do_generate_table_ddl, default_data_entry)
            }.to raise_error
          end
        end
        context 'without_port' do
          before do
            default_data_entry['mysql_data_entry_preference'].delete('port')
          end
          it "uses the default port" do
            expect(IO).to receive(:popen).with(
             'mysqldump -d -h localhost -P 3306 -u masashi -pwelcome sync_test table1  table2', 'r').and_call_original
            subject.send(:do_generate_table_ddl, default_data_entry)
          end
        end
        context 'with_port_override' do
          before do
            default_data_entry['mysql_data_entry_preference']['port'] = 1234
          end
          it "uses the specified port" do
            expect(IO).to receive(:popen).with(
             'mysqldump -d -h localhost -P 1234 -u masashi -pwelcome sync_test table1  table2', 'r').and_call_original
            subject.send(:do_generate_table_ddl, default_data_entry)
          end
        end
        context 'without_username' do
          before do
            default_data_entry['mysql_data_entry_preference'].delete('username')
          end
          it "throws an error" do
            expect {
              subject.send(:do_generate_table_ddl, default_data_entry)
            }.to raise_error
          end
        end
        context 'without_password' do
          before do
            default_data_entry['mysql_data_entry_preference'].delete('password')
          end
          it "call mysqldump without -p option" do
            expect(IO).to receive(:popen).with(
             'mysqldump -d -h localhost -P 3306 -u masashi  sync_test table1  table2', 'r').and_call_original
            subject.send(:do_generate_table_ddl, default_data_entry)
          end
        end
        context 'without_database' do
          before do
            default_data_entry['mysql_data_entry_preference'].delete('database')
          end
          it "throws an error" do
            expect {
              subject.send(:do_generate_table_ddl, default_data_entry)
            }.to raise_error
          end
        end
        context 'without_tables' do
          before do
            default_data_entry['mysql_data_entry_preference'].delete('tables')
          end
          it "throws an error" do
            expect {
              subject.send(:do_generate_table_ddl, default_data_entry)
            }.to raise_error
          end
        end
      end
    end
  end

  module FileUtil
    describe SyncFileManager do
      let(:default_mysqldump_dir) do
        File.join('/tmp', "sync_dump_#{Time.now.to_i}")
      end
      let(:default_data_entry) do
        {"id"=>93,
        "name"=>"flydata_sync_mysql",
        "data_port_id"=>52,
        "display_name"=>"flydata_sync_mysql",
        "enabled"=>true,
        "heroku_log_type"=>nil,
        "heroku_resource_id"=>nil,
        "log_deletion"=>nil,
        "log_file_delimiter"=>nil,
        "log_file_type"=>nil,
        "log_path"=>nil,
        "redshift_schema_name"=>"",
        "redshift_table_name"=>nil,
        "created_at"=>"2014-01-22T18:58:43Z",
        "updated_at"=>"2014-01-30T02:42:26Z",
        "type"=>"RedshiftMysqlDataEntry",
        "tag_name"=>"flydata.a458c641_dp52.flydata_mysql",
        "tag_name_dev"=>"flydata.a458c641_dp52.flydata_mysql.dev",
        "data_port_key"=>"a458c641",
        "mysql_data_entry_preference" =>
          { "host"=>"localhost", "port"=>3306, "username"=>"masashi",
            "password"=>"welcome", "database"=>"sync_test", "tables"=>nil,
            "mysqldump_dir"=>default_mysqldump_dir, "forwarder" => "tcpforwarder",
            "data_servers"=>"localhost:9905" }
         }
      end
      let(:default_sfm) { SyncFileManager.new(default_data_entry) }

      let (:status) { 'PARSING' }
      let (:table_name) { 'test_table' }
      let (:last_pos) { 9999 }
      let (:binlog_pos) { {binfile: 'mysqlbin.00001', pos: 111}  }
      let (:state) { 'CREATE_TABLE' }
      let (:substate) { nil }

      after :each do
        if Dir.exists?(default_mysqldump_dir)
          Dir.delete(default_mysqldump_dir) rescue nil
        end
        if File.exists?(default_mysqldump_dir)
          File.delete(default_mysqldump_dir) rescue nil
        end
      end

      describe '#dump_file_path' do
        context 'when mysqldump_dir param is nil' do
          before do
            default_data_entry['mysql_data_entry_preference'].delete('mysqldump_dir')
          end
          it do
            expect(default_sfm.dump_file_path).to eq(
              File.join(ENV['HOME'], '.flydata', 'dump', 'flydata_sync_mysql.dump'))
          end
        end
        context 'when file exists' do
          before { `touch #{default_mysqldump_dir}`}
          it do
            expect{default_sfm.dump_file_path}.to raise_error
          end
        end
        context 'when directory exists' do
          before { `mkdir -p #{default_mysqldump_dir}`}
          it do
            expect(FileUtils).to receive(:mkdir_p).with(default_mysqldump_dir).never
            expect(default_sfm.dump_file_path).to eq(
              File.join(default_mysqldump_dir, 'flydata_sync_mysql.dump'))
          end
        end
        context 'when directory or file does not exist' do
          it do
            expect(FileUtils).to receive(:mkdir_p).with(default_mysqldump_dir).once
            expect(default_sfm.dump_file_path).to eq(
              File.join(default_mysqldump_dir, 'flydata_sync_mysql.dump'))
          end
        end
        context 'when file name includes "~"' do
          let(:default_mysqldump_dir) { "~/tmp/dump/sync_spec_#{Time.now.to_i}" }
          it do
            expected_dir = File.join(ENV['HOME'], default_mysqldump_dir[1..-1])
            expect(FileUtils).to receive(:mkdir_p).with(expected_dir).once
            expect(default_sfm.dump_file_path).to eq(
              File.join(expected_dir, 'flydata_sync_mysql.dump'))
          end
        end
      end

      describe '#dump_pos_path' do
        it { expect(default_sfm.dump_pos_path).to eq(
          File.join(default_mysqldump_dir, 'flydata_sync_mysql.dump.pos')) }
      end

      describe '#save_dump_pos' do
        context 'without mysql marshal data' do
          it do
            expect{default_sfm.save_dump_pos(
              status, table_name, last_pos, binlog_pos, state, substate)}.not_to raise_error
              expect(default_sfm.load_dump_pos).to eq({
                status: status, table_name: table_name, last_pos: last_pos,
                binlog_pos: binlog_pos, state: state, substate: substate,
                mysql_table: nil
              })
          end
        end
      end

      describe '#load_dump_pos' do
        let (:mysql_table) do
          Flydata::Mysql::MysqlTable.new(
            table_name, { 'id' => { format_type: 'int' }, 'value' => { format_type: 'text' } }
          )
        end

        context 'with mysql marshal data' do
          before do
            default_sfm.save_mysql_table_marshal_dump(mysql_table)
            default_sfm.save_dump_pos(status, table_name, last_pos, binlog_pos, state, substate)
          end
          it do
            ret = default_sfm.load_dump_pos
            mt = ret.delete(:mysql_table)
            expect(ret).to eq({
              status: status, table_name: table_name, last_pos: last_pos,
              binlog_pos: binlog_pos, state: state, substate: substate,
            })
            expect(mt.table_name).to eq(table_name)
            expect(mt.columns).to eq({
              'id' => { format_type: 'int' },
              'value' => { format_type: 'text' },
            })
          end
        end
      end

      describe '#save_binlog' do
        let(:binfile) { 'mysqlbinlog.000001' }
        let(:pos) { 107 }
        let(:binlog_pos) { {binfile: binfile, pos: pos} }
        it do
          default_sfm.save_binlog(binlog_pos)
          expect(`cat #{default_sfm.binlog_path}`).to eq("#{binfile}\t#{pos}")
        end
      end

      describe '#binlog_path' do
        it { expect(default_sfm.binlog_path).to eq("#{FLYDATA_HOME}/flydata_sync_mysql.binlog.pos") }
      end
    end
  end

  module Output
    describe ForwarderFactory do
      let(:forwarder) do
        ForwarderFactory.create('tcpforwarder', 'test_tag', ['localhost:3456', 'localhost:4567'])
      end
      before :each do
        conn = double(TCPSocket)
        allow(conn).to receive(:setsockopt)
        allow(conn).to receive(:write)
        allow(conn).to receive(:close)
        allow(TCPSocket).to receive(:new).and_return(conn)
        allow(StringIO).to receive(:open)
      end

      describe '.create' do
        context 'with nil forwarder_key' do
          it 'should return TcpForwarder object' do
            forwarder = ForwarderFactory.create(nil, 'test_tag', ['localhost:3456', 'localhost:4567'])
            expect(forwarder.kind_of?(TcpForwarder)).to be_true
          end
        end

        context 'with tcpforwarder forwarder_key' do
          it 'should return TcpForwarder object' do
            forwarder = ForwarderFactory.create('tcpforwarder', 'test_tag', ['localhost:3456', 'localhost:4567'])
            expect(forwarder.kind_of?(TcpForwarder)).to be_true
          end
        end

        context 'with sslforwarder forwarder_key' do
          it 'should return SslForwarder object' do
            forwarder = ForwarderFactory.create('sslforwarder', 'test_tag', ['localhost:3456', 'localhost:4567'])
            expect(forwarder.kind_of?(SslForwarder)).to be_true
          end
        end
      end

      describe '#initialize' do
        context 'servers is empty' do
          it do
            expect{ForwarderFactory.create('tcpforwarder', 'test_tag', [])}.to raise_error
          end
        end
        context 'servers is nil' do
          it do
            expect{ForwarderFactory.create('tcpforwarder', 'test_tag', nil)}.to raise_error
          end
        end
      end

      describe '#emit' do
        let(:record) { {table_name: 'test_table_name', log: '{"key":"value"}'} }
        before :each do
          forwarder.set_options({buffer_size_limit: ([Time.now.to_i,record].to_msgpack.bytesize * 2.5)})
        end
        context 'when the buffer size is less than threthold' do
          it do
            expect(forwarder.emit(record)).to be(false)
            expect(forwarder.buffer_record_count).to be(1)
          end
        end
        context 'when the buffer size exceeds threthold' do
          it do
            expect(forwarder.emit(record)).to be(false)
            expect(forwarder.emit(record)).to be(false)
            expect(forwarder.buffer_record_count).to be(2)
            expect(forwarder.emit(record)).to be(true)
            expect(forwarder.buffer_record_count).to be(0)
          end
        end
      end

      describe '#pickup_server' do
        context 'with only one server' do
          let(:servers) { ['localhost:1111'] }
          let(:forwarder) {
            ForwarderFactory.create('tcpforwarder', 'test_tag', servers)
          }
          it 'expect to return same server' do
            expect(forwarder.pickup_server).to eq('localhost:1111')
            expect(forwarder.pickup_server).to eq('localhost:1111')
            expect(forwarder.pickup_server).to eq('localhost:1111')
          end
        end
        context 'with servers' do
          let(:servers) { ['localhost:1111', 'localhost:2222', 'localhost:3333'] }
          let(:forwarder) {
            ForwarderFactory.create('tcpforwarder', 'test_tag', servers)
          }
          it 'expect to return same server' do
            expect(forwarder.pickup_server).to eq('localhost:1111')
            expect(forwarder.pickup_server).to eq('localhost:2222')
            expect(forwarder.pickup_server).to eq('localhost:3333')
            expect(forwarder.pickup_server).to eq('localhost:1111')
          end
        end
      end
    end
  end

  module Mysql
    describe MysqlDumpGenerator do
      let(:stdout) { double(:stdout) }
      let(:stderr) { double(:stderr) }
      let(:status) { double(:status) }
      let(:file_path) { File.join('/tmp', "flydata_sync_spec_mysqldump_#{Time.now.to_i}") }
      let(:default_conf) { {
        'host' => 'localhost', 'port' => 3306, 'username' => 'admin',
        'password' => 'pass', 'database' => 'dev', 'tables' => 'users,groups',
      } }
      let(:default_dump_generator) { MysqlDumpGenerator.new(default_conf) }

      describe '#initialize' do
        context 'with password' do
          subject { default_dump_generator.instance_variable_get(:@dump_cmd) }
          it { should eq('mysqldump -h localhost -P 3306 -uadmin -ppass --skip-lock-tables ' +
               '--single-transaction --flush-logs --hex-blob --master-data=2 dev users groups') }
        end
        context 'without password' do
          let (:dump_generator) do
            MysqlDumpGenerator.new(default_conf.merge({'password' => ''}))
          end
          subject { dump_generator.instance_variable_get(:@dump_cmd) }
          it { should eq('mysqldump -h localhost -P 3306 -uadmin  --skip-lock-tables ' +
               '--single-transaction --flush-logs --hex-blob --master-data=2 dev users groups') }
        end
      end

      describe '#dump' do
        context 'when exit status is not 0' do
          before do
            `touch #{file_path}`
            expect(status).to receive(:exitstatus).and_return 1
            expect(Open3).to receive(:capture3).and_return(
              ['(dummy std out)', '(dummy std err)', status]
            )
          end
          it do
            expect{ default_dump_generator.dump(file_path) }.to raise_error
            expect(File.exists?(file_path)).to be_false
          end
        end
        context 'when exit status is 0 but no file' do
          before do
            expect(status).to receive(:exitstatus).and_return 0
            expect(Open3).to receive(:capture3).and_return(
              ['(dummy std out)', '(dummy std err)', status]
            )
          end
          it do
            expect{ default_dump_generator.dump(file_path) }.to raise_error
            expect(File.exists?(file_path)).to be_false
          end
        end
        context 'when exit status is 0 but file size is 0' do
          before do
            `touch #{file_path}`
            expect(status).to receive(:exitstatus).and_return 0
            expect(Open3).to receive(:capture3).and_return(
              ['(dummy std out)', '(dummy std err)', status]
            )
          end
          it do
            expect{ default_dump_generator.dump(file_path) }.to raise_error
            expect(File.exists?(file_path)).to be_true
          end
        end
        context 'when exit status is 0' do
          before do
            `echo "something..." > #{file_path}`
            expect(status).to receive(:exitstatus).and_return 0
            expect(Open3).to receive(:capture3).and_return(
              ['(dummy std out)', '(dummy std err)', status]
            )
          end
          it do
            expect(default_dump_generator.dump(file_path)).to be_true
            expect(File.exists?(file_path)).to be_true
          end
        end
        after :each do
          File.delete(file_path) if File.exists?(file_path)
        end
      end
    end

    describe MysqlDumpParser do
      let(:file_path) { File.join('/tmp', "flydata_sync_spec_mysqldump_parse_#{Time.now.to_i}") }
      let(:default_parser) { MysqlDumpParser.new(file_path) }

      def generate_dump_file(content)
        File.open(file_path, 'w') {|f| f.write(content)}
      end

      after do
        File.delete(file_path) if File.exists?(file_path)
      end

      describe '#initialize' do
        context 'when file does not exist' do
          it do
            expect{ MysqlDumpParser.new(file_path) }.to raise_error
          end
        end
        context 'when file exists' do
          before { generate_dump_file('') }
          it do
            expect(MysqlDumpParser.new(file_path)).to be_an_instance_of(MysqlDumpParser)
          end
        end
      end

      describe '#parse' do
        DUMP_HEADER = <<EOT
-- MySQL dump 10.13  Distrib 5.6.13, for osx10.9 (x86_64)
--
-- Host: localhost    Database: sync_test
-- ------------------------------------------------------
-- Server version>--5.6.13-log

/*!40101 SET @OLD_CHARACTER_SET_CLIENT=@@CHARACTER_SET_CLIENT */;
/*!40101 SET @OLD_CHARACTER_SET_RESULTS=@@CHARACTER_SET_RESULTS */;
/*!40101 SET @OLD_COLLATION_CONNECTION=@@COLLATION_CONNECTION */;
/*!40101 SET NAMES utf8 */;
/*!40103 SET @OLD_TIME_ZONE=@@TIME_ZONE */;
/*!40103 SET TIME_ZONE='+00:00' */;
/*!40014 SET @OLD_UNIQUE_CHECKS=@@UNIQUE_CHECKS, UNIQUE_CHECKS=0 */;
/*!40014 SET @OLD_FOREIGN_KEY_CHECKS=@@FOREIGN_KEY_CHECKS, FOREIGN_KEY_CHECKS=0 */;
/*!40101 SET @OLD_SQL_MODE=@@SQL_MODE, SQL_MODE='NO_AUTO_VALUE_ON_ZERO' */;
/*!40111 SET @OLD_SQL_NOTES=@@SQL_NOTES, SQL_NOTES=0 */;

--
-- Position to start replication or point-in-time recovery from
--

-- CHANGE MASTER TO MASTER_LOG_FILE='mysql-bin.000267', MASTER_LOG_POS=120;
EOT

        def index_after(content, string)
          content.index(string) + string.bytesize + 1
        end

        let(:default_parser) { MysqlDumpParser.new(file_path) }
        let(:default_binlog_pos) { {binfile: 'mysql-bin.000267', pos: 120 } }
        let(:dump_pos_after_binlog_pos) { index_after(DUMP_HEADER, 'MASTER_LOG_POS=120;') }

        let(:create_table_block) { double('create_table_block') }
        let(:insert_record_block) { double('insert_record_block') }
        let(:check_point_block) { double('check_point_block') }

        before do
          generate_dump_file('')
        end

        context 'when dump does not contain binlog pos' do
          before { generate_dump_file('dummy content') }
          it do
            expect(create_table_block).to receive(:call).never
            expect(insert_record_block).to receive(:call).never
            expect(check_point_block).to receive(:call).never
            binlog_pos = default_parser.parse(
              create_table_block, insert_record_block, check_point_block
            )
            expect(binlog_pos).to be_nil
          end
        end

        context 'when dump contains only binlog pos' do
          before { generate_dump_file(<<EOT
#{DUMP_HEADER}
EOT
) }
          it do
            expect(create_table_block).to receive(:call).never
            expect(insert_record_block).to receive(:call).never
            expect(check_point_block).to receive(:call).once
            binlog_pos = default_parser.parse(
              create_table_block, insert_record_block, check_point_block
            )
            expect(binlog_pos).to eq(default_binlog_pos)
          end
        end

        context 'when dump contains create table without records' do
          let(:dump_content) { <<EOT
#{DUMP_HEADER}

DROP TABLE IF EXISTS `users_login`;
/*!40101 SET @saved_cs_client     = @@character_set_client */;
/*!40101 SET character_set_client = utf8 */;
CREATE TABLE `users_login` (
  `user_id` int(10) unsigned NOT NULL DEFAULT '0' COMMENT 'users id',
  `login_count` int(10) unsigned DEFAULT '0' COMMENT 'login count',
  `create_time` datetime NOT NULL COMMENT 'create time',
  `update_time` datetime NOT NULL COMMENT 'update time',
  PRIMARY KEY (`user_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='';
/*!40101 SET character_set_client = @saved_cs_client */;

--
-- Dumping data for table `users_login`
--

LOCK TABLES `users_login` WRITE;
/*!40000 ALTER TABLE `users_login` DISABLE KEYS */;
/*!40000 ALTER TABLE `users_login` ENABLE KEYS */;
UNLOCK TABLES;

EOT
          }
          before { generate_dump_file(dump_content) }
          it do
            expect(create_table_block).to receive(:call) { |mysql_table|
                expect(mysql_table.table_name).to eq('users_login')
            }.once
            expect(insert_record_block).to receive(:call).never
            expect(check_point_block).to receive(:call) { |mysql_table, last_pos, binlog_pos, state, substate|
              expect(mysql_table).to be_nil
              expect(last_pos).to eq(dump_pos_after_binlog_pos)
              expect(binlog_pos).to eq(default_binlog_pos)
              expect(state).to eq(Flydata::Mysql::MysqlDumpParser::State::CREATE_TABLE)
              expect(substate).to be_nil
            }
            expect(check_point_block).to receive(:call) { |mysql_table, last_pos, binlog_pos, state, substate|
              expect(mysql_table.table_name).to eq('users_login')
              expect(mysql_table.columns.keys).to eq(['user_id', 'login_count', 'create_time', 'update_time'])
              expect(last_pos).to eq(index_after(dump_content, 'ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT=\'\';'))
              expect(binlog_pos).to eq(default_binlog_pos)
              expect(state).to eq(Flydata::Mysql::MysqlDumpParser::State::INSERT_RECORD)
              expect(substate).to be_nil
            }
            expect(check_point_block).to receive(:call) { |mysql_table, last_pos, binlog_pos, state, substate|
              expect(mysql_table.table_name).to eq('users_login')
              expect(last_pos).to eq(index_after(dump_content, 'UNLOCK TABLES;'))
              expect(binlog_pos).to eq(default_binlog_pos)
              expect(state).to eq(Flydata::Mysql::MysqlDumpParser::State::CREATE_TABLE)
              expect(substate).to be_nil
            }
            expect(check_point_block).to receive(:call).never

            binlog_pos = default_parser.parse(
              create_table_block, insert_record_block, check_point_block
            )
            expect(binlog_pos).to eq(default_binlog_pos)
          end
        end

        context 'when dump contains create table with multi inserts' do
          let(:dump_content) { <<EOT
#{DUMP_HEADER}

DROP TABLE IF EXISTS `users_login`;
/*!40101 SET @saved_cs_client     = @@character_set_client */;
/*!40101 SET character_set_client = utf8 */;
CREATE TABLE `users_login` (
  `user_id` int(10) unsigned NOT NULL DEFAULT '0' COMMENT 'users id',
  `login_count` int(10) unsigned DEFAULT '0' COMMENT 'login count',
  `comment` varchar(255) DEFAULT NULL COMMENT 'comment',
  `create_time` datetime NOT NULL COMMENT 'create time',
  `update_time` datetime NOT NULL COMMENT 'update time',
  PRIMARY KEY (`user_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
/*!40101 SET character_set_client = @saved_cs_client */;

--
-- Dumping data for table `users_login`
--

LOCK TABLES `users_login` WRITE;
/*!40000 ALTER TABLE `users_login` DISABLE KEYS */;
INSERT INTO `users_login` VALUES (15,46,'moodier','2001-10-02 08:18:08','1986-06-11 22:50:10'),(35,6,'missあいうえおteer','1991-10-15 19:38:07','1970-10-01 22:03:10'),(52,33,'sub\\\\field','1972-08-23 20:16:08','1974-10-10 23:28:11');
INSERT INTO `users_login` VALUES (373,31,'out\\'swearing','1979-10-07 08:10:08','2006-02-22 16:26:04'),(493,8,'schizophrenic','1979-07-06 07:34:07','1970-08-09 01:21:01');
/*!40000 ALTER TABLE `users_login` ENABLE KEYS */;
UNLOCK TABLES;
/*!40103 SET TIME_ZONE=@OLD_TIME_ZONE */;

EOT
          }
          before { generate_dump_file(dump_content) }
          it do
            # create_table_block
            expect(create_table_block).to receive(:call) { |mysql_table|
              expect(mysql_table.table_name).to eq('users_login')
              expect(mysql_table.columns.keys).to eq(['user_id', 'login_count', 'comment', 'create_time', 'update_time'])
            }.once
            expect(create_table_block).to receive(:call).never

            # insert_record_block
            [
              [ %w(15 46 moodier 2001-10-02\ 08:18:08 1986-06-11\ 22:50:10),
                %w(35 6 missあいうえおteer  1991-10-15\ 19:38:07 1970-10-01\ 22:03:10),
                %w(52 33 sub\\field 1972-08-23\ 20:16:08 1974-10-10\ 23:28:11),],
              [ %w(373 31 out'swearing 1979-10-07\ 08:10:08 2006-02-22\ 16:26:04),
                %w(493 8 schizophrenic 1979-07-06\ 07:34:07 1970-08-09\ 01:21:01),]
            ].each do |expected_values|
              expect(insert_record_block).to receive(:call) { |mysql_table, values_set|
                expect(mysql_table.table_name).to eq('users_login')
                expect(values_set).to eq(expected_values)
                nil
              }.once
            end
            expect(insert_record_block).to receive(:call).never

            # insert_record_block
            [
              {state: Flydata::Mysql::MysqlDumpParser::State::CREATE_TABLE},
              {state: Flydata::Mysql::MysqlDumpParser::State::INSERT_RECORD},
              {state: Flydata::Mysql::MysqlDumpParser::State::CREATE_TABLE,
               last_pos: index_after(dump_content, 'UNLOCK TABLES;') + 10}
            ].each do |expected_params|
              expect(check_point_block).to receive(:call) { |mysql_table, last_pos, binlog_pos, state, substate|
                expect(mysql_table.table_name).to eq('users_login') if mysql_table
                expect(state).to eq(expected_params[:state])
                if expected_params[:last_pos]
                  expect(last_pos).to eq(expected_params[:last_pos])
                end
                expect(binlog_pos).to eq(default_binlog_pos)
                expect(substate).to eq(expected_params[:substate])
                expected_params[:result]
              }.once
            end
            expect(check_point_block).to receive(:call).never

            binlog_pos = default_parser.parse(
              create_table_block, insert_record_block, check_point_block
            )
            expect(binlog_pos).to eq(default_binlog_pos)
          end
        end

        context 'when dump contains create table with records' do
          let(:dump_content) { <<EOT
#{DUMP_HEADER}

DROP TABLE IF EXISTS `users_login`;
/*!40101 SET @saved_cs_client     = @@character_set_client */;
/*!40101 SET character_set_client = utf8 */;
CREATE TABLE `users_login` (
  `user_id` int(10) unsigned NOT NULL DEFAULT '0' COMMENT 'users id',
  `login_count` int(10) unsigned DEFAULT '0' COMMENT 'login count',
  `comment` varchar(255) DEFAULT NULL COMMENT 'comment',
  `create_time` datetime NOT NULL COMMENT 'create time',
  `update_time` datetime NOT NULL COMMENT 'update time',
  PRIMARY KEY (`user_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
/*!40101 SET character_set_client = @saved_cs_client */;

--
-- Dumping data for table `users_login`
--

LOCK TABLES `users_login` WRITE;
/*!40000 ALTER TABLE `users_login` DISABLE KEYS */;
INSERT INTO `users_login` VALUES (15,46,'moodier','2001-10-02 08:18:08','1986-06-11 22:50:10'),(35,6,NULL,'1991-10-15 19:38:07','1970-10-01 22:03:10'),(52,33,'subfield','1972-08-23 20:16:08','1974-10-10 23:28:11');
/*!40000 ALTER TABLE `users_login` ENABLE KEYS */;
UNLOCK TABLES;
/*!40103 SET TIME_ZONE=@OLD_TIME_ZONE */;

EOT
          }
          before { generate_dump_file(dump_content) }
          it do
            # create_table_block
            expect(create_table_block).to receive(:call) { |mysql_table|
              expect(mysql_table.table_name).to eq('users_login')
              expect(mysql_table.columns.keys).to eq(['user_id', 'login_count', 'comment', 'create_time', 'update_time'])
            }.once
            expect(create_table_block).to receive(:call).never

            # insert_record_block
            [
              [ %w(15 46 moodier 2001-10-02\ 08:18:08 1986-06-11\ 22:50:10),
                ['35', '6', nil, '1991-10-15 19:38:07', '1970-10-01 22:03:10'],
                %w(52 33 subfield 1972-08-23\ 20:16:08 1974-10-10\ 23:28:11),],
            ].each do |expected_values|
              expect(insert_record_block).to receive(:call) { |mysql_table, values_set|
                expect(mysql_table.table_name).to eq('users_login')
                expect(values_set).to eq(expected_values)
                nil
              }.once
            end
            expect(insert_record_block).to receive(:call).never

            # insert_record_block
            [
              {state: Flydata::Mysql::MysqlDumpParser::State::CREATE_TABLE},
              {state: Flydata::Mysql::MysqlDumpParser::State::INSERT_RECORD},
              {state: Flydata::Mysql::MysqlDumpParser::State::CREATE_TABLE,
               last_pos: index_after(dump_content, 'UNLOCK TABLES;')}
            ].each do |expected_params|
              expect(check_point_block).to receive(:call) { |mysql_table, last_pos, binlog_pos, state, substate|
                expect(mysql_table.table_name).to eq('users_login') if mysql_table
                expect(state).to eq(expected_params[:state])
                if expected_params[:last_pos]
                  expect(last_pos).to eq(expected_params[:last_pos])
                end
                expect(binlog_pos).to eq(default_binlog_pos)
                expect(substate).to eq(expected_params[:substate])
                expected_params[:result]
              }.once
            end
            expect(check_point_block).to receive(:call).never

            binlog_pos = default_parser.parse(
              create_table_block, insert_record_block, check_point_block
            )
            expect(binlog_pos).to eq(default_binlog_pos)
          end
        end

        context 'with resume after creating table' do
          let(:dump_content_head) { <<EOT
#{DUMP_HEADER}

DROP TABLE IF EXISTS `users_login`;
/*!40101 SET @saved_cs_client     = @@character_set_client */;
/*!40101 SET character_set_client = utf8 */;
CREATE TABLE `users_login` (
  `user_id` int(10) unsigned NOT NULL DEFAULT '0' COMMENT 'users id',
  `login_count` int(10) unsigned DEFAULT '0' COMMENT 'login count',
  `comment` varchar(255) DEFAULT NULL COMMENT 'comment',
  `create_time` datetime NOT NULL COMMENT 'create time',
  `update_time` datetime NOT NULL COMMENT 'update time',
  PRIMARY KEY (`user_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
/*!40101 SET character_set_client = @saved_cs_client */;

EOT
          }
          let(:dump_content_all) { <<EOT
#{dump_content_head}
--
-- Dumping data for table `users_login`
--

LOCK TABLES `users_login` WRITE;
/*!40000 ALTER TABLE `users_login` DISABLE KEYS */;
INSERT INTO `users_login` VALUES (15,46,'moodier','2001-10-02 08:18:08','1986-06-11 22:50:10'),(35,6,'missteer','1991-10-15 19:38:07','1970-10-01 22:03:10'),(52,33,'subfield','1972-08-23 20:16:08','1974-10-10 23:28:11');
/*!40000 ALTER TABLE `users_login` ENABLE KEYS */;
UNLOCK TABLES;
/*!40103 SET TIME_ZONE=@OLD_TIME_ZONE */;
EOT
          }
          before do
            generate_dump_file(dump_content_head)
            default_parser.parse(
              Proc.new{},
              Proc.new{
                raise "Should not be called"
              },
              Proc.new{ |mysql_table, last_pos, binlog_pos, state, substate|
                if mysql_table
                  @mysql_table = mysql_table
                  @option = { status: Flydata::Command::Sync::STATUS_PARSING,
                              table_name: mysql_table.table_name,
                              last_pos: last_pos,
                              binlog_pos: binlog_pos,
                              state: state,
                              substate: substate,
                              mysql_table: mysql_table }
                end
              }
            )
            expect(@option).to eq({
              status: Flydata::Command::Sync::STATUS_PARSING,
              table_name: 'users_login',
              last_pos: index_after(dump_content_head, 'ENGINE=InnoDB DEFAULT CHARSET=utf8;'),
              binlog_pos: default_binlog_pos,
              state: Flydata::Mysql::MysqlDumpParser::State::INSERT_RECORD,
              substate: nil,
              mysql_table: @mysql_table
            })
          end
          it do
            generate_dump_file(dump_content_all)
            parser_for_resume = MysqlDumpParser.new(file_path, @option)

            # create_table_block
            expect(create_table_block).to receive(:call).never

            # insert_record_block
            [
              [ %w(15 46 moodier 2001-10-02\ 08:18:08 1986-06-11\ 22:50:10),
                %w(35 6 missteer  1991-10-15\ 19:38:07 1970-10-01\ 22:03:10),
                %w(52 33 subfield 1972-08-23\ 20:16:08 1974-10-10\ 23:28:11),],
            ].each do |expected_values|
              expect(insert_record_block).to receive(:call) { |mysql_table, values|
                expect(mysql_table.table_name).to eq('users_login')
                expect(values).to eq(expected_values)
                nil
              }.once
            end
            expect(insert_record_block).to receive(:call).never

            # check_point_block
            [
              {state: Flydata::Mysql::MysqlDumpParser::State::CREATE_TABLE,
               last_pos: index_after(dump_content_all, 'UNLOCK TABLES;')}
            ].each do |expected_params|
              expect(check_point_block).to receive(:call) { |mysql_table, last_pos, binlog_pos, state, substate|
                expect(mysql_table.table_name).to eq('users_login') if mysql_table
                expect(state).to eq(expected_params[:state])
                if expected_params[:last_pos]
                  expect(last_pos).to eq(expected_params[:last_pos])
                end
                expect(binlog_pos).to eq(default_binlog_pos)
                expect(substate).to eq(expected_params[:substate])
                expected_params[:result]
              }.once
            end
            expect(check_point_block).to receive(:call).never

            binlog_pos = parser_for_resume.parse(
              create_table_block, insert_record_block, check_point_block
            )
            expect(binlog_pos).to eq(default_binlog_pos)
          end
        end

        context 'with resume during insert records' do
          let(:dump_content) { <<EOT
#{DUMP_HEADER}

DROP TABLE IF EXISTS `users_login`;
/*!40101 SET @saved_cs_client     = @@character_set_client */;
/*!40101 SET character_set_client = utf8 */;
CREATE TABLE `users_login` (
  `user_id` int(10) unsigned NOT NULL DEFAULT '0' COMMENT 'users id',
  `login_count` int(10) unsigned DEFAULT '0' COMMENT 'login count',
  `comment` varchar(255) DEFAULT NULL COMMENT 'comment',
  `create_time` datetime NOT NULL COMMENT 'create time',
  `update_time` datetime NOT NULL COMMENT 'update time',
  PRIMARY KEY (`user_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
/*!40101 SET character_set_client = @saved_cs_client */;

--
-- Dumping data for table `users_login`
--

LOCK TABLES `users_login` WRITE;
/*!40000 ALTER TABLE `users_login` DISABLE KEYS */;
INSERT INTO `users_login` VALUES (15,46,'moodier','2001-10-02 08:18:08','1986-06-11 22:50:10'),(35,6,'missteer','1991-10-15 19:38:07','1970-10-01 22:03:10'),(52,33,'subfield','1972-08-23 20:16:08','1974-10-10 23:28:11');
INSERT INTO `users_login` VALUES (194,11,'pandemonium','2008-01-22 22:15:10','1991-04-04 17:30:05'),(230,7,'cloudburst','2010-12-28 11:46:11','1971-06-22 13:08:01');
/*!40000 ALTER TABLE `users_login` ENABLE KEYS */;
UNLOCK TABLES;
/*!40103 SET TIME_ZONE=@OLD_TIME_ZONE */;
EOT
          }
          before do
            generate_dump_file(dump_content)

            insert_record_block_for_resume = double('insert_record_block_for_resume')
            expect(insert_record_block_for_resume).to receive(:call) { true }.once
            expect(insert_record_block_for_resume).to receive(:call) { false }.once

            default_parser.parse(
              Proc.new{},
              insert_record_block_for_resume,
              Proc.new{ |mysql_table, last_pos, binlog_pos, state, substate|
                if last_pos == index_after(dump_content, "'1972-08-23 20:16:08','1974-10-10 23:28:11');")
                  @mysql_table = mysql_table
                  @option = { status: Flydata::Command::Sync::STATUS_PARSING,
                              table_name: mysql_table.table_name,
                              last_pos: last_pos,
                              binlog_pos: binlog_pos,
                              state: state,
                              substate: substate,
                              mysql_table: mysql_table }
                end
              }
            )
            expect(@option).to eq({
              status: Flydata::Command::Sync::STATUS_PARSING,
              table_name: 'users_login',
              last_pos: index_after(dump_content, "'1972-08-23 20:16:08','1974-10-10 23:28:11');"),
              binlog_pos: default_binlog_pos,
              state: Flydata::Mysql::MysqlDumpParser::State::INSERT_RECORD,
              substate: nil,
              mysql_table: @mysql_table
            })
          end
          it do
            generate_dump_file(dump_content)
            parser_for_resume = MysqlDumpParser.new(file_path, @option)

            # create_table_block
            expect(create_table_block).to receive(:call).never

            # insert_record_block
            [
              [ %w(194 11 pandemonium 2008-01-22\ 22:15:10 1991-04-04\ 17:30:05),
                %w(230 7 cloudburst 2010-12-28\ 11:46:11 1971-06-22\ 13:08:01),],
            ].each do |expected_values|
              expect(insert_record_block).to receive(:call) { |mysql_table, values_set|
                expect(mysql_table.table_name).to eq('users_login')
                expect(values_set).to eq(expected_values)
                nil
              }.once
            end
            expect(insert_record_block).to receive(:call).never

            # check_point_block
            [
              {state: Flydata::Mysql::MysqlDumpParser::State::CREATE_TABLE,
               last_pos: index_after(dump_content, 'UNLOCK TABLES;')}
            ].each do |expected_params|
              expect(check_point_block).to receive(:call) { |mysql_table, last_pos, binlog_pos, state, substate|
                expect(mysql_table.table_name).to eq('users_login') if mysql_table
                expect(state).to eq(expected_params[:state])
                if expected_params[:last_pos]
                  expect(last_pos).to eq(expected_params[:last_pos])
                end
                expect(binlog_pos).to eq(default_binlog_pos)
                expect(substate).to eq(expected_params[:substate])
                expected_params[:result]
              }.once
            end
            expect(check_point_block).to receive(:call).never

            binlog_pos = parser_for_resume.parse(
              create_table_block, insert_record_block, check_point_block
            )
            expect(binlog_pos).to eq(default_binlog_pos)
          end
        end

        context 'when dump contains contain escape characters' do
          let(:dump_content) { <<EOT
#{DUMP_HEADER}

DROP TABLE IF EXISTS `users_login`;
/*!40101 SET @saved_cs_client     = @@character_set_client */;
/*!40101 SET character_set_client = utf8 */;
CREATE TABLE `users_login` (
  `user_id` int(10) unsigned NOT NULL DEFAULT '0' COMMENT 'users id',
  `login_count` int(10) unsigned DEFAULT '0' COMMENT 'login count',
  `comment` varchar(255) DEFAULT NULL COMMENT 'comment',
  `create_time` datetime NOT NULL COMMENT 'create time',
  `update_time` datetime NOT NULL COMMENT 'update time',
  PRIMARY KEY (`user_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
/*!40101 SET character_set_client = @saved_cs_client */;

--
-- Dumping data for table `users_login`
--

LOCK TABLES `users_login` WRITE;
/*!40000 ALTER TABLE `users_login` DISABLE KEYS */;
INSERT INTO `users_login` VALUES (15,46,'moodier)','2001-10-02 08:18:08','1986-06-11 22:50:10'),(35,6,'miss,teer','1991-10-15 19:38:07','1970-10-01 22:03:10'),(52,33,'subfield\\'','1972-08-23 20:16:08','1974-10-10 23:28:11');
INSERT INTO `users_login` VALUES (373,31,'outs\\nwearing','1979-10-07 08:10:08','2006-02-22 16:26:04'),(493,8,'schiz\tophrenic','1979-07-06 07:34:07\\'),','1970-08-09,01:21:01,\\')');
/*!40000 ALTER TABLE `users_login` ENABLE KEYS */;
UNLOCK TABLES;
/*!40103 SET TIME_ZONE=@OLD_TIME_ZONE */;

EOT
          }
          before { generate_dump_file(dump_content) }
          it do
            # create_table_block
            expect(create_table_block).to receive(:call) { |mysql_table|
              expect(mysql_table.table_name).to eq('users_login')
              expect(mysql_table.columns.keys).to eq(['user_id', 'login_count', 'comment', 'create_time', 'update_time'])
            }.once
            expect(create_table_block).to receive(:call).never

            # insert_record_block
            [
              [ %w(15 46 moodier\) 2001-10-02\ 08:18:08 1986-06-11\ 22:50:10),
                %w(35 6 miss,teer  1991-10-15\ 19:38:07 1970-10-01\ 22:03:10),
                %w(52 33 subfield' 1972-08-23\ 20:16:08 1974-10-10\ 23:28:11),],
              [ ['373','31',"outs\nwearing",'1979-10-07 08:10:08','2006-02-22 16:26:04'],
                ['493','8',"schiz\tophrenic",'1979-07-06 07:34:07\'),','1970-08-09,01:21:01,\')'] ]
            ].each do |expected_values|
              expect(insert_record_block).to receive(:call) { |mysql_table, values_set|
                expect(mysql_table.table_name).to eq('users_login')
                expect(values_set).to eq(expected_values)
                nil
              }.once
            end
            expect(insert_record_block).to receive(:call).never

            # insert_record_block
            [
              {state: Flydata::Mysql::MysqlDumpParser::State::CREATE_TABLE},
              {state: Flydata::Mysql::MysqlDumpParser::State::INSERT_RECORD},
              {state: Flydata::Mysql::MysqlDumpParser::State::CREATE_TABLE,
               last_pos: index_after(dump_content, 'UNLOCK TABLES;')}
            ].each do |expected_params|
              expect(check_point_block).to receive(:call) { |mysql_table, last_pos, binlog_pos, state, substate|
                expect(mysql_table.table_name).to eq('users_login') if mysql_table
                expect(state).to eq(expected_params[:state])
                if expected_params[:last_pos]
                  expect(last_pos).to eq(expected_params[:last_pos])
                end
                expect(binlog_pos).to eq(default_binlog_pos)
                expect(substate).to eq(expected_params[:substate])
                expected_params[:result]
              }.once
            end
            expect(check_point_block).to receive(:call).never

            binlog_pos = default_parser.parse(
              create_table_block, insert_record_block, check_point_block
            )
            expect(binlog_pos).to eq(default_binlog_pos)
          end
        end
      end
    end
    describe MysqlDumpParser::InsertParser do
      describe '#parse' do
        let(:target_line) { '' }
        let(:parser) { MysqlDumpParser::InsertParser.new(StringIO.new(target_line)) }
        subject { parser.parse }

        context 'when single record' do
          let(:target_line) { "INSERT INTO `test_table` VALUES (1,'hogehoge','2014-04-15 13:49:14');" }
          it 'should return one element array' do
            expect(subject).to eq([['1','hogehoge','2014-04-15 13:49:14']])
          end
        end

        context 'when 2 records' do
          let(:target_line) { "INSERT INTO `test_table` VALUES (1,'foo','2014-04-15 13:49:14'),(2,'var','2014-04-15 14:21:05');" }
          it 'should return two element array' do
            expect(subject).to eq([['1','foo','2014-04-15 13:49:14'],['2','var','2014-04-15 14:21:05']])
          end
        end

        context 'when data includes carriage return' do
          let(:target_line) { "INSERT INTO `test_table` VALUES (1,'abcd\\refgh','2014-04-15 13:49:14');" }
          it 'should escape return carriage' do
            expect(subject).to eq([['1',"abcd\refgh",'2014-04-15 13:49:14']])
          end
        end

        context 'when data includes new line' do
          let(:target_line) { "INSERT INTO `test_table` VALUES (1,'abcd\\nefgh','2014-04-15 13:49:14');" }
          it 'should escape new line' do
            expect(subject).to eq([['1',"abcd\nefgh",'2014-04-15 13:49:14']])
          end
        end

        context 'when data includes escaped single quotation' do
          let(:target_line) { "INSERT INTO `test_table` VALUES (1,'abcd\\',\\'efgh','2014-04-15 13:49:14');" }
          it 'should escape single quotation' do
            expect(subject).to eq([['1',"abcd','efgh",'2014-04-15 13:49:14']])
          end
        end

        context 'when data includes back slash' do
          let(:target_line) { "INSERT INTO `test_table` VALUES (1,'abcd\\\\efgh','2014-04-15 13:49:14');" }
          it 'should escape back slash' do
            expect(subject).to eq([['1',"abcd\\efgh",'2014-04-15 13:49:14']])
          end
        end

        context 'when data includes mixed escaped characters' do
          let(:target_line) { "INSERT INTO `test_table` VALUES (1,'ab\\rcd\\\\e\\nf\\',\\'gh','2014-04-15 13:49:14');" }
          it 'should escape all' do
            expect(subject).to eq([['1',"ab\rcd\\e\nf','gh",'2014-04-15 13:49:14']])
          end
        end

        context 'when data includes mixed escaped characters in a row' do
          let(:target_line) { "INSERT INTO `test_table` VALUES (1,'ab\\\\ncd\\\\\\nefgh','2014-04-15 13:49:14');" }
          it 'should escape all' do
            expect(subject).to eq([['1',"ab\\ncd\\\nefgh",'2014-04-15 13:49:14']])
          end
        end

        context 'when data end with back slash' do
          let(:target_line) { "INSERT INTO `test_table` VALUES (1,'D:\\\\download\\\\','2014-04-15 13:49:14');" }
          it 'should escape back slash' do
            expect(subject).to eq([['1',"D:\\download\\",'2014-04-15 13:49:14']])
          end
        end
      end
    end
  end

end