Sha256: 83cb985c91ecc85a92e04b2683ee7e067523dbe2d24e13bf7d739d1fd1078298
Contents?: true
Size: 1.21 KB
Versions: 2
Compression:
Stored size: 1.21 KB
Contents
module Afterparty class Worker include QueueHelpers def initialize options = {} @options = options @options[:adapter] ||= :redis @options[:namespace] ||= :default @options[:sleep] ||= 10 @options[:logger] ||= Logger.new($stderr) self end def consume @stopped = false # puts "starting worker with namespace [#{@options[:namespace]}]." @thread = Thread.new { consume_sync } @thread end def consume_sync while !@stopped job = next_valid_job if job async_redis_call do @temp_namespace = "completed" redis_call :zadd, Time.now.to_i, Marshal.dump(job) redis_call :zrem, job end run job else sleep(@options[:sleep]) end end end def stop @stopped = true @thread.join(0) end def run(job) fork do Marshal.load(job).run end rescue Exception => exception handle_exception job, exception end def handle_exception(job, exception) @options[:logger].error "Job Error: #{job.inspect}\n#{exception.message}\n#{exception.backtrace.join("\n")}" end end end
Version data entries
2 entries across 2 versions & 1 rubygems
Version | Path |
---|---|
afterparty-0.0.4 | lib/afterparty/worker.rb |
afterparty-0.0.3 | lib/afterparty/worker.rb |