require 'fluent_plugins_spec_helper' require 'flydata/source_postgresql/table_meta' module Flydata::SourcePostgresql describe TableMeta do let(:dbconf) { { host: 'test-host', port: 3306, username: 'test-user', password: 'test-pswd', database: 'test-db', schema: 'test-schema' } } let(:database) { 'test-db' } let(:schema) { 'test-schema' } let(:tables) { %w(a_table b_table c_table) } let(:conn) { double('conn') } let(:subject_object) { described_class.new(dbconf, tables, schema) } def build_column(table_name,column_name,column_default,is_nullable,is_primary,data_type,character_octet_length,numeric_precision,numeric_scale) {'table_name'=>table_name, 'column_name'=>column_name, 'column_default'=>column_default, 'is_nullable'=>is_nullable, 'is_primary'=>is_primary, 'data_type'=>data_type, 'character_octet_length'=>character_octet_length, 'numeric_precision'=>numeric_precision, 'numeric_scale'=>numeric_scale, } end let(:raw_columns) { [ build_column('table_a', 'id', nil, 'NO', 't', 'integer', nil, 32, 0), build_column('table_a', 'val', nil, 'YES', nil, 'character varying', 24, nil, nil), build_column('table_b', 'name', nil, 'NO', 't', 'character varying', 36, nil, nil),#primary key (name, prod_id) build_column('table_b', 'prod_id', nil, 'NO', 't', 'integer', nil, 32, 0), build_column('table_b', 'serial_no', nil, 'YES', 'f', 'integer', nil, 32, 0),#index key build_column('table_b', 'count', nil, 'YES', nil, 'integer', nil, 32, 0), ] } let(:current_snapshots) { [{'txid_current_snapshot'=>'1010:1010:'}] } before do allow(conn).to receive(:close) allow(FlydataCore::Postgresql::PGClient).to receive(:new).and_return(conn) end describe '.reload' do subject { subject_object.reload(input_cli) } context 'when params is nil' do let(:input_cli) { nil } before do expect(conn).to receive(:query).and_return(raw_columns, current_snapshots) expect(conn).to receive(:close) end it do expect(subject[:table_a][:table_name]).to eq('table_a') expect(subject[:table_a][:primary_keys]).to eq(%w(id)) expect(subject[:table_a][:pk_positions]).to eq(%w(1)) expect(subject[:table_a][:max_num_rows_per_query]).to be_kind_of(Integer) expect(subject[:table_a][:columns]).to be_kind_of(Array) expect(subject[:table_a][:table_def]).to be_kind_of(FlydataCore::TableDef::Base) end end context 'when params is not nil' do let(:input_cli) { conn } before do expect(conn).to receive(:query).and_return(raw_columns, current_snapshots) expect(conn).to receive(:close).never end it do expect(subject[:table_a][:table_name]).to eq('table_a') expect(subject[:table_a][:primary_keys]).to eq(%w(id)) expect(subject[:table_a][:pk_positions]).to eq(%w(1)) expect(subject[:table_a][:max_num_rows_per_query]).to be_kind_of(Integer) expect(subject[:table_a][:columns]).to be_kind_of(Array) expect(subject[:table_a][:table_def]).to be_kind_of(FlydataCore::TableDef::Base) end end context 'when schema is empty' do let(:input_cli) { nil } let(:schema) { nil } before do dbconf.delete(:schema) dbconf.delete('schema') allow(conn).to receive(:close) end it 'uses a current schema on query' do expect(conn).to receive(:query) do |query| expect(query).to match(/AND c.table_schema IN \(select current_schema\)/) raw_columns end expect(conn).to receive(:query).with('SELECT txid_current_snapshot();').and_return(current_snapshots) subject end end end end end