Sha256: b0598a7bae6d84b0578a326e667caaeab95d76c987129257b40ecd2539ae3567

Contents?: true

Size: 1.49 KB

Versions: 1

Compression:

Stored size: 1.49 KB

Contents

require 'singleton'
require 'ffi-rzmq'
require 'em-zeromq'

module Alondra
  class MessageQueueClient

    def self.push(message)
      instance.send_message(message)
    end

    def self.instance
      if EM.reactor_running?
        async_instance
      else
        sync_instance
      end
    end

    def self.async_instance
      @async_instance ||= AsyncMessageQueueClient.new
    end

    def self.sync_instance
      @sync_instance ||= SyncMessageQueueClient.new
    end
  end

  class AsyncMessageQueueClient < MessageQueueClient
    def send_message(message)
      EM.schedule do
        begin
          push_socket.send_msg(message.to_json)
        rescue Exception => ex
          Rails.logger.error "Exception while sending message to message queue: #{ex.message}"
        end
      end
    end

    def push_socket
      @push_socket ||= context.connect(ZMQ::PUB, Alondra.config.queue_socket)
    end

    def context
      @context ||= EM::ZeroMQ::Context.new(1)
    end
  end

  class SyncMessageQueueClient < MessageQueueClient

    def send_message(message)
      begin
        push_socket.send_string(message.to_json)
      rescue Exception => ex
        Rails.logger.error "Exception while sending message to message queue: #{ex.message}"
      end
    end

    def push_socket
      @push_socket ||= begin
        socket = context.socket(ZMQ::PUB)
        socket.connect(Alondra.config.queue_socket)
        socket
      end
    end

    def context
      @context ||= ZMQ::Context.new(1)
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
alondra-0.0.3 lib/alondra/message_queue_client.rb