lib/asir/transport/beanstalk.rb in asir_beanstalk-1.2.0 vs lib/asir/transport/beanstalk.rb in asir_beanstalk-1.2.3
- old
+ new
@@ -7,10 +7,11 @@
class Beanstalk < TcpSocket
attr_accessor :tube, :tube_default
attr_accessor :priority, :delay, :ttr
def initialize *args
+ @one_way = true
self.scheme_default ||= 'beanstalk'
self.host_default ||= '127.0.0.1'
self.port_default ||= 11300
self.tube_default ||= 'asir'
@priority ||= 0
@@ -59,32 +60,33 @@
# Receives the encoded Message payload String.
def _receive_message state
additional_data = state.additional_data ||= { }
state.in_stream.with_stream! do | stream |
begin
- match =
+ match = with_force_stop! do
_beanstalk(stream,
- RESERVE,
- /\ARESERVED (\d+) (\d+)\r\n\Z/)
+ RESERVE,
+ /\ARESERVED (\d+) (\d+)\r\n\Z/)
+ end
additional_data[:beanstalk_job_id] = match[1].to_i
additional_data[:beanstalk_message_size] =
size = match[2].to_i
state.message_payload = stream.read(size)
_read_line_and_expect! stream, /\A\r\n\Z/
state.result_opaque = stream
rescue ::Exception => exc
_log { [ :_receive_message, :exception, exc ] }
additional_data[:beanstalk_error] = exc
- channel.close
+ state.in_stream.close
raise exc
end
end
end
# !SLIDE
# Sends the encoded Result payload String.
- def _send_result state
+ def _after_invoke_message state
#
# There is a possibility here the following could happen:
#
# _receive_message
# channel == #<Channel:1>
@@ -105,10 +107,11 @@
stream = state.result_opaque
job_id = state.message[:beanstalk_job_id] or raise "no beanstalk_job_id"
_beanstalk(stream,
"delete #{job_id}\r\n",
/\ADELETED\r\n\Z/)
+ # state.in_stream.close # Force close.
end
# !SLIDE
# Receives the encoded Result payload String.
def _receive_result state
@@ -134,24 +137,23 @@
stream.write message
if payload
stream.write payload
stream.write LINE_TERMINATOR
end
- stream.flush
- if match = _read_line_and_expect!(stream, expect)
+ if match = _read_line_and_expect!(stream, expect) # , /\A(BAD_FORMAT|UNKNOWN_COMMAND)\r\n\Z/)
_log { [ :_beanstalk, :result, match[0] ] } if @verbose >= 3
end
match
end
LINE_TERMINATOR = "\r\n".freeze
def _after_connect! stream
- if @tube
+ if t = tube
_beanstalk(stream,
- "use #{@tube}\r\n",
- /\AUSING #{@tube}\r\n\Z/)
+ "use #{t}\r\n",
+ /\AUSING #{t}\r\n\Z/)
end
end
# !SLIDE
# Beanstalk Server
@@ -159,13 +161,13 @@
_log { "_server! #{uri}" } if @verbose >= 1
@server = connect!(:try_max => nil,
:try_sleep => 1,
:try_sleep_increment => 0.1,
:try_sleep_max => 10) do | stream |
- if @tube
+ if t = tube
_beanstalk(stream,
- "watch #{@tube}\r\n",
+ "watch #{t}\r\n",
/\AWATCHING (\d+)\r\n\Z/)
end
end
self
end
@@ -186,10 +188,10 @@
false
end
def _start_conduit!
opt = host ? "-l #{host} " : ""
- cmd = "beanstalkd #{opt}-p #{port}"
+ cmd = "beanstalkd #{opt}-p #{port} -z #{1 * 1024 * 1024} #{@conduit_options[:beanstalkd_options]}"
$stderr.puts " #{cmd}" if @conduit_options[:verbose]
exec(cmd)
end
end
# !SLIDE END