lib/asir/transport/beanstalk.rb in asir_beanstalk-1.2.5 vs lib/asir/transport/beanstalk.rb in asir_beanstalk-1.2.6
- old
+ new
@@ -1,6 +1,7 @@
require 'asir/transport/tcp_socket'
+require 'yaml'
module ASIR
class Transport
# !SLIDE
# Beanstalk Transport
@@ -127,9 +128,54 @@
delay
end
# !SLIDE
# Beanstalk protocol support
+
+ def status
+ {
+ :beanstalkd => {
+ :stats => stats,
+ :stats_tube => tube && stats_tube,
+ }
+ }
+ end
+
+ def stats
+ _beanstalk_stats_yaml! "stats\r\n"
+ end
+
+ def stats_job job_id
+ _beanstalk_stats_yaml! "stats-job #{job_id}\r\n"
+ end
+
+ def stats_tube tube = nil
+ tube ||= self.tube
+ _beanstalk_stats_yaml! "stats-tube #{tube}\r\n"
+ end
+
+ def _beanstalk_stats_yaml! message, expect = nil
+ expect ||= /\AOK (\d+)\r\n\Z/
+ x = _beanstalk_return_data message, expect
+ x && ::YAML.load(x)
+ rescue ASIR::Transport::PayloadIO::UnexpectedResponse => exc
+ return :NOT_FOUND if exc.received == NOT_FOUND
+ raise exc
+ end
+ NOT_FOUND = "NOT_FOUND\r\n".freeze
+
+ def _beanstalk_return_data message, expect, data_size_field = nil, payload = nil
+ (@server || stream).with_stream! do | stream |
+ if match = _beanstalk(stream, message, expect, payload)
+ size = match[data_size_field ||= 1].to_i
+ data = stream.read(size)
+ _read_line_and_expect! stream, /\A\r\n\Z/
+ data
+ else
+ raise "unexpected result"
+ end
+ end
+ end
# Send "something ...\r\n".
# Expect /\ASOMETHING (\d+)...\r\n".
def _beanstalk stream, message, expect, payload = nil
_log { [ :_beanstalk, :message, message ] } if @verbose >= 3