lib/asir/transport/resque.rb in asir-1.0.1 vs lib/asir/transport/resque.rb in asir-1.0.4

- old
+ new

@@ -14,14 +14,10 @@ def initialize *args @port_default = 6379 @scheme_default = 'redis'.freeze super self.one_way = true - # Reraise exception, let Resque::Worker handle it. - @on_exeception ||= lambda do | trans, exc, type, message | - raise exc, exc.backtrace - end end # !SLIDE # Resque client. def _client_connect! @@ -32,10 +28,11 @@ end # !SLIDE # Resque server (worker). def _server! + # $stderr.puts " #{$$} #{self} _server!" resque_connect! resque_worker rescue ::Exception => exc raise exc.class, "#{self.class} #{uri}: #{exc.message}", exc.backtrace end @@ -50,11 +47,11 @@ super end def _send_message message, message_payload stream.with_stream! do | io | # Force connect - $stderr.puts " #{self} _send_message #{message_payload.inspect} to queue=#{queue.inspect} as #{self.class} :process_job" if @verbose >= 2 + $stderr.puts " #{$$} #{self} _send_message #{message_payload.inspect} to queue=#{queue.inspect} as #{self.class} :process_job" if @verbose >= 2 ::Resque.enqueue_to(queue, self.class, message_payload) end end def queues @@ -105,13 +102,13 @@ def serve_stream_message! in_stream, out_stream # ignored save = Thread.current[:asir_transport_resque_instance] Thread.current[:asir_transport_resque_instance] = self poll_throttle throttle do - # $stderr.puts " #{self} resque_worker = #{resque_worker} on queues #{resque_worker.queues}" + $stderr.puts " #{$$} #{self} serve_stream_message!: resque_worker = #{resque_worker} on queues #{resque_worker.queues}" if @verbose >= 3 if job = resque_worker.reserve - $stderr.puts " #{self} serve_stream_message! job=#{job.class}:#{job.inspect}" if @verbose >= 2 + $stderr.puts " #{$$} #{self} serve_stream_message! job=#{job.class}:#{job.inspect}" if @verbose >= 2 resque_worker.process(job) end job end self @@ -126,11 +123,11 @@ # Pass payload as in_stream; _receive_message will return it. t.serve_message! payload, nil end def _receive_message payload, additional_data # is actual payload - # $stderr.puts " #{self} _receive_message payload=#{payload.inspect}" + # $stderr.puts " #{$$} #{self} _receive_message payload=#{payload.inspect}" [ payload, nil ] end #################################### @@ -143,16 +140,17 @@ _uri ) end def resque_connect! + @redis_config = { + :host => host, + :port => port, + :thread_safe => true, + } @redis = - ::Redis.new({ - :host => address || '127.0.0.1', - :port => port, - :thread_safe => true, - }) + ::Redis.new(@redis_config) if namespace_ ::Resque.redis = @redis = ::Redis::Namespace.new(namespace_, :redis => @redis) ::Resque.redis.namespace = namespace_ @@ -186,9 +184,13 @@ $0 = @save_progname if @save_progname if worker = @resque_worker worker.unregister_worker end self + rescue Redis::CannotConnectError + # This error is not actionable since server + # is stopping. + nil end ######################################### def _start_conduit!