Sha256: 1e3cc922f62b2ca021899e26cc5d379515ffdb789a6853d03b2994bf2dc53f42

Contents?: true

Size: 1.12 KB

Versions: 1

Compression:

Stored size: 1.12 KB

Contents

require 'json'

class Skein::Client::Worker < Skein::Connected
  # == Instance Methods =====================================================

  def initialize(queue_name, connection: nil, context: nil)
    super(connection: connection, context: context)

    lock do
      queue = self.channel.queue(queue_name, durable: true)

      @handler = Skein::Handler.for(self)

      @subscriber = Skein::Client::Subscriber.new(queue_name, connection: self.connection)

      @thread = Thread.new do
        @subscriber.listen do |payload, delivery_tag, reply_to|
          @handler.handle(payload) do |reply_json|
            channel.acknowledge(delivery_tag, true)

            if (reply_to)
              channel.default_exchange.publish(
                reply_json,
                routing_key: reply_to
              )
            end
          end
        end
      end
    end
  end

  def close
    @thread.kill
    @thread.join

    super
  end

  def join
    @thread and @thread.join
  end

  def async?
    # Define this method as `true` in any subclass that requires async
    # callback-style delegation.
    false
  end

protected
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
skein-0.3.0 lib/skein/client/worker.rb