class SkynetMessageQueue < ActiveRecord::Base end class SkynetWorkerQueue < ActiveRecord::Base end class Skynet # require 'mysql' class Error < StandardError end class RequestExpiredError < Skynet::Error end class InvalidMessage < Skynet::Error end class MessageQueueAdapter class Mysql < Skynet::MessageQueueAdapter include SkynetDebugger include Skynet::GuidGenerator SEARCH_FIELDS = [:tasktype, :task_id, :job_id, :payload_type, :expire_time, :iteration, :version] unless defined?(SEARCH_FIELDS) Skynet::CONFIG[:MYSQL_MESSAGE_QUEUE_TEMP_CHECK_DELAY] ||= 30 @@db_set = false def self.adapter :mysql end def initialize if Skynet::CONFIG[:QUEUE_DATABASE] and not @@db_set begin SkynetMessageQueue.establish_connection Skynet::CONFIG[:QUEUE_DATABASE] SkynetWorkerQueue.establish_connection Skynet::CONFIG[:QUEUE_DATABASE] rescue ActiveRecord::AdapterNotSpecified => e warn "#{Skynet::CONFIG[:QUEUE_DATABASE]} not defined as a database adaptor #{e.message}" end end @@db_set = true end # def initialize # SkynetMessageQueue.connection.execute("set session TRANSACTION ISOLATION LEVEL READ UNCOMMITTED") # end def self.debug_class_desc "MYSQLMQ" end def message_to_conditions(message) template_to_conditions(message.to_a) end def template_to_conditions(template,fields=Skynet::Message.fields) fields = fields.invert conditions = [] values = [] fields.keys.each do |field| value = template[fields[field]] next unless value if value.is_a?(Range) conditions << "#{field} BETWEEN #{value.first} AND #{value.last}" elsif value.is_a?(Symbol) or value.is_a?(String) conditions << "#{field} = '#{value}'" else conditions << "#{field} = #{value}" end end return '' if conditions.empty? return conditions.join(" AND ") end def message_to_hash(message,timeout=nil,fields=Skynet::Message.fields) hash = {} fields.values.each do |field| next if field == :drburi # next unless message.send(field) if message.send(field).is_a?(Symbol) hash[field] = message.send(field).to_s elsif field == :payload hash[:raw_payload] = message.raw_payload else hash[field] = message.send(field) end end if timeout hash[:timeout] = timeout hash[:expire_time] = (Time.now.to_f + timeout) unless hash[:expire_time] end hash end def take_next_task(curver,timeout=0,payload_type=nil) timeout = Skynet::CONFIG[:NEXT_TASK_TIMEOUT] if timeout < 1 debug "TASK NEXT TASK!!!!!!! timeout: #{timeout}" message = nil start = Time.now rows = nil loop do # debug "start #{Time.now} timeout #{start + timeout}" message_row = take(Skynet::Message.next_task_template(curver,payload_type),start,timeout) next unless message_row begin message = Skynet::Message.new(message_row.attributes) rescue Skynet::Message::BadMessage => e message = nil message_row.destroy next end ftm = message.fallback_task_message rows = update("update skynet_message_queues set iteration = #{ftm.iteration }, expire_time = #{ftm.expire_time} where iteration = #{message.iteration} and id = #{message_row.id}") # message = nil if rows == 0 return message if message end end def write_message(message,timeout=nil) SkynetMessageQueue.create(message_to_hash(message, timeout)) end def write_result(message,result=[],timeout=nil) result_message = message.result_message(result) result_message.expire_time = nil update_message(result_message,timeout) end def update_message(message,timeout=nil) timeout_sql = (timeout ? ", timeout = #{timeout}, expire_time = #{Time.now.to_f + timeout}" : '') rows = 0 rows = update(%{ update skynet_message_queues set tasktype = "#{message.tasktype}", raw_payload = '#{message.raw_payload}', payload_type = "#{message.payload_type}", tran_id = NULL #{timeout_sql} where task_id = #{message.task_id} }) raise Skynet::RequestExpiredError.new() if rows == 0 end def take_result(job_id,timeout=1) start = Time.now result = nil sleep_time = 10 if timeout < 1 sleep_time = 1 elsif timeout > 10 sleep_time = 10 else sleep_time = timeout * 0.25 end message_row = nil loop do # message_row = take(Skynet::Message.result_template(job_id), start, timeout,sleep_time) conditions = template_to_conditions(Skynet::Message.result_template(job_id)) # sleep_time ||= timeout message_row = SkynetMessageQueue.find(:first,:conditions => conditions) break if message_row if Time.now.to_f > start.to_f + timeout raise Skynet::RequestExpiredError.new else sleepy = rand(sleep_time) # error "RESULT EMPTY SLEEPING: #{sleepy}" sleep sleepy next end next end result = Skynet::Message.new(message_row.clone.attributes) message_row.destroy return result if result end def list_tasks(iteration=nil) conditions = template_to_conditions(Skynet::Message.outstanding_tasks_template(iteration)) SkynetMessageQueue.find(:all,:conditions => conditions) end def list_results conditions = template_to_conditions(Skynet::Message.outstanding_results_template) SkynetMessageQueue.find(:all,:conditions => conditions) end def write_error(message,error='',timeout=nil) message.expire_time = nil update_message(message.error_message(error),timeout) end def write_worker_status(task, timeout=nil) message = Skynet::WorkerStatusMessage.new(task) worker_fields = Skynet::WorkerStatusMessage.fields.reject {|k,f| f == :process_id or f == :hostname or f == :tasksubtype or f == :tasktype} update_hash = message_to_hash(message, timeout, Skynet::WorkerStatusMessage.fields) update_hash.each do |k,v| if not v update_hash[k] = "NULL" elsif v.kind_of?(String) or v.kind_of?(Symbol) update_hash[k] = "'#{v}'" end end update_sql = "UPDATE skynet_worker_queues SET #{update_hash.collect{|k,v| "#{k}=#{v}"}.join(',')} WHERE worker_id=#{message.worker_id}" rows = update(update_sql) if rows == 0 begin insert_sql = "INSERT INTO skynet_worker_queues (#{update_hash.keys.join(',')}) VALUES (#{update_hash.values.join(',')})" rows = update(insert_sql) rescue ActiveRecord::StatementInvalid => e if e.message =~ /Duplicate/ error "DUPLICATE WORKER #{e.message}" else raise e end end end return rows end def take_worker_status(task, timeout=nil) conditions = template_to_conditions(Skynet::WorkerStatusMessage.worker_status_template(task), Skynet::WorkerStatusMessage.fields) worker_status = nil SkynetWorkerQueue.transaction do worker_row = SkynetWorkerQueue.find(:first, :conditions => conditions) return unless worker_row worker_status = Skynet::WorkerStatusMessage.new(worker_row.clone.attributes) worker_row.destroy end worker_status end def read_all_worker_statuses(hostname=nil,process_id=nil) ws = Skynet::WorkerStatusMessage.all_workers_template(hostname) ws[4] = process_id if process_id conditions = template_to_conditions(ws,Skynet::WorkerStatusMessage.fields) rows = SkynetWorkerQueue.find(:all, :conditions => conditions) workers = rows.collect{ |w| Skynet::WorkerStatusMessage.new(w.attributes) }#.sort{ |a,b| a.process_id <=> b.process_id } end def clear_worker_status(hostname=nil) if hostname SkynetWorkerQueue.connection.execute("delete from skynet_worker_queues where hostname = '#{hostname}'") else SkynetWorkerQueue.destroy_all end end def set_worker_version(ver=nil) ver ||= 1 # SkynetWorkerQueue.transaction do # SkynetWorkerQueue.connection.execute("delete from skynet_worker_queues where tasktype = 'workerversion'") SkynetWorkerQueue.connection.insert("replace skynet_worker_queues (worker_id, tasktype, version) values (0, 'workerversion',#{ver})") # end ver end def get_worker_version # ver = SkynetWorkerQueue.connection.select_value("select min(version) from skynet_message_queues where tasktype = 'task'") ver = SkynetWorkerQueue.connection.select_value("select version from skynet_worker_queues where tasktype = 'workerversion'") if not ver set_worker_version(1) ver = 1 end ver.to_i end def clear_outstanding_tasks SkynetMessageQueue.destroy_all end def delete_expired_messages SkynetMessageQueue.connection.delete("delete from skynet_message_queues where expire_time BETWEEN 1 AND '#{Time.now.to_f}'") end # select hostname, iteration, count(id) as number_of_workers, count(iteration) as iteration, sum(processed) as processed, max(started_at) as most_recent_task_time from skynet_worker_queues where tasksubtype = 'worker' group by hostname, iteration; # # select hostname, count(id) as number_of_workers, sum(processed) as processed, max(started_at) as most_recent_task_time, # CASE iteration WHEN NULL # # from skynet_worker_queues where tasksubtype = 'worker' group by hostname; def stats stats = { :servers => {}, :results => 0, :taken_tasks => 0, :untaken_tasks => 0, :taken_master_tasks => 0, :taken_task_tasks => 0, :untaken_master_tasks => 0, :untaken_task_tasks => 0, :processed => 0, :number_of_workers => 0, :active_workers => 0, :idle_workers => 0, :hosts => 0, :masters => 0, :taskworkers => 0, :time => Time.now.to_f } stat_rows = SkynetWorkerQueue.connection.select_all(%{ SELECT tasktype, payload_type, iteration, count(id) as number_of_tasks FROM skynet_message_queues GROUP BY tasktype, payload_type, iteration }) # pp stat_rows stat_rows.each do |row| if row["tasktype"] == "result" or row["payload_type"] == "result" stats[:results] += row["number_of_tasks"].to_i elsif row["tasktype"] == "task" type_of_tasks = nil if row["payload_type"] == "master" type_of_tasks = :master_tasks elsif row["payload_type"] == "task" type_of_tasks = :task_tasks end if row["iteration"].to_i > 0 stats["taken_#{type_of_tasks}".to_sym] += row["number_of_tasks"].to_i stats[:taken_tasks] += row["number_of_tasks"].to_i else stats["untaken_#{type_of_tasks}".to_sym] += row["number_of_tasks"].to_i stats[:untaken_tasks] += row["number_of_tasks"].to_i end end end servers = {} stat_sql = <<-SQL select hostname, map_or_reduce, count(id) number_of_workers, sum(processed) as processed, max(started_at) as most_recent_task_time, iteration FROM skynet_worker_queues WHERE skynet_worker_queues.tasksubtype = 'worker' SQL stat_rows = SkynetWorkerQueue.connection.select_all("#{stat_sql} GROUP BY hostname, map_or_reduce").each do |row| servers[row["hostname"]] ||= { :processed => 0, :hostname => row["hostname"], :number_of_workers => 0, :active_workers => 0, :idle_workers => 0, } servers[row["hostname"]][:processed] += row["processed"].to_i servers[row["hostname"]][:number_of_workers] += row["number_of_workers"].to_i servers[row["hostname"]][:active_workers] += 0 servers[row["hostname"]][:idle_workers] += row["number_of_workers"].to_i stats[:processed] += row["processed"].to_i stats[:number_of_workers] += row["number_of_workers"].to_i stats[:idle_workers] += row["number_of_workers"].to_i end SkynetWorkerQueue.connection.select_all(%{ #{stat_sql} AND skynet_worker_queues.iteration IS NOT NULL GROUP BY hostname, map_or_reduce }).each do |row| map_or_reduce = nil if row["map_or_reduce"] == "master" map_or_reduce = :masters else map_or_reduce = :taskworkers end servers[row["hostname"]][:active_workers] += row["number_of_workers"].to_i servers[row["hostname"]][:idle_workers] -= row["number_of_workers"].to_i servers[row["hostname"]][map_or_reduce] ||= 0 servers[row["hostname"]][map_or_reduce] += row["number_of_workers"].to_i stats[map_or_reduce] += row["number_of_workers"].to_i stats[:active_workers] += row["number_of_workers"].to_i stats[:idle_workers] -= row["number_of_workers"].to_i end stats[:servers] = servers stats[:hosts] = servers.keys.size stats[:time] = Time.now.to_f - stats[:time] stats end def processed(sleepy=5,tim=10) last_time = Time.now last_count = Skynet::MessageQueue.new.stats[:processed] tim.times do new_count = Skynet::MessageQueue.new.stats[:processed] new_time = Time.now puts "Processed #{new_count - last_count} in #{new_time - last_time}" last_time = new_time last_count = new_count sleep sleepy end end private def update(sql) rows = 0 3.times do begin rows = SkynetMessageQueue.connection.update(sql) return rows rescue ActiveRecord::StatementInvalid => e if e.message =~ /Deadlock/ or e.message =~ /Transaction/ error "#{self.class} update had collision #{e.message}" sleep 0.1 next else raise e end end end return rows end Skynet::CONFIG[:MYSQL_TEMPERATURE_CHANGE_SLEEP] ||= 40 @@temperature ||= {} @@temperature[:task] ||= 1 @@temperature[:master] ||= 1 @@temperature[:any] ||= 1 def take(template,start=Time.now,timeout=1,sleep_time=nil) conditions = template_to_conditions(template) sleep_time ||= timeout transaction_id = get_unique_id(1) times_tried = 0 payload_type = template[Skynet::Message.fields.invert[:payload_type]] payload_type ||= :any payload_type = payload_type.to_sym 10.times do begin ## TEPERATURE temperature_sql = (temperature(payload_type) > 1 ? " AND id % #{temperature(payload_type).ceil} = #{rand(temperature(payload_type)).to_i} " : '') ### Mqke sure we get the old ones. If we order by on ever select its VERY expensive. order_by = (payload_type != :master and rand(100) < 5) ? "ORDER BY payload_type desc, created_on desc" : '' sql = <<-SQL SELECT * FROM skynet_message_queues WHERE #{conditions} #{temperature_sql} #{order_by} LIMIT 1 SQL message_row = SkynetMessageQueue.find_by_sql(sql).first if message_row update_conditions = "ID = #{message_row.id} and tran_id " if message_row.tran_id update_conditions << "= #{message_row.tran_id}" else update_conditions << 'IS NULL' end rows = SkynetMessageQueue.connection.update( "UPDATE skynet_message_queues set tran_id = #{transaction_id} WHERE #{update_conditions}" ) if rows < 1 old_temp = temperature(payload_type) set_temperature(payload_type,conditions) info "MISSCOLLISION PTYPE #{payload_type} OLDTEMP: #{old_temp} NEWTEMP: #{temperature(payload_type)}" next end return message_row else old_temp = temperature(payload_type) set_temperature(payload_type,conditions) info "MISS PTYPE #{payload_type} OLDTEMP: #{old_temp} NEWTEMP: #{temperature(payload_type)}" break if temperature(payload_type) == 1 and old_temp == 1 next end rescue ActiveRecord::StatementInvalid => e if e.message =~ /Deadlock/ old_temp = temperature(payload_type) set_temperature(payload_type,conditions) info "COLLISION PTYPE #{payload_type} OLDTEMP: #{old_temp} NEWTEMP: #{temperature(payload_type)}" next else raise e end end end if Time.now.to_f > start.to_f + timeout debug "MISSTIMEOUT PTYPE #{payload_type} #{temperature(payload_type)}" raise Skynet::RequestExpiredError.new else sleepy = rand(sleep_time * 0.5 ) debug "EMPTY QUEUE #{temperature(payload_type)} SLEEPING: #{sleep_time} / #{sleepy}" sleep sleepy return false end end # Skynet::CONFIG[:temperature_growth_rate] ||= 2 # Skynet::CONFIG[:temperature_backoff_rate] ||= 0.75 # TUNEABLE_SETTINGS = [:temp_pow, :temp_interval, :sleep_time] # # def write_score(new_values,new_result,score) # values ||= {} # set = new_values.keys.sort.collect{|k|[k,new_values[k]]}.join(",") # if not values[set] # values[set] ||= {} # values[set][:results] ||= [] # values[set][:scores] ||= [] # values[set][:settings] ||= {} # values[set][:total_score] = 0 # TUNEABLE_SETTINGS.each do |setting| # values[set][:settings][setting] = [] # end # end # TUNEABLE_SETTINGS.each do |setting, value| # values[set][:settings][setting] << value # end # values[set][:results] << new_result # values[set][:scores] << score + values[set][:total_score] # values[set][:total_score] += score # end def temperature(payload_type) @@temperature[payload_type.to_sym] end ## try SQRT *2 ## try POW 0.6 or .75 def set_temperature(payload_type,conditions) temp_q_conditions = "type = '#{payload_type}' AND updated_on < '#{(Time.now - 5).strftime('%Y-%m-%d %H:%M:%S')}'" # "POW(#{(rand(40) + 40) * 0.01})" # its almost like the temperature table needs to store the POW and adjust that to be adaptive. Like some % of the time it # uses the one in the table, and some % it tries a new one and scores it. temperature = SkynetWorkerQueue.connection.select_value(%{select ( CASE WHEN (@t:=FLOOR( POW(@c:=(SELECT count(*) FROM skynet_message_queues WHERE #{conditions} ),0.6))) < 1 THEN 1 ELSE @t END) from skynet_queue_temperature WHERE #{temp_q_conditions} }) if temperature update("UPDATE skynet_queue_temperature SET temperature = #{temperature} WHERE #{temp_q_conditions}") @@temperature[payload_type.to_sym] = temperature.to_f else sleepy = rand Skynet::CONFIG[:MYSQL_TEMPERATURE_CHANGE_SLEEP] sleep sleepy @@temperature[payload_type.to_sym] = SkynetWorkerQueue.connection.select_value("select temperature from skynet_queue_temperature WHERE type = '#{payload_type}'").to_f end # update("UPDATE skynet_queue_temperature SET type = '#{payload_type}', temperature = CASE WHEN @t:=FLOOR(SQRT(select count(*) from skynet_message_queues WHERE #{conditions})) < 1 THEN 1 ELSE @t END") # tasks = SkynetMessageQueue.connection.select_value("select count(*) from skynet_message_queues WHERE #{conditions}").to_i # sleep 4 if payload_type == :tasks and tasks < 100 # @@temperature[payload_type.to_sym] = tasks ** 0.5 # @@temperature[payload_type.to_sym] *= multiplier @@temperature[payload_type.to_sym] = 1 if @@temperature[payload_type.to_sym] < 1 end end end end