module InstDataShipper module Destinations class Base attr_reader :dumper delegate :tracker, :table_schemas, :working_dir, to: :dumper def initialize(cache_key, config, dumper) @cache_key = cache_key @_config = config @dumper = dumper end # This method is called before processing any data. # It should be used to initialize any external resources needed for the dump. def initialize_dump; end # Yields an object (can be anything) that will be passed to `upload_data_chunk` as `chunk`. # # If multiple Destinations have the same `group_key`, `chunk_data` will only be called on the first and the chunk will be passed to each destination. # Thus, if chunking is config-dependent, your Destination must modify the `group_key` to be unique for each configuration. # # This must be overridden, but you may call super with a block to iterate individual rows. Manually batch the rows, or include Concerns::Chunking to pre-batch them. def chunk_data(generator, **kwargs) raise NotImplementedError if method(__method__).owner == Base enum = Enumerator.new(&generator) enum.each do |row| yield format_row(row) end end # Called with any values yielded from chunk_data. # This method should upload the chunk to the destination. def upload_data_chunk(table_def, chunk) raise NotImplementedError end # This method is called after processing all data. # It should be used to finalize any external resources created by the dump. def finalize_dump; end # This method is called if a fatal error occurs. # It should cleanup any external resources created by the dump. def cleanup_fatal_error; end def config return @_config if @_config.is_a?(Hash) @config ||= parse_configuration(@_config) end def user_config config[:extra] end def group_key { class: self.class } end protected def parse_configuration(uri) if block_given? parsed = URI.parse(uri) cfg = { params: parsed.query.present? ? Rack::Utils.parse_nested_query(parsed.query) : {}, extra: (parsed.fragment.present? && parsed.fragment.match?(/^\w+=/) && Rack::Utils.parse_nested_query(parsed.fragment)).presence || parsed.fragment || nil, } yield parsed, cfg cfg else raise NotImplementedError end end def rk(key) "#{@cache_key}:#{key}" end def redis(*args, &blk) InstDataShipper.redis(*args, &blk) end # This is a base/generic implementation and may need to be overridden def format_row(row, override_nils: true) if row.is_a?(Array) row = row.map do |v| v = '\N' if v.nil? && override_nils v = v.utc.strftime('%Y-%m-%d %H:%M:%S') if v.is_a?(DateTime) || v.is_a?(Time) v = v.strftime('%Y-%m-%d') if v.is_a?(Date) v = JSON.dump(v) if v.is_a?(Hash) || v.is_a?(Array) if v.is_a?(String) v = v.gsub("\t", '\t') v = v.gsub("\n", '\n') end v end end row end end end end