Sha256: d6bc98e44d177ea44f1ca9b91d0555aa2296a89e40006f8ac0b9663f4b6e462f
Contents?: true
Size: 1.94 KB
Versions: 3
Compression:
Stored size: 1.94 KB
Contents
require 'concurrent' module GBDispatch class Queue include Concurrent::Async # @return [String] queue name attr_reader :name # @param name [String] queue name, should be the same as is register in Celluloid def initialize(name) super() @name = name end # Perform given block # # If used with rails it will wrap block with connection pool. # @param block [Proc] # @yield if there is no block given it yield without param. # @return [Object, Exception] returns value of executed block or exception if block execution failed. def perform_now(block=nil) Thread.current[:name] ||= name if defined?(Rails) && defined?(ActiveRecord::Base) require 'gb_dispatch/active_record_patch' thread_block = ->() do if Rails::VERSION::MAJOR < 5 begin ActiveRecord::Base.connection_pool.force_new_connection do block ? block.call : yield end ensure ActiveRecord::Base.clear_active_connections! end else Rails.application.executor.wrap do ActiveRecord::Base.connection_pool.force_new_connection do block ? block.call : yield end end end end else thread_block = block ? block : ->() { yield } end begin Runner.execute thread_block, name: name rescue Exception => e return e end end # Perform block after given period # @param time [Fixnum] # @param block [Proc] # @yield if there is no block given it yield without param. # @return [Concurrent::ScheduledTask] def perform_after(time, block=nil) task = Concurrent::ScheduledTask.new(time) do block = ->(){ yield } unless block self.async.perform_now block end task.execute task end def to_s self.name.to_s end end end
Version data entries
3 entries across 3 versions & 1 rubygems
Version | Path |
---|---|
gb_dispatch-0.1.2 | lib/gb_dispatch/queue.rb |
gb_dispatch-0.1.1 | lib/gb_dispatch/queue.rb |
gb_dispatch-0.1.0 | lib/gb_dispatch/queue.rb |