lib/skynet/skynet_message.rb in skynet-0.9.1 vs lib/skynet/skynet_message.rb in skynet-0.9.2

- old
+ new

@@ -1,54 +1,71 @@ class Skynet class Message include SkynetDebugger - class BadMessage < Skynet::Error - end + class BadMessage < Skynet::Error; end class << self attr_accessor :fields end - self.fields = { - 0 => :tasktype, - 1 => :drburi, - 2 => :task_id, - 3 => :job_id, - 4 => :payload, - 5 => :payload_type, - 6 => :name, - 7 => :expiry, - 8 => :expire_time, - 9 => :iteration, - 10 => :version - } + self.fields = [ + :tasktype, + :drburi, + :task_id, + :job_id, + :payload, + :payload_type, + :name, + :expiry, + :expire_time, + :iteration, + :version, + :retry, + :queue_id + ] - self.fields.values.each do |method| - next if method == :payload - next if method == :tasktype - next if method == :payload_type + self.fields.each do |method| + next if [:payload, :tasktype, :payload_type].include?(method) attr_accessor method end attr_reader :payload_type, :tasktype + + def self.new_task_message(task,job) + self.new( + :job_id => job.job_id, + :expire_time => job.start_after, + :version => job.version, + :queue_id => job.queue_id || 0, + :iteration => 0, + :tasktype => :task, + :task_id => task.task_id, + :payload => task, + :payload_type => task.task_or_master, + :expiry => task.result_timeout, + :name => task.name, + :retry => task.retry + ) + end def initialize(opts) if opts.is_a?(Array) - self.class.fields.each do |ii, field| + self.class.fields.each_with_index do |field, ii| self.send("#{field}=",opts[ii] || nil) end elsif opts - self.class.fields.values.each do |field| + self.class.fields.each do |field| value = opts[field] || opts[field.to_s] || nil self.send("#{field}=",value) if value end opts_raw_payload = opts[:raw_payload] || opts["raw_payload"] if opts_raw_payload self.raw_payload = opts_raw_payload end + self.retry ||= 0 end self.payload end def fields @@ -93,19 +110,19 @@ def [](ii) send(self.class.fields[ii]) end def to_a - self.class.fields.keys.sort.collect do |ii| - self.send(self.class.fields[ii]) + self.class.fields.collect do |field| + self.send(field) end end def to_hash hash = {} - self.class.fields.keys.sort.collect do |ii| - hash[self.class.fields[ii]] = self.send(self.class.fields[ii]) + self.class.fields.each do |field| + hash[field] = self.send(field) end hash end def to_h @@ -118,89 +135,70 @@ def timeout expire_time * 2 end - ####### TEMPLATES ############ - def self.next_task_template(version=nil,payload_type=nil) - fields.keys.sort.collect do |ii| - field = fields[ii] - case field - when :expire_time - (0 .. Time.now.to_i) - when :tasktype - :task - when :version - version - when :payload_type - payload_type - when :iteration - (0..6) - else - nil - end + def self.next_task_template(version=nil, payload_type=nil, queue_id=0) + template = { + :expire_time => (0 .. Time.now.to_i), + :tasktype => :task, + :queue_id => queue_id, + :version => version, + :payload_type => payload_type, + :iteration => (0..Skynet::CONFIG[:MAX_RETRIES]), + } + + fields.collect do |field| + template[field] end end def self.result_template(job_id,tasktype=:result) - fields.keys.sort.collect do |ii| - field = fields[ii] - case field - when :tasktype - tasktype - when :job_id - job_id - else - nil - end - end + template = { + :tasktype => tasktype, + :job_id => job_id + } + fields.collect do |field| + template[field] + end end def self.result_message(message,result,tasktype=:result, resulttype=:result) - message_array = fields.keys.sort.collect do |ii| - field = fields[ii] - case field - when :tasktype - tasktype - when :payload - result - when :payload_type - resulttype - else - message.send(fields[ii]) - end + template = { + :tasktype => tasktype, + :payload => result, + :payload_type => resulttype + } + + fields.each do |field| + template[field] = message.send(field) unless template.has_key?(field) end - new(message_array) + new(template) end def result_message(result,tasktype=:result, resulttype=:result) self.class.result_message(self,result,tasktype,resulttype) end - def self.outstanding_tasks_template(iteration=nil) - fields.keys.sort.collect do |ii| - field = fields[ii] - case field - when :tasktype - :task - when :iteration - iteration - else - nil - end + def self.outstanding_tasks_template(iteration=nil,queue_id=0) + template = { + :tasktype => :task, + :queue_id => queue_id, + :iteration => iteration + } + fields.collect do |field| + template[field] end end - def self.outstanding_results_template - fields.keys.sort.collect do |ii| - field = fields[ii] - case field - when :tasktype - :result - else - nil - end + def self.outstanding_results_template(queue_id=0) + template = { + :tasktype => :result, + :queue_id => queue_id + } + fields.collect do |field| + template[field] end end def self.error_message(message,error) result_message(message,error,:result,:error) @@ -209,60 +207,67 @@ def error_message(error) self.class.error_message(self,error) end def self.error_template(message) - fields.keys.sort.collect do |ii| - field = fields[ii] - case field - when :tasktype - message.tasktype - when :drburi, :version, :task_id - message.send(fields[ii]) - else - nil - end + template = { + :tasktype => message.tasktype, + :drburi => message.drburi, + :version => message.version, + :task_id => message.task_id, + :queue_id => message.queue_id + } + fields.collect do |field| + template[field] end end def error_template self.class.error_template(self) end def self.fallback_task_message(message) - opts = Hash.new - fields.values.each do |field| - case field - when :iteration - opts[:iteration] = message.iteration + 1 - when :expire_time - opts[:expire_time] = Time.now.to_i + message.expiry + template = {} + if message.retry + if (message.retry and message.iteration >= message.retry) + template[:iteration] = -1 else - opts[field] = message.send(field) + template[:iteration] = message.iteration + 1 end + # Originally I was gonna do this for map and reduce, but we don't know that here, just whether its a master. + elsif message.payload_type.to_sym == :master and Skynet::CONFIG[:DEFAULT_MASTER_RETRY] and message.iteration >= Skynet::CONFIG[:DEFAULT_MASTER_RETRY] + template[:iteration] = -1 + elsif Skynet::CONFIG[:MAX_RETRIES] and message.iteration >= Skynet::CONFIG[:MAX_RETRIES] + template[:iteration] = -1 + else + template[:iteration] = message.iteration + 1 end - # debug "BUILDING NEXT FALLBACK TASK MESSAGE OFF"#, opts - Skynet::Message.new(opts) + + template[:expire_time] = Time.now.to_i + message.expiry + + fields.each do |field| + template[field] = message.send(field) unless template.has_key?(field) + end + # debug "BUILDING NEXT FALLBACK TASK MESSAGE OFF"#, template + Skynet::Message.new(template) end def fallback_task_message self.class.fallback_task_message(self) end def self.fallback_template(message) - fields.keys.sort.collect do |ii| - field = fields[ii] - case field - when :tasktype - message.tasktype - when :drburi, :version, :task_id - message.send(field) - when :iteration - (1..20) - else - nil - end + template = { + :tasktype => message.tasktype, + :drburi => message.drburi, + :version => message.version, + :task_id => message.task_id, + :queue_id => message.queue_id, + :iteration => (1..Skynet::CONFIG[:MAX_RETRIES]), + } + fields.collect do |field| + template[field] end end def fallback_template self.class.fallback_template(self) @@ -283,84 +288,74 @@ def version @version.to_i end def self.template - fields.keys.sort.collect do |ii| - field = fields[ii] - case field - when :tasktype - :current_worker_rev - else - nil - end + template = { + :tasktype => :current_worker_rev + } + fields.collect do |field| + template[field] end end def template - fields.keys.sort.collect do |ii| - field = fields[ii] - case field - when :tasktype - :current_worker_rev - when :expire_time - nil - else - self.send(field) - end + template = { + :tasktype => :current_worker_rev, + :expire_time => nil + } + fields.collect do |field| + template[field] || self.send(field) end end end class WorkerStatusMessage < Skynet::Message - self.fields = { - 0 => :tasktype, - 1 => :tasksubtype, - 2 => :worker_id, - 3 => :hostname, - 4 => :process_id, - 5 => :job_id, - 6 => :task_id, - 7 => :iteration, - 8 => :name, - 9 => :map_or_reduce, - 10 => :started_at, - 11 => :version, - 12 => :processed - } - self.fields.values.each { |method| attr_accessor method } + self.fields = [ + :tasktype, + :tasksubtype, + :worker_id, + :hostname, + :process_id, + :job_id, + :task_id, + :iteration, + :name, + :map_or_reduce, + :started_at, + :version, + :processed, + :queue_id + ] + self.fields.each { |method| attr_accessor method } def initialize(opts) super self.tasktype = :status self.tasksubtype = :worker end def self.worker_status_template(opts) - fields.keys.sort.collect do |key| - case fields[key] - when :tasktype : :status - when :tasksubtype : :worker - when :hostname : opts[:hostname] - when :process_id : opts[:process_id] - else - nil - end + template = { + :tasktype => :status, + :tasksubtype => :worker, + :hostname => opts[:hostname], + :process_id => opts[:process_id] + } + fields.collect do |field| + template[field] end end def self.all_workers_template(hostname=nil) - fields.keys.sort.collect do |key| - case fields[key] - when :tasktype : :status - when :tasksubtype : :worker - when :hostname - hostname if hostname - else - nil - end + template = { + :tasktype => :status, + :tasksubtype => :worker, + :hostname => hostname, + } + fields.collect do |field| + template[field] end - end - - end + end + end # class WorkerStatusMessage end