lib/safubot/bot.rb in safubot-0.0.1 vs lib/safubot/bot.rb in safubot-0.0.2
- old
+ new
@@ -1,8 +1,12 @@
#!/usr/bin/env ruby
module Safubot
+ def error_report(e)
+ "#{e.inspect}\n#{e.backtrace.join("\n\t")}"
+ end
+
# Defines elements of the input queue, agnostic of the transfer medium.
# May be extended by service-specific modules.
class Request
include MongoMapper::Document
safe
@@ -29,23 +33,24 @@
key :text, String
belongs_to :request, :polymorphic => true
timestamps!
end
- # Here's where we define the main Safubot instance.
class Bot
include Evented
attr_reader :opts, :twitter, :xmpp
+ # Records an error and emits a corresponding :request_error event.
def request_error(req, e)
Log.error "Error processing #{req.source.class} '#{req.text}': #{e}\n#{e.backtrace.join("\n\t")}"
req.errors[Time.now] = e
req.save
emit(:request_error, req, e)
end
+ # Processes an individual request (synchronously).
def process_request(req)
begin
emit(:request, req)
rescue Exception => e
request_error(req, e)
@@ -53,12 +58,12 @@
req.processed = true
req.save
end
end
+ # Performs appropriate dispatch operation for response type.
def dispatch(resp)
- # Performs appropriate dispatch operation for response type.
begin
source = resp.request.source
if Safubot::mode != :production
Log.info "#{source.class} Response to #{source.username}: #{resp.text}"
else
@@ -79,14 +84,16 @@
resp.save
emit(:dispatch_error, resp, e)
end
end
+ # This pulls requests from passive non-streaming sources (currently, the Twitter AJAX API).
def pull
@twitter.pull
end
+ # Goes through each unprocessed Request and submits it for processing.
def process
Request.where(:processed => false).each do |req|
concurrently(req) { process_request(req) }
end
end
@@ -103,23 +110,20 @@
def respond_now(req, text)
respond(req, text)
push
end
+ # Dispatches all undispatched Responses.
def push
Response.where(:dispatched => false).each(&method(:dispatch))
end
-
- def cycle
- pull; process; push
- end
# Wraps EM::defer with error handling and response pushing for the given request.
# @param req The Request being processed.
# @param blk The operation to be performed in a separate thread.
def concurrently(req, &blk)
- EventMachine::defer do
+ EM::defer do
begin
blk.call
rescue Exception => e
request_error(req, e)
end
@@ -127,38 +131,36 @@
push
end
end
def run
- EventMachine::run do
- EM::error_handler do |e|
- if e.is_a? Interrupt
- Log.error("Received interrupt, shutting down.")
- stop
- else
- Log.error("Unhandled exception: #{e}\n#{e.backtrace.join("\t\n")}")
- end
- end
-
- cycle
+ #EventMachine::run do
+ pull; process; push
@twitter.run if @twitter
@xmpp.run if @xmpp
- end
+ begin
+ Process.waitall
+ rescue Interrupt
+ stop
+ end
+ #end
end
+ # Shuts down the event loop.
def stop
@twitter.stop if @twitter
@xmpp.stop if @xmpp
- EM::stop_event_loop
end
+ # Initialises Twitter-related functionality.
def enable_twitter(opts={})
@twitter = Twitter::Bot.new(opts)
@twitter.on(:request) do |req|
process; push
end
end
+ # Initialises XMPP-related functionality.
def enable_xmpp(options={})
defaults = { :jid => nil, :password => nil }
@xmpp = XMPP::Bot.new(defaults.merge(options))
@xmpp.on(:request) do |req|
process; push