lib/qpush/server/jobs.rb in qpush-0.1.1 vs lib/qpush/server/jobs.rb in qpush-0.1.2
- old
+ new
@@ -42,34 +42,20 @@
def retry_at
Time.now.to_i + ((@total_fail**4) + 15 + (rand(30) * (@total_fail + 1)))
end
end
- class Job
- include QPush::Job::Base
+ class Job < QPush::Job::Base
include QPush::Server::JobHelpers
include ObjectValidator::Validate
- attr_accessor :klass, :id, :priority, :created_at, :start_at,
- :cron, :retry_max, :total_success, :total_fail,
- :run_time
- attr_reader :args, :api
+ attr_reader :api
- def initialize(options = {})
- options = defaults.merge(options)
- options.each { |key, value| send("#{key}=", value) }
+ def initialize(options)
+ super
@api = JobApi.new(self)
end
-
- def args=(args)
- @args =
- if args.is_a?(String) then JSON.parse(args)
- else args
- end
- rescue JSON::ParserError
- @args = nil
- end
end
class JobValidator
include ObjectValidator::Validator
@@ -89,51 +75,53 @@
end
class JobApi
def initialize(job)
@job = job
- @config = QPush.config
end
- def delay
- QPush.redis.with do |conn|
- conn.incr("#{@config.stats_namespace}:delayed")
- conn.zadd(@config.delay_namespace, @job.delay_until, @job.to_json)
- end
- end
-
def queue
QPush.redis.with do |conn|
- conn.incr("#{@config.stats_namespace}:queued")
- conn.lpush("#{@config.queue_namespace}", @job.to_json)
+ conn.incr("#{QPush.config.stats_namespace}:queued")
+ conn.lpush("#{QPush.config.queue_namespace}", @job.to_json)
end
end
def execute
execute = Execute.new(@job)
execute.call
end
def perform
QPush.redis.with do |conn|
- conn.incr("#{@config.stats_namespace}:performed")
- conn.lpush("#{@config.perform_namespace}:#{@job.priority}", @job.to_json)
+ conn.incr("#{QPush.config.stats_namespace}:performed")
+ conn.lpush("#{QPush.config.perform_namespace}:#{@job.priority}", @job.to_json)
end
end
+ def delay
+ send_to_delay('delayed', @job.delay_until)
+ end
+
def retry
- QPush.redis.with do |conn|
- conn.incr("#{@config.stats_namespace}:retries")
- conn.zadd(@config.delay_namespace, @job.retry_at, @job.to_json)
- end
+ send_to_delay('retries', @job.retry_at)
end
def setup
fail unless @job.valid?
perform if @job.perform_job?
delay if @job.delay_job?
rescue
raise ServerError, 'Invalid job: ' + @job.errors.full_messages.join(' ')
+ end
+
+ private
+
+ def send_to_delay(stat, time)
+ QPush.redis.with do |conn|
+ conn.incr("#{QPush.config.stats_namespace}:#{stat}")
+ conn.zadd(QPush.config.delay_namespace, time, @job.to_json)
+ end
end
end
end
end