lib/rflow.rb in rflow-1.2.0 vs lib/rflow.rb in rflow-1.3.0a1
- old
+ new
@@ -59,16 +59,40 @@
@master = Master.new(configuration)
master.daemonize! if @daemonize
master.run! # blocks until EventMachine stops
end
+ # Nice pretty wrapper method to help reduce direct dependencies on EM
+ def self.next_tick(pr = nil, &block)
+ EM.next_tick(pr, &block)
+ end
+
+ def self.default_error_callback(error)
+ RFlow.logger.error "Unhandled error on worker thread: #{error.class}: #{error.message}, because: #{error.backtrace}"
+ end
+
+ # Wrapped version of EM.defer that also fixes logging, releases AR
+ # connections, and catches exceptions that would otherwise propagate to the
+ # main thread magically
+ def self.defer(op = nil, callback = nil, errback = nil, &blk)
+ context = RFlow.logger.clone_logging_context
+ EM.defer(nil, callback, errback || method(:default_error_callback)) do
+ begin
+ RFlow.logger.apply_logging_context context
+ (op || blk).call
+ ensure
+ ActiveRecord::Base.connection_pool.release_connection
+ end
+ end
+ end
+
# This ought to be in EM, but we'll put it here instead of monkey-patching
def self.next_tick_and_wait
mutex = Mutex.new
condition = ConditionVariable.new
mutex.synchronize do # while locked...
- EM.next_tick do # schedule a job that will...
+ RFlow.next_tick do # schedule a job that will...
mutex.synchronize do # grab the lock
begin
yield # do its thing...
condition.signal # then wake us up when it's done...
rescue