Sha256: b2c391ef90039630e3c91a31a3d182d9c31bcf0b2fed8c2f4814964c25c622b8

Contents?: true

Size: 1.81 KB

Versions: 1

Compression:

Stored size: 1.81 KB

Contents

require 'celluloid/current'
module GBDispatch
  class Queue
    include Celluloid

    # @return [String] queue name
    attr_reader :name

    # @param name [String] queue name, should be the same as is register in Celluloid
    # @param thread_pool [Celluloid::Pool] pool of runners for executing code.
    def initialize(name, thread_pool)
      @name = name
      @thread_pool = thread_pool
      @executing = false
    end

    # Perform given block
    #
    # If used with rails it will wrap block with connection pool.
    # @param block [Proc]
    # @yield if there is no block given it yield without param.
    # @return [Object, Exception] returns value of executed block or exception if block execution failed.
    def perform(block=nil)
      Thread.current[:name] ||= name
      if defined?(Rails) && defined?(ActiveRecord::Base)
        thread_block = ->() do
          begin
            ActiveRecord::Base.connection_pool.with_connection do
              block ? block.call : yield
            end
          ensure
            ActiveRecord::Base.clear_active_connections!
          end
        end
      else
        thread_block = block ? block : ->() { yield }
      end
      while @executing
        sleep(0.0001)
      end
      #exclusive do
        begin
          @executing = true
          @thread_pool.execute thread_block, name: name
        rescue Exception => e
          return e
        ensure
          @executing = false
        end
      #end
    end

    # Perform block after given period
    # @param time [Fixnum]
    # @param block [Proc]
    # @yield if there is no block given it yield without param.
    def perform_after(time, block=nil)
      after(time) do
        block = ->(){ yield } unless block
        self.async.perform block
      end
    end

    def to_s
      self.name.to_s
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
gb_dispatch-0.0.4 lib/gb_dispatch/queue.rb