lib/logstash/inputs/imap.rb in logstash-input-imap-3.0.6 vs lib/logstash/inputs/imap.rb in logstash-input-imap-3.0.7

- old
+ new

@@ -27,15 +27,21 @@ config :lowercase_headers, :validate => :boolean, :default => true config :check_interval, :validate => :number, :default => 300 config :delete, :validate => :boolean, :default => false config :expunge, :validate => :boolean, :default => false config :strip_attachments, :validate => :boolean, :default => false - + # For multipart messages, use the first part that has this # content-type as the event message. config :content_type, :validate => :string, :default => "text/plain" + # Whether to use IMAP uid to track last processed message + config :uid_tracking, :validate => :boolean, :default => false + + # Path to file with last run time metadata + config :sincedb_path, :validate => :string, :required => false + def register require "net/imap" # in stdlib require "mail" # gem 'mail' if @secure and not @verify_cert @@ -48,10 +54,26 @@ else @port = 143 end end + # Load last processed IMAP uid from file if exists + if @sincedb_path.nil? + datapath = File.join(LogStash::SETTINGS.get_value("path.data"), "plugins", "inputs", "imap") + # Ensure that the filepath exists before writing, since it's deeply nested. + FileUtils::mkdir_p datapath + @sincedb_path = File.join(datapath, ".sincedb_" + Digest::MD5.hexdigest("#{@user}_#{@host}_#{@port}_#{@folder}")) + end + if File.directory?(@sincedb_path) + raise ArgumentError.new("The \"sincedb_path\" argument must point to a file, received a directory: \"#{@sincedb_path}\"") + end + @logger.info("Using \"sincedb_path\": \"#{@sincedb_path}\"") + if File.exist?(@sincedb_path) + @uid_last_value = File.read(@sincedb_path).to_i + @logger.info("Loading \"uid_last_value\": \"#{@uid_last_value}\"") + end + @content_type_re = Regexp.new("^" + @content_type) end # def register def connect sslopt = @secure @@ -73,37 +95,59 @@ def check_mail(queue) # TODO(sissel): handle exceptions happening during runtime: # EOFError, OpenSSL::SSL::SSLError imap = connect imap.select(@folder) - ids = imap.search("NOT SEEN") + if @uid_tracking && @uid_last_value + # If there are no new messages, uid_search returns @uid_last_value + # because it is the last message, so we need to delete it. + ids = imap.uid_search(["UID", (@uid_last_value+1..-1)]).delete_if { |uid| + uid <= @uid_last_value + } + else + ids = imap.uid_search("NOT SEEN") + end ids.each_slice(@fetch_count) do |id_set| - items = imap.fetch(id_set, "RFC822") + items = imap.uid_fetch(id_set, ["BODY.PEEK[]", "UID"]) items.each do |item| - next unless item.attr.has_key?("RFC822") - mail = Mail.read_from_string(item.attr["RFC822"]) + next unless item.attr.has_key?("BODY[]") + mail = Mail.read_from_string(item.attr["BODY[]"]) if @strip_attachments queue << parse_mail(mail.without_attachments!) else queue << parse_mail(mail) end + # Mark message as processed + @uid_last_value = item.attr["UID"] + imap.uid_store(@uid_last_value, '+FLAGS', @delete || @expunge ? :Deleted : :Seen) + + # Stop message processing if it is requested + break if stop? end - imap.store(id_set, '+FLAGS', @delete ? :Deleted : :Seen) - - end + # Expunge deleted messages + imap.expunge() if @expunge - # Enable an 'expunge' IMAP command after the items.each loop - if @expunge - # Force messages to be marked as "Deleted", the above may or may not be working as expected. "Seen" means nothing if you are going to - # delete a message after processing. - imap.store(id_set, '+FLAGS', [:Deleted]) - imap.expunge() + # Stop message fetching if it is requested + break if stop? end - imap.close - imap.disconnect + rescue => e + @logger.error("Encountered error #{e.class}", :message => e.message, :backtrace => e.backtrace) + # Do not raise error, check_mail will be invoked in the next run time + + ensure + # Close the connection (and ignore errors) + imap.close rescue nil + imap.disconnect rescue nil + + # Always save @uid_last_value so when tracking is switched from + # "NOT SEEN" to "UID" we will continue from first unprocessed message + if @uid_last_value + @logger.info("Saving \"uid_last_value\": \"#{@uid_last_value}\"") + File.write(@sincedb_path, @uid_last_value) + end end def parse_mail(mail) # Add a debug message so we can track what message might cause an error later @logger.debug? && @logger.debug("Working with message_id", :message_id => mail.message_id)