require 'thread_safe' module IOStreams # A registry to hold formats for processing files during upload or download @@extensions = ThreadSafe::Hash.new # Returns [Array] the formats required to process the file by looking at # its extension(s) # # Extensions supported: # .zip Zip File [ :zip ] # .gz, .gzip GZip File [ :gzip ] # .enc File Encrypted using symmetric encryption [ :enc ] # other All other extensions will be returned as: [ :file ] # # When a file is encrypted, it may also be compressed: # .zip.enc [ :zip, :enc ] # .gz.enc [ :gz, :enc ] # # Example Zip file: # RocketJob::Formatter::Formats.streams_for_file_name('myfile.zip') # => [ :zip ] # # Example Encrypted Gzip file: # RocketJob::Formatter::Formats.streams_for_file_name('myfile.csv.gz.enc') # => [ :gz, :enc ] # # Example plain text / binary file: # RocketJob::Formatter::Formats.streams_for_file_name('myfile.csv') # => [ :file ] def self.streams_for_file_name(file_name) raise ArgumentError.new("File name cannot be nil") if file_name.nil? raise ArgumentError.new("RocketJob Cannot detect file format when uploading to stream: #{file_name.inspect}") if file_name.respond_to?(:read) parts = file_name.split('.') extensions = [] while extension = parts.pop break unless @@extensions[extension.to_sym] extensions.unshift(extension.to_sym) end extensions << :file if extensions.size == 0 extensions end Extension = Struct.new(:reader_class, :writer_class) # Register a file extension and the reader and writer classes to use to format it # # Example: # # MyXls::Reader and MyXls::Writer must implement .open # register_extension(:xls, MyXls::Reader, MyXls::Writer) def self.register_extension(extension, reader_class, writer_class) raise "Invalid extension #{extension.inspect}" unless extension.to_s =~ /\A\w+\Z/ @@extensions[extension.to_sym] = Extension.new(reader_class, writer_class) end # De-Register a file extension # # Returns [Symbol] the extension removed, or nil if the extension was not registered # # Example: # register_extension(:xls) def self.deregister_extension(extension) raise "Invalid extension #{extension.inspect}" unless extension.to_s =~ /\A\w+\Z/ @@extensions.delete(extension.to_sym) end # Returns a Reader for reading a file / stream # # Parameters # file_name_or_io [String|IO] # The file_name of the file to write to, or an IO Stream that implements # #read. # # streams [Symbol|Array] # The formats/streams that be used to convert the data whilst it is # being read. # When nil, the file_name will be inspected to try and determine what # streams should be applied. # Default: nil # # Stream types / extensions supported: # .zip Zip File [ :zip ] # .gz, .gzip GZip File [ :gzip ] # .enc File Encrypted using symmetric encryption [ :enc ] # other All other extensions will be returned as: [ :file ] # # When a file is encrypted, it may also be compressed: # .zip.enc [ :zip, :enc ] # .gz.enc [ :gz, :enc ] # # Example: Zip # IOStreams.reader('myfile.zip') do |stream| # puts stream.read # end # # Example: Encrypted Zip # IOStreams.reader('myfile.zip.enc') do |stream| # puts stream.read # end # # Example: Explicitly set the streams # IOStreams.reader('myfile.zip.enc', [:zip, :enc]) do |stream| # puts stream.read # end # # Example: Supply custom options # # Encrypt the file and get Symmetric Encryption to also compress it # IOStreams.reader('myfile.csv.enc', [:enc]) do |stream| # puts stream.read # end def self.reader(file_name_or_io, streams=nil, &block) stream(:reader, file_name_or_io, streams, &block) end # Returns a Writer for writing to a file / stream # # Parameters # file_name_or_io [String|IO] # The file_name of the file to write to, or an IO Stream that implements # #write. # # streams [Symbol|Array] # The formats/streams that be used to convert the data whilst it is # being written. # When nil, the file_name will be inspected to try and determine what # streams should be applied. # Default: nil # # Stream types / extensions supported: # .zip Zip File [ :zip ] # .gz, .gzip GZip File [ :gzip ] # .enc File Encrypted using symmetric encryption [ :enc ] # other All other extensions will be returned as: [ :file ] # # When a file is encrypted, it may also be compressed: # .zip.enc [ :zip, :enc ] # .gz.enc [ :gz, :enc ] # # Example: Zip # IOStreams.writer('myfile.zip') do |stream| # stream.write(data) # end # # Example: Encrypted Zip # IOStreams.writer('myfile.zip.enc') do |stream| # stream.write(data) # end # # Example: Explicitly set the streams # IOStreams.writer('myfile.zip.enc', [:zip, :enc]) do |stream| # stream.write(data) # end # # Example: Supply custom options # IOStreams.writer('myfile.csv.enc', [enc: { compress: true }]) do |stream| # stream.write(data) # end # # Example: Set internal filename when creating a zip file # IOStreams.writer('myfile.csv.zip', zip: { zip_file_name: 'myfile.csv' }) do |stream| # stream.write(data) # end def self.writer(file_name_or_io, streams=nil, &block) stream(:writer, file_name_or_io, streams, &block) end # Copies the source stream to the target stream # Returns [Integer] the number of bytes copied # # Example: # IOStreams.reader('a.csv') do |source_stream| # IOStreams.writer('b.csv.enc') do |target_stream| # IOStreams.copy(source_stream, target_stream) # end # end def self.copy(source_stream, target_stream, buffer_size=65536) bytes = 0 while data = source_stream.read(buffer_size) break if data.size == 0 bytes += data.size target_stream.write(data) end bytes end ########################################################################## private # Struct to hold the Stream and options if any StreamStruct = Struct.new(:klass, :options) # Returns a reader or writer stream def self.stream(type, file_name_or_io, streams=nil, &block) unless streams respond_to = type == :reader ? :read : :write streams = file_name_or_io.respond_to?(respond_to) ? [ :file ] : streams_for_file_name(file_name_or_io) end stream_structs = streams_for(type, streams) if stream_structs.size == 1 stream_struct = stream_structs.first stream_struct.klass.open(file_name_or_io, stream_struct.options, &block) else # Daisy chain multiple streams together last = stream_structs.inject(block){ |inner, stream_struct| -> io { stream_struct.klass.open(io, stream_struct.options, &inner) } } last.call(file_name_or_io) end end # type: :reader or :writer def self.streams_for(type, params) if params.is_a?(Symbol) [ stream_struct_for_stream(type, params) ] elsif params.is_a?(Array) a = [] params.each do |stream| if stream.is_a?(Hash) stream.each_pair { |stream_sym, options| a << stream_struct_for_stream(type, stream_sym, options) } else a << stream_struct_for_stream(type, stream) end end a elsif params.is_a?(Hash) a = [] params.each_pair { |stream, options| a << stream_struct_for_stream(type, stream, options) } a else raise ArgumentError, "Invalid params supplied: #{params.inspect}" end end def self.stream_struct_for_stream(type, stream, options={}) ext = @@extensions[stream.to_sym] || raise(ArgumentError, "Unknown Stream type: #{stream.inspect}") klass = ext.send("#{type}_class") StreamStruct.new(klass, options) end # Register File extensions register_extension(:enc, SymmetricEncryption::Reader, SymmetricEncryption::Writer) if defined?(SymmetricEncryption) register_extension(:file, IOStreams::File::Reader, IOStreams::File::Writer) register_extension(:gz, IOStreams::Gzip::Reader, IOStreams::Gzip::Writer) register_extension(:gzip, IOStreams::Gzip::Reader, IOStreams::Gzip::Writer) register_extension(:zip, IOStreams::Zip::Reader, IOStreams::Zip::Writer) end