lib/asir/transport/beanstalk.rb in asir_beanstalk-1.1.4 vs lib/asir/transport/beanstalk.rb in asir_beanstalk-1.1.5
- old
+ new
@@ -3,30 +3,44 @@
module ASIR
class Transport
# !SLIDE
# Beanstalk Transport
class Beanstalk < TcpSocket
- LINE_TERMINATOR = "\r\n".freeze
+ attr_accessor :tube, :tube_default
+ attr_accessor :priority, :delay, :ttr
- attr_accessor :tube, :priority, :delay, :ttr
-
def initialize *args
- self.port_default ||= 11300
- @tube ||= 'asir'
+ self.scheme_default ||= 'beanstalk'
+ self.host_default ||= '127.0.0.1'
+ self.port_default ||= 11300
+ self.tube_default ||= 'asir'
@priority ||= 0
@delay ||= 0
@ttr ||= 600
super
end
+ def tube
+ @tube ||=
+ @uri && (
+ p = _uri.path.sub(%r{\A/}, '')
+ p = nil if p.empty?
+ p
+ ) || tube_default
+ end
+
+ def path_default
+ "/#{tube}"
+ end
+
# !SLIDE
# Sends the encoded Message payload String.
def _send_message message, message_payload
stream.with_stream! do | s |
begin
- match =
- _beanstalk(s,
+ match =
+ _beanstalk(s,
"put #{message[:beanstalk_priority] || @priority} #{message[:beanstalk_delay] || @delay} #{message[:beanstalk_ttr] || @ttr} #{message_payload.size}\r\n",
/\AINSERTED (\d+)\r\n\Z/,
message_payload)
job_id = message[:beanstalk_job_id] = match[1].to_i
_log { "beanstalk_job_id = #{job_id.inspect}" } if @verbose >= 2
@@ -43,16 +57,16 @@
# !SLIDE
# Receives the encoded Message payload String.
def _receive_message channel, additional_data
channel.with_stream! do | stream |
begin
- match =
+ match =
_beanstalk(stream,
RESERVE,
/\ARESERVED (\d+) (\d+)\r\n\Z/)
additional_data[:beanstalk_job_id] = match[1].to_i
- additional_data[:beanstalk_message_size] =
+ additional_data[:beanstalk_message_size] =
size = match[2].to_i
message_payload = stream.read(size)
_read_line_and_expect! stream, /\A\r\n\Z/
# Pass the original stream used to #_send_result below.
[ message_payload, stream ]
@@ -70,19 +84,19 @@
def _send_result message, result, result_payload, channel, stream
#
# There is a possibility here the following could happen:
#
# _receive_message
- # channel == #<Channel:1>
+ # channel == #<Channel:1>
# channel.stream == #<TCPSocket:1234>
# end
# ...
# ERROR OCCURES:
# channel.stream.close
# channel.stream = nil
# ...
- # _send_result
+ # _send_result
# channel == #<Channel:1>
# channel.stream == #<TCPSocket:5678> # NEW CONNECTION
# stream.write "delete #{job_id}"
# ...
#
@@ -129,10 +143,12 @@
_log { [ :_beanstalk, :result, match[0] ] } if @verbose >= 3
end
match
end
+ LINE_TERMINATOR = "\r\n".freeze
+
def _after_connect! stream
if @tube
_beanstalk(stream,
"use #{@tube}\r\n",
/\AUSING #{@tube}\r\n\Z/)
@@ -146,11 +162,11 @@
@server = connect!(:try_max => nil,
:try_sleep => 1,
:try_sleep_increment => 0.1,
:try_sleep_max => 10) do | stream |
if @tube
- _beanstalk(stream,
+ _beanstalk(stream,
"watch #{@tube}\r\n",
/\AWATCHING (\d+)\r\n\Z/)
end
end
self
@@ -171,11 +187,11 @@
# beanstalk connections are long lived.
false
end
def _start_conduit!
- addr = address ? "-l #{address} " : ""
- cmd = "beanstalkd #{addr}-p #{port}"
+ opt = host ? "-l #{host} " : ""
+ cmd = "beanstalkd #{opt}-p #{port}"
$stderr.puts " #{cmd}" if @conduit_options[:verbose]
exec(cmd)
end
end
# !SLIDE END