lib/rocket_job/batch/io.rb in rocketjob-5.1.1 vs lib/rocket_job/batch/io.rb in rocketjob-5.2.0.beta1

- old
+ new

@@ -1,6 +1,6 @@ -require 'active_support/concern' +require "active_support/concern" module RocketJob module Batch # IO methods for sliced jobs module IO @@ -12,11 +12,13 @@ # category [Symbol] # The name of the category to access or upload data into # Default: None ( Uses the single default input collection for this job ) # Validates: This value must be one of those listed in #input_categories def input(category = :main) - raise "Category #{category.inspect}, must be registered in input_categories: #{input_categories.inspect}" unless input_categories.include?(category) || (category == :main) + unless input_categories.include?(category) || (category == :main) + raise "Category #{category.inspect}, must be registered in input_categories: #{input_categories.inspect}" + end (@inputs ||= {})[category] ||= RocketJob::Sliced::Input.new(rocket_job_io_slice_arguments("inputs", category)) end # Returns [RocketJob::Sliced::Output] output collection for holding output slices @@ -26,11 +28,13 @@ # category [Symbol] # The name of the category to access or download data from # Default: None ( Uses the single default output collection for this job ) # Validates: This value must be one of those listed in #output_categories def output(category = :main) - raise "Category #{category.inspect}, must be registered in output_categories: #{output_categories.inspect}" unless output_categories.include?(category) || (category == :main) + unless output_categories.include?(category) || (category == :main) + raise "Category #{category.inspect}, must be registered in output_categories: #{output_categories.inspect}" + end (@outputs ||= {})[category] ||= RocketJob::Sliced::Output.new(rocket_job_io_slice_arguments("outputs", category)) end # Upload the supplied file, io, IOStreams::Path, or IOStreams::Stream. @@ -108,11 +112,11 @@ # loaded into the job # * If an io stream is supplied, it is read until it returns nil. # * Only use this method for UTF-8 data, for binary data use #input_slice or #input_records. # * CSV parsing is slow, so it is usually left for the workers to do. def upload(stream = nil, file_name: nil, category: :main, stream_mode: :line, on_first: nil, **args, &block) - raise(ArgumentError, 'Either stream, or a block must be supplied') unless stream || block + raise(ArgumentError, "Either stream, or a block must be supplied") unless stream || block stream_mode = stream_mode.to_sym # Backward compatibility with existing v4 jobs stream_mode = :array if stream_mode == :row stream_mode = :hash if stream_mode == :record @@ -382,11 +386,12 @@ # job.write_output(result) def write_output(result = nil, input_slice = nil, &block) if block RocketJob::Sliced::Writer::Output.collect(self, input_slice, &block) else - raise(ArgumentError, 'result parameter is required when no block is supplied') unless result + raise(ArgumentError, "result parameter is required when no block is supplied") unless result + RocketJob::Sliced::Writer::Output.collect(self, input_slice) { |writer| writer << result } end end private @@ -401,9 +406,8 @@ elsif compress args[:slice_class] = Sliced::CompressedSlice end args end - end end end