class Skynet
# Skynet::Job is the main interface to Skynet. You create a job object giving
# it the starting data (map_data), along with what class has the map/reduce
# functions in it. Even though Skynet is distributed, when you call #run on
# a plain Skynet::Job, it will still block in your current process until it has completed
# your task. If you want to go on to do other things you'll want to pass :async => true
# when creating a new job. Then later call job.results to retrieve your results.
#
# There are also many global configuration options which can be controlled through Skynet::CONFIG
#
# Example Usage:
# Create a file called mapreduce_test.rb with the following.
#
# class MapreduceTest
# include SkynetDebugger ## This gives you logging methods such as log, error, info, fatal
#
# def self.run
# job = Skynet::Job.new(
# :mappers => 2,
# :reducers => 1,
# :map_reduce_class => self,
# :map_data => [OpenStruct.new({:created_by => 2}),OpenStruct.new({:created_by => 2}),OpenStruct.new({:created_by => 3})]
# )
# results = job.run
# end
#
# def self.map(profiles)
# result = Array.new
# profiles.each do |profile|
# result << [profile.created_by, 1] if profile.created_by
# end
# result
# end
#
# def self.reduce(pairs)
# totals = Hash.new
# pairs.each do |pair|
# created_by, count = pair[0], pair[1]
# totals[created_by] ||= 0
# totals[created_by] += count
# end
# totals
# end
# end
#
#
# You need to make sure Skynet is running with your class loaded. That's is how Skynet works.
# Since there is no easy way to actually pass code around the network, each skynet worker must
# already have your code loaded. If you have skynet started, stop it and then start it with the -r flag
# to tell it where to find your class it should require.
# $ skynet -r mapreduce_test.rb
# Then go into the skynet console to test running your map reduce task.
# $ skynet console -r mapreduce_test.rb
# skynet>> MapreduceTest.run # returns {2=>2, 3=>1}
#
# In the example above, you might notice that self.map and self.reduce both accept Arrays.
# If you do not want to deal with getting arrays of map_data or reduce_data, you can include MapreduceHelper
# into your class and then implement self.map_each and self.reduce_each methods.
# The included self.map and self.reduce methods will handle iterating over the map_data and reduce_data,
# passing each element to your map_each and reduce_each methods respectively. They will also handle error
# handling within that loop to make sure even if a single map or reduce fails, processing will continue.
# If you do not want processing to continue if a map fails, do not use the MapreduceHelper mixin.
#
# Since Skynet must have your code, you will probably want to install skynet into the application
# that skynet needs access to in order to run your jobs. See bin/skynet_install[link:files/bin/skynet_install.html] for more info.
#
# See new for the many other options to control various Skynet::Job settings.
class Job
include SkynetDebugger
include Skynet::GuidGenerator
class WorkerError < Skynet::Error; end
class BadMapOrReduceError < Skynet::Error; end
class Error < Skynet::Error; end
@@worker_ver = nil
FIELDS = [:queue_id, :mappers, :reducers, :silent, :name, :map_timeout, :map_data, :job_id,
:reduce_timeout, :master_timeout, :map_name, :reduce_name,
:master_result_timeout, :result_timeout, :start_after, :solo, :single, :version,
:map, :map_partitioner, :reduce, :reduce_partition, :map_reduce_class,
:master_retry, :map_retry, :reduce_retry,
:keep_map_tasks, :keep_reduce_tasks,
:local_master, :async, :data_debug
]
FIELDS.each do |method|
if [:map_reduce_class, :version, :map, :reduce, :map_data, :start_after].include?(method)
attr_reader method
elsif [:master_retry, :map_retry, :reduce_retry,:keep_map_tasks, :keep_reduce_tasks].include?(method)
attr_writer method
else
attr_accessor method
end
end
attr_accessor :use_local_queue, :data_debug
Skynet::CONFIG[:JOB_DEFAULTS] = {
:queue_id => 0,
:mappers => 2,
:reducers => 1,
:map_timeout => 60,
:reduce_timeout => 60,
:master_timeout => 60,
:result_timeout => 1200,
:start_after => 0,
:master_result_timeout => 1200,
:local_master => true
}
def self.debug_class_desc
"JOB"
end
# Most of the time you will merely call #new(options) and then #run on the returned object.
#
# Options are:
# :local_master BOOL (DEFAULT true)
# By default, your Skynet::Job will act as the master for your map/reduce job, doling out
# tasks, waiting for other workers to complete and return their results and dealing with
# merging and partitioning the data. If you call #run in async mode, another worker will handle
# being the master for your job without blocking. If you run :async => false, :local_master => false
# Skynet will let another worker be the master for your job, but will block waiting for the
# final results. The benefit of this is that if your process dies, the Job will continue to
# run remotely.
#
# :async BOOL (DEFAULT false)
# If you run in async mode, another worker will handle being the master for your job without blocking.
# You can not pass :local_master => true, :async => true since the only way to allow your
# job to run asyncronously is to have a remote_master.
#
# :map_data(Array or Enumerable)
# map_data should be an Array or Enumerable that data Skynet::Job will split up
# and distribute among your workers. You can stream data to Skynet::Job by passing
# an Enumerable that implements next or each.
#
# :map_reduce_class(Class or Class Name)
# Skynet::Job will look for class methods named self.map, self.reduce, self.map_partitioner,
# self.reduce_partition in your map_reduce_class. The only method requires is self.map.
# Each of these methods must accept an array. Examples above.
#
# :map(Class Name)
# You can pass a classname, or a proc. If you pass a classname, Job will look for a method
# called self.map in that class.
# WARNING: Passing a proc does not work right now.
#
# :reduce(Class Name)
# You can pass a classname, or a proc. If you pass a classname, Job will look for a method
# called self.reduce in that class.
# WARNING: Passing a proc does not work right now.
#
# :reduce_partition(Class Name)
# You can pass a classname, or a proc. If you pass a classname, Job will look for a method
# called self.reduce_partition in that class.
# WARNING: Passing a proc does not work right now.
#
# :mappers Fixnum
# The number of mappers to partition map data for.
#
# :reducers Fixnum
# The number of reducers to partition the returned map_data for.
#
# :master_retry Fixnum
# If the master fails for any reason, how many times should it be retried? You can also set
# Skynet::CONFIG[:DEFAULT_MASTER_RETRY] (DEFAULT 0)
#
# :map_retry Fixnum
# If a map task fails for any reason, how many times should it be retried? You can also set
# Skynet::CONFIG[:DEFAULT_MAP_RETRY] (DEFAULT 3)
#
# :reduce_retry Fixnum
# If a reduce task fails for any reason, how many times should it be retried? You can also set
# Skynet::CONFIG[:DEFAULT_REDUCE_RETRY] (DEFAULT 3)
#
# :master_timeout, :map_timeout, :reduce_timeout, master_result_timeout, result_timeout
# These control how long skynet should wait for particular actions to be finished.
# The master_timeout controls how long the master should wait for ALL map/reduce tasks ie. the entire job to finish.
# The master_result_timeout controls how long the final result should wait in the queue before being expired.
# The map and reduce timeouts control how long individual map and reduce tasks shoudl take.
#
# :single BOOL
# By default the master task distributes the map and reduce tasks to other workers.
# In single mode the master will take care of the map and reduce tasks by itself.
# This is handy when you really want to just perform some single action asyncronously.
# In this case you're merely using Skynet to postpone some action. In single mode, the
# first worker that picks up your task will just complete it as opposed to trying to distribute
# it to another worker.
#
# :start_after Time or Time.to_i
# Do not start job until :start_after has passed
#
# :queue String
# Which queue should this Job go in to? The queue provided is merely used to
# determine the queue_id.
# Queues are defined in Skynet::CONFIG[:MESSAGE_QUEUES]
#
# :queue_id Fixnum (DEFAULT 0)
# Which queue should this Job go in to?
# Queues are defined in Skynet::CONFIG[:MESSAGE_QUEUES]
#
# :solo BOOL
# One normally turns solo mode in in Skynet::Config using Skynet::CONFIG[:SOLO] = true
# In solo mode, Skynet jobs do not add items to a Skynet queue. Instead they do all
# work in place. It's like a Skynet simulation mode. It will complete all tasks
# without Skynet running. Great for testing. You can also wrap code blocks in
# Skynet.solo {} to run that code in solo mode.
#
# :version Fixnum
# If you do not provide a version the current worker version will be used.
# Skynet workers start at a specific version and only look for jobs that match that version.
# A worker will continue looking for jobs at that version until there are no more jobs left on
# the queue for that version. At that time, the worker will check to see if there is a new version.
# If there is, it will restart itself at the new version (assuming you had already pushed code to
# said workers.)
# To retrieve the current version, set the current version or increment the current version, see
# Skynet::Job.set_worker_version, Skynet::Job.get_worker_version, Skynet::Job.increment_worker_version
#
# :name, :map_name, :reduce_name
# These name methods are merely for debugging while watching the Skynet logs or the Skynet queue.
# If you do not supply names, it will try and provide sensible ones based on your class names.
#
# :keep_map_tasks BOOL or Fixnum (DEFAULT 1)
# If true, the master will run the map_tasks locally.
# If a number is provided, the master will run the map_tasks locally if there are
# LESS THAN OR EQUAL TO the number provided.
# You may also set Skynet::CONFIG[:DEFAILT_KEEP_MAP_TASKS] DEFAULT 1
#
# :keep_reduce_tasks BOOL or Fixnum (DEFAULT 1)
# If true, the master will run the reduce_tasks locally.
# If a number is provided, the master will run the reduce_tasks locally if there are
# LESS THAN OR EQUAL TO the number provided.
# You may also set Skynet::CONFIG[:DEFAILT_REDUCVE_MAP_TASKS] DEFAULT 1
def initialize(options = {})
FIELDS.each do |field|
if options.has_key?(field)
self.send("#{field}=".to_sym,options[field])
elsif Skynet::CONFIG[:JOB_DEFAULTS][field]
self.send("#{field}=".to_sym,Skynet::CONFIG[:JOB_DEFAULTS][field])
end
if options[:queue]
raise Error.new("The provided queue (#{options[:queue]}) does not exist in Skynet::CONFIG[:MESSAGE_QUEUES]") unless Skynet::CONFIG[:MESSAGE_QUEUES].index(options[:queue])
self.queue_id = Skynet::CONFIG[:MESSAGE_QUEUES].index(options[:queue])
end
# Backward compatability
self.mappers ||= options[:map_tasks]
self.reducers ||= options[:reduce_tasks]
end
raise Error.new("You can not run a local master in async mode.") if self.async and self.local_master
@job_id = task_id
end
# Options are:
# :local_master BOOL (DEFAULT true)
# By default, your Skynet::Job will act as the master for your map/reduce job, doling out
# tasks, waiting for other workers to complete and return their results and dealing with
# merging and partitioning the data. If you run in async mode, another worker will handle
# being the master for your job without blocking. If you run :async => false, :local_master => false
# Skynet will let another worker be the master for your job, but will block waiting for the
# final results. The benefit of this is that if your process dies, the Job will continue to
# run remotely.
#
# :async BOOL (DEFAULT false)
# If you run in async mode, another worker will handle being the master for your job without blocking.
# You can not pass :local_master => true, :async => true since the only way to allow your
# job to run asyncronously is to have a remote_master.
#
# You can pass any options you might pass to Skynet::Job.new. Warning: Passing options to run
# will permanently change properties of the job.
def run(options = {})
FIELDS.each do |field|
if options.has_key?(field)
self.send("#{field}=".to_sym,options[field])
end
end
raise Error.new("You can not run a local master in async mode.") if self.async and self.local_master
info "RUN 1 BEGIN #{name}, job_id:#{job_id} vers: #{version} async:#{async}, local_master: #{local_master}, master?: #{master?}"
# run the master task if we're running async or local_master
if master?
master_enqueue
# ====================================================================================
# = FIXME If async Return a handle to an object that can used to retrieve the results later.
# ====================================================================================
async? ? job_id : master_results
else
number_of_tasks_queued = self.map_enqueue
map_results = self.map_results(number_of_tasks_queued)
return map_results unless map_results and self.reduce
partitioned_data = self.partition_data(map_results)
return unless partitioned_data
number_of_tasks_queued = self.reduce_enqueue(partitioned_data)
@results = self.reduce_results(number_of_tasks_queued)
end
end
def master_enqueue
self.use_local_queue = local_master?
messages = tasks_to_messages([master_task])
enqueue_messages(messages)
end
# Returns the final results of this map/reduce job. If results is called on an :async job
# calling results will block until results are found or the master_timeout is reached.
def results
# ============================================
# = FIXME Maybe this can have better warnings if the results aren't ready yet. =
# ============================================
master_results
end
def master_results
@results = gather_results(1,master_timeout,name) unless defined?(@results)
end
def map_enqueue
task_ids = []
map_tasks = self.map_tasks
self.use_local_queue = map_local?
if map_tasks
number_of_tasks = 0
size = map_tasks.size - 1
printlog "MESSAGES TO MAP ENQUEUE #{size}" if data_debug?
map_tasks.each_with_index do |task,ii|
printlog "#{size - ii} MAP TASKS LEFT TO ENQUEUE" if data_debug?
number_of_tasks += 1
enqueue_messages(tasks_to_messages(task))
end
end
return number_of_tasks
end
def map_results(number_of_tasks)
debug "RUN MAP 2.4 BEFORE MAP #{display_info} MAP_LOCAL?:#{map_local?} USE_LOCAL_QUEUE?:#{use_local_queue?}"
results = gather_results(number_of_tasks, map_timeout, map_name)
return unless results
results.compact! if results.is_a?(Array)
debug "RUN MAP 2.5 RESULTS AFTER RUN #{display_info} MAP_LOCAL:#{map_local?} USE_LOCAL_QUEUE?:#{use_local_queue?} results:", results.inspect
results
end
def partition_data(post_map_data)
info "RUN REDUCE 3.1 BEFORE PARTITION #{display_info} reducers: #{reducers}"
debug "RUN REDUCE 3.1 : #{reducers} #{name}, job_id:#{job_id}", post_map_data
printlog "RUN REDUCE 3.1 : #{reducers} #{name}, job_id:#{job_id}", post_map_data if data_debug?
return unless post_map_data
partitioned_data = nil
if not @reduce_partition
# =====================
# = XXX HACK
# = There was a bug in Job where the reduce_partition of master jobs wasn't being set! This is to catch that.
# = It handles it by checking if the map class has a reduce partitioner. Maybe this is a good thing to leave anyway.
# =====================
if @map.is_a?(String) and @map.constantize.respond_to?(:reduce_partition)
partitioned_data = @map.constantize.reduce_partition(post_map_data, reducers)
else
partitioned_data = Skynet::Partitioners::RecombineAndSplit.reduce_partition(post_map_data, reducers)
end
elsif @reduce_partition.is_a?(String)
partitioned_data = @reduce_partition.constantize.reduce_partition(post_map_data, reducers)
else
partitioned_data = @reduce_partition.call(post_map_data, reducers)
end
partitioned_data.compact! if partitioned_data
info "RUN REDUCE 3.2 AFTER PARTITION #{display_info} reducers: #{reducers}"
debug "RUN REDUCE 3.2 AFTER PARTITION #{display_info} data:", partitioned_data if partitioned_data
printlog "RUN REDUCE 3.2 AFTER PARTITION #{display_info} data:", partitioned_data if data_debug?
partitioned_data
end
def reduce_enqueue(partitioned_data)
return partitioned_data unless @reduce and reducers and reducers > 0
debug "RUN REDUCE 3.3 CREATED REDUCE TASKS #{display_info}", partitioned_data
size = partitioned_data.size
printlog "REDUCE MESSAGES TO ENQUEUE #{size}" if data_debug?
reduce_tasks = self.reduce_tasks(partitioned_data)
self.use_local_queue = reduce_local?(reduce_tasks)
number_of_tasks = 0
reduce_tasks.each_with_index do |task,ii|
printlog "#{size - ii} REDUCE TASKS LEFT TO ENQUEUE" if data_debug?
number_of_tasks += 1
enqueue_messages(tasks_to_messages(task))
end
return number_of_tasks
end
def reduce_results(number_of_tasks)
results = gather_results(number_of_tasks, reduce_timeout, reduce_name)
printlog "REDUCE RESULTS", results if data_debug?
if results.is_a?(Array) and results.first.is_a?(Array)
final = []
results.each do |result|
final += result
end
results = final
end
debug "RUN REDUCE 3.4 AFTER REDUCE #{display_info} results size: #{results ? results.size : ''}"
debug "RUN REDUCE 3.4 AFTER REDUCE #{display_info} results:", results if results
printlog "POST REDUCE RESULTS", results if data_debug?
return results
end
def enqueue_messages(messages)
size = messages.size
messages.each_with_index do |message,ii|
timeout = message.expiry || 5
debug "RUN TASKS SUBMITTING #{message.name} job_id: #{job_id} #{message.payload.is_a?(Skynet::Task) ? 'task' + message.payload.task_id.to_s : ''}"
debug "RUN TASKS WORKER MESSAGE #{message.name} job_id: #{job_id}", message.to_a
mq.write_message(message,timeout * 5)
end
end
# Given a job_id, returns the results from the message queue. Used to retrieve results of asyncronous jobs.
def self.results_by_job_id(job_id,timeout=2)
result_message = mq.take_result(job_id,timeout)
result = result_message.payload
return nil unless result
return result
end
def gather_results(number_of_tasks, timeout=nil, description=nil)
debug "GATHER RESULTS job_id: #{job_id} - NOT AN ASYNC JOB"
results = {}
errors = {}
started_at = Time.now.to_i
begin
loop do
# debug "LOOKING FOR RESULT MESSAGE TEMPLATE"
result_message = self.mq.take_result(job_id,timeout * 2)
ret_result = result_message.payload
if result_message.payload_type == :error
errors[result_message.task_id] = ret_result
error "ERROR RESULT TASK #{result_message.task_id} returned #{errors[result_message.task_id].inspect}"
else
results[result_message.task_id] = ret_result
debug "RESULT returned TASKID: #{result_message.task_id} #{results[result_message.task_id].inspect}"
end
debug "RESULT collected: #{(results.keys + errors.keys).size}, remaining: #{(number_of_tasks - (results.keys + errors.keys).uniq.size)}"
printlog "RESULT collected: #{(results.keys + errors.keys).size}, remaining: #{(number_of_tasks - (results.keys + errors.keys).uniq.size)}" if data_debug?
break if (number_of_tasks - (results.keys + errors.keys).uniq.size) <= 0
end
rescue Skynet::RequestExpiredError => e
local_mq_reset!
error "A WORKER EXPIRED or ERRORED, #{description}, job_id: #{job_id}"
if not errors.empty?
raise WorkerError.new("WORKER ERROR #{description}, job_id: #{job_id} errors:#{errors.keys.size} out of #{number_of_tasks} workers. #{errors.pretty_print_inspect}")
else
raise Skynet::RequestExpiredError.new("WORKER ERROR, A WORKER EXPIRED! Did not get results or even errors back from all workers!")
end
end
local_mq_reset!
# ==========
# = FIXME Tricky one. Should we throw an exception if we didn't get all the results back, or should we keep going.
# = Maybe this is another needed option.
# ==========
# if not (errors.keys - results.keys).empty?
# raise WorkerError.new("WORKER ERROR #{description}, job_id: #{job_id} errors:#{errors.keys.size} out of #{number_of_tasks} workers. #{errors.pretty_print_inspect}")
# end
return nil if results.values.compact.empty?
return results.values
end
def local_mq_reset!
if use_local_queue?
local_mq.reset!
self.use_local_queue=false
end
end
def master_task
@master_task ||= begin
raise Exception.new("No map provided") unless @map
# Make sure to set single to false in our own Job object.
# We're just passing along whether they set us to single.
# If we were single, we'd never send off the master to be run externally.
@single = false
task = Skynet::Task.master_task(self)
end
end
def map_tasks
@map_tasks ||= begin
map_tasks = []
debug "RUN MAP 2.1 #{display_info} data size before partition: #{@map_data.size}" if @map_data.respond_to?(:size)
debug "RUN MAP 2.1 #{display_info} data before partition:", @map_data
task_options = {
:process => @map,
:name => map_name,
:map_or_reduce => :map,
:result_timeout => map_timeout,
:retry => map_retry || Skynet::CONFIG[:DEFAULT_MAP_RETRY]
}
if @map_data.is_a?(Array)
debug "RUN MAP 2.2 DATA IS Array #{display_info}"
num_mappers = @map_data.length < @mappers ? @map_data.length : @mappers
map_data = if @map_partitioner
@map_partitioner.call(@map_data,num_mappers)
else
Skynet::Partitioners::SimplePartitionData.reduce_partition(@map_data, num_mappers)
end
debug "RUN MAP 2.3 #{display_info} data size after partition: #{map_data.size}"
debug "RUN MAP 2.3 #{display_info} map data after partition:", map_data
elsif @map_data.is_a?(Enumerable)
debug "RUN MAP 2.2 DATA IS ENUMERABLE #{display_info} map_data_class: #{@map_data.class}"
map_data = @map_data
else
debug "RUN MAP 2.2 DATA IS NOT ARRAY OR ENUMERABLE #{display_info} map_data_class: #{@map_data.class}"
map_data = [ @map_data ]
end
Skynet::TaskIterator.new(task_options, map_data)
end
end
def reduce_tasks(partitioned_data)
@reduce_tasks ||= begin
task_options = {
:name => reduce_name,
:process => @reduce,
:map_or_reduce => :reduce,
:result_timeout => reduce_timeout,
:retry => reduce_retry || Skynet::CONFIG[:DEFAULT_REDUCE_RETRY]
}
Skynet::TaskIterator.new(task_options, partitioned_data)
end
end
def tasks_to_messages(tasks)
if tasks.is_a?(Skynet::TaskIterator)
tasks = tasks.to_a
elsif not tasks.is_a?(Array)
tasks = [tasks]
end
tasks.collect do |task|
Skynet::Message.new_task_message(task,self)
end
end
def master_retry
@master_retry || Skynet::CONFIG[:DEFAULT_MASTER_RETRY]
end
def map_retry
@map_retry || Skynet::CONFIG[:DEFAULT_MAP_RETRY]
end
def reduce_retry
@reduce_retry || Skynet::CONFIG[:DEFAULT_REDUCE_RETRY]
end
def keep_map_tasks
@keep_map_tasks || Skynet::CONFIG[:DEFAULT_KEEP_MAP_TASKS]
end
def keep_reduce_tasks
@keep_reduce_tasks || Skynet::CONFIG[:DEFAULT_KEEP_REDUCE_TASKS]
end
def map_local?
return true if solo? or single?
return true if keep_map_tasks == true
# error "RUN MAP 2.4 BEFORE MAP #{display_info} KEEPMT:#{keep_map_tasks} DKMT:#{Skynet::CONFIG[:DEFAULT_KEEP_MAP_TASKS]} MDCLASS: #{map_tasks.data.class} #{(map_tasks.data.is_a?(Array) ? map_tasks.data.size : '')}"
return true if keep_map_tasks and map_tasks.data.is_a?(Array) and map_tasks.data.size <= keep_map_tasks
return false
end
def reduce_local?(reduce_tasks)
return true if solo? or single?
return true if keep_reduce_tasks == true
return true if keep_reduce_tasks and reduce_tasks.data.is_a?(Array) and reduce_tasks.data.size <= keep_reduce_tasks
return false
end
def use_local_queue?
@use_local_queue
end
# async is true if the async flag is set and the job is not a 'single' job, or in solo mode.
# async only applies to whether we run the master locally and whether we poll for the result
def async?
@async and not (solo? or single? or local_master?)
end
def master?
async? or not local_master?
end
def local_master?
@local_master or solo?
end
def solo?
(@solo or CONFIG[:SOLO])
end
def single?
@single
end
def data_debug?
@data_debug || Skynet::CONFIG[:SKYNET_JOB_DEBUG_DATA_LEVEL]
end
def reset!
@map_tasks = nil
@reduce_tasks = nil
end
def to_h
if @map.kind_of?(Proc) or @reduce.kind_of?(Proc)
raise Skynet::Error.new("You have a Proc in your map or reduce. This can't be turned into a hash.")
end
hash = {}
FIELDS.each do |field|
hash[field] = self.send(field) if self.send(field)
end
hash
end
def task_id
@task_id ||= get_unique_id(1).to_i
end
def version
return 1 if solo?
@version ||= begin
@@worker_version ||= self.mq.get_worker_version || 1
@@worker_version
end
end
def version=(v)
@version = v
end
def display_info
"#{name}, job_id: #{job_id}"
end
def start_after=(time)
@start_after = (time.is_a?(Time) ? time.to_i : time)
end
def map_data=(map_data)
reset!
@map_data = map_data
end
def map=(map)
reset!
return unless map
if map.class == String or map.class == Class
@map = map.to_s
elsif map.is_a?(Proc)
@map = map
else
raise BadMapOrReduceError.new("#{self.class}.map accepts a class name or a proc. Got #{map}")
end
end
def reduce=(reduce)
reset!
return unless reduce
if reduce.class == String or reduce.class == Class
@reduce = reduce.to_s
elsif reduce.is_a?(Proc)
@reduce = reduce
else
raise BadMapOrReduceError.new("#{self.class}.reduce accepts a class name or a proc. Got #{reduce}")
end
end
def map_reduce_class=(klass)
reset!
unless klass.class == String or klass.class == Class
raise BadMapOrReduceError.new("#{self.class}.map_reduce only accepts a class name: #{klass} #{klass.class}")
end
klass = klass.to_s
@map = klass
self.name ||= "#{klass} MASTER"
self.map_name ||= "#{klass} MAP"
if klass.constantize.respond_to?(:reduce)
@reduce ||= klass
self.reduce_name ||= "#{klass} REDUCE"
end
@reduce_partitioner ||= klass if klass.constantize.respond_to?(:reduce_partition)
@map_partitioner ||= klass if klass.constantize.respond_to?(:map_partitioner)
end
def run_master
error "run_master has been deprecated, please use run"
run(:local_master => false)
end
def mq
if use_local_queue?
local_mq
else
@mq ||= Skynet::MessageQueue.new
end
end
def local_mq
@local_mq ||= LocalMessageQueue.new
end
def self.mq
Skynet::MessageQueue.new
end
end ### END class Skynet::Job
end
class Skynet::AsyncJob < Skynet::Job
# Skynet::AsyncJob is for Skynet jobs you want to run asyncronously.
# Normally when you run a Skynet::Job it blocks until the job is complete.
# Running an Async job merely returns a job_id which can be used later to retrieve the results.
# See Skynet::Job for full documentation
def initialize(options = {})
options[:async] = true
options[:local_master] = false
super(options)
end
def map=(klass)
unless klass.class == String or klass.class == Class
raise BadMapOrReduceError.new("#{self.class}.map only accepts a class name")
end
@map = klass.to_s
end
def reduce=(klass)
unless klass.class == String or klass.class == Class
raise BadMapOrReduceError.new("#{self.class}.reduce only accepts a class name")
end
@reduce = klass.to_s
end
end # class Skynet::AsyncJob
class Skynet::Job::LocalMessageQueue
include SkynetDebugger
attr_reader :messages, :results
def initialize
@messages = []
@results = []
end
def get_worker_version
1
end
def take_result(job_id,timeout=nil)
raise Skynet::RequestExpiredError.new if @messages.empty?
run_message(@messages.shift)
end
def write_message(message,timeout=nil)
@messages << message
end
def empty?
@messages.empty?
end
def in_use?
(not empty?)
end
def reset!
@messages = []
@results = []
end
def run_message(message)
result = nil
(message.retry + 1).times do
task = message.payload
debug "RUN TASKS LOCALLY SUBMITTING #{message.name} task #{task.task_id}", task
begin
result = task.run
break
rescue Skynet::Task::TimeoutError => e
result = e
error "Skynet::Job::LocalMessageQueue Task timed out while executing #{e.inspect} #{e.backtrace.join("\n")}"
next
rescue Exception => e
error "Skynet::Job::LocalMessageQueue :#{__LINE__} #{e.inspect} #{e.backtrace.join("\n")}"
result = e
next
end
end
message.result_message(result)
end
end # class LocalMessageQueue
class Skynet::TaskIterator
include SkynetDebugger
include Skynet::GuidGenerator
class Error < StandardError
end
include Enumerable
attr_accessor :task_options, :data
def initialize(task_options, data)
@task_options = task_options
@data = data
end
def first
if data.respond_to?(:first)
@first ||= Skynet::Task.new(task_options.merge(:data => data.first, :task_id => get_unique_id(1).to_i))
else
raise Error.new("#{data.class} does not implement 'first'")
end
end
def size
if data.respond_to?(:size)
data.size
else
raise Error.new("#{data.class} does not implement 'size'")
end
end
def [](index)
if data.respond_to?(:[])
Skynet::Task.new(task_options.merge(:data => data[index], :task_id => get_unique_id(1).to_i))
else
raise Error.new("#{data.class} does not implement '[]'")
end
end
def each_method
each_method = data.respond_to?(:next) ? :next : :each
end
def to_a
self.collect { |task| task }
end
def each
iteration = 0
data.send(each_method) do |task_data|
task = nil
if @first and iteration == 0
task = @first
else
task = Skynet::Task.new(task_options.merge(:data => task_data, :task_id => (get_unique_id(1).to_i)))
@first = task if iteration == 0
end
iteration += 1
yield task
end
end
end # class TaskIterator
# require 'ruby2ruby' # XXX this will break unless people have the fix to Ruby2Ruby
##### ruby2ruby fix from ruby2ruby.rb ############
### XXX This is bad. Some people rely on an exception being thrown if a method is missing! BULLSHIT!
# class NilClass # Objective-C trick
# def method_missing(msg, *args, &block)
# nil
# end
# end
##############################