Sha256: 3b699cebf7e78d3816ed1dbb3b4be50764c7a3350cf726592da2896e7aed2ce6

Contents?: true

Size: 1.98 KB

Versions: 4

Compression:

Stored size: 1.98 KB

Contents

require_relative 'flydata_plugin_ext/base'

require 'flydata/fluent-plugins/flydata_plugin_ext/preference'
require 'flydata/fluent-plugins/flydata_plugin_ext/flydata_sync_query_based'
require 'flydata/source_postgresql/plugin_support/source_position_file'
require 'flydata/source_postgresql/plugin_support/context'
require 'flydata/source_postgresql/table_meta'
require 'flydata/source_postgresql/query_based_sync/client'
require 'flydata-core/postgresql/config'

module Fluent

class PostgresqlQueryBasedFlydataInput < Input
  include PostgresqlQueryBasedSyncPreference
  FlydataSyncQueryBased.include_modules(self)

  Plugin.register_input('postgresql_query_based_sync_flydata', self)

  SOURCE_POSITION_FILE_CLASS = Flydata::SourcePostgresql::PluginSupport::SourcePositionFile

  def configure(conf)
    super
    @dbconf = FlydataCore::Postgresql::Config.opts_for_pg(@data_entry['postgresql_data_entry_preference'])
    $log.info "postgresql host:\"#{@host}\" port:\"#{@port}\" username:\"#{@username}\" database:\"#{@database}\" tables:\"#{@tables}\" tables_append_only:\"#{@tables_append_only}\""

    @table_meta = Flydata::SourcePostgresql::TableMeta.new(
      @dbconf, @tables, @schema)

    @context = Flydata::SourcePostgresql::PluginSupport::Context.new(
      database: @database, tables: @tables,
      tag: @tag, sync_fm: @sync_fm, omit_events: @omit_events,
      table_revs: @table_revs, dbconf: @dbconf,
      cur_src_pos_file: @source_position_file,
      table_src_pos_files: @table_src_pos_files,
      table_meta: @table_meta,
      params: {
        fetch_interval: @fetch_interval,
        retry_interval: @retry_interval,
        emit_chunk_limit: @emit_chunk_limit,
      },
    )

    @client = Flydata::SourcePostgresql::QueryBasedSync::Client.new(@context)
  end

  def start
   super
   @thread = Thread.new(&method(:run))
  end

  def run
    @client.start
  end

  def shutdown
    if @thread and @thread.alive?
      @client.stop_request
      @thread.join
    end
    super
  end
end

end

Version data entries

4 entries across 4 versions & 1 rubygems

Version Path
flydata-0.7.2.1 lib/flydata/fluent-plugins/in_postgresql_query_based_flydata.rb
flydata-0.7.2 lib/flydata/fluent-plugins/in_postgresql_query_based_flydata.rb
flydata-0.7.1 lib/flydata/fluent-plugins/in_postgresql_query_based_flydata.rb
flydata-0.7.0 lib/flydata/fluent-plugins/in_postgresql_query_based_flydata.rb