Sha256: e142f5563cc685b06dbefdbd6e60594d773395ee18d804e016c8bf2acbbb8931

Contents?: true

Size: 1.93 KB

Versions: 7

Compression:

Stored size: 1.93 KB

Contents

module Resque
  module Integration
    # Continuous job can re-enqueue self with respect to resque-lock and resque-meta.
    #
    # @example
    #   class ResqueJob
    #     include Resque::Integration
    #
    #     unique
    #     continuous
    #
    #     def self.execute(id)
    #       chunk = Company.find(id).products.limit(1000)
    #
    #       if chunk.size > 0
    #         heavy_work
    #         continue # it will re-enqueue the job with the same arguments. Avoid infinite loops!
    #       end
    #     end
    #   end
    module Continuous
      # Remove any locks if needed and re-enqueue job with the same arguments
      def continue(*args)
        @continued = args
      end
      private :continue # one should not call it from outside

      # This callback resets Meta's finish flags
      def after_perform_reset_meta(*)
        if should_reset_meta?
          meta_obj = meta

          meta_obj.data.delete('succeeded')
          meta_obj.data.delete('finished_at')

          meta_obj.save
        end
      end

      # Just to ensure that previous jobs won't affect current
      def before_perform_continue(*)
        @continued = nil
      end

      # `after` callbacks are executed after `around` callbacks
      # so here we can re-enqueue the job, because lock (from resque-lock) already released
      def after_perform_continue(*args)
        if continued?
          args = if @continued.any?
            if unique?
              meta_id = args.first # we should keep meta_id as first argument
              [meta_id] + @continued
            else
              @continued
            end
          else
            args
          end

          ::Resque.enqueue(self, *args)
        end
      end

      private

      def continued?
        !@continued.nil?
      end

      def should_reset_meta?
        continued? && unique? && meta
      end
    end # module Continuous
  end # module Integration
end # module Resque

Version data entries

7 entries across 7 versions & 1 rubygems

Version Path
resque-integration-3.8.2 lib/resque/integration/continuous.rb
resque-integration-3.8.1 lib/resque/integration/continuous.rb
resque-integration-3.8.0 lib/resque/integration/continuous.rb
resque-integration-3.7.1 lib/resque/integration/continuous.rb
resque-integration-3.5.1 lib/resque/integration/continuous.rb
resque-integration-3.5.0 lib/resque/integration/continuous.rb
resque-integration-3.4.1 lib/resque/integration/continuous.rb