lib/io_streams/pgp.rb in iostreams-0.11.0 vs lib/io_streams/pgp.rb in iostreams-0.12.0

- old
+ new

@@ -60,12 +60,12 @@ # - If you get not trusted errors # gpg --edit-key sender@example.org # Select highest level: 5 # # Delete test keys: - # IOStreams::Pgp.delete_keys(email: 'sender@example.org', secret: true) - # IOStreams::Pgp.delete_keys(email: 'receiver@example.org', secret: true) + # IOStreams::Pgp.delete_keys(email: 'sender@example.org', private: true) + # IOStreams::Pgp.delete_keys(email: 'receiver@example.org', private: true) # # Limitations # - Designed for processing larger files since a process is spawned for each file processed. # - For small in memory files or individual emails, use the 'opengpgme' library. # @@ -82,14 +82,18 @@ autoload :Writer, 'io_streams/pgp/writer' class Failure < StandardError end - # Generate a new ultimate trusted local public and private key - # Returns [String] the key id for the generated key - # Raises an exception if it fails to generate the key + class UnsupportedVersion < Failure + end + + # Generate a new ultimate trusted local public and private key. # + # Returns [String] the key id for the generated key. + # Raises an exception if it fails to generate the key. + # # name: [String] # Name of who owns the key, such as organization # # email: [String] # Email address for the key @@ -103,130 +107,201 @@ # To generate a good passphrase: # `SecureRandom.urlsafe_base64(128)` # # See `man gpg` for the remaining options def self.generate_key(name:, email:, comment: nil, passphrase: nil, key_type: 'RSA', key_length: 4096, subkey_type: 'RSA', subkey_length: key_length, expire_date: nil) - Open3.popen2e('gpg --batch --gen-key') do |stdin, out, waith_thr| - stdin.puts "Key-Type: #{key_type}" if key_type - stdin.puts "Key-Length: #{key_length}" if key_length - stdin.puts "Subkey-Type: #{subkey_type}" if subkey_type - stdin.puts "Subkey-Length: #{subkey_length}" if subkey_length - stdin.puts "Name-Real: #{name}" if name - stdin.puts "Name-Comment: #{comment}" if comment - stdin.puts "Name-Email: #{email}" if email - stdin.puts "Expire-Date: #{expire_date}" if expire_date - stdin.puts "Passphrase: #{passphrase}" if passphrase - stdin.puts '%commit' - stdin.close - if waith_thr.value.success? - key_id = nil - out.each_line do |line| - if (line = line.chomp) =~ /^gpg: key ([0-9A-F]+) marked as ultimately trusted/ - key_id = $1.to_i(16) - end - end - key_id - else - raise(Pgp::Failure, "GPG Failed to generate key: #{out.read.chomp}") + version_check + params = '' + params << "Key-Type: #{key_type}\n" if key_type + params << "Key-Length: #{key_length}\n" if key_length + params << "Subkey-Type: #{subkey_type}\n" if subkey_type + params << "Subkey-Length: #{subkey_length}\n" if subkey_length + params << "Name-Real: #{name}\n" if name + params << "Name-Comment: #{comment}\n" if comment + params << "Name-Email: #{email}\n" if email + params << "Expire-Date: #{expire_date}\n" if expire_date + params << "Passphrase: #{passphrase}\n" if passphrase + params << '%commit' + out, err, status = Open3.capture3('gpg --batch --gen-key', binmode: true, stdin_data: params) + logger.debug { "IOStreams::Pgp.generate_key output:\n#{out}#{err}" } if logger + if status.success? + if match = err.match(/gpg: key ([0-9A-F]+)\s+/) + return match[1] end + else + raise(Pgp::Failure, "GPG Failed to generate key: #{out}#{err}") end end - # Delete a secret and public keys using its email - # Returns false if no key was found - # Raises an exception if it fails to delete the key + # Delete all private and public keys for a particular email. # - # email: [String] Email address for the key + # Returns false if no key was found. + # Raises an exception if it fails to delete the key. # + # email: [String] Email address for the key. + # # public: [true|false] # Whether to delete the public key # Default: true # - # secret: [true|false] - # Whether to delete the secret key + # private: [true|false] + # Whether to delete the private key # Default: false - def self.delete_keys(email:, public: true, secret: false) + def self.delete_keys(email:, public: true, private: false) + version_check cmd = "for i in `gpg --with-colons --fingerprint #{email} | grep \"^fpr\" | cut -d: -f10`; do\n" - cmd << "gpg --batch --delete-secret-keys \"$i\" ;\n" if secret + cmd << "gpg --batch --delete-secret-keys \"$i\" ;\n" if private cmd << "gpg --batch --delete-keys \"$i\" ;\n" if public cmd << 'done' - Open3.popen2e(cmd) do |stdin, out, waith_thr| - output = out.read.chomp - if waith_thr.value.success? - return false if output =~ /(public key not found|No public key)/i - raise(Pgp::Failure, "GPG Failed to delete keys for #{email}: #{output}") if output.include?('error') - true - else - raise(Pgp::Failure, "GPG Failed calling gpg to delete secret keys for #{email}: #{output}") - end + + out, err, status = Open3.capture3(cmd, binmode: true) + logger.debug { "IOStreams::Pgp.delete_keys output:\n#{err}#{out}" } if logger + + if status.success? + return false if err =~ /(not found|No public key)/i + raise(Pgp::Failure, "GPG Failed to delete keys for #{email}:#{err}#{out}") if out.include?('error') + true + else + raise(Pgp::Failure, "GPG Failed calling gpg to delete private keys for #{email}: #{err}#{out}") end end - def self.has_key?(email:) - Open3.popen2e("gpg --list-keys --with-colons #{email}") do |stdin, out, waith_thr| - output = out.read.chomp - if waith_thr.value.success? - output.each_line do |line| - return true if line.include?(email) - end - false - else - return false if output =~ /(public key not found|No public key)/i - raise(Pgp::Failure, "GPG Failed calling gpg to list keys for #{email}: #{output}") - end + # Returns [true|false] whether their is a key for the supplied email or key_id + def self.has_key?(email: nil, key_id: nil, private: false) + raise(ArgumentError, 'Either :email, or :key_id must be supplied') if email.nil? && key_id.nil? + + !list_keys(email: email, key_id: key_id, private: private).empty? + end + + # Returns [Array<Hash>] the list of keys. + # Each Hash consists of: + # key_length: [Integer] + # key_type: [String] + # key_id: [String] + # date: [String] + # name: [String] + # email: [String] + # Returns [] if no keys were found. + def self.list_keys(email: nil, key_id: nil, private: false) + version_check + cmd = private ? '--list-secret-keys' : '--list-keys' + out, err, status = Open3.capture3("gpg #{cmd} #{email || key_id}", binmode: true) + logger.debug { "IOStreams::Pgp.list_keys output:\n#{err}#{out}" } if logger + if status.success? && out.length > 0 + # Sample output + # pub 4096R/3A5456F5 2017-06-07 + # uid [ unknown] Joe Bloggs <j@bloggs.net> + # sub 4096R/2C9B240B 2017-06-07 + parse_list_output(out) + else + return [] if err =~ /(key not found|No (public|secret) key)/i + raise(Pgp::Failure, "GPG Failed calling gpg to list keys for #{email || key_id}: #{err}#{out}") end end - # Returns [String] the first fingerprint for the supplied email - # Returns nil if no fingerprint was found - def self.fingerprint(email:) - Open3.popen2e("gpg --list-keys --fingerprint --with-colons #{email}") do |stdin, out, waith_thr| - output = out.read.chomp - if waith_thr.value.success? - output.each_line do |line| - if match = line.match(/\Afpr.*::([^\:]*):\Z/) - return match[1] - end - end - nil - else - return if output =~ /(public key not found|No public key)/i - raise(Pgp::Failure, "GPG Failed calling gpg to list keys for #{email}: #{output}") - end + # Extract information from the supplied key. + # + # Useful for confirming encryption keys before importing them. + # + # Returns [Array<Hash>] the list of primary keys. + # Each Hash consists of: + # key_length: [Integer] + # key_type: [String] + # key_id: [String] + # date: [String] + # name: [String] + # email: [String] + def self.key_info(key:) + version_check + out, err, status = Open3.capture3('gpg', binmode: true, stdin_data: key) + logger.debug { "IOStreams::Pgp.key_info output:\n#{err}#{out}" } if logger + if status.success? && out.length > 0 + # Sample Output: + # + # pub 4096R/3A5456F5 2017-06-07 + # uid Joe Bloggs <j@bloggs.net> + # sub 4096R/2C9B240B 2017-06-07 + parse_list_output(out) + else + raise(Pgp::Failure, "GPG Failed extracting key details: #{err} #{out}") end end - # Returns [String] the key for the supplied email address + # Returns [String] containing all the keys for the supplied email address. # - # email: [String] Email address for requested key + # email: [String] Email address for requested key. # # ascii: [true|false] # Whether to export as ASCII text instead of binary format # Default: true # - # secret: [true|false] + # private: [true|false] # Whether to export the private key # Default: false - def self.export(email:, ascii: true, secret: false) + def self.export(email:, ascii: true, private: false) + version_check armor = ascii ? ' --armor' : nil - cmd = secret ? '--export-secret-keys' : '--export' + cmd = private ? '--export-secret-keys' : '--export' out, err, status = Open3.capture3("gpg#{armor} #{cmd} #{email}", binmode: true) + logger.debug { "IOStreams::Pgp.export output:\n#{err}" } if logger if status.success? && out.length > 0 out else - raise(Pgp::Failure, "GPG Failed reading key: #{email}: #{err} #{out}") + raise(Pgp::Failure, "GPG Failed reading key: #{email}: #{err}") end end # Imports the supplied public/private key - # Returns [String] the output returned from the import command - def self.import(key) + # + # Returns [Array<Hash>] keys that were successfully imported. + # Each Hash consists of: + # key_id: [String] + # type: [String] + # name: [String] + # email: [String] + # Returns [] if the same key was previously imported. + # + # Raises Pgp::Failure if there was an issue importing any of the keys. + # + # Notes: + # * Importing a new key for the same email address does not remove the prior key if any. + # * Invalidated keys must be removed manually. + def self.import(key:) + version_check out, err, status = Open3.capture3('gpg --import', binmode: true, stdin_data: key) - if status.success? && out.length > 0 - out + logger.debug { "IOStreams::Pgp.import output:\n#{err}#{out}" } if logger + if status.success? && err.length > 0 + # Sample output + # + # gpg: key C16500E3: secret key imported\n" + # gpg: key C16500E3: public key "Joe Bloggs <pgp_test@iostreams.net>" imported + # gpg: Total number processed: 1 + # gpg: imported: 1 (RSA: 1) + # gpg: secret keys read: 1 + # gpg: secret keys imported: 1 + # + # Ignores unchanged: + # gpg: key 9615D46D: \"Joe Bloggs <j@bloggs.net>\" not changed\n + results = [] + secret = false + err.each_line do |line| + if line =~ /secret key imported/ + secret = true + elsif match = line.match(/key\s+(\w+):\s+(\w+).+\"(.*)<(.*)>\"/) + results << { + key_id: match[1].to_s.strip, + private: secret, + name: match[3].to_s.strip, + email: match[4].to_s.strip + } + secret = false + end + end + results else - raise(Pgp::Failure, "GPG Failed importing key: #{err} #{out}") + return [] if err =~ /already in secret keyring/ + raise(Pgp::Failure, "GPG Failed importing key: #{err}#{out}") end end # Set the trust level for an existing key. # @@ -234,19 +309,116 @@ # Returns nil if the email was not found # # After importing keys, they are not trusted and the relevant trust level must be set. # Default: 5 : Ultimate def self.set_trust(email:, level: 5) + version_check fingerprint = fingerprint(email: email) return unless fingerprint trust = "#{fingerprint}:#{level + 1}:\n" out, err, status = Open3.capture3('gpg --import-ownertrust', stdin_data: trust) + logger.debug { "IOStreams::Pgp.set_trust output:\n#{err}#{out}" } if logger if status.success? err else raise(Pgp::Failure, "GPG Failed trusting key: #{err} #{out}") end + end + + # DEPRECATED - Use key_ids instead of fingerprints + def self.fingerprint(email:) + version_check + Open3.popen2e("gpg --list-keys --fingerprint --with-colons #{email}") do |stdin, out, waith_thr| + output = out.read.chomp + if waith_thr.value.success? + output.each_line do |line| + if match = line.match(/\Afpr.*::([^\:]*):\Z/) + return match[1] + end + end + nil + else + return if output =~ /(public key not found|No public key)/i + raise(Pgp::Failure, "GPG Failed calling gpg to list keys for #{email}: #{output}") + end + end + end + + def self.logger=(logger) + @logger = logger + end + + # Returns [String] the version of pgp currently installed + def self.pgp_version + @pgp_version ||= begin + out, err, status = Open3.capture3("gpg --version") + logger.debug { "IOStreams::Pgp.version output:\n#{err}#{out}" } if logger + if status.success? + # Sample output + # gpg (GnuPG) 2.0.30 + # libgcrypt 1.7.6 + # Copyright (C) 2015 Free Software Foundation, Inc. + # License GPLv3+: GNU GPL version 3 or later <http://gnu.org/licenses/gpl.html> + # This is free software: you are free to change and redistribute it. + # There is NO WARRANTY, to the extent permitted by law. + # + # Home: ~/.gnupg + # Supported algorithms: + # Pubkey: RSA, RSA, RSA, ELG, DSA + # Cipher: IDEA, 3DES, CAST5, BLOWFISH, AES, AES192, AES256, TWOFISH, + # CAMELLIA128, CAMELLIA192, CAMELLIA256 + # Hash: MD5, SHA1, RIPEMD160, SHA256, SHA384, SHA512, SHA224 + # Compression: Uncompressed, ZIP, ZLIB, BZIP2 + if match = out.lines.first.match(/(\d+\.\d+.\d+)/) + match[1] + end + else + return [] if err =~ /(key not found|No (public|secret) key)/i + raise(Pgp::Failure, "GPG Failed calling gpg to list keys for #{email || key_id}: #{err}#{out}") + end + end + end + + private + + @logger = nil + + def self.logger + @logger + end + + def self.version_check + raise(Pgp::UnsupportedVersion, "Version #{pgp_version} of gpg is not yet supported. You are welcome to submit a Pull Request.") if pgp_version.to_f >= 2.1 + end + + def self.parse_list_output(out) + results = [] + hash = {} + out.each_line do |line| + if match = line.match(/(pub|sec)\s+(\d+)(.*)\/(\w+)\s+(\S+)/) + hash = { + private: match[1] == 'sec', + key_length: match[2].to_s.to_i, + key_type: match[3], + key_id: match[4], + date: (Date.parse(match[5].to_s) rescue match[5]) + } + elsif match = line.match(/uid\s+(.+)<(.+)>/) + name = match[1].strip + hash[:email] = match[2].strip + if match = name.match(/(\[(.+)\])?(.+)/) + trust = match[2].to_s.strip + hash[:trust] = trust unless trust.empty? + hash[:name] = match[3].to_s.strip + else + hash[:name] = name + end + results << hash + hash = {} + end + end + results end end end