require 'thread' require 'flydata-core/logger' module Flydata module QueryBasedSync class Client include FlydataCore::Logger DEFAULT_FETCH_INTERVAL = 60 # 1 minute DEFAULT_RETRY_INTERVAL = 30 # 1 minute # params # fetch_interval # resource_names def initialize(context) @context = context @resource_requester = self.class::RESOURCE_REQUESTER_CLASS.new(context) @response_handler = self.class::RESPONSE_HANDLER_CLASS.new(context) context.params.tap do |c| @fetch_interval = c.nil? ? DEFAULT_FETCH_INTERVAL : c[:fetch_interval] @retry_interval = c.nil? ? DEFAULT_RETRY_INTERVAL : c[:retry_interval] end end attr_reader :context attr_reader :resource_requester attr_reader :response_handler def start raise "Already started - thread:#{@running_thread}" if @running_thread raise "Already stop requested" if @stop_requested @running_thread = Thread.current run_loop end def stop_request @finish = true if @running_thread && @running_thread.alive? @running_thread.run end end def stop_requested? !!@finish end def handle_response(response) log_debug("Handling response", table_name: response.table_name, count:response.records.count, new_snapshot: response.new_source_pos, base_snapshot: context.table_meta.current_snapshot) @response_handler.handle(response) end private def run_loop begin until stop_requested? reset_log_context run_once break if stop_requested? sleep @fetch_interval end rescue => e handle_error(e, "Unexpected error occured") sleep @retry_interval retry end end def handle_error(error, message) log_method = if error.kind_of?(FlydataCore::RetryableError) error = error.original_exception :log_warn else :log_error end self.send(log_method, message, {error: error}, {backtrace: true}) end def run_once resource_requester.start do |req| # open connection context.tables.each do |table_name| add_log_context_items(table_name: table_name) return if stop_requested? req.each_response(table_name) do |response| return if stop_requested? handle_response(response) unless response.empty? log_info("Emitted records", count: response.record_count, src_pos:response.new_source_pos) end return if stop_requested? end delete_log_context_item(:table_name) end end # Set the current snapshot to master(resume) binlog.pos file context.cur_src_pos_file.save(context.table_meta.current_snapshot) # Also set the same value to the sent.binlog.pos file as the file is # required by the agent commands. context.cur_sent_pos_file.save(context.table_meta.current_snapshot) log_info("Updated source position -", resume_pos:context.table_meta.current_snapshot) end end end end