module Streamworker module Workers class Worker include Enumerable QUERIES_PER_BLOCK = 500 TIME_PER_BLOCK = 300 TIMEOUT_MARGIN = 5 attr_accessor :view_context, :opts attr_accessor :repeats attr_accessor :line_num # subclasses responsible for setting this as appropriate attr_accessor :title attr_accessor :num_records, :num_success, :num_errors attr_accessor :footer_messages def initialize(view_context, opts={}) @opts = opts.with_indifferent_access @view_context = view_context @title = "Working..." @repeats = opts[:repeats] || 1 @repeats = @repeats.to_i @fragment = false @started_at = Time.now if defined?(AppConfig) @opts[:unicorn_timeout] ||= AppConfig.unicorn_timeout end @opts[:unicorn_timeout] ||= ENV['UNICORN_TIMEOUT'] @opts[:unicorn_timeout] ||= 30 @opts[:unicorn_timeout] = @opts[:unicorn_timeout].to_i @num_records = opts[:num_records].to_i || 1 @num_success = 0 @num_errors = 0 @footer_messages = [] end def queries_per_record 1 end def projected_queries (self.num_records * self.queries_per_record).to_f end def num_queries (self.num_success + self.num_errors) .to_f * self.queries_per_record end def calculate_times actual_time_used = Time.now - @started_at work_time_remaining = opts[:unicorn_timeout] - actual_time_used theoretical_total_time = (projected_queries / QUERIES_PER_BLOCK) * TIME_PER_BLOCK theoretical_time_used = (num_queries / QUERIES_PER_BLOCK) * TIME_PER_BLOCK factor = actual_time_used.to_f / theoretical_time_used factor = [factor, 1].max if projected_queries > QUERIES_PER_BLOCK total_time = theoretical_total_time * factor # puts "--------- calculate_times ---------" # puts "Time.now: #{Time.now.inspect}" # puts "@started_at: #{@started_at.inspect}" # puts "QUERIES_PER_BLOCK: #{QUERIES_PER_BLOCK.inspect}" # puts "TIME_PER_BLOCK: #{TIME_PER_BLOCK.inspect}" # puts "(self.num_records * self.queries_per_record): #{(self.num_records * self.queries_per_record).inspect}" # puts "opts[:unicorn_timeout] : #{opts[:unicorn_timeout] .inspect}" # puts "actual_time_used: #{actual_time_used.inspect}" # puts "work_time_remaining: #{work_time_remaining.inspect}" # puts "theoretical_total_time: #{theoretical_total_time.inspect}" # puts "theoretical_time_used: #{theoretical_time_used.inspect}" # puts "factor: #{factor.inspect}" # puts "total_time: #{total_time.inspect}" # puts "(total_time - actual_time_used): #{(total_time - actual_time_used).inspect}" # puts { work_time: opts[:unicorn_timeout] .to_i, work_time_remaining: work_time_remaining, time_used: actual_time_used, time_remaining: (total_time - actual_time_used), total_time: total_time } end def imminent_timeout? # puts "--------- imminent_timeout ---------" # puts "work_time_remaining: #{calculate_times[:work_time_remaining].inspect}" # puts "TIMEOUT_MARGIN: #{TIMEOUT_MARGIN.inspect}" calculate_times[:work_time_remaining] < TIMEOUT_MARGIN end def report_timeout_footer(msg={}) msg[:work_desc] ||= "#{num_success} records" msg[:how_to_finish] ||= "resubmit the last #{num_records - num_success} records." times = calculate_times %Q{
Repeating #{@repeats} times
} if @repeats > 1 header = <<-EOHTML #{self.head}} @fragment = ! opts[:close] close = "" if opts[:close] close = %Q{
#{scroll} } end out = %Q{#{start}#{str}#{close}} # Rails.logger.info(" out: #{out.inspect}") out end def report_error(str, list=[]) err = %Q{ #{error_line_num}#{str}
} err << %Q{