lib/vertica/connection.rb in vertica-0.10.5 vs lib/vertica/connection.rb in vertica-0.11.0
- old
+ new
@@ -60,15 +60,15 @@
def closed?
!opened?
end
def busy?
- !ready_for_query?
+ @mutex.locked?
end
def ready_for_query?
- @current_job.nil?
+ !busy?
end
def write_message(message)
raise ArgumentError, "invalid message: (#{message.inspect})" unless message.respond_to?(:to_bytes)
puts "=> #{message.inspect}" if @debug
@@ -144,20 +144,20 @@
@backend_key = message.key
when Vertica::Messages::ParameterStatus
@parameters[message.name] = message.value
when Vertica::Messages::ReadyForQuery
@transaction_status = message.transaction_status
- @current_job = nil
+ @mutex.unlock
else
raise Vertica::Error::MessageError, "Unhandled message: #{message.inspect}"
end
end
def query(sql, options = {}, &block)
job = Vertica::Query.new(self, sql, { :row_style => @row_style }.merge(options))
job.row_handler = block if block_given?
- run_with_job_lock(job)
+ run_with_mutex(job)
end
def copy(sql, source = nil, &block)
job = Vertica::Query.new(self, sql, :row_style => @row_style)
if block_given?
@@ -165,25 +165,27 @@
elsif source && File.exists?(source.to_s)
job.copy_handler = lambda { |data| file_copy_handler(source, data) }
elsif source.respond_to?(:read) && source.respond_to?(:eof?)
job.copy_handler = lambda { |data| io_copy_handler(source, data) }
end
- run_with_job_lock(job)
+ run_with_mutex(job)
end
def inspect
safe_options = @options.reject{ |name, _| name == :password }
"#<Vertica::Connection:#{object_id} @parameters=#{@parameters.inspect} @backend_pid=#{@backend_pid}, @backend_key=#{@backend_key}, @transaction_status=#{@transaction_status}, @socket=#{@socket}, @options=#{safe_options.inspect}, @row_style=#{@row_style}>"
end
protected
- def run_with_job_lock(job)
+ def run_with_mutex(job)
boot_connection if closed?
- raise Vertica::Error::SynchronizeError.new(@current_job, job) if busy?
- @current_job = job
- job.run
+ if @mutex.try_lock
+ job.run
+ else
+ raise Vertica::Error::SynchronizeError.new(job)
+ end
end
COPY_FROM_IO_BLOCK_SIZE = 1024 * 4096
def file_copy_handler(input_file, output)
@@ -257,10 +259,10 @@
@session_id = nil
@backend_pid = nil
@backend_key = nil
@transaction_status = nil
@socket = nil
- @current_job = '<initialization>'
+ @mutex = Mutex.new.lock
end
end
require 'vertica/query'
require 'vertica/column'