lib/safubot/bot.rb in safubot-0.0.3 vs lib/safubot/bot.rb in safubot-0.0.4

- old
+ new

@@ -6,36 +6,70 @@ # @param e An Exception to print. def error_report(e) "#{e.inspect}\n#{e.backtrace.join("\n\t")}" end + ### + # An EmbeddedDocument for storing processing/dispatch errors. + class Problem + include MongoMapper::EmbeddedDocument + key :when, Time + key :error, String # Exception#to_s + key :type, String # Exception class + key :backtrace, Array, :default => nil + end + ## + # A mixin adding Problem handling. + module Problematic + ## + # Adds a timestamped Problem to the list. + # @param e Exception from which the Problem is derived. + def add_problem(e) + problem = Problem.new(:error => e.to_s, :type => e.class.to_s, + :when => Time.now, :backtrace => e.backtrace) + self.problems.push(problem) + end + + ## + # Fetches the most recent Problem. + def last_problem + self.problems.sort { |x,y| x.when <=> y.when }.last + end + end + + + ## # Defines elements of the input queue, agnostic of the transfer medium. # May be extended by service-specific modules. class Request include MongoMapper::Document + include Problematic safe - key :errors, Hash # Errors encountered during processing. + key :processing, Boolean, :default => false # Currently-processing lock. key :processed, Boolean, :default => false # Have we processed this request? key :success, Boolean, :default => false # Did we *successfully* process this request? key :text, String # The actual content. belongs_to :source, :polymorphic => true # The concrete medium-specific source. - belongs_to :user, :polymorphic => true + belongs_to :user, :class_name => "Safubot::KnownUser" many :responses, :class_name => "Safubot::Response" + many :problems # Hopefully not that many ;) timestamps! end ## # Defines elements of the output queue, agnostic of the transfer medium. # May be extended by service-specific modules. class Response include MongoMapper::Document + include Problematic safe - key :errors, Hash # Errors encountered during dispatch by timestamp. + key :dispatching, Boolean, :default => false # Dispatch lock. key :dispatched, Boolean, :default => false key :text, String - belongs_to :request, :polymorphic => true + belongs_to :request, :class_name => "Safubot::Request" + many :problems timestamps! end ## # The main event-processing class. You are encouraged to @@ -45,59 +79,104 @@ include Evented attr_reader :opts, :twitter, :xmpp ## - # Records an error and emits a corresponding :request_error event. + # Records an error in processing and emits a corresponding :request_error event. # @param req The Request for which the error was encountered. # @param e The caught Exception. def request_error(req, e) - Log.error "Error processing #{req.source.class} '#{req.text}': #{e}\n#{e.backtrace.join("\n\t")}" - req.errors[Time.now] = e + Log.error "Error processing #{req.source.class} '#{req.text}': #{error_report(e)}" + req.add_problem(e) req.save emit(:request_error, req, e) end + ## + # Records an error in dispatch and emits a corresponding :dispatch_error event. + # @param resp The Response for which the error was encountered. + # @param e The caught Exception. + def dispatch_error(resp, e) + Log.error "Error dispatching #{resp.request.source.class} '#{resp.text}': #{error_report(e)}" + resp.add_problem(e) + resp.save + emit(:dispatch_error, resp, e) + end + ## # Processes an individual request (synchronously). # @param req An unprocessed Request. def process_request(req) + req.reload + if req.processed + Log.debug "Request '#{req.text}' has already been processed, ignoring." + return + elsif req.processing + Log.debug "Request '#{req.text}' is currently in processing, ignoring." + return + end + begin + req.processing = true + req.save emit(:request, req) rescue Exception => e request_error(req, e) + else + req.success = true ensure - req.processed = true - req.save + #if Safubot::mode == :production + req.processing = false + req.processed = true + req.save + #end end end ## # Performs appropriate dispatch operation for response type. # @param resp An undispatched Response. def dispatch(resp) + resp.reload + if resp.dispatched + Log.debug "Response '#{resp.text}' has already been dispatched, ignoring." + return + elsif resp.dispatching + Log.debug "Response '#{resp.text}' is already in dispatch, ignoring." + return + elsif resp.problems.length > 10 + Log.debug "Response '#{resp.text}' encountered more than ten dispatch errors, ignoring." + return + elsif !resp.problems.empty? && (Time.now - resp.last_problem.when) < 1.minute + Log.debug "Response '#{resp.text}' encountered a dispatch error <1 minute ago, ignoring." + return + end + begin source = resp.request.source + resp.dispatching = true + resp.save + if Safubot::mode != :production Log.info "#{source.class} Response to #{source.username}: #{resp.text}" else if @twitter && [Twitter::Tweet, Twitter::DirectMessage].include?(source.class) @twitter.send(resp) elsif @xmpp && [XMPP::Message].include?(source.class) @xmpp.send(resp) else raise NotImplementedError, "Don't know how to send response to a #{source.class}!" end - end - resp.dispatched = true - resp.save + resp.dispatched = true + resp.save + end rescue Exception => e - Log.error "Error dispatching #{source.class} '#{resp.text}': #{e}" - resp.errors[Time.now] = e + dispatch_error(resp, e) + ensure + resp.dispatching = false resp.save - emit(:dispatch_error, resp, e) end end # This pulls requests from passive non-streaming sources (currently, the Twitter AJAX API). def pull @@ -110,24 +189,18 @@ concurrently(req) { process_request(req) } end end ## - # Adds a response to the queue. + # Adds a response to the queue and dispatches it. # @param req Request to respond to. # @param text Contents of the response. def respond(req, text) Log.info("#{req.user.name}: #{req.text}\nsafubot: #{text}") - Response.create(:request => req, :text => text) + dispatch(Response.create(:request => req, :text => text)) end - # Respond + push - def respond_now(req, text) - respond(req, text) - push - end - # Dispatches all undispatched Responses. def push Response.where(:dispatched => false).each(&method(:dispatch)) end @@ -140,11 +213,10 @@ begin blk.call rescue Exception => e request_error(req, e) end - push end end # Runs an initial request-processing loop and then forks the streaming processes. @@ -172,19 +244,21 @@ # Initialises Twitter-related functionality. def enable_twitter(opts={}) @twitter = Twitter::Bot.new(opts) @twitter.on(:request) do |req| - process; push + process_request(req) + req.responses.where(:dispatched => false).map(&method(:dispatch)) end end # Initialises XMPP-related functionality. def enable_xmpp(options={}) defaults = { :jid => nil, :password => nil } @xmpp = XMPP::Bot.new(defaults.merge(options)) @xmpp.on(:request) do |req| - process; push + process_request(req) + req.responses.where(:dispatched => false).map(&method(:dispatch)) end end def initialize(options={}) defaults = { :database => "safubot" }