lib/asir/transport/beanstalk.rb in asir_beanstalk-1.2.5 vs lib/asir/transport/beanstalk.rb in asir_beanstalk-1.2.6

- old
+ new

@@ -1,6 +1,7 @@ require 'asir/transport/tcp_socket' +require 'yaml' module ASIR class Transport # !SLIDE # Beanstalk Transport @@ -127,9 +128,54 @@ delay end # !SLIDE # Beanstalk protocol support + + def status + { + :beanstalkd => { + :stats => stats, + :stats_tube => tube && stats_tube, + } + } + end + + def stats + _beanstalk_stats_yaml! "stats\r\n" + end + + def stats_job job_id + _beanstalk_stats_yaml! "stats-job #{job_id}\r\n" + end + + def stats_tube tube = nil + tube ||= self.tube + _beanstalk_stats_yaml! "stats-tube #{tube}\r\n" + end + + def _beanstalk_stats_yaml! message, expect = nil + expect ||= /\AOK (\d+)\r\n\Z/ + x = _beanstalk_return_data message, expect + x && ::YAML.load(x) + rescue ASIR::Transport::PayloadIO::UnexpectedResponse => exc + return :NOT_FOUND if exc.received == NOT_FOUND + raise exc + end + NOT_FOUND = "NOT_FOUND\r\n".freeze + + def _beanstalk_return_data message, expect, data_size_field = nil, payload = nil + (@server || stream).with_stream! do | stream | + if match = _beanstalk(stream, message, expect, payload) + size = match[data_size_field ||= 1].to_i + data = stream.read(size) + _read_line_and_expect! stream, /\A\r\n\Z/ + data + else + raise "unexpected result" + end + end + end # Send "something ...\r\n". # Expect /\ASOMETHING (\d+)...\r\n". def _beanstalk stream, message, expect, payload = nil _log { [ :_beanstalk, :message, message ] } if @verbose >= 3