require 'flydata/sync_file_manager' require 'flydata-core/fluent/config_helper' require 'flydata/fluent-plugins/flydata_plugin_ext/flush_support' require 'flydata/fluent-plugins/flydata_plugin_ext/transaction_support' module Fluent module FlydataSync def self.included(base) base.class_eval do include FlushSupport prepend TransactionSupport config_param :tables, :string config_param :tables_append_only, :string # binlog plugin config_param :tag, :string config_param :position_file, :string, default: 'position.log' end end def configure(conf) super @binlog_position_file = self.class::BINLOG_POSITION_FILE_CLASS.new(@position_file) unless @binlog_position_file.exists? raise "No position file(#{@binlog_position_file.path}). Initial synchronization is required before starting." end @sync_fm = Flydata::SyncFileManager.new(nil) # Passing nil for data_entry as this class does not use methods which require data_entry sent_position_file_path = @sync_fm.sent_source_pos_path(@position_file) @sent_position_file = self.class::BINLOG_POSITION_FILE_CLASS.new(sent_position_file_path) # Create positions dir positions_path = @sync_fm.table_positions_dir_path Dir.mkdir positions_path unless File.exists? positions_path load_custom_conf @tables = @tables.split(/(?:\s*,\s*|\s+)/) @omit_events = Hash.new @tables_append_only.split(/(?:\s*,\s*|\s+)/).each do |table| @tables << table unless @tables.include?(table) @omit_events[table] = [:delete, :truncate_table] end # Remove tables that do not have pos files new_tables = @sync_fm.get_new_table_list(@tables, "pos") @tables -= new_tables $log.info "Not watching these tables: #{new_tables.join(", ")}" end end end