lib/safubot/bot.rb in safubot-0.0.3 vs lib/safubot/bot.rb in safubot-0.0.4
- old
+ new
@@ -6,36 +6,70 @@
# @param e An Exception to print.
def error_report(e)
"#{e.inspect}\n#{e.backtrace.join("\n\t")}"
end
+ ###
+ # An EmbeddedDocument for storing processing/dispatch errors.
+ class Problem
+ include MongoMapper::EmbeddedDocument
+ key :when, Time
+ key :error, String # Exception#to_s
+ key :type, String # Exception class
+ key :backtrace, Array, :default => nil
+ end
+
##
+ # A mixin adding Problem handling.
+ module Problematic
+ ##
+ # Adds a timestamped Problem to the list.
+ # @param e Exception from which the Problem is derived.
+ def add_problem(e)
+ problem = Problem.new(:error => e.to_s, :type => e.class.to_s,
+ :when => Time.now, :backtrace => e.backtrace)
+ self.problems.push(problem)
+ end
+
+ ##
+ # Fetches the most recent Problem.
+ def last_problem
+ self.problems.sort { |x,y| x.when <=> y.when }.last
+ end
+ end
+
+
+ ##
# Defines elements of the input queue, agnostic of the transfer medium.
# May be extended by service-specific modules.
class Request
include MongoMapper::Document
+ include Problematic
safe
- key :errors, Hash # Errors encountered during processing.
+ key :processing, Boolean, :default => false # Currently-processing lock.
key :processed, Boolean, :default => false # Have we processed this request?
key :success, Boolean, :default => false # Did we *successfully* process this request?
key :text, String # The actual content.
belongs_to :source, :polymorphic => true # The concrete medium-specific source.
- belongs_to :user, :polymorphic => true
+ belongs_to :user, :class_name => "Safubot::KnownUser"
many :responses, :class_name => "Safubot::Response"
+ many :problems # Hopefully not that many ;)
timestamps!
end
##
# Defines elements of the output queue, agnostic of the transfer medium.
# May be extended by service-specific modules.
class Response
include MongoMapper::Document
+ include Problematic
safe
- key :errors, Hash # Errors encountered during dispatch by timestamp.
+ key :dispatching, Boolean, :default => false # Dispatch lock.
key :dispatched, Boolean, :default => false
key :text, String
- belongs_to :request, :polymorphic => true
+ belongs_to :request, :class_name => "Safubot::Request"
+ many :problems
timestamps!
end
##
# The main event-processing class. You are encouraged to
@@ -45,59 +79,104 @@
include Evented
attr_reader :opts, :twitter, :xmpp
##
- # Records an error and emits a corresponding :request_error event.
+ # Records an error in processing and emits a corresponding :request_error event.
# @param req The Request for which the error was encountered.
# @param e The caught Exception.
def request_error(req, e)
- Log.error "Error processing #{req.source.class} '#{req.text}': #{e}\n#{e.backtrace.join("\n\t")}"
- req.errors[Time.now] = e
+ Log.error "Error processing #{req.source.class} '#{req.text}': #{error_report(e)}"
+ req.add_problem(e)
req.save
emit(:request_error, req, e)
end
+ ##
+ # Records an error in dispatch and emits a corresponding :dispatch_error event.
+ # @param resp The Response for which the error was encountered.
+ # @param e The caught Exception.
+ def dispatch_error(resp, e)
+ Log.error "Error dispatching #{resp.request.source.class} '#{resp.text}': #{error_report(e)}"
+ resp.add_problem(e)
+ resp.save
+ emit(:dispatch_error, resp, e)
+ end
+
##
# Processes an individual request (synchronously).
# @param req An unprocessed Request.
def process_request(req)
+ req.reload
+ if req.processed
+ Log.debug "Request '#{req.text}' has already been processed, ignoring."
+ return
+ elsif req.processing
+ Log.debug "Request '#{req.text}' is currently in processing, ignoring."
+ return
+ end
+
begin
+ req.processing = true
+ req.save
emit(:request, req)
rescue Exception => e
request_error(req, e)
+ else
+ req.success = true
ensure
- req.processed = true
- req.save
+ #if Safubot::mode == :production
+ req.processing = false
+ req.processed = true
+ req.save
+ #end
end
end
##
# Performs appropriate dispatch operation for response type.
# @param resp An undispatched Response.
def dispatch(resp)
+ resp.reload
+ if resp.dispatched
+ Log.debug "Response '#{resp.text}' has already been dispatched, ignoring."
+ return
+ elsif resp.dispatching
+ Log.debug "Response '#{resp.text}' is already in dispatch, ignoring."
+ return
+ elsif resp.problems.length > 10
+ Log.debug "Response '#{resp.text}' encountered more than ten dispatch errors, ignoring."
+ return
+ elsif !resp.problems.empty? && (Time.now - resp.last_problem.when) < 1.minute
+ Log.debug "Response '#{resp.text}' encountered a dispatch error <1 minute ago, ignoring."
+ return
+ end
+
begin
source = resp.request.source
+ resp.dispatching = true
+ resp.save
+
if Safubot::mode != :production
Log.info "#{source.class} Response to #{source.username}: #{resp.text}"
else
if @twitter && [Twitter::Tweet, Twitter::DirectMessage].include?(source.class)
@twitter.send(resp)
elsif @xmpp && [XMPP::Message].include?(source.class)
@xmpp.send(resp)
else
raise NotImplementedError, "Don't know how to send response to a #{source.class}!"
end
- end
- resp.dispatched = true
- resp.save
+ resp.dispatched = true
+ resp.save
+ end
rescue Exception => e
- Log.error "Error dispatching #{source.class} '#{resp.text}': #{e}"
- resp.errors[Time.now] = e
+ dispatch_error(resp, e)
+ ensure
+ resp.dispatching = false
resp.save
- emit(:dispatch_error, resp, e)
end
end
# This pulls requests from passive non-streaming sources (currently, the Twitter AJAX API).
def pull
@@ -110,24 +189,18 @@
concurrently(req) { process_request(req) }
end
end
##
- # Adds a response to the queue.
+ # Adds a response to the queue and dispatches it.
# @param req Request to respond to.
# @param text Contents of the response.
def respond(req, text)
Log.info("#{req.user.name}: #{req.text}\nsafubot: #{text}")
- Response.create(:request => req, :text => text)
+ dispatch(Response.create(:request => req, :text => text))
end
- # Respond + push
- def respond_now(req, text)
- respond(req, text)
- push
- end
-
# Dispatches all undispatched Responses.
def push
Response.where(:dispatched => false).each(&method(:dispatch))
end
@@ -140,11 +213,10 @@
begin
blk.call
rescue Exception => e
request_error(req, e)
end
-
push
end
end
# Runs an initial request-processing loop and then forks the streaming processes.
@@ -172,19 +244,21 @@
# Initialises Twitter-related functionality.
def enable_twitter(opts={})
@twitter = Twitter::Bot.new(opts)
@twitter.on(:request) do |req|
- process; push
+ process_request(req)
+ req.responses.where(:dispatched => false).map(&method(:dispatch))
end
end
# Initialises XMPP-related functionality.
def enable_xmpp(options={})
defaults = { :jid => nil, :password => nil }
@xmpp = XMPP::Bot.new(defaults.merge(options))
@xmpp.on(:request) do |req|
- process; push
+ process_request(req)
+ req.responses.where(:dispatched => false).map(&method(:dispatch))
end
end
def initialize(options={})
defaults = { :database => "safubot" }