lib/logstash/inputs/imap.rb in logstash-input-imap-3.0.6 vs lib/logstash/inputs/imap.rb in logstash-input-imap-3.0.7
- old
+ new
@@ -27,15 +27,21 @@
config :lowercase_headers, :validate => :boolean, :default => true
config :check_interval, :validate => :number, :default => 300
config :delete, :validate => :boolean, :default => false
config :expunge, :validate => :boolean, :default => false
config :strip_attachments, :validate => :boolean, :default => false
-
+
# For multipart messages, use the first part that has this
# content-type as the event message.
config :content_type, :validate => :string, :default => "text/plain"
+ # Whether to use IMAP uid to track last processed message
+ config :uid_tracking, :validate => :boolean, :default => false
+
+ # Path to file with last run time metadata
+ config :sincedb_path, :validate => :string, :required => false
+
def register
require "net/imap" # in stdlib
require "mail" # gem 'mail'
if @secure and not @verify_cert
@@ -48,10 +54,26 @@
else
@port = 143
end
end
+ # Load last processed IMAP uid from file if exists
+ if @sincedb_path.nil?
+ datapath = File.join(LogStash::SETTINGS.get_value("path.data"), "plugins", "inputs", "imap")
+ # Ensure that the filepath exists before writing, since it's deeply nested.
+ FileUtils::mkdir_p datapath
+ @sincedb_path = File.join(datapath, ".sincedb_" + Digest::MD5.hexdigest("#{@user}_#{@host}_#{@port}_#{@folder}"))
+ end
+ if File.directory?(@sincedb_path)
+ raise ArgumentError.new("The \"sincedb_path\" argument must point to a file, received a directory: \"#{@sincedb_path}\"")
+ end
+ @logger.info("Using \"sincedb_path\": \"#{@sincedb_path}\"")
+ if File.exist?(@sincedb_path)
+ @uid_last_value = File.read(@sincedb_path).to_i
+ @logger.info("Loading \"uid_last_value\": \"#{@uid_last_value}\"")
+ end
+
@content_type_re = Regexp.new("^" + @content_type)
end # def register
def connect
sslopt = @secure
@@ -73,37 +95,59 @@
def check_mail(queue)
# TODO(sissel): handle exceptions happening during runtime:
# EOFError, OpenSSL::SSL::SSLError
imap = connect
imap.select(@folder)
- ids = imap.search("NOT SEEN")
+ if @uid_tracking && @uid_last_value
+ # If there are no new messages, uid_search returns @uid_last_value
+ # because it is the last message, so we need to delete it.
+ ids = imap.uid_search(["UID", (@uid_last_value+1..-1)]).delete_if { |uid|
+ uid <= @uid_last_value
+ }
+ else
+ ids = imap.uid_search("NOT SEEN")
+ end
ids.each_slice(@fetch_count) do |id_set|
- items = imap.fetch(id_set, "RFC822")
+ items = imap.uid_fetch(id_set, ["BODY.PEEK[]", "UID"])
items.each do |item|
- next unless item.attr.has_key?("RFC822")
- mail = Mail.read_from_string(item.attr["RFC822"])
+ next unless item.attr.has_key?("BODY[]")
+ mail = Mail.read_from_string(item.attr["BODY[]"])
if @strip_attachments
queue << parse_mail(mail.without_attachments!)
else
queue << parse_mail(mail)
end
+ # Mark message as processed
+ @uid_last_value = item.attr["UID"]
+ imap.uid_store(@uid_last_value, '+FLAGS', @delete || @expunge ? :Deleted : :Seen)
+
+ # Stop message processing if it is requested
+ break if stop?
end
- imap.store(id_set, '+FLAGS', @delete ? :Deleted : :Seen)
-
- end
+ # Expunge deleted messages
+ imap.expunge() if @expunge
- # Enable an 'expunge' IMAP command after the items.each loop
- if @expunge
- # Force messages to be marked as "Deleted", the above may or may not be working as expected. "Seen" means nothing if you are going to
- # delete a message after processing.
- imap.store(id_set, '+FLAGS', [:Deleted])
- imap.expunge()
+ # Stop message fetching if it is requested
+ break if stop?
end
- imap.close
- imap.disconnect
+ rescue => e
+ @logger.error("Encountered error #{e.class}", :message => e.message, :backtrace => e.backtrace)
+ # Do not raise error, check_mail will be invoked in the next run time
+
+ ensure
+ # Close the connection (and ignore errors)
+ imap.close rescue nil
+ imap.disconnect rescue nil
+
+ # Always save @uid_last_value so when tracking is switched from
+ # "NOT SEEN" to "UID" we will continue from first unprocessed message
+ if @uid_last_value
+ @logger.info("Saving \"uid_last_value\": \"#{@uid_last_value}\"")
+ File.write(@sincedb_path, @uid_last_value)
+ end
end
def parse_mail(mail)
# Add a debug message so we can track what message might cause an error later
@logger.debug? && @logger.debug("Working with message_id", :message_id => mail.message_id)