Sha256: 510e179207ddf9ad8f18c7078b085e36a69434f6b55bbacbeba7c8073646a563
Contents?: true
Size: 1.98 KB
Versions: 1
Compression:
Stored size: 1.98 KB
Contents
require "monitor" class ActiveRecordTransactioner DEFAULT_ARGS = { :call_args => [], :call_method => :save!, :transaction_method => :transaction, :transaction_size => 1000 } ALLOWED_ARGS = DEFAULT_ARGS.keys def initialize(args = {}) args.each do |key, val| raise "Invalid key: '#{key}'." unless ALLOWED_ARGS.include?(key) end @args = DEFAULT_ARGS.merge(args) @models = {} @threads = [] @count = 0 @lock = Monitor.new @lock_threads = Monitor.new if block_given? begin yield self ensure flush end end end #Adds another model to the queue and calls 'flush' if it is over the limit. def queue(model) @lock.synchronize do klass = model.class @models[klass] = [] if !@models.key?(klass) @models[klass] << model @count += 1 flush if @count >= @args[:transaction_size] end end #Flushes the specified method on all the queued models in a thread for each type of model. def flush threads = [] @lock.synchronize do @models.each do |klass, val| next if val.empty? models = val @models[klass] = [] @count -= models.length thread = Thread.new do begin klass.__send__(@args[:transaction_method]) do models.each do |model| model.__send__(@args[:call_method], *@args[:call_args]) end end rescue => e puts e.inspect puts e.backtrace ensure @threads.delete(Thread.current) end end @lock_threads.synchronize do threads << thread @threads << thread end end end return { :threads => threads } end #Waits for any remaining running threads. def join @lock_threads.synchronize do @threads.each do |thread| thread.join end end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
active-record-transactioner-0.0.1 | lib/active-record-transactioner.rb |