module Zookeeper # @private # sigh, slightly different than the userland callbacks, the continuation # provides sync call semantics around an async api class Continuation include Constants include Logger OPERATION_TIMEOUT = 30 # seconds # for keeping track of which continuations are pending, and which ones have # been submitted and are awaiting a repsonse # # `state_check` are high-priority checks that query the connection about # its current state, they always run before other continuations # class Registry < Struct.new(:pending, :state_check, :in_flight) extend Forwardable def_delegators :@mutex, :lock, :unlock def initialize super([], [], {}) @mutex = Mutex.new end def synchronize @mutex.lock begin yield self ensure @mutex.unlock rescue nil end end # does not lock the mutex, returns true if there are pending jobs def anything_to_do? (pending.length + state_check.length) > 0 end # returns the pending continuations, resetting the list # this method is synchronized def next_batch() @mutex.lock begin state_check.slice!(0, state_check.length) + pending.slice!(0,pending.length) ensure @mutex.unlock rescue nil end end end # Registry # *sigh* what is the index in the *args array of the 'callback' param CALLBACK_ARG_IDX = { :get => 2, :set => 3, :exists => 2, :create => 3, :delete => 3, :get_acl => 2, :set_acl => 3, :get_children => 2, :state => 0, :add_auth => 2 } # maps the method name to the async return hash keys it should use to # deliver the results METH_TO_ASYNC_RESULT_KEYS = { :get => [:rc, :data, :stat], :set => [:rc, :stat], :exists => [:rc, :stat], :create => [:rc, :string], :delete => [:rc], :get_acl => [:rc, :acl, :stat], :set_acl => [:rc], :get_children => [:rc, :strings, :stat], :add_auth => [:rc] } attr_accessor :meth, :block, :rval attr_reader :args def initialize(meth, *args) @meth = meth @args = args.freeze @mutex = Monitor.new @cond = @mutex.new_cond @rval = nil # make this error reporting more robust if necessary, right now, just set to state @error = nil end # the caller calls this method and receives the response from the async loop # this method has a hard-coded 30 second timeout as a safety feature. No # call should take more than 20s (as the session timeout is set to 20s) # so if any call takes longer than that, something has gone horribly wrong. # # @raise [ContinuationTimeoutError] if a response is not received within 30s # def value time_to_stop = Time.now + OPERATION_TIMEOUT now = nil @mutex.synchronize do while true now = Time.now break if @rval or @error or (now > time_to_stop) deadline = time_to_stop.to_f - now.to_f @cond.wait(deadline) end if (now > time_to_stop) and !@rval and !@error raise Exceptions::ContinuationTimeoutError, "response for meth: #{meth.inspect}, args: #{@args.inspect}, not received within #{OPERATION_TIMEOUT} seconds" end case @error when nil # ok, nothing to see here, carry on when :shutdown raise Exceptions::NotConnected, "the connection is shutting down" when ZOO_EXPIRED_SESSION_STATE raise Exceptions::SessionExpired, "connection has expired" else raise Exceptions::NotConnected, "connection state is #{STATE_NAMES[@error]}" end case @rval.length when 1 return @rval.first else return @rval end end end # receive the response from the server, set @rval, notify caller def call(hash) logger.debug { "continuation req_id #{req_id}, got hash: #{hash.inspect}" } @rval = hash.values_at(*METH_TO_ASYNC_RESULT_KEYS.fetch(meth)) logger.debug { "delivering result #{@rval.inspect}" } deliver! end def user_callback? !!@args.at(callback_arg_idx) end # this method is called by the event thread to submit the request # passed the CZookeeper instance, makes the async call and deals with the results # # BTW: in case you were wondering this is a completely stupid # implementation, but it's more important to get *something* working and # passing specs, then refactor to make everything sane # # def submit(czk) state = czk.zkrb_state # check the state of the connection if @meth == :state # if the method is a state call @rval = [state] # we're done, no error return deliver! elsif state != ZOO_CONNECTED_STATE # otherwise, we must be connected @error = state # so set the error return deliver! # and we're out end rc, *_ = czk.__send__(:"zkrb_#{@meth}", *async_args) if user_callback? or (rc != ZOK) # async call, or we failed to submit it @rval = [rc] # create the repsonse deliver! # wake the caller and we're out end end def req_id @args.first end def state_call? @meth == :state end # interrupt the sleeping thread with a NotConnected error def shutdown! @mutex.synchronize do return if @rval or @error @error = :shutdown @cond.broadcast end end protected # an args array with the only difference being that if there's a user # callback provided, we don't handle delivering the end result def async_args return [] if @meth == :state # special-case :P ary = @args.dup logger.debug { "async_args, meth: #{meth} ary: #{ary.inspect}, #{callback_arg_idx}" } ary[callback_arg_idx] ||= self ary end def callback_arg_idx CALLBACK_ARG_IDX.fetch(meth) { raise ArgumentError, "unknown method #{meth.inspect}" } end def deliver! @mutex.synchronize do @cond.signal end end end # Base end