require 'pg' require 'socket' require 'delegate' require 'flydata-core/postgresql/config' require 'flydata-core/postgresql/query_helper' module FlydataCore module Postgresql class PGClient PG_CONNECT_TIMEOUT = 10.0 def initialize(dbconf) @dbconf = FlydataCore::Postgresql::Config.opts_for_pg(dbconf) end attr_reader :dbconf def establish_connection @conn = create_connection if @conn.nil? end def query(query, params = []) establish_connection if query.respond_to?(:placeholder_start_num) && query.placeholder_start_num placeholders = placeholder_string(query.placeholder_size, query.placeholder_start_num) q = query % [placeholders] else q = query end if (params.nil? || params.empty?) && query.respond_to?(:binding_params) params = query.binding_params || [] end result = @conn.query(q, params) query.respond_to?(:value_overriders) && query.value_overriders ? EnumerableDelegator.new(result, HashValueOverrider, query.value_overriders) : result end def close if @conn @conn.finish @conn = nil end end private def create_connection conn = nil retry_on(Errno::EBADF, 3) do hostaddr = IPSocket.getaddress(dbconf[:host]) dbconf[:hostaddr] = hostaddr conn = PG::Connection.connect_start(dbconf) raise PG::Error.new("Unable to create a new connection.") unless conn raise PG::Error.new("Connection failed: %s" % [ conn.error_message ]) if conn.status == PG::CONNECTION_BAD socket = conn.socket_io poll_status = PG::PGRES_POLLING_WRITING until poll_status == PG::PGRES_POLLING_OK || poll_status == PG::PGRES_POLLING_FAILED case poll_status when PG::PGRES_POLLING_READING IO.select([socket], nil, nil, PG_CONNECT_TIMEOUT) or raise PG::Error.new("Asynchronous connection timed out!(READING)") when PG::PGRES_POLLING_WRITING IO.select(nil, [socket], nil, PG_CONNECT_TIMEOUT) or raise PG::Error.new("Asynchronous connection timed out!(WRITING)") end poll_status = conn.connect_poll end end unless conn.status == PG::CONNECTION_OK raise PG::Error.new("Connect failed: %s" % [ conn.error_message.to_s.lines.uniq.join(" ") ]) end conn rescue Errno::EBADF => e raise PG::Error.new("Failed to connect redshift due to Errno::EBADF. #{e.to_s}") rescue SocketError => e if e.to_s == 'getaddrinfo: nodename nor servname provided, or not known' raise PG::Error.new("Connection failed: FATAL: unknown host(#{dbconf[:host]}).") end raise e end # Retry the given block if +exception+ happens def retry_on(exception = StandardError, try_count = 3, interval = 1.0) count = 0 begin count += 1 yield rescue exception if count < try_count sleep interval interval *= 2 retry else raise end end end def placeholder_string(num_items, start_num) num_items.times.collect{|i| "$#{i + start_num}"}.join(",") end class EnumerableDelegator include Enumerable def initialize(delegate, item_delegator_class, *args) @delegate = delegate @item_delegator_class = item_delegator_class @args = args end def each(&block) @delegate.each do |item| block.call(@item_delegator_class.new(item, *@args)) end end end class HashValueOverrider < SimpleDelegator def initialize(delegate, overriders) super(delegate) @overriders = overriders end def [](key) val = __getobj__[key] override(key, val) end # TODO: all methods returning hash value(s) must be overridden def first key, val = __getobj__.first [key, override(key, val)] end def values __getobj__.keys.collect{|k| override(k, __getobj__[k]) } end def kind_of?(klass) __getobj__.kind_of?(klass) end private def override(key, val) @overriders.has_key?(key) ? @overriders[key].call(val) : val end end end class PGQuery < SimpleDelegator def initialize(query_text, opts) super(query_text) @value_overriders = opts[:value_overriders] @placeholder_start_num = opts[:placeholder_start_num] @placeholder_size = opts[:placeholder_size] @binding_params = opts[:binding_params] end attr_reader :value_overriders, :placeholder_start_num, :placeholder_size, :binding_params end end end