lib/asir/transport/beanstalk.rb in asir_beanstalk-1.1.4 vs lib/asir/transport/beanstalk.rb in asir_beanstalk-1.1.5

- old
+ new

@@ -3,30 +3,44 @@ module ASIR class Transport # !SLIDE # Beanstalk Transport class Beanstalk < TcpSocket - LINE_TERMINATOR = "\r\n".freeze + attr_accessor :tube, :tube_default + attr_accessor :priority, :delay, :ttr - attr_accessor :tube, :priority, :delay, :ttr - def initialize *args - self.port_default ||= 11300 - @tube ||= 'asir' + self.scheme_default ||= 'beanstalk' + self.host_default ||= '127.0.0.1' + self.port_default ||= 11300 + self.tube_default ||= 'asir' @priority ||= 0 @delay ||= 0 @ttr ||= 600 super end + def tube + @tube ||= + @uri && ( + p = _uri.path.sub(%r{\A/}, '') + p = nil if p.empty? + p + ) || tube_default + end + + def path_default + "/#{tube}" + end + # !SLIDE # Sends the encoded Message payload String. def _send_message message, message_payload stream.with_stream! do | s | begin - match = - _beanstalk(s, + match = + _beanstalk(s, "put #{message[:beanstalk_priority] || @priority} #{message[:beanstalk_delay] || @delay} #{message[:beanstalk_ttr] || @ttr} #{message_payload.size}\r\n", /\AINSERTED (\d+)\r\n\Z/, message_payload) job_id = message[:beanstalk_job_id] = match[1].to_i _log { "beanstalk_job_id = #{job_id.inspect}" } if @verbose >= 2 @@ -43,16 +57,16 @@ # !SLIDE # Receives the encoded Message payload String. def _receive_message channel, additional_data channel.with_stream! do | stream | begin - match = + match = _beanstalk(stream, RESERVE, /\ARESERVED (\d+) (\d+)\r\n\Z/) additional_data[:beanstalk_job_id] = match[1].to_i - additional_data[:beanstalk_message_size] = + additional_data[:beanstalk_message_size] = size = match[2].to_i message_payload = stream.read(size) _read_line_and_expect! stream, /\A\r\n\Z/ # Pass the original stream used to #_send_result below. [ message_payload, stream ] @@ -70,19 +84,19 @@ def _send_result message, result, result_payload, channel, stream # # There is a possibility here the following could happen: # # _receive_message - # channel == #<Channel:1> + # channel == #<Channel:1> # channel.stream == #<TCPSocket:1234> # end # ... # ERROR OCCURES: # channel.stream.close # channel.stream = nil # ... - # _send_result + # _send_result # channel == #<Channel:1> # channel.stream == #<TCPSocket:5678> # NEW CONNECTION # stream.write "delete #{job_id}" # ... # @@ -129,10 +143,12 @@ _log { [ :_beanstalk, :result, match[0] ] } if @verbose >= 3 end match end + LINE_TERMINATOR = "\r\n".freeze + def _after_connect! stream if @tube _beanstalk(stream, "use #{@tube}\r\n", /\AUSING #{@tube}\r\n\Z/) @@ -146,11 +162,11 @@ @server = connect!(:try_max => nil, :try_sleep => 1, :try_sleep_increment => 0.1, :try_sleep_max => 10) do | stream | if @tube - _beanstalk(stream, + _beanstalk(stream, "watch #{@tube}\r\n", /\AWATCHING (\d+)\r\n\Z/) end end self @@ -171,11 +187,11 @@ # beanstalk connections are long lived. false end def _start_conduit! - addr = address ? "-l #{address} " : "" - cmd = "beanstalkd #{addr}-p #{port}" + opt = host ? "-l #{host} " : "" + cmd = "beanstalkd #{opt}-p #{port}" $stderr.puts " #{cmd}" if @conduit_options[:verbose] exec(cmd) end end # !SLIDE END