lib/remi/data_subjects/sftp_file.rb in remi-0.3.2 vs lib/remi/data_subjects/sftp_file.rb in remi-0.3.3

- old
+ new

@@ -1,7 +1,47 @@ module Remi + module DataSubject::SftpFile + attr_reader :sftp_session + + def sftp_retry(&block) + tries ||= @retries + + block.call + rescue StandardError => err + if (tries -= 1) > 0 + logger.error "Error: #{err.message}" + logger.error "Will retry #{tries} more times" + sleep(1) + retry + else + raise err + end + end + + def begin_connection + sftp_retry do + Timeout.timeout(@timeout) do + @ssh_session = Net::SSH.start(@host, @username, password: @password, port: @port, number_of_password_prompts: 0) + @sftp_session = Net::SFTP::Session.new(@ssh_session) + @sftp_session.connect! + end + end + end + + def end_connection + @sftp_session.close_channel unless @sftp_session.nil? + @ssh_session.close unless @ssh_session.nil? + + Timeout.timeout(@timeout) do + sleep 1 until (@sftp_session.nil? || @sftp_session.closed?) && (@ssh_session.nil? || @ssh_session.closed?) + end + end + end + + + # Sftp File extractor # Used to extract files from an SFTP server # # @example # @@ -33,17 +73,19 @@ # # id name # # 0 1 Albert # # 1 2 Betsy # # 2 3 Camu class Extractor::SftpFile < Extractor::FileSystem - N_RETRY = 3 + include DataSubject::SftpFile # @param credentials [Hash] Options hash containing login credentials # @param credentials [String] :host SFTP host (e.g., coolserver.com) # @param credentials [String] :username SFTP username # @param credentials [String] :password SFTP password # @param credentials [String] :port SFTP port (default: 22) + # @param retries [Integer] Number of times a connection or operation will be retried (default: 3) + # @param timeout [Integer] Number of seconds to wait for establishing/closing a connection (default: 30) def initialize(*args, **kargs, &block) super init_sftp_extractor(*args, **kargs) end @@ -53,30 +95,30 @@ attr_reader :port # Called to extract files from the source filesystem. # @return [Array<String>] An array of paths to a local copy of the files extacted def extract - connection do |sftp| - entries.map do |entry| - local_file = File.join(@local_path, entry.name) - logger.info "Downloading #{entry.name} to #{local_file}" - retry_download { sftp.download!(File.join(@remote_path, entry.name), local_file) } - local_file + begin_connection - end + entries.map do |entry| + local_file = File.join(@local_path, entry.name) + logger.info "Downloading #{entry.name} to #{local_file}" + sftp_retry { sftp_session.download!(File.join(@remote_path, entry.name), local_file) } + local_file end + ensure + end_connection end # @return [Array<Extractor::FileSystemEntry>] (Memoized) list of objects in the bucket/prefix def all_entries @all_entries ||= all_entries! end # @return [Array<Extractor::FileSystemEntry>] (Memoized) list of objects in the bucket/prefix def all_entries! - sftp_entries = connection { |sftp| sftp.dir.entries(@remote_path) } - sftp_entries.map do |entry| + sftp_session.dir.entries(@remote_path).map do |entry| # Early versions of the protocol don't support create time, fake it with modified time? FileSystemEntry.new( pathname: File.join(@remote_path, entry.name), create_time: entry.attributes.respond_to?(:createtime) ? entry.attributes.createtime : entry.attributes.mtime, modified_time: entry.attributes.mtime @@ -85,38 +127,18 @@ end private - def init_sftp_extractor(*args, credentials:, **kargs) + def init_sftp_extractor(*args, credentials:, retries: 3, timeout: 30, **kargs) @host = credentials.fetch(:host) @username = credentials.fetch(:username) - @password = credentials.fetch(:password) + @password = credentials.fetch(:password, nil) @port = credentials.fetch(:port, '22') + @retries = retries + @timeout = timeout end - - def connection(&block) - result = nil - Net::SFTP.start(@host, @username, password: @password, port: @port) do |sftp| - result = yield sftp - end - result - end - - def retry_download(&block) - 1.upto(N_RETRY).each do |itry| - begin - block.call - break - rescue RuntimeError => err - raise err unless itry < N_RETRY - logger.error "Download failed with error: #{err.message}" - logger.error "Retry attempt #{itry}/#{N_RETRY-1}" - sleep(1) - end - end - end end # SFTP file loader @@ -141,12 +163,20 @@ # my_df = Daru::DataFrame.new({ a: 1.upto(5).to_a, b: 6.upto(10) }) # job = MyJob.new # job.my_target.df = my_df # job.my_target.load class Loader::SftpFile < Loader + include DataSubject::SftpFile + # @param credentials [Hash] Options hash containing login credentials + # @param credentials [String] :host SFTP host (e.g., coolserver.com) + # @param credentials [String] :username SFTP username + # @param credentials [String] :password SFTP password + # @param credentials [String] :port SFTP port (default: 22) # @param remote_path [String, Pathname] Full path to the file to be created on the target filesystem + # @param retries [Integer] Number of times a connection or operation will be retried (default: 3) + # @param timeout [Integer] Number of seconds to wait for establishing/closing a connection (default: 30) def initialize(*args, **kargs, &block) super init_sftp_loader(*args, **kargs, &block) end @@ -154,44 +184,29 @@ # Copies data to the SFTP Server # @param data [Object] The path to the file in the temporary work location # @return [true] On success def load(data) - logger.info "Uploading #{data} to #{@credentials[:username]}@#{@credentials[:host]}: #{@remote_path}" - connection do |sftp| - retry_upload { sftp.upload! data, @remote_path } - end + begin_connection + logger.info "Uploading #{data} to #{@username}@#{@host}: #{@remote_path}" + sftp_retry { sftp_session.upload! data, @remote_path } + true + ensure + end_connection end private - def init_sftp_loader(*args, credentials:, remote_path:, **kargs, &block) - @credentials = credentials + def init_sftp_loader(*args, credentials:, remote_path:, retries: 3, timeout: 30, **kargs, &block) + @host = credentials.fetch(:host) + @username = credentials.fetch(:username) + @password = credentials.fetch(:password, nil) + @port = credentials.fetch(:port, '22') @remote_path = remote_path - end - - def connection(&block) - result = nil - Net::SFTP.start(@credentials[:host], @credentials[:username], password: @credentials[:password], port: @credentials[:port] || '22') do |sftp| - result = yield sftp - end - result - end - - def retry_upload(ntry=2, &block) - 1.upto(ntry).each do |itry| - begin - block.call - break - rescue RuntimeError => err - raise err unless itry < ntry - logger.error "Upload failed with error: #{err.message}" - logger.error "Retry attempt #{itry}/#{ntry-1}" - sleep(1) - end - end + @retries = retries + @timeout = timeout end end end