module InstDataShipper module Destinations class Base attr_reader :dumper delegate :tracker, :table_schemas, :working_dir, to: :dumper def initialize(cache_key, config, dumper) @cache_key = cache_key @_config = config @dumper = dumper end # This method is called before processing any data. # It should be used to initialize any external resources needed for the dump. def initialize_dump; end # Yields an object (can be anything) that will be passed to `upload_data_chunk` as `chunk`. # # If multiple Destinations have the same `group_key`, `chunk_data` will only be called on the first and the chunk will be passed to each destination. # Thus, if chunking is config-dependent, your Destination must modify the `group_key` to be unique for each configuration. # # This must be overridden, but you may call super with a block to iterate individual rows. Manually batch the rows, or include Concerns::Chunking to pre-batch them. def chunk_data(generator, **kwargs) raise NotImplementedError if method(__method__).owner == Base enum = Enumerator.new(&generator) enum.each do |row| yield format_row(row) end end # Called with any values yielded from chunk_data. # This method should upload the chunk to the destination. def upload_data_chunk(table_def, chunk) raise NotImplementedError end # This method is called after processing all data. # It should be used to finalize any external resources created by the dump. def finalize_dump; end # This method is called if a fatal error occurs. # It should cleanup any external resources created by the dump. def cleanup_fatal_error; end def config return @_config if @_config.is_a?(Hash) @config ||= parse_configuration(@_config) end def user_config config[:user_config] end def group_key { class: self.class } end protected def parse_configuration(uri) if block_given? parsed = URI.parse(uri) cparsed = ConfigURI.new(parsed) cfg = { user_config: cparsed.hash_params, } yield cparsed, cfg cfg else raise NotImplementedError end end def rk(key) "#{@cache_key}:#{key}" end def redis(*args, &blk) InstDataShipper.redis(*args, &blk) end # This is a base/generic implementation and may need to be overridden def format_row(row, override_nils: true) if row.is_a?(Array) row = row.map do |v| v = '\N' if v.nil? && override_nils v = v.utc.strftime('%Y-%m-%d %H:%M:%S') if v.is_a?(DateTime) || v.is_a?(Time) v = v.strftime('%Y-%m-%d') if v.is_a?(Date) v = JSON.dump(v) if v.is_a?(Hash) || v.is_a?(Array) if v.is_a?(String) v = v.gsub("\t", '\t') v = v.gsub("\n", '\n') end v end end row end end class ConfigURI def initialize(uri) @uri = uri end # delegate_missing_to :uri delegate :scheme, :user, :password, :host, :hostname, :port, :path, :query, :fragment, to: :uri def params @params ||= (query.present? ? Rack::Utils.parse_nested_query(query).with_indifferent_access : {}).freeze end def hash_params @hash_params ||= ((fragment.present? && fragment.match?(/^\w+=/) && Rack::Utils.parse_nested_query(fragment).with_indifferent_access).presence || fragment || nil)&.freeze end private def uri @uri end end end end