lib/skynet/skynet_message.rb in skynet-0.9.1 vs lib/skynet/skynet_message.rb in skynet-0.9.2
- old
+ new
@@ -1,54 +1,71 @@
class Skynet
class Message
include SkynetDebugger
- class BadMessage < Skynet::Error
- end
+ class BadMessage < Skynet::Error; end
class << self
attr_accessor :fields
end
- self.fields = {
- 0 => :tasktype,
- 1 => :drburi,
- 2 => :task_id,
- 3 => :job_id,
- 4 => :payload,
- 5 => :payload_type,
- 6 => :name,
- 7 => :expiry,
- 8 => :expire_time,
- 9 => :iteration,
- 10 => :version
- }
+ self.fields = [
+ :tasktype,
+ :drburi,
+ :task_id,
+ :job_id,
+ :payload,
+ :payload_type,
+ :name,
+ :expiry,
+ :expire_time,
+ :iteration,
+ :version,
+ :retry,
+ :queue_id
+ ]
- self.fields.values.each do |method|
- next if method == :payload
- next if method == :tasktype
- next if method == :payload_type
+ self.fields.each do |method|
+ next if [:payload, :tasktype, :payload_type].include?(method)
attr_accessor method
end
attr_reader :payload_type, :tasktype
+
+ def self.new_task_message(task,job)
+ self.new(
+ :job_id => job.job_id,
+ :expire_time => job.start_after,
+ :version => job.version,
+ :queue_id => job.queue_id || 0,
+ :iteration => 0,
+ :tasktype => :task,
+ :task_id => task.task_id,
+ :payload => task,
+ :payload_type => task.task_or_master,
+ :expiry => task.result_timeout,
+ :name => task.name,
+ :retry => task.retry
+ )
+ end
def initialize(opts)
if opts.is_a?(Array)
- self.class.fields.each do |ii, field|
+ self.class.fields.each_with_index do |field, ii|
self.send("#{field}=",opts[ii] || nil)
end
elsif opts
- self.class.fields.values.each do |field|
+ self.class.fields.each do |field|
value = opts[field] || opts[field.to_s] || nil
self.send("#{field}=",value) if value
end
opts_raw_payload = opts[:raw_payload] || opts["raw_payload"]
if opts_raw_payload
self.raw_payload = opts_raw_payload
end
+ self.retry ||= 0
end
self.payload
end
def fields
@@ -93,19 +110,19 @@
def [](ii)
send(self.class.fields[ii])
end
def to_a
- self.class.fields.keys.sort.collect do |ii|
- self.send(self.class.fields[ii])
+ self.class.fields.collect do |field|
+ self.send(field)
end
end
def to_hash
hash = {}
- self.class.fields.keys.sort.collect do |ii|
- hash[self.class.fields[ii]] = self.send(self.class.fields[ii])
+ self.class.fields.each do |field|
+ hash[field] = self.send(field)
end
hash
end
def to_h
@@ -118,89 +135,70 @@
def timeout
expire_time * 2
end
- ####### TEMPLATES ############
- def self.next_task_template(version=nil,payload_type=nil)
- fields.keys.sort.collect do |ii|
- field = fields[ii]
- case field
- when :expire_time
- (0 .. Time.now.to_i)
- when :tasktype
- :task
- when :version
- version
- when :payload_type
- payload_type
- when :iteration
- (0..6)
- else
- nil
- end
+ def self.next_task_template(version=nil, payload_type=nil, queue_id=0)
+ template = {
+ :expire_time => (0 .. Time.now.to_i),
+ :tasktype => :task,
+ :queue_id => queue_id,
+ :version => version,
+ :payload_type => payload_type,
+ :iteration => (0..Skynet::CONFIG[:MAX_RETRIES]),
+ }
+
+ fields.collect do |field|
+ template[field]
end
end
def self.result_template(job_id,tasktype=:result)
- fields.keys.sort.collect do |ii|
- field = fields[ii]
- case field
- when :tasktype
- tasktype
- when :job_id
- job_id
- else
- nil
- end
- end
+ template = {
+ :tasktype => tasktype,
+ :job_id => job_id
+ }
+ fields.collect do |field|
+ template[field]
+ end
end
def self.result_message(message,result,tasktype=:result, resulttype=:result)
- message_array = fields.keys.sort.collect do |ii|
- field = fields[ii]
- case field
- when :tasktype
- tasktype
- when :payload
- result
- when :payload_type
- resulttype
- else
- message.send(fields[ii])
- end
+ template = {
+ :tasktype => tasktype,
+ :payload => result,
+ :payload_type => resulttype
+ }
+
+ fields.each do |field|
+ template[field] = message.send(field) unless template.has_key?(field)
end
- new(message_array)
+ new(template)
end
def result_message(result,tasktype=:result, resulttype=:result)
self.class.result_message(self,result,tasktype,resulttype)
end
- def self.outstanding_tasks_template(iteration=nil)
- fields.keys.sort.collect do |ii|
- field = fields[ii]
- case field
- when :tasktype
- :task
- when :iteration
- iteration
- else
- nil
- end
+ def self.outstanding_tasks_template(iteration=nil,queue_id=0)
+ template = {
+ :tasktype => :task,
+ :queue_id => queue_id,
+ :iteration => iteration
+ }
+ fields.collect do |field|
+ template[field]
end
end
- def self.outstanding_results_template
- fields.keys.sort.collect do |ii|
- field = fields[ii]
- case field
- when :tasktype
- :result
- else
- nil
- end
+ def self.outstanding_results_template(queue_id=0)
+ template = {
+ :tasktype => :result,
+ :queue_id => queue_id
+ }
+ fields.collect do |field|
+ template[field]
end
end
def self.error_message(message,error)
result_message(message,error,:result,:error)
@@ -209,60 +207,67 @@
def error_message(error)
self.class.error_message(self,error)
end
def self.error_template(message)
- fields.keys.sort.collect do |ii|
- field = fields[ii]
- case field
- when :tasktype
- message.tasktype
- when :drburi, :version, :task_id
- message.send(fields[ii])
- else
- nil
- end
+ template = {
+ :tasktype => message.tasktype,
+ :drburi => message.drburi,
+ :version => message.version,
+ :task_id => message.task_id,
+ :queue_id => message.queue_id
+ }
+ fields.collect do |field|
+ template[field]
end
end
def error_template
self.class.error_template(self)
end
def self.fallback_task_message(message)
- opts = Hash.new
- fields.values.each do |field|
- case field
- when :iteration
- opts[:iteration] = message.iteration + 1
- when :expire_time
- opts[:expire_time] = Time.now.to_i + message.expiry
+ template = {}
+ if message.retry
+ if (message.retry and message.iteration >= message.retry)
+ template[:iteration] = -1
else
- opts[field] = message.send(field)
+ template[:iteration] = message.iteration + 1
end
+ # Originally I was gonna do this for map and reduce, but we don't know that here, just whether its a master.
+ elsif message.payload_type.to_sym == :master and Skynet::CONFIG[:DEFAULT_MASTER_RETRY] and message.iteration >= Skynet::CONFIG[:DEFAULT_MASTER_RETRY]
+ template[:iteration] = -1
+ elsif Skynet::CONFIG[:MAX_RETRIES] and message.iteration >= Skynet::CONFIG[:MAX_RETRIES]
+ template[:iteration] = -1
+ else
+ template[:iteration] = message.iteration + 1
end
- # debug "BUILDING NEXT FALLBACK TASK MESSAGE OFF"#, opts
- Skynet::Message.new(opts)
+
+ template[:expire_time] = Time.now.to_i + message.expiry
+
+ fields.each do |field|
+ template[field] = message.send(field) unless template.has_key?(field)
+ end
+ # debug "BUILDING NEXT FALLBACK TASK MESSAGE OFF"#, template
+ Skynet::Message.new(template)
end
def fallback_task_message
self.class.fallback_task_message(self)
end
def self.fallback_template(message)
- fields.keys.sort.collect do |ii|
- field = fields[ii]
- case field
- when :tasktype
- message.tasktype
- when :drburi, :version, :task_id
- message.send(field)
- when :iteration
- (1..20)
- else
- nil
- end
+ template = {
+ :tasktype => message.tasktype,
+ :drburi => message.drburi,
+ :version => message.version,
+ :task_id => message.task_id,
+ :queue_id => message.queue_id,
+ :iteration => (1..Skynet::CONFIG[:MAX_RETRIES]),
+ }
+ fields.collect do |field|
+ template[field]
end
end
def fallback_template
self.class.fallback_template(self)
@@ -283,84 +288,74 @@
def version
@version.to_i
end
def self.template
- fields.keys.sort.collect do |ii|
- field = fields[ii]
- case field
- when :tasktype
- :current_worker_rev
- else
- nil
- end
+ template = {
+ :tasktype => :current_worker_rev
+ }
+ fields.collect do |field|
+ template[field]
end
end
def template
- fields.keys.sort.collect do |ii|
- field = fields[ii]
- case field
- when :tasktype
- :current_worker_rev
- when :expire_time
- nil
- else
- self.send(field)
- end
+ template = {
+ :tasktype => :current_worker_rev,
+ :expire_time => nil
+ }
+ fields.collect do |field|
+ template[field] || self.send(field)
end
end
end
class WorkerStatusMessage < Skynet::Message
- self.fields = {
- 0 => :tasktype,
- 1 => :tasksubtype,
- 2 => :worker_id,
- 3 => :hostname,
- 4 => :process_id,
- 5 => :job_id,
- 6 => :task_id,
- 7 => :iteration,
- 8 => :name,
- 9 => :map_or_reduce,
- 10 => :started_at,
- 11 => :version,
- 12 => :processed
- }
- self.fields.values.each { |method| attr_accessor method }
+ self.fields = [
+ :tasktype,
+ :tasksubtype,
+ :worker_id,
+ :hostname,
+ :process_id,
+ :job_id,
+ :task_id,
+ :iteration,
+ :name,
+ :map_or_reduce,
+ :started_at,
+ :version,
+ :processed,
+ :queue_id
+ ]
+ self.fields.each { |method| attr_accessor method }
def initialize(opts)
super
self.tasktype = :status
self.tasksubtype = :worker
end
def self.worker_status_template(opts)
- fields.keys.sort.collect do |key|
- case fields[key]
- when :tasktype : :status
- when :tasksubtype : :worker
- when :hostname : opts[:hostname]
- when :process_id : opts[:process_id]
- else
- nil
- end
+ template = {
+ :tasktype => :status,
+ :tasksubtype => :worker,
+ :hostname => opts[:hostname],
+ :process_id => opts[:process_id]
+ }
+ fields.collect do |field|
+ template[field]
end
end
def self.all_workers_template(hostname=nil)
- fields.keys.sort.collect do |key|
- case fields[key]
- when :tasktype : :status
- when :tasksubtype : :worker
- when :hostname
- hostname if hostname
- else
- nil
- end
+ template = {
+ :tasktype => :status,
+ :tasksubtype => :worker,
+ :hostname => hostname,
+ }
+ fields.collect do |field|
+ template[field]
end
- end
-
- end
+ end
+ end # class WorkerStatusMessage
end