lib/right_chimp/queue/ChimpQueue.rb in right_chimp-1.0.2 vs lib/right_chimp/queue/ChimpQueue.rb in right_chimp-1.0.3
- old
+ new
@@ -12,11 +12,11 @@
@delay = 0
@retry_count = 0
@max_threads = 10
@workers_never_exit = true
@threads = []
-
+ @semaphore = Mutex.new
self.reset!
end
#
# Reset the queue and the :default group
@@ -52,11 +52,10 @@
create_group(g) if not ChimpQueue[g]
ChimpQueue[g].push(w)
end
def create_group(name, type = :parallel, concurrency = 1)
- type = :parallel
Log.debug "Creating new execution group #{name} type=#{type} concurrency=#{concurrency}"
new_group = ExecutionGroupFactory.from_type(type)
new_group.group_id = name
new_group.concurrency = concurrency
ChimpQueue[name] = new_group
@@ -65,15 +64,17 @@
#
# Grab the oldest work item available
#
def shift
r = nil
- @group.values.each do |group|
- if group.ready?
- r = group.shift
- break
- end
- end
+ @semaphore.synchronize do
+ @group.values.each do |group|
+ if group.ready?
+ r = group.shift
+ break
+ end
+ end
+ end
return(r)
end
#
# Wait until a group is done