lib/mail_spy/manager.rb in mail_spy-0.1.0 vs lib/mail_spy/manager.rb in mail_spy-0.1.1
- old
+ new
@@ -77,54 +77,35 @@
# ------------------------------------------- SEND OUTSTANDING EMAILS
# Batches through all the emails that were scheduled and have come due
# sends them out (step many at a time). Don't thread this method, instead
# use the parameters to control concurrency
- def send_outstanding_emails(step=100, num_threads=50)
+ def send_outstanding_emails(step=200, num_threads=100, num_workers=1)
success = false
raise "No Email service providers installed" unless MailSpy.esps.present?
return if MailSpy::ProcessLog.currently_processing?
+ current_time = DateTime.now
current_process = MailSpy::ProcessLog.create!(
{
- :start => Time.now,
+ :start => current_time,
:running => true,
})
- wq = WorkQueue.new(num_threads, step*2)
- current_time = DateTime.now
- offset = 0
- processed = 0
+ count = MailSpy::Email.where(:schedule_at.lte => current_time, :sent => false, :failed => false).count
+ first = MailSpy::Email.where(:schedule_at.lte => current_time, :sent => false, :failed => false).first
+ division = (count / num_workers).to_i
- # Helper function for setting present values
- def set_if_present(email, pony_hash, pony_key, email_key=nil)
- email_key = pony_key if email_key.nil?
- value = email.send("#{email_key}")
- pony_hash[pony_key] = value if value.present?
- end
-
- while true
- emails = MailSpy::Email.
- limit(step).offset(offset).asc(:_id).
- where(:schedule_at.lte => current_time, :sent => false, :failed => false).all
- break if emails.count <= 0 #returns enumerator which is never blank
- emails.each do |email|
- processed += 1
- wq.enqueue_b do
- MailSpy.using_delayed_job ? email.delay.deliver : email.deliver
- end
+ num_workers.times do |i|
+ if MailSpy.using_delayed_job
+ Email.delay.deliver_batch(current_time, first._id, (i - 1) * division, division, step, num_threads)
+ else
+ Email.deliver_batch(current_time, first._id, (i - 1) * division, division, step, num_threads)
end
-
- # We must join here otherwise the next loop email lookup will be in a
- # race condition with the results of our worker_queue.
- wq.join
-
- offset += step
end
success = true
- return processed
-
+ return count
ensure
if current_process
end_time = Time.now
current_process.end = end_time
current_process.seconds_elapsed = end_time.to_i - current_process.start.to_i