lib/vertica/connection.rb in vertica-0.10.5 vs lib/vertica/connection.rb in vertica-0.11.0

- old
+ new

@@ -60,15 +60,15 @@ def closed? !opened? end def busy? - !ready_for_query? + @mutex.locked? end def ready_for_query? - @current_job.nil? + !busy? end def write_message(message) raise ArgumentError, "invalid message: (#{message.inspect})" unless message.respond_to?(:to_bytes) puts "=> #{message.inspect}" if @debug @@ -144,20 +144,20 @@ @backend_key = message.key when Vertica::Messages::ParameterStatus @parameters[message.name] = message.value when Vertica::Messages::ReadyForQuery @transaction_status = message.transaction_status - @current_job = nil + @mutex.unlock else raise Vertica::Error::MessageError, "Unhandled message: #{message.inspect}" end end def query(sql, options = {}, &block) job = Vertica::Query.new(self, sql, { :row_style => @row_style }.merge(options)) job.row_handler = block if block_given? - run_with_job_lock(job) + run_with_mutex(job) end def copy(sql, source = nil, &block) job = Vertica::Query.new(self, sql, :row_style => @row_style) if block_given? @@ -165,25 +165,27 @@ elsif source && File.exists?(source.to_s) job.copy_handler = lambda { |data| file_copy_handler(source, data) } elsif source.respond_to?(:read) && source.respond_to?(:eof?) job.copy_handler = lambda { |data| io_copy_handler(source, data) } end - run_with_job_lock(job) + run_with_mutex(job) end def inspect safe_options = @options.reject{ |name, _| name == :password } "#<Vertica::Connection:#{object_id} @parameters=#{@parameters.inspect} @backend_pid=#{@backend_pid}, @backend_key=#{@backend_key}, @transaction_status=#{@transaction_status}, @socket=#{@socket}, @options=#{safe_options.inspect}, @row_style=#{@row_style}>" end protected - def run_with_job_lock(job) + def run_with_mutex(job) boot_connection if closed? - raise Vertica::Error::SynchronizeError.new(@current_job, job) if busy? - @current_job = job - job.run + if @mutex.try_lock + job.run + else + raise Vertica::Error::SynchronizeError.new(job) + end end COPY_FROM_IO_BLOCK_SIZE = 1024 * 4096 def file_copy_handler(input_file, output) @@ -257,10 +259,10 @@ @session_id = nil @backend_pid = nil @backend_key = nil @transaction_status = nil @socket = nil - @current_job = '<initialization>' + @mutex = Mutex.new.lock end end require 'vertica/query' require 'vertica/column'