lib/hotseat/queue.rb in hotseat-0.1.2 vs lib/hotseat/queue.rb in hotseat-0.2.0
- old
+ new
@@ -4,84 +4,137 @@
class QueueError < RuntimeError
end
class Queue
- attr_reader :db
+ attr_accessor :db, :config
- class << self
+ DEFAULT_CONFIG = {
+ :design_doc_name => 'hotseat_queue',
+ :pending_view_name => 'pending',
+ :locked_view_name => 'locked',
+ :done_view_name => 'done',
+ :all_view_name => 'all',
+ :object_name => 'hotseat',
+ }
- def patch(doc)
- doc[Hotseat.config[:object_name]] = {'at' => Time.now.utc.iso8601, 'by' => $$}
- doc
+ def initialize(db, options={})
+ @db = db
+ @config = DEFAULT_CONFIG.merge(options)
+ unless Hotseat.queue?(@db, @config[:design_doc_name])
+ @db.save_doc design_doc
end
+ end
- def unpatch(doc)
- doc.delete( Hotseat.config[:object_name] )
- doc
- end
+ def design_doc_id
+ "_design/#{config[:design_doc_name]}"
+ end
- def add_lock(doc)
- obj = doc[Hotseat.config[:object_name]]
- obj['lock'] = {'at' => Time.now.utc.iso8601, 'by' => $$}
- doc
- end
+ def pending_view_name
+ "#{config[:design_doc_name]}/#{config[:pending_view_name]}"
+ end
- def locked?(doc)
- if obj = doc[Hotseat.config[:object_name]]
- obj.has_key? 'lock'
- end
- end
+ def locked_view_name
+ "#{config[:design_doc_name]}/#{config[:locked_view_name]}"
+ end
- def remove_lock(doc)
- obj = doc[Hotseat.config[:object_name]]
- obj.delete 'lock'
- doc
- end
+ def done_view_name
+ "#{config[:design_doc_name]}/#{config[:done_view_name]}"
+ end
- def mark_done(doc)
- obj = doc[Hotseat.config[:object_name]]
- obj['done'] = {'at' => Time.now.utc.iso8601, 'by' => $$}
- doc
- end
+ def all_view_name
+ "#{config[:design_doc_name]}/#{config[:all_view_name]}"
+ end
+ def design_doc
+ q = "doc.#{config[:object_name]}"
+ lock = "#{q}.lock"
+ done = "#{q}.done"
+ pending_func = <<-JAVASCRIPT
+ function(doc) { if (#{q} && !(#{lock} || #{done})) emit(#{q}.at, null); }
+ JAVASCRIPT
+ locked_func = <<-JAVASCRIPT
+ function(doc) { if (#{q} && #{lock}) emit(#{lock}.at, null); }
+ JAVASCRIPT
+ done_func = <<-JAVASCRIPT
+ function(doc) { if (#{q} && #{done}) emit(#{done}.at, null); }
+ JAVASCRIPT
+ all_func = <<-JAVASCRIPT
+ function(doc) { if (#{q}) emit(#{q}.at, null); }
+ JAVASCRIPT
+ {
+ '_id' => "_design/#{config[:design_doc_name]}",
+ :views => {
+ config[:pending_view_name] => { :map => pending_func.strip },
+ config[:locked_view_name] => { :map => locked_func.strip },
+ config[:done_view_name] => { :map => done_func.strip },
+ config[:all_view_name] => { :map => all_func.strip },
+ }
+ }
end
- def initialize(db)
- @db = db
- unless Hotseat.queue?(@db)
- @db.save_doc Hotseat.design_doc
+ def patch(doc)
+ doc[config[:object_name]] = {'at' => Time.now.utc.iso8601, 'by' => $$}
+ doc
+ end
+
+ def unpatch(doc)
+ doc.delete( config[:object_name] )
+ doc
+ end
+
+ def add_lock(doc)
+ obj = doc[config[:object_name]]
+ obj['lock'] = {'at' => Time.now.utc.iso8601, 'by' => $$}
+ doc
+ end
+
+ def locked?(doc)
+ if obj = doc[config[:object_name]]
+ obj.has_key? 'lock'
end
end
+ def remove_lock(doc)
+ obj = doc[config[:object_name]]
+ obj.delete 'lock'
+ doc
+ end
+
+ def mark_done(doc)
+ obj = doc[config[:object_name]]
+ obj['done'] = {'at' => Time.now.utc.iso8601, 'by' => $$}
+ doc
+ end
+
def add(doc_id)
@db.update_doc(doc_id) do |doc|
- Queue.patch doc
+ patch doc
yield doc if block_given?
end
end
def add_bulk(doc_ids)
#Note: this silently ignores missing doc_ids
docs = @db.bulk_load(doc_ids)['rows'].map{|row| row['doc']}.compact
- docs.each {|doc| Queue.patch doc }
+ docs.each {|doc| patch doc }
@db.bulk_save docs, use_uuids=false
end
def num_pending
- @db.view(Hotseat.pending_view_name, :limit => 0)['total_rows']
+ @db.view(pending_view_name, :limit => 0)['total_rows']
end
alias :size :num_pending
def get(n=1)
- rows = @db.view(Hotseat.pending_view_name, :limit => n, :include_docs => true)['rows']
+ rows = @db.view(pending_view_name, :limit => n, :include_docs => true)['rows']
rows.map{|row| row['doc']} unless rows.empty?
end
def lease(n=1)
if docs = get(n)
- docs.each {|doc| Queue.add_lock doc }
+ docs.each {|doc| add_lock doc }
response = @db.bulk_save docs, use_uuids=false
# Some docs may have failed to lock - probably updated by another process
locked_ids = response.reject{|res| res['error']}.map{|res| res['id']}
if locked_ids.length < docs.length
# This runs in O(n^2) time. Performance will be bad here if the number of documents
@@ -91,70 +144,70 @@
docs
end
end
def num_locked
- @db.view(Hotseat.locked_view_name, :limit => 0)['total_rows']
+ @db.view(locked_view_name, :limit => 0)['total_rows']
end
def remove(doc_id, opts={})
@db.update_doc(doc_id) do |doc|
- raise(QueueError, "Document was already removed") unless Queue.locked?(doc)
+ raise(QueueError, "Document was already removed") unless locked?(doc)
if opts.delete(:forget)
- Queue.unpatch doc
+ unpatch doc
else
- Queue.mark_done( Queue.remove_lock( doc ) )
+ mark_done( remove_lock( doc ) )
end
yield doc if block_given?
end
end
def remove_bulk(doc_ids, opts={})
rows = @db.bulk_load(doc_ids)['rows']
docs, missing = rows.partition {|row| row['doc'] }
docs.map! {|row| row['doc'] }
- locked, unlocked = docs.partition {|doc| Queue.locked? doc }
+ locked, unlocked = docs.partition {|doc| locked? doc }
forget = opts.delete(:forget)
locked.each do |doc|
if forget
- Queue.unpatch doc
+ unpatch doc
else
- Queue.mark_done( Queue.remove_lock( doc ) )
+ mark_done( remove_lock( doc ) )
end
end
@db.bulk_save locked, use_uuids=false
{'errors' =>
unlocked.map {|doc| {'id' => doc['_id'], 'error' => 'unlocked' } } +
missing.map {|row| {'id' => row['key'], 'error' => row['error']} }
}
end
def num_done
- @db.view(Hotseat.done_view_name, :limit => 0)['total_rows']
+ @db.view(done_view_name, :limit => 0)['total_rows']
end
def num_all
- @db.view(Hotseat.all_view_name, :limit => 0)['total_rows']
+ @db.view(all_view_name, :limit => 0)['total_rows']
end
alias :num_total :num_all
def forget(doc_id)
@db.update_doc(doc_id) do |doc|
- Queue.unpatch doc
+ unpatch doc
end
end
def forget_bulk(doc_ids)
#Note: this silently ignores missing doc_ids
docs = @db.bulk_load(doc_ids)['rows'].map{|row| row['doc']}.compact
- docs.each {|doc| Queue.unpatch doc }
+ docs.each {|doc| unpatch doc }
@db.bulk_save docs, use_uuids=false
end
def purge
- rows = @db.view(Hotseat.all_view_name, :include_docs => true)['rows']
+ rows = @db.view(all_view_name, :include_docs => true)['rows']
docs = rows.map{|row| row['doc']}
- docs.each{|doc| Queue.unpatch doc }
+ docs.each{|doc| unpatch doc }
@db.bulk_save docs, use_uuids=false
end
end
end