lib/asir/transport/beanstalk.rb in asir_beanstalk-1.2.0 vs lib/asir/transport/beanstalk.rb in asir_beanstalk-1.2.3

- old
+ new

@@ -7,10 +7,11 @@ class Beanstalk < TcpSocket attr_accessor :tube, :tube_default attr_accessor :priority, :delay, :ttr def initialize *args + @one_way = true self.scheme_default ||= 'beanstalk' self.host_default ||= '127.0.0.1' self.port_default ||= 11300 self.tube_default ||= 'asir' @priority ||= 0 @@ -59,32 +60,33 @@ # Receives the encoded Message payload String. def _receive_message state additional_data = state.additional_data ||= { } state.in_stream.with_stream! do | stream | begin - match = + match = with_force_stop! do _beanstalk(stream, - RESERVE, - /\ARESERVED (\d+) (\d+)\r\n\Z/) + RESERVE, + /\ARESERVED (\d+) (\d+)\r\n\Z/) + end additional_data[:beanstalk_job_id] = match[1].to_i additional_data[:beanstalk_message_size] = size = match[2].to_i state.message_payload = stream.read(size) _read_line_and_expect! stream, /\A\r\n\Z/ state.result_opaque = stream rescue ::Exception => exc _log { [ :_receive_message, :exception, exc ] } additional_data[:beanstalk_error] = exc - channel.close + state.in_stream.close raise exc end end end # !SLIDE # Sends the encoded Result payload String. - def _send_result state + def _after_invoke_message state # # There is a possibility here the following could happen: # # _receive_message # channel == #<Channel:1> @@ -105,10 +107,11 @@ stream = state.result_opaque job_id = state.message[:beanstalk_job_id] or raise "no beanstalk_job_id" _beanstalk(stream, "delete #{job_id}\r\n", /\ADELETED\r\n\Z/) + # state.in_stream.close # Force close. end # !SLIDE # Receives the encoded Result payload String. def _receive_result state @@ -134,24 +137,23 @@ stream.write message if payload stream.write payload stream.write LINE_TERMINATOR end - stream.flush - if match = _read_line_and_expect!(stream, expect) + if match = _read_line_and_expect!(stream, expect) # , /\A(BAD_FORMAT|UNKNOWN_COMMAND)\r\n\Z/) _log { [ :_beanstalk, :result, match[0] ] } if @verbose >= 3 end match end LINE_TERMINATOR = "\r\n".freeze def _after_connect! stream - if @tube + if t = tube _beanstalk(stream, - "use #{@tube}\r\n", - /\AUSING #{@tube}\r\n\Z/) + "use #{t}\r\n", + /\AUSING #{t}\r\n\Z/) end end # !SLIDE # Beanstalk Server @@ -159,13 +161,13 @@ _log { "_server! #{uri}" } if @verbose >= 1 @server = connect!(:try_max => nil, :try_sleep => 1, :try_sleep_increment => 0.1, :try_sleep_max => 10) do | stream | - if @tube + if t = tube _beanstalk(stream, - "watch #{@tube}\r\n", + "watch #{t}\r\n", /\AWATCHING (\d+)\r\n\Z/) end end self end @@ -186,10 +188,10 @@ false end def _start_conduit! opt = host ? "-l #{host} " : "" - cmd = "beanstalkd #{opt}-p #{port}" + cmd = "beanstalkd #{opt}-p #{port} -z #{1 * 1024 * 1024} #{@conduit_options[:beanstalkd_options]}" $stderr.puts " #{cmd}" if @conduit_options[:verbose] exec(cmd) end end # !SLIDE END