lib/cloud_crowd/action.rb in documentcloud-cloud-crowd-0.0.5 vs lib/cloud_crowd/action.rb in documentcloud-cloud-crowd-0.0.6
- old
+ new
@@ -1,10 +1,10 @@
module CloudCrowd
# As you write your custom actions, have them inherit from CloudCrowd::Action.
# All actions must implement a +process+ method, which should return a
- # JSON-serializeable object that will be used as the output for the work unit.
+ # JSON-serializable object that will be used as the output for the work unit.
# See the default actions for examples.
#
# Optionally, actions may define +split+ and +merge+ methods to do mapping
# and reducing around the +input+. +split+ should return an array of URLs --
# to be mapped into WorkUnits and processed in parallel. In the +merge+ step,
@@ -12,10 +12,12 @@
#
# All actions have use of an individual +work_directory+, for scratch files,
# and spend their duration inside of it, so relative paths work well.
class Action
+ FILE_URL = /\Afile:\/\//
+
attr_reader :input, :input_path, :file_name, :options, :work_directory
# Initializing an Action sets up all of the read-only variables that
# form the bulk of the API for action subclasses. (Paths to read from and
# write to). It creates the +work_directory+ and moves into it.
@@ -25,25 +27,26 @@
@input, @options, @store = input, options, store
@job_id, @work_unit_id = options['job_id'], options['work_unit_id']
@work_directory = File.expand_path(File.join(@store.temp_storage_path, storage_prefix))
FileUtils.mkdir_p(@work_directory) unless File.exists?(@work_directory)
Dir.chdir @work_directory
- unless status == MERGING
- @input_path = File.join(@work_directory, safe_filename(@input))
- @file_name = File.basename(@input_path, File.extname(@input_path))
- download(@input, @input_path)
- end
+ status == MERGING ? parse_input : download_input
end
# Each Action subclass must implement a +process+ method, overriding this.
def process
raise NotImplementedError.new("CloudCrowd::Actions must override 'process' with their own processing code.")
end
- # Download a file to the specified path with *curl*.
+ # Download a file to the specified path.
def download(url, path)
- `curl -s "#{url}" > "#{path}"`
+ if url.match(FILE_URL)
+ FileUtils.cp(url.sub(FILE_URL, ''), path)
+ else
+ resp = RestClient::Request.execute(:url => url, :method => :get, :raw_response => true)
+ FileUtils.mv resp.file.path, path
+ end
path
end
# Takes a local filesystem path, saves the file to S3, and returns the
# public (or authenticated) url on S3 where the file can be accessed.
@@ -55,11 +58,11 @@
# After the Action has finished, we remove the work directory and return
# to the root directory (where daemons run by default).
def cleanup_work_directory
Dir.chdir '/'
- FileUtils.rm_r(@work_directory)
+ FileUtils.rm_r(@work_directory) if File.exists?(@work_directory)
end
private
@@ -76,9 +79,23 @@
path_parts = []
path_parts << Inflector.underscore(self.class)
path_parts << "job_#{@job_id}"
path_parts << "unit_#{@work_unit_id}" if @work_unit_id
@storage_prefix ||= File.join(path_parts)
+ end
+
+ # If we know that the input is JSON, replace it with the parsed form.
+ def parse_input
+ @input = JSON.parse(@input)
+ end
+
+ # If the input is a URL, download the file before beginning processing.
+ def download_input
+ input_is_url = !!URI.parse(@input) rescue false
+ return unless input_is_url
+ @input_path = File.join(@work_directory, safe_filename(@input))
+ @file_name = File.basename(@input_path, File.extname(@input_path))
+ download(@input, @input_path)
end
end
end
\ No newline at end of file