# This module is primarily intended for use with ActiveMQ, though # it should work with any Stomp-speaking broker. require 'eventbus/common_init' require_relative 'stomp/slogger' require 'socket' require 'stomp' module StompConnectionDriver @@StompConnector_Connection = nil def connection_driver_initialize end def get_connection opts = { :hosts => [ { :host => ENV["BROKER_HOST"] || "localhost", :port => ENV["BROKER_PORT"] || (ENV['EVENTBUS_STOMP_SSL'] == "false" ? 61613 : 61614), :login => ENV["BROKER_USER"] || "eventbus", :passcode => ENV["BROKER_PASS"] || "eventbus", :ssl => ENV['EVENTBUS_STOMP_SSL'] == "false" ? false : true # True unless specified in ENV } ], :reliable => ENV['EVENTBUS_STOMP_RELIABLE'] == "false" ? false : true, # True unless specified in ENV :initial_reconnect_delay => 1, :max_reconnect_delay => 10, :use_exponential_back_off => true, :back_off_multiplier => 2, :closed_check => ENV['EVENTBUS_STOMP_CLOSED_CHECK'] == "false" ? false : true, :hbser => true, :max_hbread_fails => ENV['EVENTBUS_STOMP_MAX_HBREAD_FAILS'] || 5, :max_hbrlck_fails => ENV['EVENTBUS_STOMP_MAX_HBREADLOCK_FAILS'] || 5, :connect_timeout => 30, :connect_headers => { # Set up heartbeating to keep connection alive and notice right away when it isn't. # # Value is two integer millisecond interval values, comma-delimited # , # Heartbeating can be disabled entirely or from one end by using 0 for # one or both of these values in the EVENTBUS_STOMP_HEARTBEAT_INTERVALS environment variable: # 0,0 Disable all heartbeating # 0,1000 Receive heartbeats from broker every 1000ms, but don't send them. # 1000,0 Send heartbeats from client every 1000ms, but don't look for them from broker. # # This is turned on by default to keep the connection alive in some environments where they can be # dropped silently. If the heartbeat fails, it will trigger the :reliable re-connect. # Best to leave it on. # # For heartbeats to work properly, you must be using Stomp protocol version 1.2 or higher. # Tested to work with Apache Apollo. :host => Socket.gethostname, :"accept-version" => ENV['EVENTBUS_STOMP_PROTOCOL_VERSION'] || "1.2", :"heart-beat" => ENV['EVENTBUS_STOMP_HEARTBEAT_INTERVALS'] || '1000,1000', }, :logger => ENV['EVENTBUS_STOMP_DETAILED_LOGGING'] == "true" ? EventBus::Connectors::Stomp::Slogger.new : nil, } # If we never opened one, or if it's closed and not reliable, then open a new connection. if @@StompConnector_Connection.nil? || (@@StompConnector_Connection.closed? && ! @@StompConnector_Connction.reliable?) EventBus.logger.info("Connecting to STOMP with options: #{opts.to_s}") @@StompConnector_Connection = Stomp::Client.new(opts) end # I'm not sure whether we should sleep here if the connection is closed?() but is reliable?() # Assuming that the connection will do the right thing and punting until I find out different. return @@StompConnector_Connection end # opts basically just gets passed to the Stomp client.send, so it can be things # like :persistent => false, etc. :persistent is true by default. def send_raw(content, opts = {}) queue_name = opts.delete(:queue_name) queue_name = "/queue/#{queue_name}" opts[:persistent] = true if opts[:persistent].nil? # "Fix" an ActiveMQ "bug" where it assumes that any message with a content-length # header is a bytes-message instead of a text message. This behavior prevents the # message conent from showing up in the "Message Details" section when you try # to view the message in ActiveMQ's admin interface. # # Note that the *correct* behavior is the opposite of this: All Stomp messages # are recommended to include content-length. However, this is the *less annoying* # and *more useful* behavior. opts[:suppress_content_length] = true if opts[:suppress_content_length].nil? EventBus.logger.info "Stomp sending message to: #{queue_name}" client = get_connection client.publish(queue_name, content, opts) end def watch_queue(listen_queue) client = get_connection queue_name = listen_queue.match('^/') ? listen_queue : "/queue/#{listen_queue}" # OK, so some assumption that we're using ActiveMQ here. We'll work that out eventually. The point # is, though, that we want to allow for maximum parallelism -- not just by thread but also across # processes and machines. Therefore, don't lock the queue exclusively (i.e. don't kill other clients!) # and only take blocks of one message rather than claiming large blocks. These assumptions should become # configurable at some point, but for now it's not really a concern. client.subscribe(queue_name, {"activemq.prefetchSize" => 1, "activemq.exclusive" => false}) do |msg| yield msg.body end end end