lib/qpush/server/jobs.rb in qpush-0.1.1 vs lib/qpush/server/jobs.rb in qpush-0.1.2

- old
+ new

@@ -42,34 +42,20 @@ def retry_at Time.now.to_i + ((@total_fail**4) + 15 + (rand(30) * (@total_fail + 1))) end end - class Job - include QPush::Job::Base + class Job < QPush::Job::Base include QPush::Server::JobHelpers include ObjectValidator::Validate - attr_accessor :klass, :id, :priority, :created_at, :start_at, - :cron, :retry_max, :total_success, :total_fail, - :run_time - attr_reader :args, :api + attr_reader :api - def initialize(options = {}) - options = defaults.merge(options) - options.each { |key, value| send("#{key}=", value) } + def initialize(options) + super @api = JobApi.new(self) end - - def args=(args) - @args = - if args.is_a?(String) then JSON.parse(args) - else args - end - rescue JSON::ParserError - @args = nil - end end class JobValidator include ObjectValidator::Validator @@ -89,51 +75,53 @@ end class JobApi def initialize(job) @job = job - @config = QPush.config end - def delay - QPush.redis.with do |conn| - conn.incr("#{@config.stats_namespace}:delayed") - conn.zadd(@config.delay_namespace, @job.delay_until, @job.to_json) - end - end - def queue QPush.redis.with do |conn| - conn.incr("#{@config.stats_namespace}:queued") - conn.lpush("#{@config.queue_namespace}", @job.to_json) + conn.incr("#{QPush.config.stats_namespace}:queued") + conn.lpush("#{QPush.config.queue_namespace}", @job.to_json) end end def execute execute = Execute.new(@job) execute.call end def perform QPush.redis.with do |conn| - conn.incr("#{@config.stats_namespace}:performed") - conn.lpush("#{@config.perform_namespace}:#{@job.priority}", @job.to_json) + conn.incr("#{QPush.config.stats_namespace}:performed") + conn.lpush("#{QPush.config.perform_namespace}:#{@job.priority}", @job.to_json) end end + def delay + send_to_delay('delayed', @job.delay_until) + end + def retry - QPush.redis.with do |conn| - conn.incr("#{@config.stats_namespace}:retries") - conn.zadd(@config.delay_namespace, @job.retry_at, @job.to_json) - end + send_to_delay('retries', @job.retry_at) end def setup fail unless @job.valid? perform if @job.perform_job? delay if @job.delay_job? rescue raise ServerError, 'Invalid job: ' + @job.errors.full_messages.join(' ') + end + + private + + def send_to_delay(stat, time) + QPush.redis.with do |conn| + conn.incr("#{QPush.config.stats_namespace}:#{stat}") + conn.zadd(QPush.config.delay_namespace, time, @job.to_json) + end end end end end