lib/rflow.rb in rflow-1.2.0 vs lib/rflow.rb in rflow-1.3.0a1

- old
+ new

@@ -59,16 +59,40 @@ @master = Master.new(configuration) master.daemonize! if @daemonize master.run! # blocks until EventMachine stops end + # Nice pretty wrapper method to help reduce direct dependencies on EM + def self.next_tick(pr = nil, &block) + EM.next_tick(pr, &block) + end + + def self.default_error_callback(error) + RFlow.logger.error "Unhandled error on worker thread: #{error.class}: #{error.message}, because: #{error.backtrace}" + end + + # Wrapped version of EM.defer that also fixes logging, releases AR + # connections, and catches exceptions that would otherwise propagate to the + # main thread magically + def self.defer(op = nil, callback = nil, errback = nil, &blk) + context = RFlow.logger.clone_logging_context + EM.defer(nil, callback, errback || method(:default_error_callback)) do + begin + RFlow.logger.apply_logging_context context + (op || blk).call + ensure + ActiveRecord::Base.connection_pool.release_connection + end + end + end + # This ought to be in EM, but we'll put it here instead of monkey-patching def self.next_tick_and_wait mutex = Mutex.new condition = ConditionVariable.new mutex.synchronize do # while locked... - EM.next_tick do # schedule a job that will... + RFlow.next_tick do # schedule a job that will... mutex.synchronize do # grab the lock begin yield # do its thing... condition.signal # then wake us up when it's done... rescue