include/query_buffer.rb in baza-0.0.7 vs include/query_buffer.rb in baza-0.0.8

- old
+ new

@@ -13,16 +13,29 @@ @lock = Mutex.new STDOUT.puts "Query buffer started." if @debug if block_given? - begin - yield(self) - ensure - self.flush - thread_async_join - end + if @args[:flush_async] + @args[:db].clone_conn do |db_flush_async| + @db_flush_async = db_flush_async + + begin + yield(self) + ensure + flush + thread_async_join + end + end + else + begin + yield(self) + ensure + flush + thread_async_join + end + end end end #Adds a query to the buffer. def query(str) @@ -30,29 +43,29 @@ STDOUT.print "Adding to buffer: #{str}\n" if @debug @queries << str @queries_count += 1 end - self.flush if @queries_count >= 1000 + flush if @queries_count >= 1000 return nil end #Delete as on a normal Baza::Db. #===Example # buffer.delete(:users, {:id => 5}) def delete(table, where) STDOUT.puts "Delete called on table #{table} with arguments: '#{where}'." if @debug - self.query(@args[:db].delete(table, where, :return_sql => true)) + query(@args[:db].delete(table, where, :return_sql => true)) return nil end #Update as on a normal Baza::Db. #===Example # buffer.update(:users, {:name => "Kasper"}, {:id => 5}) def update(table, update, terms) STDOUT.puts "Update called on table #{table}." if @debug - self.query(@args[:db].update(table, update, terms, :return_sql => true)) + query(@args[:db].update(table, update, terms, :return_sql => true)) return nil end #Shortcut to doing upsert through the buffer instead of through the db-object with the buffer as an argument. #===Example @@ -64,11 +77,11 @@ #Plans to inset a hash into a table. It will only be inserted when flush is called. #===Examples # buffer.insert(:users, {:name => "John Doe"}) def insert(table, data) - self.query(@args[:db].insert(table, data, :return_sql => true)) + query(@args[:db].insert(table, data, :return_sql => true)) return nil end #Flushes all queries out in a transaction. This will automatically be called for every 1000 queries. def flush @@ -84,39 +97,45 @@ #Runs the flush in a thread in the background. def flush_async thread_async_join @thread_async = Thread.new do - flush_real + begin + flush_real(@db_flush_async) + rescue => e + $stderr.puts e.inspect + $stderr.puts e.backtrace + end end end def thread_async_join if thread = @thread_async thread.join end end #Flushes the queries for real. - def flush_real + def flush_real(db = nil) return nil if @queries_count <= 0 + db = @args[:db] if db == nil @lock.synchronize do if !@queries.empty? while !@queries.empty? - @args[:db].transaction do + db.transaction do @queries.shift(1000).each do |str| STDOUT.print "Executing via buffer: #{str}\n" if @debug - @args[:db].q(str) + db.q(str) end end end end @inserts.each do |table, datas_arr| while !datas_arr.empty? datas_chunk_arr = datas_arr.shift(1000) - @args[:db].insert_multi(table, datas_chunk_arr) + @db.insert_multi(table, datas_chunk_arr) end end @inserts.clear @queries_count = 0 \ No newline at end of file