lib/pallets/backends/redis.rb in pallets-0.2.0 vs lib/pallets/backends/redis.rb in pallets-0.3.0
- old
+ new
@@ -1,58 +1,55 @@
require 'redis'
module Pallets
module Backends
class Redis < Base
- def initialize(namespace:, blocking_timeout:, job_timeout:, pool_size:, **options)
+ def initialize(namespace:, blocking_timeout:, failed_job_lifespan:, job_timeout:, pool_size:, **options)
@namespace = namespace
@blocking_timeout = blocking_timeout
+ @failed_job_lifespan = failed_job_lifespan
@job_timeout = job_timeout
@pool = Pallets::Pool.new(pool_size) { ::Redis.new(options) }
@queue_key = "#{namespace}:queue"
@reliability_queue_key = "#{namespace}:reliability-queue"
@reliability_set_key = "#{namespace}:reliability-set"
@retry_set_key = "#{namespace}:retry-set"
- @fail_set_key = "#{namespace}:fail-set"
+ @given_up_set_key = "#{namespace}:given-up-set"
@workflow_key = "#{namespace}:workflows:%s"
+ @context_key = "#{namespace}:contexts:%s"
+ @eta_key = "#{namespace}:etas:%s"
register_scripts
end
def pick
- job = @pool.execute do |client|
- client.brpoplpush(@queue_key, @reliability_queue_key, timeout: @blocking_timeout)
- end
- if job
- # We store the job's timeout so we know when to retry jobs that are
- # still on the reliability queue. We do this separately since there is
- # no other way to atomically BRPOPLPUSH from the main queue to a
- # sorted set
- @pool.execute do |client|
+ @pool.execute do |client|
+ job = client.brpoplpush(@queue_key, @reliability_queue_key, timeout: @blocking_timeout)
+ if job
+ # We store the job's timeout so we know when to retry jobs that are
+ # still on the reliability queue. We do this separately since there is
+ # no other way to atomically BRPOPLPUSH from the main queue to a
+ # sorted set
client.zadd(@reliability_set_key, Time.now.to_f + @job_timeout, job)
end
+ job
end
- job
end
- def save(workflow_id, job)
+ def get_context(workflow_id)
@pool.execute do |client|
- client.eval(
- @scripts['save'],
- [@workflow_key % workflow_id, @queue_key, @reliability_queue_key, @reliability_set_key],
- [job]
- )
+ client.hgetall(@context_key % workflow_id)
end
end
- def discard(job)
+ def save(workflow_id, job, context_buffer)
@pool.execute do |client|
client.eval(
- @scripts['discard'],
- [@reliability_queue_key, @reliability_set_key],
- [job]
+ @scripts['save'],
+ [@workflow_key % workflow_id, @queue_key, @reliability_queue_key, @reliability_set_key, @context_key % workflow_id, @eta_key % workflow_id],
+ context_buffer.to_a << job
)
end
end
def retry(job, old_job, at)
@@ -63,16 +60,16 @@
[at, job, old_job]
)
end
end
- def give_up(job, old_job, at)
+ def give_up(job, old_job)
@pool.execute do |client|
client.eval(
@scripts['give_up'],
- [@fail_set_key, @reliability_queue_key, @reliability_set_key],
- [at, job, old_job]
+ [@given_up_set_key, @reliability_queue_key, @reliability_set_key],
+ [Time.now.to_f, job, old_job, Time.now.to_f - @failed_job_lifespan]
)
end
end
def reschedule_all(earlier_than)
@@ -83,16 +80,19 @@
[earlier_than]
)
end
end
- def run_workflow(workflow_id, jobs_with_order)
+ def run_workflow(workflow_id, jobs_with_order, context)
@pool.execute do |client|
- client.eval(
- @scripts['run_workflow'],
- [@workflow_key % workflow_id, @queue_key],
- jobs_with_order
- )
+ client.multi do
+ client.eval(
+ @scripts['run_workflow'],
+ [@workflow_key % workflow_id, @queue_key, @eta_key % workflow_id],
+ jobs_with_order
+ )
+ client.hmset(@context_key % workflow_id, *context.to_a) unless context.empty?
+ end
end
end
private