Sha256: 467f201412601eb6883d996245f0fbe478b8f7f96222773c55d3765109c6a316

Contents?: true

Size: 1.96 KB

Versions: 2

Compression:

Stored size: 1.96 KB

Contents

require 'singleton'
require 'ffi'
require 'em-zeromq'

module Alondra
  class MessageQueue
    include Singleton

    def start_listening
      Log.info "Starting message queue"

      if @pull_socket || @push_socket
        Log.warn 'Connections to message queue started twice'
        reset!
      end
      
      push_socket  
      pull_socket
      
      self
    end

    def on_readable(socket, messages)
      messages.each do |received|
        begin
          parse received.copy_out_string
        rescue Exception => ex
          Log.error "Error raised while processing message"
          Log.error "#{ex.class}: #{ex.message}"
          Log.error ex.backtrace.join("\n") if ex.respond_to? :backtrace
        end
      end
    end

    def parse(received_string)
      received_hash = ActiveSupport::JSON.decode(received_string).symbolize_keys

      if received_hash[:event]
        event = Event.new(received_hash, received_string)
        receive(event)
      elsif received_hash[:message]
        message = Message.new(received_hash[:message], received_hash[:channel_names])
        message.send_to_channels
      else
        Log.warn "Unrecognized message type #{received_string}"
      end
    end

    def receive(event)
      event_router.process(event)
    end
    
    def push_socket
      @push_socket ||= begin
        push_socket = context.socket(ZMQ::PUSH)  
        push_socket.connect(Alondra.config.queue_socket)
        push_socket
      end
    end
    
    def pull_socket
      @pull_socket ||= begin
        pull_socket = context.socket(ZMQ::PULL, self)  
        pull_socket.bind(Alondra.config.queue_socket)
        pull_socket
      end
    end

    def reset!
      @push_socket.close()
      @pull_socket.close()
      
      @context     = nil
      @push_socket = nil
      @pull_socket = nil
    end

    private

    def event_router
      @event_router ||= EventRouter.new
    end

    def context
      @context ||= EM::ZeroMQ::Context.new(1)
    end
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
alondra-0.1.1 lib/alondra/message_queue.rb
alondra-0.1.0 lib/alondra/message_queue.rb