require 'flydata/query_based_sync/resource_requester.rb' require 'flydata/source_postgresql/query_based_sync/response.rb' require 'flydata/source_postgresql/query_based_sync/diff_query_generator' require 'flydata-core/postgresql/pg_client' module Flydata module SourcePostgresql module QueryBasedSync class ResourceRequester < Flydata::QueryBasedSync::ResourceRequester RESPONSE_CLASS = Flydata::SourcePostgresql::QueryBasedSync::Response def initialize(*args) super @tables_missing_meta = [] end # Override def create_resource_client(context) FlydataCore::Postgresql::PGClient.new(context.dbconf) end # Override def fetch_responses_once(table_name, latest_src_pos) table_sym = table_name.to_sym t_meta = context.table_meta[table_sym] if t_meta.nil? || t_meta.empty? unless @tables_missing_meta.include?(table_name) @tables_missing_meta << table_name log_error "No metadata is available for table '#{table_name}'. Make sure the table exists and is visible to the PostgreSQL user." end return end table_src_pos = context.table_src_pos_files[table_sym].pos master_src_pos = context.cur_src_pos_file.pos # Set from_sid, to_sid and pk_values to_sid = latest_src_pos.snapshot pk_values = nil # - table.binlog.pos does not exist if table_src_pos.nil? from_sid = master_src_pos.snapshot # - pk_values exists -> continue from the last transaction elsif table_src_pos.pk_values from_sid = table_src_pos.snapshot to_sid = table_src_pos.to_snapshot pk_values = table_src_pos.pk_values else from_sid = table_src_pos.snapshot end if from_sid == to_sid log_debug("Skip fetching records (No changes)") return end log_debug("Start query", from_sid: from_sid, to_sid: to_sid, pk_values: pk_values) # Create and execute query query = generate_query(t_meta, from_sid, to_sid, pk_values) result = @resource_client.query(query) # Create response object based on result responses = build_responses(table_name, result, from_sid: from_sid, to_sid: to_sid, pk_values: pk_values) Array(responses) end private def generate_query(t_meta, from_sid, to_sid, pk_values) last_pks = if pk_values pk_values.collect{|h| h.values.first} else nil end DiffQueryGenerator.new(t_meta[:table_name], t_meta[:table_schema], to_sid: to_sid, from_sid: from_sid, columns: t_meta[:columns], pk_columns: t_meta[:primary_keys], last_pks: last_pks, limit: t_meta[:max_num_rows_per_query], ).build_query end def build_responses(table_name, raw_result, query_cond = {}) RESPONSE_CLASS.create_responses(context, table_name, raw_result, query_cond) end end end end end