lib/asir/transport/resque.rb in asir-1.0.1 vs lib/asir/transport/resque.rb in asir-1.0.4
- old
+ new
@@ -14,14 +14,10 @@
def initialize *args
@port_default = 6379
@scheme_default = 'redis'.freeze
super
self.one_way = true
- # Reraise exception, let Resque::Worker handle it.
- @on_exeception ||= lambda do | trans, exc, type, message |
- raise exc, exc.backtrace
- end
end
# !SLIDE
# Resque client.
def _client_connect!
@@ -32,10 +28,11 @@
end
# !SLIDE
# Resque server (worker).
def _server!
+ # $stderr.puts " #{$$} #{self} _server!"
resque_connect!
resque_worker
rescue ::Exception => exc
raise exc.class, "#{self.class} #{uri}: #{exc.message}", exc.backtrace
end
@@ -50,11 +47,11 @@
super
end
def _send_message message, message_payload
stream.with_stream! do | io | # Force connect
- $stderr.puts " #{self} _send_message #{message_payload.inspect} to queue=#{queue.inspect} as #{self.class} :process_job" if @verbose >= 2
+ $stderr.puts " #{$$} #{self} _send_message #{message_payload.inspect} to queue=#{queue.inspect} as #{self.class} :process_job" if @verbose >= 2
::Resque.enqueue_to(queue, self.class, message_payload)
end
end
def queues
@@ -105,13 +102,13 @@
def serve_stream_message! in_stream, out_stream # ignored
save = Thread.current[:asir_transport_resque_instance]
Thread.current[:asir_transport_resque_instance] = self
poll_throttle throttle do
- # $stderr.puts " #{self} resque_worker = #{resque_worker} on queues #{resque_worker.queues}"
+ $stderr.puts " #{$$} #{self} serve_stream_message!: resque_worker = #{resque_worker} on queues #{resque_worker.queues}" if @verbose >= 3
if job = resque_worker.reserve
- $stderr.puts " #{self} serve_stream_message! job=#{job.class}:#{job.inspect}" if @verbose >= 2
+ $stderr.puts " #{$$} #{self} serve_stream_message! job=#{job.class}:#{job.inspect}" if @verbose >= 2
resque_worker.process(job)
end
job
end
self
@@ -126,11 +123,11 @@
# Pass payload as in_stream; _receive_message will return it.
t.serve_message! payload, nil
end
def _receive_message payload, additional_data # is actual payload
- # $stderr.puts " #{self} _receive_message payload=#{payload.inspect}"
+ # $stderr.puts " #{$$} #{self} _receive_message payload=#{payload.inspect}"
[ payload, nil ]
end
####################################
@@ -143,16 +140,17 @@
_uri
)
end
def resque_connect!
+ @redis_config = {
+ :host => host,
+ :port => port,
+ :thread_safe => true,
+ }
@redis =
- ::Redis.new({
- :host => address || '127.0.0.1',
- :port => port,
- :thread_safe => true,
- })
+ ::Redis.new(@redis_config)
if namespace_
::Resque.redis =
@redis =
::Redis::Namespace.new(namespace_, :redis => @redis)
::Resque.redis.namespace = namespace_
@@ -186,9 +184,13 @@
$0 = @save_progname if @save_progname
if worker = @resque_worker
worker.unregister_worker
end
self
+ rescue Redis::CannotConnectError
+ # This error is not actionable since server
+ # is stopping.
+ nil
end
#########################################
def _start_conduit!