#!/usr/bin/env ruby module Safubot def error_report(e) "#{e.inspect}\n#{e.backtrace.join("\n\t")}" end # Defines elements of the input queue, agnostic of the transfer medium. # May be extended by service-specific modules. class Request include MongoMapper::Document safe key :errors, Hash # Errors encountered during processing. key :processed, Boolean, :default => false # Have we processed this request? key :success, Boolean, :default => false # Did we *successfully* process this request? key :text, String # The actual content. belongs_to :source, :polymorphic => true # The concrete medium-specific source. belongs_to :user, :polymorphic => true many :responses, :class_name => "Safubot::Response" timestamps! attr_accessor :callback # If a callback is set, responses will be sent to it instead of Response. end # Defines elements of the output queue, agnostic of the transfer medium. # May be extended by service-specific modules. class Response include MongoMapper::Document safe key :errors, Hash # Errors encountered during dispatch by timestamp. key :dispatched, Boolean, :default => false key :text, String belongs_to :request, :polymorphic => true timestamps! end class Bot include Evented attr_reader :opts, :twitter, :xmpp # Records an error and emits a corresponding :request_error event. def request_error(req, e) Log.error "Error processing #{req.source.class} '#{req.text}': #{e}\n#{e.backtrace.join("\n\t")}" req.errors[Time.now] = e req.save emit(:request_error, req, e) end # Processes an individual request (synchronously). def process_request(req) begin emit(:request, req) rescue Exception => e request_error(req, e) ensure req.processed = true req.save end end # Performs appropriate dispatch operation for response type. def dispatch(resp) begin source = resp.request.source if Safubot::mode != :production Log.info "#{source.class} Response to #{source.username}: #{resp.text}" else if @twitter && [Twitter::Tweet, Twitter::DirectMessage].include?(source.class) @twitter.send(resp) elsif @xmpp && [XMPP::Message].include?(source.class) @xmpp.send(resp) else raise NotImplementedError, "Don't know how to send response to a #{source.class}!" end end resp.dispatched = true resp.save rescue Exception => e Log.error "Error dispatching #{source.class} '#{resp.text}': #{e}" resp.errors[Time.now] = e resp.save emit(:dispatch_error, resp, e) end end # This pulls requests from passive non-streaming sources (currently, the Twitter AJAX API). def pull @twitter.pull end # Goes through each unprocessed Request and submits it for processing. def process Request.where(:processed => false).each do |req| concurrently(req) { process_request(req) } end end # Adds a response to the queue. # @param req Request to respond to. # @param text Contents of the response. def respond(req, text) Log.info("#{req.user.name}: #{req.text}\nsafubot: #{text}") Response.create(:request => req, :text => text) end # Respond + push def respond_now(req, text) respond(req, text) push end # Dispatches all undispatched Responses. def push Response.where(:dispatched => false).each(&method(:dispatch)) end # Wraps EM::defer with error handling and response pushing for the given request. # @param req The Request being processed. # @param blk The operation to be performed in a separate thread. def concurrently(req, &blk) EM::defer do begin blk.call rescue Exception => e request_error(req, e) end push end end def run #EventMachine::run do pull; process; push @twitter.run if @twitter @xmpp.run if @xmpp begin Process.waitall rescue Interrupt stop end #end end # Shuts down the event loop. def stop @twitter.stop if @twitter @xmpp.stop if @xmpp end # Initialises Twitter-related functionality. def enable_twitter(opts={}) @twitter = Twitter::Bot.new(opts) @twitter.on(:request) do |req| process; push end end # Initialises XMPP-related functionality. def enable_xmpp(options={}) defaults = { :jid => nil, :password => nil } @xmpp = XMPP::Bot.new(defaults.merge(options)) @xmpp.on(:request) do |req| process; push end end def initialize(options={}) defaults = { :database => "safubot" } @opts = defaults.merge(options) MongoMapper.database = @opts[:database] MongoMapper.connection = Mongo::Connection.new('localhost', 27017, :pool_size => 5) @handlers = {} end end end