lib/remi/data_subjects/sftp_file.rb in remi-0.3.2 vs lib/remi/data_subjects/sftp_file.rb in remi-0.3.3
- old
+ new
@@ -1,7 +1,47 @@
module Remi
+ module DataSubject::SftpFile
+ attr_reader :sftp_session
+
+ def sftp_retry(&block)
+ tries ||= @retries
+
+ block.call
+ rescue StandardError => err
+ if (tries -= 1) > 0
+ logger.error "Error: #{err.message}"
+ logger.error "Will retry #{tries} more times"
+ sleep(1)
+ retry
+ else
+ raise err
+ end
+ end
+
+ def begin_connection
+ sftp_retry do
+ Timeout.timeout(@timeout) do
+ @ssh_session = Net::SSH.start(@host, @username, password: @password, port: @port, number_of_password_prompts: 0)
+ @sftp_session = Net::SFTP::Session.new(@ssh_session)
+ @sftp_session.connect!
+ end
+ end
+ end
+
+ def end_connection
+ @sftp_session.close_channel unless @sftp_session.nil?
+ @ssh_session.close unless @ssh_session.nil?
+
+ Timeout.timeout(@timeout) do
+ sleep 1 until (@sftp_session.nil? || @sftp_session.closed?) && (@ssh_session.nil? || @ssh_session.closed?)
+ end
+ end
+ end
+
+
+
# Sftp File extractor
# Used to extract files from an SFTP server
#
# @example
#
@@ -33,17 +73,19 @@
# # id name
# # 0 1 Albert
# # 1 2 Betsy
# # 2 3 Camu
class Extractor::SftpFile < Extractor::FileSystem
- N_RETRY = 3
+ include DataSubject::SftpFile
# @param credentials [Hash] Options hash containing login credentials
# @param credentials [String] :host SFTP host (e.g., coolserver.com)
# @param credentials [String] :username SFTP username
# @param credentials [String] :password SFTP password
# @param credentials [String] :port SFTP port (default: 22)
+ # @param retries [Integer] Number of times a connection or operation will be retried (default: 3)
+ # @param timeout [Integer] Number of seconds to wait for establishing/closing a connection (default: 30)
def initialize(*args, **kargs, &block)
super
init_sftp_extractor(*args, **kargs)
end
@@ -53,30 +95,30 @@
attr_reader :port
# Called to extract files from the source filesystem.
# @return [Array<String>] An array of paths to a local copy of the files extacted
def extract
- connection do |sftp|
- entries.map do |entry|
- local_file = File.join(@local_path, entry.name)
- logger.info "Downloading #{entry.name} to #{local_file}"
- retry_download { sftp.download!(File.join(@remote_path, entry.name), local_file) }
- local_file
+ begin_connection
- end
+ entries.map do |entry|
+ local_file = File.join(@local_path, entry.name)
+ logger.info "Downloading #{entry.name} to #{local_file}"
+ sftp_retry { sftp_session.download!(File.join(@remote_path, entry.name), local_file) }
+ local_file
end
+ ensure
+ end_connection
end
# @return [Array<Extractor::FileSystemEntry>] (Memoized) list of objects in the bucket/prefix
def all_entries
@all_entries ||= all_entries!
end
# @return [Array<Extractor::FileSystemEntry>] (Memoized) list of objects in the bucket/prefix
def all_entries!
- sftp_entries = connection { |sftp| sftp.dir.entries(@remote_path) }
- sftp_entries.map do |entry|
+ sftp_session.dir.entries(@remote_path).map do |entry|
# Early versions of the protocol don't support create time, fake it with modified time?
FileSystemEntry.new(
pathname: File.join(@remote_path, entry.name),
create_time: entry.attributes.respond_to?(:createtime) ? entry.attributes.createtime : entry.attributes.mtime,
modified_time: entry.attributes.mtime
@@ -85,38 +127,18 @@
end
private
- def init_sftp_extractor(*args, credentials:, **kargs)
+ def init_sftp_extractor(*args, credentials:, retries: 3, timeout: 30, **kargs)
@host = credentials.fetch(:host)
@username = credentials.fetch(:username)
- @password = credentials.fetch(:password)
+ @password = credentials.fetch(:password, nil)
@port = credentials.fetch(:port, '22')
+ @retries = retries
+ @timeout = timeout
end
-
- def connection(&block)
- result = nil
- Net::SFTP.start(@host, @username, password: @password, port: @port) do |sftp|
- result = yield sftp
- end
- result
- end
-
- def retry_download(&block)
- 1.upto(N_RETRY).each do |itry|
- begin
- block.call
- break
- rescue RuntimeError => err
- raise err unless itry < N_RETRY
- logger.error "Download failed with error: #{err.message}"
- logger.error "Retry attempt #{itry}/#{N_RETRY-1}"
- sleep(1)
- end
- end
- end
end
# SFTP file loader
@@ -141,12 +163,20 @@
# my_df = Daru::DataFrame.new({ a: 1.upto(5).to_a, b: 6.upto(10) })
# job = MyJob.new
# job.my_target.df = my_df
# job.my_target.load
class Loader::SftpFile < Loader
+ include DataSubject::SftpFile
+ # @param credentials [Hash] Options hash containing login credentials
+ # @param credentials [String] :host SFTP host (e.g., coolserver.com)
+ # @param credentials [String] :username SFTP username
+ # @param credentials [String] :password SFTP password
+ # @param credentials [String] :port SFTP port (default: 22)
# @param remote_path [String, Pathname] Full path to the file to be created on the target filesystem
+ # @param retries [Integer] Number of times a connection or operation will be retried (default: 3)
+ # @param timeout [Integer] Number of seconds to wait for establishing/closing a connection (default: 30)
def initialize(*args, **kargs, &block)
super
init_sftp_loader(*args, **kargs, &block)
end
@@ -154,44 +184,29 @@
# Copies data to the SFTP Server
# @param data [Object] The path to the file in the temporary work location
# @return [true] On success
def load(data)
- logger.info "Uploading #{data} to #{@credentials[:username]}@#{@credentials[:host]}: #{@remote_path}"
- connection do |sftp|
- retry_upload { sftp.upload! data, @remote_path }
- end
+ begin_connection
+ logger.info "Uploading #{data} to #{@username}@#{@host}: #{@remote_path}"
+ sftp_retry { sftp_session.upload! data, @remote_path }
+
true
+ ensure
+ end_connection
end
private
- def init_sftp_loader(*args, credentials:, remote_path:, **kargs, &block)
- @credentials = credentials
+ def init_sftp_loader(*args, credentials:, remote_path:, retries: 3, timeout: 30, **kargs, &block)
+ @host = credentials.fetch(:host)
+ @username = credentials.fetch(:username)
+ @password = credentials.fetch(:password, nil)
+ @port = credentials.fetch(:port, '22')
@remote_path = remote_path
- end
-
- def connection(&block)
- result = nil
- Net::SFTP.start(@credentials[:host], @credentials[:username], password: @credentials[:password], port: @credentials[:port] || '22') do |sftp|
- result = yield sftp
- end
- result
- end
-
- def retry_upload(ntry=2, &block)
- 1.upto(ntry).each do |itry|
- begin
- block.call
- break
- rescue RuntimeError => err
- raise err unless itry < ntry
- logger.error "Upload failed with error: #{err.message}"
- logger.error "Retry attempt #{itry}/#{ntry-1}"
- sleep(1)
- end
- end
+ @retries = retries
+ @timeout = timeout
end
end
end