# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # @private module Qpid::Proton module Handler # Adapter to convert raw proton events for the old {Handler::MessagingHandler} # used by the Reactor. class ReactorMessagingAdapter < Adapter def initialize handler super @opts = (handler.options if handler.respond_to?(:options)) || {} @opts[:prefetch] ||= 10 @opts[:peer_close_is_error] = false unless @opts.include? :peer_close_is_error [:auto_accept, :auto_settle, :auto_open, :auto_close].each do |k| @opts[k] = true unless @opts.include? k end end alias dispatch forward def delegate(method, event) event.method = method # Update the event with the new method event.dispatch(@handler) or dispatch(:on_unhandled, event) end def delegate_error(method, event) event.method = method unless event.dispatch(@handler) || dispatch(:on_error, event) dispatch(:on_unhandled, event) event.connection.close(event.context.condition) if @opts[:auto_close] end end def on_container_start(container) delegate(:on_start, Event.new(nil, nil, container)); end def on_container_stop(container) delegate(:on_stop, Event.new(nil, nil, container)); end # Define repetative on_xxx_open/close methods for each endpoint type def self.open_close(endpoint) on_opening = :"on_#{endpoint}_opening" on_opened = :"on_#{endpoint}_opened" on_closing = :"on_#{endpoint}_closing" on_closed = :"on_#{endpoint}_closed" on_error = :"on_#{endpoint}_error" Module.new do define_method(:"on_#{endpoint}_local_open") do |event| delegate(on_opened, event) if event.context.remote_open? end define_method(:"on_#{endpoint}_remote_open") do |event| if event.context.local_open? delegate(on_opened, event) elsif event.context.local_uninit? delegate(on_opening, event) event.context.open if @opts[:auto_open] end end define_method(:"on_#{endpoint}_local_close") do |event| delegate(on_closed, event) if event.context.remote_closed? end define_method(:"on_#{endpoint}_remote_close") do |event| if event.context.remote_condition delegate_error(on_error, event) elsif event.context.local_closed? delegate(on_closed, event) elsif @opts[:peer_close_is_error] Condition.assign(event.context.__send__(:_remote_condition), "unexpected peer close") delegate_error(on_error, event) else delegate(on_closing, event) end event.context.close if @opts[:auto_close] end end end # Generate and include open_close modules for each endpoint type [:connection, :session, :link].each { |endpoint| include open_close(endpoint) } def on_transport_error(event) delegate_error(:on_transport_error, event); end def on_transport_closed(event) delegate(:on_transport_closed, event); end # Add flow control for link opening events def on_link_local_open(event) super; add_credit(event); end def on_link_remote_open(event) super; add_credit(event); end def on_delivery(event) if event.link.receiver? # Incoming message d = event.delivery if d.aborted? delegate(:on_aborted, event) d.settle elsif d.complete? if d.link.local_closed? && @opts[:auto_accept] d.release else begin delegate(:on_message, event) d.accept if @opts[:auto_accept] && !d.settled? rescue Qpid::Proton::Reject d.reject rescue Qpid::Proton::Release d.release(true) end end end delegate(:on_settled, event) if d.settled? add_credit(event) else # Outgoing message t = event.tracker if t.updated? case t.state when Qpid::Proton::Delivery::ACCEPTED then delegate(:on_accepted, event) when Qpid::Proton::Delivery::REJECTED then delegate(:on_rejected, event) when Qpid::Proton::Delivery::RELEASED then delegate(:on_released, event) when Qpid::Proton::Delivery::MODIFIED then delegate(:on_modified, event) end delegate(:on_settled, event) if t.settled? t.settle if @opts[:auto_settle] end end end def on_link_flow(event) add_credit(event) l = event.link delegate(:on_sendable, event) if l.sender? && l.open? && l.credit > 0 end def add_credit(event) r = event.receiver prefetch = @opts[:prefetch] if r && r.open? && (r.drained == 0) && prefetch && (prefetch > r.credit) r.flow(prefetch - r.credit) end end end end end