Sha256: 0afbdbbe91608843f9c7130f3ac4f608d4479e936cba692177ce084002c614e5

Contents?: true

Size: 1.95 KB

Versions: 8

Compression:

Stored size: 1.95 KB

Contents

module Schemas
  class Descriptor
    include RedisStore
    INFORMATION_SCHEMA_QUERY = <<-SQL
      SELECT
        table_schema,
        table_name,
        column_name,
        udt_name,
        character_maximum_length
      FROM information_schema.columns
      WHERE table_schema NOT IN ('information_schema', 'pg_catalog', 'public')
      ORDER BY table_schema, table_name, ordinal_position;
    SQL

    SCHEMA_REFRESH_INTERVAL = APP_CONFIG['schema_refresh_interval'] || 1.day

    def initialize(role)
      @role = role
      @cache = []

      Rails.logger.info("Start schema refresher thread for #{@role}")
      @refresher_thread = Thread.new{ schema_refresher }
    end

    def schemas
      retrieve.map { |row| row['table_schema'] }.uniq
    end

    def table_columns(schema)
      retrieve.select { |row| row['table_schema'] == schema }.group_by { |row| row['table_name'] }
    end

    def columns
      retrieve.map do |column|
        HashWithIndifferentAccess.new(
          column: column['column_name'],
          table: column['table_name'],
          schema: column['table_schema'],
          type: column['udt_name']
        )
      end
    end

    def key
      @key ||= "#{@role}_schema_descriptor"
    end

    private

    def retrieve
      if !@cache.present?
        @cache = redis_retrieve
      end
      return @cache
    end

    def schema_refresher
      loop do
        refresh_schema
        sleep(SCHEMA_REFRESH_INTERVAL)
      end
    end

    def refresh_schema
      Rails.logger.info('Schemas::Descriptor.refresh_schema')
      result = nil
      Pester.schema_refresh.retry do
        result = exec_schema_query
      end

      if result
        redis_store!(result.to_a)
        @cache = redis_retrieve
      end
    end

    def exec_schema_query
      connection = RedshiftConnectionPool.instance.get(@role)
      connection.reconnect_on_failure do
        connection.pg_connection.exec(INFORMATION_SCHEMA_QUERY)
      end
    end
  end
end

Version data entries

8 entries across 8 versions & 1 rubygems

Version Path
aleph_analytics-0.1.0 lib/schemas/descriptor.rb
aleph_analytics-0.0.6 lib/schemas/descriptor.rb
aleph_analytics-0.0.5 lib/schemas/descriptor.rb
aleph_analytics-0.0.4 lib/schemas/descriptor.rb
aleph_analytics-0.0.3 lib/schemas/descriptor.rb
aleph_analytics-0.0.2 lib/schemas/descriptor.rb
aleph_analytics-0.0.1.alpha lib/schemas/descriptor.rb
aleph_analytics-0.0.0.alpha lib/schemas/descriptor.rb