Sha256: d43ea0172637bd7d2054df096717b458f9874e4f18a564cf260934e6a5c7638e

Contents?: true

Size: 1.04 KB

Versions: 1

Compression:

Stored size: 1.04 KB

Contents

require "beetle_joevandyk_extensions/version"
require 'beetle'
require 'timeout'

module BeetleJoevandykExtensions
  class BeetleError < RuntimeError; end

  DEFAULT_TIMEOUT = 10

  def rpc name, message, options={}
    client = Beetle::Client.new
    client.register_message name
    Timeout.timeout(options[:timeout] || DEFAULT_TIMEOUT, BeetleError) do
      status, result = client.rpc(name, message.to_json)
      raise BeetleError.new(result.to_s) if status != "OK"
      JSON.parse(result)
    end
  end

  def listen name, klass=nil, &block
    client = Beetle::Client.new
    client.register_queue name
    client.register_handler name, klass do |input|
      json = JSON.parse(input.data)
      if klass
        klass.call(json).to_json
      else
        block.call(json).to_json
      end
    end
    client.listen do
      puts "Started #{ name } at #{ Time.now }"
    end
  end

  def publish name, message
    client = Beetle::Client.new
    client.register_message name
    client.publish name, message
  end
end

Beetle.extend(BeetleJoevandykExtensions)

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
beetle_joevandyk_extensions-0.0.6 lib/beetle_joevandyk_extensions.rb