lib/asir/transport/beanstalk.rb in asir_beanstalk-1.1.5 vs lib/asir/transport/beanstalk.rb in asir_beanstalk-1.2.0

- old
+ new

@@ -32,18 +32,19 @@ "/#{tube}" end # !SLIDE # Sends the encoded Message payload String. - def _send_message message, message_payload + def _send_message state stream.with_stream! do | s | + message = state.message begin match = _beanstalk(s, - "put #{message[:beanstalk_priority] || @priority} #{message[:beanstalk_delay] || @delay} #{message[:beanstalk_ttr] || @ttr} #{message_payload.size}\r\n", + "put #{message[:beanstalk_priority] || @priority} #{message[:beanstalk_delay] || @delay} #{message[:beanstalk_ttr] || @ttr} #{state.message_payload.size}\r\n", /\AINSERTED (\d+)\r\n\Z/, - message_payload) + state.message_payload) job_id = message[:beanstalk_job_id] = match[1].to_i _log { "beanstalk_job_id = #{job_id.inspect}" } if @verbose >= 2 rescue ::Exception => exc message[:beanstalk_error] = exc close @@ -54,24 +55,24 @@ RESERVE = "reserve\r\n".freeze # !SLIDE # Receives the encoded Message payload String. - def _receive_message channel, additional_data - channel.with_stream! do | stream | + def _receive_message state + additional_data = state.additional_data ||= { } + state.in_stream.with_stream! do | stream | begin match = _beanstalk(stream, RESERVE, /\ARESERVED (\d+) (\d+)\r\n\Z/) additional_data[:beanstalk_job_id] = match[1].to_i additional_data[:beanstalk_message_size] = size = match[2].to_i - message_payload = stream.read(size) + state.message_payload = stream.read(size) _read_line_and_expect! stream, /\A\r\n\Z/ - # Pass the original stream used to #_send_result below. - [ message_payload, stream ] + state.result_opaque = stream rescue ::Exception => exc _log { [ :_receive_message, :exception, exc ] } additional_data[:beanstalk_error] = exc channel.close raise exc @@ -79,11 +80,11 @@ end end # !SLIDE # Sends the encoded Result payload String. - def _send_result message, result, result_payload, channel, stream + def _send_result state # # There is a possibility here the following could happen: # # _receive_message # channel == #<Channel:1> @@ -99,22 +100,19 @@ # channel.stream == #<TCPSocket:5678> # NEW CONNECTION # stream.write "delete #{job_id}" # ... # # Therefore: _receiver_message passes the original message stream to us. - # We insure that the same stream is still the active one and use it. - channel.with_stream! do | maybe_other_stream | - _log [ :_send_result, "stream lost" ] if maybe_other_stream != stream - job_id = message[:beanstalk_job_id] or raise "no beanstalk_job_id" - _beanstalk(stream, - "delete #{job_id}\r\n", - /\ADELETED\r\n\Z/) - end + stream = state.result_opaque + job_id = state.message[:beanstalk_job_id] or raise "no beanstalk_job_id" + _beanstalk(stream, + "delete #{job_id}\r\n", + /\ADELETED\r\n\Z/) end # !SLIDE # Receives the encoded Result payload String. - def _receive_result message, opaque_result + def _receive_result state nil end # !SLIDE # Sets beanstalk_delay if message.delay was specified.