Sha256: 66385e3325dbd26f673e154e98ebe4a8a1fc7034d4f47d6ce8b06db37a9c9314

Contents?: true

Size: 1.77 KB

Versions: 2

Compression:

Stored size: 1.77 KB

Contents

module RCelery
  class Task
    module States
      SUCCESS = 'SUCCESS'.freeze
      RETRY   = 'RETRY'.freeze
      FAILURE = 'FAILURE'.freeze
    end

    class Runner
      include States

      attr_reader :task, :result, :status

      def initialize(message)
        @task = Task.all_tasks[message['task']]
        @task_id = message['id']
        @eager = message['eager'].nil? ? false : message['eager']

        @args = [message['args'], message['kwargs']].flatten.compact
        @args.pop if @args.last.is_a?(Hash) && @args.last.empty?

        @queue = Task.result_queue(@task_id) unless eager_mode?
        @task.request.update(
          :task_id => @task_id,
          :retries => message['retries'] || 0,
          :args => message['args'],
          :kwargs => message['kwargs']
        )
      end

      def execute
        result = @task.method.call(*@args)
        @status = SUCCESS
        @result = result
        publish_result if publish_result?
      rescue RetryError => raised
        @result = raised
        @status = RETRY
      rescue Exception => raised
        @result = raised
        @status = FAILURE
        publish_result if publish_result?
      ensure
        @task.request.clear
      end

      private
      def publish_result
        traceback = []

        if @status == FAILURE
          traceback = result.backtrace
        end

        RCelery.publish(:result, {
            :result => @result,
            :status => @status,
            :task_id => @task_id,
            :traceback => traceback },
          :routing_key => @task_id.gsub('-', ''),
          :persistent => true)
      end

      def eager_mode?
        @eager == true
      end

      def publish_result?
        @task.ignore_result? == false && eager_mode? == false
      end
    end
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
rcelery-1.0.1 lib/rcelery/task/runner.rb
rcelery-1.0.0 lib/rcelery/task/runner.rb