Sha256: 63011dc00311647de214b390854060bd9d620ebece4c0cfde8b95bd6476d2277

Contents?: true

Size: 1.46 KB

Versions: 1

Compression:

Stored size: 1.46 KB

Contents

require 'bunny'
require 'json'

module EasyBunnyRPC
  class Worker
    def initialize(options={})
      @options = options
    end

    def publish_success(payload)
      publish true, payload
    end

    def publish_failure(payload)
      publish false, payload
    end

    def subscribe
      queue.subscribe(block: true) do |delivery_info, properties, payload|
        @delivery_info, @properties, @payload = delivery_info, properties, payload

        yield JSON.parse(payload).first
      end
    end

    def close
      if defined?(@channel)
        channel.close
        remove_instance_variable :@channel
        remove_instance_variable :@queue
      end

      if defined?(@connection)
        connection.close
        remove_instance_variable :@connection
      end
    end

    private

    def publish(success, payload)
      obj = { 'success' => success, 'payload' => payload }.to_json

      exchange.publish(obj, routing_key: @properties.reply_to, correlation_id: @properties.correlation_id)
    end

    def connection
      return @connection if defined?(@connection)

      @connection = Bunny.new(@options[:bunny])
      @connection.start
      @connection
    end

    def channel
      return @channel if defined?(@channel)

      @channel = connection.create_channel
      @channel.prefetch(1)
      @channel
    end

    def queue
      @queue ||= channel.queue(@options[:queue])
    end

    def exchange
      @exchange ||= channel.default_exchange
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
easy_bunny_rpc-0.1.1 lib/easy_bunny_rpc/worker.rb