lib/stomp.rb in stomp-1.0.1 vs lib/stomp.rb in stomp-1.0.2
- old
+ new
@@ -1,57 +1,103 @@
+# Copyright 2005-2006 Brian McCallister
+# Copyright 2006 LogicBlaze Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
require 'io/wait'
require 'socket'
require 'thread'
module Stomp
# Low level connection which maps commands and supports
# synchronous receives
class Connection
- def Connection.open(login = "", passcode = "", host='localhost', port=61613)
- Connection.new login, passcode, host, port
+ def Connection.open(login = "", passcode = "", host='localhost', port=61613, reliable=FALSE)
+ Connection.new login, passcode, host, port, reliable
end
# Create a connection, requires a login and passcode.
# Can accept a host (default is localhost), and port
# (default is 61613) to connect to
- def initialize(login, passcode, host='localhost', port=61613)
+ def initialize(login, passcode, host='localhost', port=61613, reliable=FALSE)
+ @host = host
+ @port = port
+ @login = login
+ @passcode = passcode
@transmit_semaphore = Mutex.new
@read_semaphore = Mutex.new
-
- @socket = TCPSocket.open host, port
- transmit "CONNECT", {:login => login, :passcode => passcode}
- @started = true
- @connect = receive()
+ @socket_semaphore = Mutex.new
+ @reliable = reliable
+ @reconnectDelay = 5
+ @closed = FALSE
+ @subscriptions = {}
+ @failure = NIL
+ socket
end
-
+
+ def socket
+ # Need to look into why the following synchronize does not work.
+ #@read_semaphore.synchronize do
+ s = @socket;
+ while s == NIL or @failure != NIL
+ @failure = NIL
+ begin
+ s = TCPSocket.open @host, @port
+ _transmit(s, "CONNECT", {:login => @login, :passcode => @passcode})
+ @connect = _receive(s)
+ # replay any subscriptions.
+ @subscriptions.each { |k,v| _transmit(s, "SUBSCRIBE", v) }
+ rescue
+ @failure = $!;
+ s=NIL;
+ raise unless @reliable
+ $stderr.print "connect failed: " + $! +" will retry in #{@reconnectDelay}\n";
+ sleep(@reconnectDelay);
+ end
+ end
+ @socket = s
+ return s;
+ #end
+ end
+
# Is this connection open?
def open?
- !@socket.closed?
+ !@closed
end
-
+
# Is this connection closed?
def closed?
- !open?
+ @closed
end
-
+
# Begin a transaction, requires a name for the transaction
def begin name, headers={}
headers[:transaction] = name
transmit "BEGIN", headers
end
-
+
# Acknowledge a message, used then a subscription has specified
# client acknowledgement ( connection.subscribe "/queue/a", :ack => 'client'g
#
# Accepts a transaction header ( :transaction => 'some_transaction_id' )
def ack message_id, headers={}
headers['message-id'] = message_id
transmit "ACK", headers
end
-
+
# Commit a transaction by name
def commit name, headers={}
headers[:transaction] = name
transmit "COMMIT", headers
end
@@ -59,95 +105,139 @@
# Abort a transaction by name
def abort name, headers={}
headers[:transaction] = name
transmit "ABORT", headers
end
-
+
# Subscribe to a destination, must specify a name
- def subscribe(name, headers = {})
+ def subscribe(name, headers = {}, subId=NIL)
headers[:destination] = name
transmit "SUBSCRIBE", headers
+
+ # Store the sub so that we can replay if we reconnect.
+ if @reliable
+ subId = name if subId==NIL
+ @subscriptions[subId]=headers
+ end
end
-
+
# Unsubscribe from a destination, must specify a name
- def unsubscribe(name, headers = {})
+ def unsubscribe(name, headers = {}, subId=NIL)
headers[:destination] = name
transmit "UNSUBSCRIBE", headers
+ if @reliable
+ subId = name if subId==NIL
+ @h.delete(subId)
+ end
end
-
+
# Send message to destination
#
# Accepts a transaction header ( :transaction => 'some_transaction_id' )
def send(destination, message, headers={})
headers[:destination] = destination
transmit "SEND", headers, message
end
-
+
# Close this connection
def disconnect(headers = {})
transmit "DISCONNECT", headers
end
-
+
# Return a pending message if one is available, otherwise
# return nil
def poll
@read_semaphore.synchronize do
- return nil unless @socket.ready?
+ return nil if @socket==NIL or !@socket.ready?
return receive
end
end
-
+
# Receive a frame, block until the frame is received
def receive
+ # The recive my fail so we may need to retry.
+ while TRUE
+ begin
+ s = socket
+ return _receive(s)
+ rescue
+ @failure = $!;
+ raise unless @reliable
+ $stderr.print "receive failed: " + $!;
+ end
+ end
+ end
+
+ private
+ def _receive( s )
line = ' '
@read_semaphore.synchronize do
- line = @socket.gets.chomp while line =~ /\A\s*\Z/
+ line = s.gets
+ return NIL if line == NIL
Message.new do |m|
m.command = line.chomp
m.headers = {}
- until (line = @socket.gets.chomp) == ''
+ until (line = s.gets.chomp) == ''
k = (line.strip[0, line.strip.index(':')]).strip
v = (line.strip[line.strip.index(':') + 1, line.strip.length]).strip
m.headers[k] = v
end
+
if (m.headers['content-length'])
- m.body = @socket.read m.headers['content-length'].to_i
- c = @socket.getc
+ m.body = s.read m.headers['content-length'].to_i
+ c = s.getc
raise "Invalid content length received" unless c == 0
else
m.body = ''
- until (c = @socket.getc) == 0
+ until (c = s.getc) == 0
m.body << c.chr
end
end
+ c = s.getc
+ raise "Invalid frame termination received" unless c == 10
end
end
- rescue
- raise "Closed!"
end
private
def transmit(command, headers={}, body='')
+ # The transmit my fail so we may need to retry.
+ while TRUE
+ begin
+ s = socket
+ _transmit(s, command, headers, body)
+ return
+ rescue
+ @failure = $!;
+ raise unless @reliable
+ $stderr.print "transmit failed: " + $!+"\n";
+ end
+ end
+ end
+
+ private
+ def _transmit(s, command, headers={}, body='')
@transmit_semaphore.synchronize do
- data = String.new
- data << command << "\n"
- headers.each {|k,v| data << "#{k}:#{v}\n" }
- data << "content-length: #{body.length}\n"
- data << "content-type: text/plain; charset=UTF-8\n\n"
- data << body << "\0"
- @socket.write data
+ s.puts command
+ headers.each {|k,v| s.puts "#{k}:#{v}" }
+ s.puts "content-length: #{body.length}"
+ s.puts "content-type: text/plain; charset=UTF-8"
+ s.puts
+ s.write body
+ s.write "\0"
end
end
end
# Container class for frames, misnamed technically
class Message
attr_accessor :headers, :body, :command
+
def initialize
yield(self) if block_given?
end
-
+
def to_s
"<Stomp::Message headers=#{headers.inspect} body='#{body}' command='#{command}' >"
end
end
@@ -158,33 +248,42 @@
# in that thread if you have much message volume.
class Client
# Accepts a username (default ""), password (default ""),
# host (default localhost), and port (default 61613)
- def initialize user="", pass="", host="localhost", port=61613
+ def initialize user="", pass="", host="localhost", port=61613, reliable=FALSE
@id_mutex = Mutex.new
@ids = 1
- @connection = Connection.open user, pass, host, port
+ @connection = Connection.open user, pass, host, port, reliable
@listeners = {}
@receipt_listeners = {}
@running = true
+ @replay_messages_by_txn = Hash.new
@listener_thread = Thread.start do
while @running
message = @connection.receive
case
+ when message == NIL:
+ break
when message.command == 'MESSAGE':
- if listener = @listeners[message.headers['destination']]
- listener.call(message)
- end
+ if listener = @listeners[message.headers['destination']]
+ listener.call(message)
+ end
when message.command == 'RECEIPT':
- if listener = @receipt_listeners[message.headers['receipt-id']]
- listener.call(message)
- end
+ if listener = @receipt_listeners[message.headers['receipt-id']]
+ listener.call(message)
+ end
end
end
end
end
+
+ # Join the listener thread for this client,
+ # generally used to wait for a quit signal
+ def join
+ @listener_thread.join
+ end
# Accepts a username (default ""), password (default ""),
# host (default localhost), and port (default 61613)
def self.open user="", pass="", host="localhost", port=61613
Client.new user, pass, host, port
@@ -196,14 +295,26 @@
end
# Abort a transaction by name
def abort name, headers={}
@connection.abort name, headers
+
+ # lets replay any ack'd messages in this transaction
+ replay_list = @replay_messages_by_txn[name]
+ if replay_list
+ replay_list.each do |message|
+ if listener = @listeners[message.headers['destination']]
+ listener.call(message)
+ end
+ end
+ end
end
# Commit a transaction by name
def commit name, headers={}
+ txn_id = headers[:transaction]
+ @replay_messages_by_txn.delete(txn_id)
@connection.commit name, headers
end
# Subscribe to a destination, must be passed a block
# which will be used as a callback listener
@@ -224,10 +335,20 @@
# Acknowledge a message, used then a subscription has specified
# client acknowledgement ( connection.subscribe "/queue/a", :ack => 'client'g
#
# Accepts a transaction header ( :transaction => 'some_transaction_id' )
def acknowledge message, headers={}
+ txn_id = headers[:transaction]
+ if txn_id
+ # lets keep around messages ack'd in this transaction in case we rollback
+ replay_list = @replay_messages_by_txn[txn_id]
+ if replay_list == nil
+ replay_list = []
+ @replay_messages_by_txn[txn_id] = replay_list
+ end
+ replay_list << message
+ end
if block_given?
headers['receipt'] = register_receipt_listener lambda {|r| yield r}
end
@connection.ack message.headers['message-id'], headers
end
@@ -264,7 +385,8 @@
@ids = @ids.succ
end
@receipt_listeners[id] = listener
id
end
+
end
end