Sha256: bbad9657d16ede83ec335b0698e7126d99225d316474d29403fe297ec896041a
Contents?: true
Size: 1.56 KB
Versions: 2
Compression:
Stored size: 1.56 KB
Contents
require 'singleton' require 'ffi-rzmq' require 'em-zeromq' module Alondra class MessageQueueClient def self.push(message) instance.send_message(message) end def self.instance if EM.reactor_running? async_instance else sync_instance end end def self.async_instance @async_instance ||= AsyncMessageQueueClient.new end def self.sync_instance @sync_instance ||= SyncMessageQueueClient.new end end class AsyncMessageQueueClient < MessageQueueClient def send_message(message) EM.schedule do begin push_socket.send_msg(message.to_json) rescue Exception => ex Log.error "Exception while sending message to message queue: #{ex.message}" end end end def push_socket @push_socket ||= begin push_socket = context.socket(ZMQ::PUSH) push_socket.connect(Alondra.config.queue_socket) push_socket end end def context @context ||= EM::ZeroMQ::Context.new(1) end end class SyncMessageQueueClient < MessageQueueClient def send_message(message) begin push_socket.send_string(message.to_json) rescue Exception => ex Log.error "Exception while sending message to message queue: #{ex.message}" end end def push_socket @push_socket ||= begin socket = context.socket(ZMQ::PUSH) socket.connect(Alondra.config.queue_socket) socket end end def context @context ||= ZMQ::Context.new(1) end end end
Version data entries
2 entries across 2 versions & 1 rubygems
Version | Path |
---|---|
alondra-0.1.1 | lib/alondra/message_queue_client.rb |
alondra-0.1.0 | lib/alondra/message_queue_client.rb |