lib/rocket_job/batch/io.rb in rocketjob-5.3.3 vs lib/rocket_job/batch/io.rb in rocketjob-5.4.0.beta1
- old
+ new
@@ -16,11 +16,11 @@
def input(category = :main)
unless input_categories.include?(category) || (category == :main)
raise "Category #{category.inspect}, must be registered in input_categories: #{input_categories.inspect}"
end
- (@inputs ||= {})[category] ||= RocketJob::Sliced::Input.new(**rocket_job_io_slice_arguments("inputs", category))
+ (@inputs ||= {})[category] ||= RocketJob::Sliced.factory(:input, category, self)
end
# Returns [RocketJob::Sliced::Output] output collection for holding output slices
# Returns nil if no output is being collected
#
@@ -32,11 +32,11 @@
def output(category = :main)
unless output_categories.include?(category) || (category == :main)
raise "Category #{category.inspect}, must be registered in output_categories: #{output_categories.inspect}"
end
- (@outputs ||= {})[category] ||= RocketJob::Sliced::Output.new(**rocket_job_io_slice_arguments("outputs", category))
+ (@outputs ||= {})[category] ||= RocketJob::Sliced.factory(:output, category, self)
end
# Upload the supplied file, io, IOStreams::Path, or IOStreams::Stream.
#
# Returns [Integer] the number of records uploaded.
@@ -353,12 +353,22 @@
def download(stream = nil, category: :main, header_line: nil, **args, &block)
raise "Cannot download incomplete job: #{id}. Currently in state: #{state}-#{sub_state}" if rocket_job_processing?
return output(category).download(header_line: header_line, &block) if block
- IOStreams.new(stream).writer(:line, **args) do |io|
- output(category).download(header_line: header_line) { |record| io << record }
+ output_collection = output(category)
+
+ if output_collection.binary?
+ IOStreams.new(stream).stream(:none).writer(**args) do |io|
+ raise(ArgumenError, "A `header_line` is not supported with binary output collections") if header_line
+
+ output_collection.download { |record| io << record[:binary] }
+ end
+ else
+ IOStreams.new(stream).writer(:line, **args) do |io|
+ output_collection.download(header_line: header_line) { |record| io << record }
+ end
end
end
# Writes the supplied result, Batch::Result or Batch::Results to the relevant collections.
#
@@ -390,24 +400,9 @@
else
raise(ArgumentError, "result parameter is required when no block is supplied") unless result
RocketJob::Sliced::Writer::Output.collect(self, input_slice) { |writer| writer << result }
end
- end
-
- private
-
- def rocket_job_io_slice_arguments(collection_type, category)
- collection_name = "rocket_job.#{collection_type}.#{id}"
- collection_name << ".#{category}" unless category == :main
-
- args = {collection_name: collection_name, slice_size: slice_size}
- if encrypt
- args[:slice_class] = Sliced::EncryptedSlice
- elsif compress
- args[:slice_class] = Sliced::CompressedSlice
- end
- args
end
end
end
end