Sha256: 49f89a03e5888ec6d305a6a6afffa435c336f4ad7942d98841d3c52bac874a65

Contents?: true

Size: 1.11 KB

Versions: 1

Compression:

Stored size: 1.11 KB

Contents

require 'celluloid'
module GBDispatch
  class Queue
    include Celluloid

    # @return [String] queue name
    attr_reader :name

    # @param name [String] queue name, should be the same as is register in Celluloid
    # @param thread_pool [Celluloid::Pool] pool of runners for executing code.
    def initialize(name, thread_pool)
      @name = name
      @thread_pool = thread_pool
    end

    #
    # @param block [Proc]
    # @yield if there is no block given it yield without param.
    def perform(block=nil)
      Thread.current[:name] ||= name
      if defined?(Rails) && defined?(ActiveRecord::Base)
        thread_block = ->() do
          begin
            ActiveRecord::Base.connection_pool.with_connection do
              block ? block.call : yield
            end
          ensure
            ActiveRecord::Base.clear_active_connections!
          end
        end
      else
        thread_block = block ? block : ->() { yield }
      end
      exclusive do
        begin
          @thread_pool.execute thread_block, name: name
        rescue Exception => e
          return e
        end
      end
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
gb_dispatch-0.0.1 lib/gb_dispatch/queue.rb