require_relative 'flydata_plugin_ext/base' require 'flydata/fluent-plugins/flydata_plugin_ext/preference' require 'flydata/fluent-plugins/flydata_plugin_ext/flydata_sync_query_based' require 'flydata/source_postgresql/plugin_support/source_position_file' require 'flydata/source_postgresql/plugin_support/context' require 'flydata/source_postgresql/table_meta' require 'flydata/source_postgresql/query_based_sync/client' require 'flydata-core/postgresql/config' module Fluent class PostgresqlQueryBasedFlydataInput < Input include PostgresqlQueryBasedSyncPreference FlydataSyncQueryBased.include_modules(self) Plugin.register_input('postgresql_query_based_sync_flydata', self) SOURCE_POSITION_FILE_CLASS = Flydata::SourcePostgresql::PluginSupport::SourcePositionFile def configure(conf) super @dbconf = FlydataCore::Postgresql::Config.opts_for_pg(@data_entry['postgresql_data_entry_preference']) $log.info "postgresql host:\"#{@host}\" port:\"#{@port}\" username:\"#{@username}\" database:\"#{@database}\" tables:\"#{@tables}\" tables_append_only:\"#{@tables_append_only}\"" opts = @dbconf.merge(pk_override:@data_entry['postgresql_data_entry_preference']['pk_override']) @table_meta = Flydata::SourcePostgresql::TableMeta.new( opts, @tables, @schema) @context = Flydata::SourcePostgresql::PluginSupport::Context.new( database: @database, tables: @tables, tag: @tag, sync_fm: @sync_fm, omit_events: @omit_events, table_revs: @table_revs, dbconf: @dbconf, cur_src_pos_file: @source_position_file, cur_sent_pos_file: @sent_position_file, table_src_pos_files: @table_src_pos_files, table_meta: @table_meta, params: { fetch_interval: @fetch_interval, retry_interval: @retry_interval, emit_chunk_limit: @emit_chunk_limit, }, ) @client = Flydata::SourcePostgresql::QueryBasedSync::Client.new(@context) end def start super @thread = Thread.new(&method(:run)) end def run @client.start end def shutdown if @thread and @thread.alive? @client.stop_request @thread.join end super end end end