Sha256: c8840e41736a42254aecfeabe22ec2b8271f5f10899621abda5dfb46a01bc2f4

Contents?: true

Size: 1.66 KB

Versions: 2

Compression:

Stored size: 1.66 KB

Contents

require 'resque-retry'

module ResqueBus
  # queue'd in each
  class Rider
    extend Resque::Plugins::ExponentialBackoff
    
    def self.perform(attributes = {})
      sub_key = attributes["bus_rider_sub_key"]
      app_key = attributes["bus_rider_app_key"]
      raise "No application key passed" if app_key.to_s == ""
      raise "No subcription key passed" if sub_key.to_s == ""
      
      attributes ||= {}
      
      ResqueBus.log_worker("Rider received: #{app_key} #{sub_key} #{attributes.inspect}")
      
      # attributes that should be available
      # attributes["bus_event_type"]
      # attributes["bus_app_key"]
      # attributes["bus_published_at"]
      # attributes["bus_driven_at"]
      
      # allow the real Reqsue to be used inside the callback while in a worker
      Resque.redis = ResqueBus.original_redis if ResqueBus.original_redis
      
      # (now running with the real app that subscribed)
      ResqueBus.dispatcher_execute(app_key, sub_key, attributes.merge("bus_executed_at" => Time.now.to_i))
    ensure
      # put this back if running in the thread
      Resque.redis = ResqueBus.redis if ResqueBus.original_redis
    end
    
    # @failure_hooks_already_ran on https://github.com/defunkt/resque/tree/1-x-stable
    # to prevent running twice
    def self.queue
      @my_queue
    end
    
    def self.on_failure_aaa(exception, *args)
      # note: sorted alphabetically
      # queue needs to be set for rety to work (know what queue in Requeue.class_to_queue)
      @my_queue = args[0]["bus_rider_queue"]
    end
    
    def self.on_failure_zzz(exception, *args)
      # note: sorted alphabetically
      @my_queue = nil
    end
    
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
resque-bus-0.2.4 lib/resque_bus/rider.rb
resque-bus-0.2.3 lib/resque_bus/rider.rb