Sha256: bbad9657d16ede83ec335b0698e7126d99225d316474d29403fe297ec896041a

Contents?: true

Size: 1.56 KB

Versions: 2

Compression:

Stored size: 1.56 KB

Contents

require 'singleton'
require 'ffi-rzmq'
require 'em-zeromq'

module Alondra
  class MessageQueueClient

    def self.push(message)
      instance.send_message(message)
    end

    def self.instance
      if EM.reactor_running?
        async_instance
      else
        sync_instance
      end
    end

    def self.async_instance
      @async_instance ||= AsyncMessageQueueClient.new
    end

    def self.sync_instance
      @sync_instance ||= SyncMessageQueueClient.new
    end
  end

  class AsyncMessageQueueClient < MessageQueueClient
    def send_message(message)
      EM.schedule do
        begin
          push_socket.send_msg(message.to_json)
        rescue Exception => ex
          Log.error "Exception while sending message to message queue: #{ex.message}"
        end
      end
    end

    def push_socket
      @push_socket ||= begin
        push_socket = context.socket(ZMQ::PUSH)
        push_socket.connect(Alondra.config.queue_socket)
        push_socket
      end
    end

    def context
      @context ||= EM::ZeroMQ::Context.new(1)
    end
  end

  class SyncMessageQueueClient < MessageQueueClient

    def send_message(message)
      begin
        push_socket.send_string(message.to_json)
      rescue Exception => ex
        Log.error "Exception while sending message to message queue: #{ex.message}"
      end
    end

    def push_socket
      @push_socket ||= begin
        socket = context.socket(ZMQ::PUSH)
        socket.connect(Alondra.config.queue_socket)
        socket
      end
    end

    def context
      @context ||= ZMQ::Context.new(1)
    end
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
alondra-0.1.1 lib/alondra/message_queue_client.rb
alondra-0.1.0 lib/alondra/message_queue_client.rb