lib/rocket_job/batch/io.rb in rocketjob-5.3.3 vs lib/rocket_job/batch/io.rb in rocketjob-5.4.0.beta1

- old
+ new

@@ -16,11 +16,11 @@ def input(category = :main) unless input_categories.include?(category) || (category == :main) raise "Category #{category.inspect}, must be registered in input_categories: #{input_categories.inspect}" end - (@inputs ||= {})[category] ||= RocketJob::Sliced::Input.new(**rocket_job_io_slice_arguments("inputs", category)) + (@inputs ||= {})[category] ||= RocketJob::Sliced.factory(:input, category, self) end # Returns [RocketJob::Sliced::Output] output collection for holding output slices # Returns nil if no output is being collected # @@ -32,11 +32,11 @@ def output(category = :main) unless output_categories.include?(category) || (category == :main) raise "Category #{category.inspect}, must be registered in output_categories: #{output_categories.inspect}" end - (@outputs ||= {})[category] ||= RocketJob::Sliced::Output.new(**rocket_job_io_slice_arguments("outputs", category)) + (@outputs ||= {})[category] ||= RocketJob::Sliced.factory(:output, category, self) end # Upload the supplied file, io, IOStreams::Path, or IOStreams::Stream. # # Returns [Integer] the number of records uploaded. @@ -353,12 +353,22 @@ def download(stream = nil, category: :main, header_line: nil, **args, &block) raise "Cannot download incomplete job: #{id}. Currently in state: #{state}-#{sub_state}" if rocket_job_processing? return output(category).download(header_line: header_line, &block) if block - IOStreams.new(stream).writer(:line, **args) do |io| - output(category).download(header_line: header_line) { |record| io << record } + output_collection = output(category) + + if output_collection.binary? + IOStreams.new(stream).stream(:none).writer(**args) do |io| + raise(ArgumenError, "A `header_line` is not supported with binary output collections") if header_line + + output_collection.download { |record| io << record[:binary] } + end + else + IOStreams.new(stream).writer(:line, **args) do |io| + output_collection.download(header_line: header_line) { |record| io << record } + end end end # Writes the supplied result, Batch::Result or Batch::Results to the relevant collections. # @@ -390,24 +400,9 @@ else raise(ArgumentError, "result parameter is required when no block is supplied") unless result RocketJob::Sliced::Writer::Output.collect(self, input_slice) { |writer| writer << result } end - end - - private - - def rocket_job_io_slice_arguments(collection_type, category) - collection_name = "rocket_job.#{collection_type}.#{id}" - collection_name << ".#{category}" unless category == :main - - args = {collection_name: collection_name, slice_size: slice_size} - if encrypt - args[:slice_class] = Sliced::EncryptedSlice - elsif compress - args[:slice_class] = Sliced::CompressedSlice - end - args end end end end