lib/cloud_crowd/action.rb in documentcloud-cloud-crowd-0.0.5 vs lib/cloud_crowd/action.rb in documentcloud-cloud-crowd-0.0.6

- old
+ new

@@ -1,10 +1,10 @@ module CloudCrowd # As you write your custom actions, have them inherit from CloudCrowd::Action. # All actions must implement a +process+ method, which should return a - # JSON-serializeable object that will be used as the output for the work unit. + # JSON-serializable object that will be used as the output for the work unit. # See the default actions for examples. # # Optionally, actions may define +split+ and +merge+ methods to do mapping # and reducing around the +input+. +split+ should return an array of URLs -- # to be mapped into WorkUnits and processed in parallel. In the +merge+ step, @@ -12,10 +12,12 @@ # # All actions have use of an individual +work_directory+, for scratch files, # and spend their duration inside of it, so relative paths work well. class Action + FILE_URL = /\Afile:\/\// + attr_reader :input, :input_path, :file_name, :options, :work_directory # Initializing an Action sets up all of the read-only variables that # form the bulk of the API for action subclasses. (Paths to read from and # write to). It creates the +work_directory+ and moves into it. @@ -25,25 +27,26 @@ @input, @options, @store = input, options, store @job_id, @work_unit_id = options['job_id'], options['work_unit_id'] @work_directory = File.expand_path(File.join(@store.temp_storage_path, storage_prefix)) FileUtils.mkdir_p(@work_directory) unless File.exists?(@work_directory) Dir.chdir @work_directory - unless status == MERGING - @input_path = File.join(@work_directory, safe_filename(@input)) - @file_name = File.basename(@input_path, File.extname(@input_path)) - download(@input, @input_path) - end + status == MERGING ? parse_input : download_input end # Each Action subclass must implement a +process+ method, overriding this. def process raise NotImplementedError.new("CloudCrowd::Actions must override 'process' with their own processing code.") end - # Download a file to the specified path with *curl*. + # Download a file to the specified path. def download(url, path) - `curl -s "#{url}" > "#{path}"` + if url.match(FILE_URL) + FileUtils.cp(url.sub(FILE_URL, ''), path) + else + resp = RestClient::Request.execute(:url => url, :method => :get, :raw_response => true) + FileUtils.mv resp.file.path, path + end path end # Takes a local filesystem path, saves the file to S3, and returns the # public (or authenticated) url on S3 where the file can be accessed. @@ -55,11 +58,11 @@ # After the Action has finished, we remove the work directory and return # to the root directory (where daemons run by default). def cleanup_work_directory Dir.chdir '/' - FileUtils.rm_r(@work_directory) + FileUtils.rm_r(@work_directory) if File.exists?(@work_directory) end private @@ -76,9 +79,23 @@ path_parts = [] path_parts << Inflector.underscore(self.class) path_parts << "job_#{@job_id}" path_parts << "unit_#{@work_unit_id}" if @work_unit_id @storage_prefix ||= File.join(path_parts) + end + + # If we know that the input is JSON, replace it with the parsed form. + def parse_input + @input = JSON.parse(@input) + end + + # If the input is a URL, download the file before beginning processing. + def download_input + input_is_url = !!URI.parse(@input) rescue false + return unless input_is_url + @input_path = File.join(@work_directory, safe_filename(@input)) + @file_name = File.basename(@input_path, File.extname(@input_path)) + download(@input, @input_path) end end end \ No newline at end of file