lib/asir/transport/beanstalk.rb in asir_beanstalk-1.1.5 vs lib/asir/transport/beanstalk.rb in asir_beanstalk-1.2.0
- old
+ new
@@ -32,18 +32,19 @@
"/#{tube}"
end
# !SLIDE
# Sends the encoded Message payload String.
- def _send_message message, message_payload
+ def _send_message state
stream.with_stream! do | s |
+ message = state.message
begin
match =
_beanstalk(s,
- "put #{message[:beanstalk_priority] || @priority} #{message[:beanstalk_delay] || @delay} #{message[:beanstalk_ttr] || @ttr} #{message_payload.size}\r\n",
+ "put #{message[:beanstalk_priority] || @priority} #{message[:beanstalk_delay] || @delay} #{message[:beanstalk_ttr] || @ttr} #{state.message_payload.size}\r\n",
/\AINSERTED (\d+)\r\n\Z/,
- message_payload)
+ state.message_payload)
job_id = message[:beanstalk_job_id] = match[1].to_i
_log { "beanstalk_job_id = #{job_id.inspect}" } if @verbose >= 2
rescue ::Exception => exc
message[:beanstalk_error] = exc
close
@@ -54,24 +55,24 @@
RESERVE = "reserve\r\n".freeze
# !SLIDE
# Receives the encoded Message payload String.
- def _receive_message channel, additional_data
- channel.with_stream! do | stream |
+ def _receive_message state
+ additional_data = state.additional_data ||= { }
+ state.in_stream.with_stream! do | stream |
begin
match =
_beanstalk(stream,
RESERVE,
/\ARESERVED (\d+) (\d+)\r\n\Z/)
additional_data[:beanstalk_job_id] = match[1].to_i
additional_data[:beanstalk_message_size] =
size = match[2].to_i
- message_payload = stream.read(size)
+ state.message_payload = stream.read(size)
_read_line_and_expect! stream, /\A\r\n\Z/
- # Pass the original stream used to #_send_result below.
- [ message_payload, stream ]
+ state.result_opaque = stream
rescue ::Exception => exc
_log { [ :_receive_message, :exception, exc ] }
additional_data[:beanstalk_error] = exc
channel.close
raise exc
@@ -79,11 +80,11 @@
end
end
# !SLIDE
# Sends the encoded Result payload String.
- def _send_result message, result, result_payload, channel, stream
+ def _send_result state
#
# There is a possibility here the following could happen:
#
# _receive_message
# channel == #<Channel:1>
@@ -99,22 +100,19 @@
# channel.stream == #<TCPSocket:5678> # NEW CONNECTION
# stream.write "delete #{job_id}"
# ...
#
# Therefore: _receiver_message passes the original message stream to us.
- # We insure that the same stream is still the active one and use it.
- channel.with_stream! do | maybe_other_stream |
- _log [ :_send_result, "stream lost" ] if maybe_other_stream != stream
- job_id = message[:beanstalk_job_id] or raise "no beanstalk_job_id"
- _beanstalk(stream,
- "delete #{job_id}\r\n",
- /\ADELETED\r\n\Z/)
- end
+ stream = state.result_opaque
+ job_id = state.message[:beanstalk_job_id] or raise "no beanstalk_job_id"
+ _beanstalk(stream,
+ "delete #{job_id}\r\n",
+ /\ADELETED\r\n\Z/)
end
# !SLIDE
# Receives the encoded Result payload String.
- def _receive_result message, opaque_result
+ def _receive_result state
nil
end
# !SLIDE
# Sets beanstalk_delay if message.delay was specified.