Sha256: 12884db357cb35cb95ddbaa2b8ab7f195772435210050ef8292eb422f7970c4d

Contents?: true

Size: 1.68 KB

Versions: 1

Compression:

Stored size: 1.68 KB

Contents

module FrenzyBunnies::Worker
  import java.util.concurrent.Executors
  
  def ack!
    true
  end
  def work

  end

  def self.included(base)
    base.extend ClassMethods
  end

  module ClassMethods
    def from_queue(q, opts={})
      @queue_name = q
      @queue_opts = opts
    end

    def start(context)
      @logger = context.logger

      queue_name = "#{@queue_name}_#{context.env}"

      @queue_opts[:prefetch] ||= 10
      @queue_opts[:durable] ||= false

      if @queue_opts[:threads]
        @thread_pool = Executors.new_fixed_thread_pool(@queue_opts[:threads])
      else
        @thread_pool = Executors.new_cached_thread_pool
      end

      q = context.queue_factory.build_queue(queue_name, @queue_opts[:prefetch], @queue_opts[:durable])
      @s = q.subscribe(:ack => true)

      say "#{@queue_opts[:threads] ? "#{@queue_opts[:threads]} threads " : ''}with #{@queue_opts[:prefetch]} prefetch on <#{queue_name}>."

      @s.each(:blocking => false, :executor => @thread_pool) do |h, msg|
        wkr = new
        begin
          if(wkr.work(msg))
            h.ack
          else
            h.reject
            error "Cannot process message <#{msg.inspect}>"
          end
        rescue
          h.reject
          error "ERROR #{$!}"
        end
      end

      say "workers up."
    end

    def stop
      say "stopping"
      @thread_pool.shutdown_now
      say "pool shutdown"
      # @s.cancel  #for some reason when the channel socket is broken, this is holding the process up and we're zombie.
      say "stopped"
    end

    def say(text)
      @logger.info "[#{self.name}] #{text}"
    end
    
    def error(text)
      @logger.error "[#{self.name}] #{text}"
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
frenzy_bunnies-0.0.3 lib/frenzy_bunnies/worker.rb