lib/safubot/bot.rb in safubot-0.0.1 vs lib/safubot/bot.rb in safubot-0.0.2

- old
+ new

@@ -1,8 +1,12 @@ #!/usr/bin/env ruby module Safubot + def error_report(e) + "#{e.inspect}\n#{e.backtrace.join("\n\t")}" + end + # Defines elements of the input queue, agnostic of the transfer medium. # May be extended by service-specific modules. class Request include MongoMapper::Document safe @@ -29,23 +33,24 @@ key :text, String belongs_to :request, :polymorphic => true timestamps! end - # Here's where we define the main Safubot instance. class Bot include Evented attr_reader :opts, :twitter, :xmpp + # Records an error and emits a corresponding :request_error event. def request_error(req, e) Log.error "Error processing #{req.source.class} '#{req.text}': #{e}\n#{e.backtrace.join("\n\t")}" req.errors[Time.now] = e req.save emit(:request_error, req, e) end + # Processes an individual request (synchronously). def process_request(req) begin emit(:request, req) rescue Exception => e request_error(req, e) @@ -53,12 +58,12 @@ req.processed = true req.save end end + # Performs appropriate dispatch operation for response type. def dispatch(resp) - # Performs appropriate dispatch operation for response type. begin source = resp.request.source if Safubot::mode != :production Log.info "#{source.class} Response to #{source.username}: #{resp.text}" else @@ -79,14 +84,16 @@ resp.save emit(:dispatch_error, resp, e) end end + # This pulls requests from passive non-streaming sources (currently, the Twitter AJAX API). def pull @twitter.pull end + # Goes through each unprocessed Request and submits it for processing. def process Request.where(:processed => false).each do |req| concurrently(req) { process_request(req) } end end @@ -103,23 +110,20 @@ def respond_now(req, text) respond(req, text) push end + # Dispatches all undispatched Responses. def push Response.where(:dispatched => false).each(&method(:dispatch)) end - - def cycle - pull; process; push - end # Wraps EM::defer with error handling and response pushing for the given request. # @param req The Request being processed. # @param blk The operation to be performed in a separate thread. def concurrently(req, &blk) - EventMachine::defer do + EM::defer do begin blk.call rescue Exception => e request_error(req, e) end @@ -127,38 +131,36 @@ push end end def run - EventMachine::run do - EM::error_handler do |e| - if e.is_a? Interrupt - Log.error("Received interrupt, shutting down.") - stop - else - Log.error("Unhandled exception: #{e}\n#{e.backtrace.join("\t\n")}") - end - end - - cycle + #EventMachine::run do + pull; process; push @twitter.run if @twitter @xmpp.run if @xmpp - end + begin + Process.waitall + rescue Interrupt + stop + end + #end end + # Shuts down the event loop. def stop @twitter.stop if @twitter @xmpp.stop if @xmpp - EM::stop_event_loop end + # Initialises Twitter-related functionality. def enable_twitter(opts={}) @twitter = Twitter::Bot.new(opts) @twitter.on(:request) do |req| process; push end end + # Initialises XMPP-related functionality. def enable_xmpp(options={}) defaults = { :jid => nil, :password => nil } @xmpp = XMPP::Bot.new(defaults.merge(options)) @xmpp.on(:request) do |req| process; push