lib/mail_spy/manager.rb in mail_spy-0.1.0 vs lib/mail_spy/manager.rb in mail_spy-0.1.1

- old
+ new

@@ -77,54 +77,35 @@ # ------------------------------------------- SEND OUTSTANDING EMAILS # Batches through all the emails that were scheduled and have come due # sends them out (step many at a time). Don't thread this method, instead # use the parameters to control concurrency - def send_outstanding_emails(step=100, num_threads=50) + def send_outstanding_emails(step=200, num_threads=100, num_workers=1) success = false raise "No Email service providers installed" unless MailSpy.esps.present? return if MailSpy::ProcessLog.currently_processing? + current_time = DateTime.now current_process = MailSpy::ProcessLog.create!( { - :start => Time.now, + :start => current_time, :running => true, }) - wq = WorkQueue.new(num_threads, step*2) - current_time = DateTime.now - offset = 0 - processed = 0 + count = MailSpy::Email.where(:schedule_at.lte => current_time, :sent => false, :failed => false).count + first = MailSpy::Email.where(:schedule_at.lte => current_time, :sent => false, :failed => false).first + division = (count / num_workers).to_i - # Helper function for setting present values - def set_if_present(email, pony_hash, pony_key, email_key=nil) - email_key = pony_key if email_key.nil? - value = email.send("#{email_key}") - pony_hash[pony_key] = value if value.present? - end - - while true - emails = MailSpy::Email. - limit(step).offset(offset).asc(:_id). - where(:schedule_at.lte => current_time, :sent => false, :failed => false).all - break if emails.count <= 0 #returns enumerator which is never blank - emails.each do |email| - processed += 1 - wq.enqueue_b do - MailSpy.using_delayed_job ? email.delay.deliver : email.deliver - end + num_workers.times do |i| + if MailSpy.using_delayed_job + Email.delay.deliver_batch(current_time, first._id, (i - 1) * division, division, step, num_threads) + else + Email.deliver_batch(current_time, first._id, (i - 1) * division, division, step, num_threads) end - - # We must join here otherwise the next loop email lookup will be in a - # race condition with the results of our worker_queue. - wq.join - - offset += step end success = true - return processed - + return count ensure if current_process end_time = Time.now current_process.end = end_time current_process.seconds_elapsed = end_time.to_i - current_process.start.to_i