require 'flydata/sync_file_manager' require 'flydata/source' require 'flydata-core/fluent/config_helper' require 'flydata/fluent-plugins/flydata_plugin_ext/flush_support' require 'flydata/fluent-plugins/flydata_plugin_ext/transaction_support' module Fluent module FlydataSync def self.included(base) base.class_eval do include FlushSupport prepend TransactionSupport config_param :data_entry_name, :string, default: nil # data entry name config_param :data_entry_type, :string, default: nil # data entry type config_param :tables, :string config_param :tables_append_only, :string config_param :pk_override, :hash, default: {} config_param :tag, :string config_param :position_file, :string, default: 'position.binlog.pos' end end def configure(conf) super @source_position_file = self.class::SOURCE_POSITION_FILE_CLASS.new(@position_file) unless @source_position_file.exists? raise "No position file(#{@source_position_file.path}). Initial synchronization is required before starting." end load_custom_conf # preference module needs to be included @data_entry = build_data_entry @source = Flydata::Source.create(@data_entry) @sync_fm = Flydata::SyncFileManager.new(@data_entry, @source) sent_position_file_path = @sync_fm.sent_source_pos_path(@position_file) @sent_position_file = self.class::SOURCE_POSITION_FILE_CLASS.new(sent_position_file_path) # Create positions dir positions_path = @sync_fm.table_positions_dir_path Dir.mkdir positions_path unless File.exists? positions_path @tables = @tables.to_s.split(/(?:\s*,\s*|\s+)/) @omit_events = Hash.new @tables_append_only = @tables_append_only.to_s.split(/(?:\s*,\s*|\s+)/) @tables_append_only.each do |table| @tables << table unless @tables.include?(table) @omit_events[table] = [:delete, :truncate_table] end # Remove tables that do not have pos files new_tables = @sync_fm.get_new_table_list(@tables, "pos") @tables -= new_tables $log.info "Not watching these tables: #{new_tables.join(", ")}" # Set table revisions @table_revs = @tables.inject({}) do |h, table_name| h[table_name] = @sync_fm.table_rev(table_name) h end $log.info("Source position - resume_pos:'#{@source_position_file.read rescue IOError}' " + "sent_pos:'#{@sent_position_file.read rescue nil}'") end def build_data_entry(base_object = {}) de = base_object || {} de['name'] = @data_entry_name de['type'] = @data_entry_type de.merge!(@data_entry_preferences || {}) de end end def shutdown super @sync_fm.close if @sync_fm end end