lib/rocket_job/batch/io.rb in rocketjob-5.1.1 vs lib/rocket_job/batch/io.rb in rocketjob-5.2.0.beta1
- old
+ new
@@ -1,6 +1,6 @@
-require 'active_support/concern'
+require "active_support/concern"
module RocketJob
module Batch
# IO methods for sliced jobs
module IO
@@ -12,11 +12,13 @@
# category [Symbol]
# The name of the category to access or upload data into
# Default: None ( Uses the single default input collection for this job )
# Validates: This value must be one of those listed in #input_categories
def input(category = :main)
- raise "Category #{category.inspect}, must be registered in input_categories: #{input_categories.inspect}" unless input_categories.include?(category) || (category == :main)
+ unless input_categories.include?(category) || (category == :main)
+ raise "Category #{category.inspect}, must be registered in input_categories: #{input_categories.inspect}"
+ end
(@inputs ||= {})[category] ||= RocketJob::Sliced::Input.new(rocket_job_io_slice_arguments("inputs", category))
end
# Returns [RocketJob::Sliced::Output] output collection for holding output slices
@@ -26,11 +28,13 @@
# category [Symbol]
# The name of the category to access or download data from
# Default: None ( Uses the single default output collection for this job )
# Validates: This value must be one of those listed in #output_categories
def output(category = :main)
- raise "Category #{category.inspect}, must be registered in output_categories: #{output_categories.inspect}" unless output_categories.include?(category) || (category == :main)
+ unless output_categories.include?(category) || (category == :main)
+ raise "Category #{category.inspect}, must be registered in output_categories: #{output_categories.inspect}"
+ end
(@outputs ||= {})[category] ||= RocketJob::Sliced::Output.new(rocket_job_io_slice_arguments("outputs", category))
end
# Upload the supplied file, io, IOStreams::Path, or IOStreams::Stream.
@@ -108,11 +112,11 @@
# loaded into the job
# * If an io stream is supplied, it is read until it returns nil.
# * Only use this method for UTF-8 data, for binary data use #input_slice or #input_records.
# * CSV parsing is slow, so it is usually left for the workers to do.
def upload(stream = nil, file_name: nil, category: :main, stream_mode: :line, on_first: nil, **args, &block)
- raise(ArgumentError, 'Either stream, or a block must be supplied') unless stream || block
+ raise(ArgumentError, "Either stream, or a block must be supplied") unless stream || block
stream_mode = stream_mode.to_sym
# Backward compatibility with existing v4 jobs
stream_mode = :array if stream_mode == :row
stream_mode = :hash if stream_mode == :record
@@ -382,11 +386,12 @@
# job.write_output(result)
def write_output(result = nil, input_slice = nil, &block)
if block
RocketJob::Sliced::Writer::Output.collect(self, input_slice, &block)
else
- raise(ArgumentError, 'result parameter is required when no block is supplied') unless result
+ raise(ArgumentError, "result parameter is required when no block is supplied") unless result
+
RocketJob::Sliced::Writer::Output.collect(self, input_slice) { |writer| writer << result }
end
end
private
@@ -401,9 +406,8 @@
elsif compress
args[:slice_class] = Sliced::CompressedSlice
end
args
end
-
end
end
end