# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. module Arrow class TableSaver class << self def save(table, output, options={}) new(table, output, options).save end end def initialize(table, output, options={}) @table = table output = output.to_path if output.respond_to?(:to_path) @output = output @options = options fill_options end def save format = @options[:format] custom_save_method = "save_as_#{format}" unless respond_to?(custom_save_method, true) available_formats = [] (methods(true) | private_methods(true)).each do |name| match_data = /\Asave_as_/.match(name.to_s) if match_data available_formats << match_data.post_match end end deprecated_formats = ["batch", "stream"] available_formats -= deprecated_formats message = "Arrow::Table save format must be one of [" message << available_formats.join(", ") message << "]: #{format.inspect}" raise ArgumentError, message end if method(custom_save_method).arity.zero? __send__(custom_save_method) else # For backward compatibility. __send__(custom_save_method, @output) end end private def fill_options if @options[:format] and @options.key?(:compression) return end if @output.is_a?(Buffer) info = {} else extension = PathExtension.new(@output) info = extension.extract end format = info[:format] @options = @options.dup if format and respond_to?("save_as_#{format}", true) @options[:format] ||= format.to_sym else @options[:format] ||= :arrow end unless @options.key?(:compression) @options[:compression] = info[:compression] end end def open_raw_output_stream(&block) if @output.is_a?(Buffer) BufferOutputStream.open(@output, &block) else FileOutputStream.open(@output, false, &block) end end def open_output_stream(&block) compression = @options[:compression] if compression codec = Codec.new(compression) open_raw_output_stream do |raw_output| CompressedOutputStream.open(codec, raw_output) do |output| yield(output) end end else open_raw_output_stream(&block) end end def save_raw(writer_class) open_output_stream do |output| writer_class.open(output, @table.schema) do |writer| writer.write_table(@table) end end end def save_as_arrow save_as_arrow_file end # @since 1.0.0 def save_as_arrow_file save_raw(RecordBatchFileWriter) end # @deprecated Use `format: :arrow_batch` instead. def save_as_batch save_as_arrow_file end # @since 1.0.0 def save_as_arrow_streaming save_raw(RecordBatchStreamWriter) end # @deprecated Use `format: :arrow_streaming` instead. def save_as_stream save_as_arrow_streaming end def csv_save(**options) open_output_stream do |output| csv = CSV.new(output, **options) names = @table.schema.fields.collect(&:name) csv << names @table.raw_records.each do |record| csv << record end end end def save_as_csv csv_save end def save_as_tsv csv_save(col_sep: "\t") end def save_as_feather properties = FeatherWriteProperties.new properties.class.properties.each do |name| value = @options[name.to_sym] next if value.nil? properties.__send__("#{name}=", value) end open_raw_output_stream do |output| @table.write_as_feather(output, properties) end end end end