include/query_buffer.rb in baza-0.0.7 vs include/query_buffer.rb in baza-0.0.8
- old
+ new
@@ -13,16 +13,29 @@
@lock = Mutex.new
STDOUT.puts "Query buffer started." if @debug
if block_given?
- begin
- yield(self)
- ensure
- self.flush
- thread_async_join
- end
+ if @args[:flush_async]
+ @args[:db].clone_conn do |db_flush_async|
+ @db_flush_async = db_flush_async
+
+ begin
+ yield(self)
+ ensure
+ flush
+ thread_async_join
+ end
+ end
+ else
+ begin
+ yield(self)
+ ensure
+ flush
+ thread_async_join
+ end
+ end
end
end
#Adds a query to the buffer.
def query(str)
@@ -30,29 +43,29 @@
STDOUT.print "Adding to buffer: #{str}\n" if @debug
@queries << str
@queries_count += 1
end
- self.flush if @queries_count >= 1000
+ flush if @queries_count >= 1000
return nil
end
#Delete as on a normal Baza::Db.
#===Example
# buffer.delete(:users, {:id => 5})
def delete(table, where)
STDOUT.puts "Delete called on table #{table} with arguments: '#{where}'." if @debug
- self.query(@args[:db].delete(table, where, :return_sql => true))
+ query(@args[:db].delete(table, where, :return_sql => true))
return nil
end
#Update as on a normal Baza::Db.
#===Example
# buffer.update(:users, {:name => "Kasper"}, {:id => 5})
def update(table, update, terms)
STDOUT.puts "Update called on table #{table}." if @debug
- self.query(@args[:db].update(table, update, terms, :return_sql => true))
+ query(@args[:db].update(table, update, terms, :return_sql => true))
return nil
end
#Shortcut to doing upsert through the buffer instead of through the db-object with the buffer as an argument.
#===Example
@@ -64,11 +77,11 @@
#Plans to inset a hash into a table. It will only be inserted when flush is called.
#===Examples
# buffer.insert(:users, {:name => "John Doe"})
def insert(table, data)
- self.query(@args[:db].insert(table, data, :return_sql => true))
+ query(@args[:db].insert(table, data, :return_sql => true))
return nil
end
#Flushes all queries out in a transaction. This will automatically be called for every 1000 queries.
def flush
@@ -84,39 +97,45 @@
#Runs the flush in a thread in the background.
def flush_async
thread_async_join
@thread_async = Thread.new do
- flush_real
+ begin
+ flush_real(@db_flush_async)
+ rescue => e
+ $stderr.puts e.inspect
+ $stderr.puts e.backtrace
+ end
end
end
def thread_async_join
if thread = @thread_async
thread.join
end
end
#Flushes the queries for real.
- def flush_real
+ def flush_real(db = nil)
return nil if @queries_count <= 0
+ db = @args[:db] if db == nil
@lock.synchronize do
if !@queries.empty?
while !@queries.empty?
- @args[:db].transaction do
+ db.transaction do
@queries.shift(1000).each do |str|
STDOUT.print "Executing via buffer: #{str}\n" if @debug
- @args[:db].q(str)
+ db.q(str)
end
end
end
end
@inserts.each do |table, datas_arr|
while !datas_arr.empty?
datas_chunk_arr = datas_arr.shift(1000)
- @args[:db].insert_multi(table, datas_chunk_arr)
+ @db.insert_multi(table, datas_chunk_arr)
end
end
@inserts.clear
@queries_count = 0
\ No newline at end of file