# This module is primarily intended for use with ActiveMQ, though
# it should work with any Stomp-speaking broker.
require 'eventbus/common_init'
require_relative 'stomp/slogger'
require 'socket'
require 'stomp'
module StompConnectionDriver
@@StompConnector_Connection = nil
def connection_driver_initialize
end
def get_connection
opts = {
:hosts => [
{
:host => ENV["BROKER_HOST"] || "localhost",
:port => ENV["BROKER_PORT"] || (ENV['EVENTBUS_STOMP_SSL'] == "false" ? 61613 : 61614),
:login => ENV["BROKER_USER"] || "eventbus",
:passcode => ENV["BROKER_PASS"] || "eventbus",
:ssl => ENV['EVENTBUS_STOMP_SSL'] == "false" ? false : true # True unless specified in ENV
}
],
:reliable => ENV['EVENTBUS_STOMP_RELIABLE'] == "false" ? false : true, # True unless specified in ENV
:initial_reconnect_delay => 1,
:max_reconnect_delay => 10,
:use_exponential_back_off => true,
:back_off_multiplier => 2,
:closed_check => ENV['EVENTBUS_STOMP_CLOSED_CHECK'] == "false" ? false : true,
:hbser => true,
:max_hbread_fails => ENV['EVENTBUS_STOMP_MAX_HBREAD_FAILS'] || 5,
:max_hbrlck_fails => ENV['EVENTBUS_STOMP_MAX_HBREADLOCK_FAILS'] || 5,
:connect_timeout => 30,
:connect_headers => {
# :host
#
# This is the STOMP VHOST, NOT a hostname! Make sure your broker is set up right!
# This is a requirement of STOMP 1.2. The broker may (read: probably will)
# drop connections on you if you're sending a value it doesn't recognize. If you're
# getting lots of "connection reset by peer" and connection drops, dude, look at this really hard.
# Behavior highly dependent on the broker, however. Works fine in Apache Apollo by adding this line
# to the apollo.xml file:
#
# eventbus
#
:host => ENV['BROKER_VHOST'] || "eventbus",
# :accept-version and :heart-beat
#
# Used to set up heart-beating to keep connection alive and notice right away when it isn't.
#
# Value is two integer millisecond interval values, comma-delimited
# ,
# Heart-beating can be disabled entirely or from one end by using 0 for
# one or both of these values in the EVENTBUS_STOMP_HEARTBEAT_INTERVALS environment variable:
# 0,0 Disable all heart-beating
# 0,1000 Receive heart-beats from broker every 1000ms, but don't send them.
# 1000,0 Send heart-beats from client every 1000ms, but don't look for them from broker.
#
# This is turned on by default to keep the connection alive in some environments where they can be
# dropped silently. If the heart-beat fails, it will trigger the :reliable re-connect.
# Best to leave it on.
#
# For heart-beats to work properly, you must be using Stomp protocol version 1.2 or higher.
# Tested to work with Apache Apollo.
:"accept-version" => ENV['EVENTBUS_STOMP_PROTOCOL_VERSION'] || "1.2",
:"heart-beat" => ENV['EVENTBUS_STOMP_HEARTBEAT_INTERVALS'] || '1000,1000',
},
:logger => ENV['EVENTBUS_STOMP_DETAILED_LOGGING'] == "true" ? EventBus::Connectors::Stomp::Slogger.new : nil,
}
# If we never opened one, or if it's closed and not reliable, then open a new connection.
if @@StompConnector_Connection.nil? || (@@StompConnector_Connection.closed? && ! @@StompConnector_Connction.reliable?)
EventBus.logger.info("Connecting to STOMP with options: #{opts.to_s}")
@@StompConnector_Connection = Stomp::Client.new(opts)
end
# I'm not sure whether we should sleep here if the connection is closed?() but is reliable?()
# Assuming that the connection will do the right thing and punting until I find out different.
return @@StompConnector_Connection
end
# opts basically just gets passed to the Stomp client.send, so it can be things
# like :persistent => false, etc. :persistent is true by default.
def send_raw(content, opts = {})
queue_name = opts.delete(:queue_name)
queue_name = "/queue/#{queue_name}"
opts[:persistent] = true if opts[:persistent].nil?
# "Fix" an ActiveMQ "bug" where it assumes that any message with a content-length
# header is a bytes-message instead of a text message. This behavior prevents the
# message conent from showing up in the "Message Details" section when you try
# to view the message in ActiveMQ's admin interface.
#
# Note that the *correct* behavior is the opposite of this: All Stomp messages
# are recommended to include content-length. However, this is the *less annoying*
# and *more useful* behavior.
opts[:suppress_content_length] = true if opts[:suppress_content_length].nil?
EventBus.logger.info "Stomp sending message to: #{queue_name}"
client = get_connection
client.publish(queue_name, content, opts)
end
def watch_queue(listen_queue)
client = get_connection
queue_name = listen_queue.match('^/') ? listen_queue : "/queue/#{listen_queue}"
# OK, so some assumption that we're using ActiveMQ here. We'll work that out eventually. The point
# is, though, that we want to allow for maximum parallelism -- not just by thread but also across
# processes and machines. Therefore, don't lock the queue exclusively (i.e. don't kill other clients!)
# and only take blocks of one message rather than claiming large blocks. These assumptions should become
# configurable at some point, but for now it's not really a concern.
client.subscribe(queue_name, {"activemq.prefetchSize" => 1, "activemq.exclusive" => false}) do |msg|
yield msg.body
end
end
end