lib/pallets/backends/redis.rb in pallets-0.8.0 vs lib/pallets/backends/redis.rb in pallets-0.9.0
- old
+ new
@@ -7,17 +7,19 @@
RELIABILITY_QUEUE_KEY = 'reliability-queue'
RELIABILITY_SET_KEY = 'reliability-set'
RETRY_SET_KEY = 'retry-set'
GIVEN_UP_SET_KEY = 'given-up-set'
WORKFLOW_QUEUE_KEY = 'workflow-queue:%s'
+ JOBMASKS_KEY = 'jobmasks:%s'
JOBMASK_KEY = 'jobmask:%s'
CONTEXT_KEY = 'context:%s'
REMAINING_KEY = 'remaining:%s'
- def initialize(blocking_timeout:, failed_job_lifespan:, job_timeout:, pool_size:, **options)
+ def initialize(blocking_timeout:, failed_job_lifespan:, failed_job_max_count:, job_timeout:, pool_size:, **options)
@blocking_timeout = blocking_timeout
@failed_job_lifespan = failed_job_lifespan
+ @failed_job_max_count = failed_job_max_count
@job_timeout = job_timeout
@pool = Pallets::Pool.new(pool_size) { ::Redis.new(options) }
register_scripts
end
@@ -44,11 +46,11 @@
def save(wfid, jid, job, context_buffer)
@pool.execute do |client|
client.evalsha(
@scripts['save'],
- [WORKFLOW_QUEUE_KEY % wfid, QUEUE_KEY, RELIABILITY_QUEUE_KEY, RELIABILITY_SET_KEY, CONTEXT_KEY % wfid, REMAINING_KEY % wfid, JOBMASK_KEY % jid],
+ [WORKFLOW_QUEUE_KEY % wfid, QUEUE_KEY, RELIABILITY_QUEUE_KEY, RELIABILITY_SET_KEY, CONTEXT_KEY % wfid, REMAINING_KEY % wfid, JOBMASK_KEY % jid, JOBMASKS_KEY % wfid],
context_buffer.to_a << job
)
end
end
@@ -60,20 +62,30 @@
[at, job, old_job]
)
end
end
- def give_up(job, old_job)
+ def discard(job)
@pool.execute do |client|
client.evalsha(
- @scripts['give_up'],
+ @scripts['discard'],
[GIVEN_UP_SET_KEY, RELIABILITY_QUEUE_KEY, RELIABILITY_SET_KEY],
- [Time.now.to_f, job, old_job, Time.now.to_f - @failed_job_lifespan]
+ [Time.now.to_f, job, Time.now.to_f - @failed_job_lifespan, @failed_job_max_count]
)
end
end
+ def give_up(wfid, job, old_job)
+ @pool.execute do |client|
+ client.evalsha(
+ @scripts['give_up'],
+ [GIVEN_UP_SET_KEY, RELIABILITY_QUEUE_KEY, RELIABILITY_SET_KEY, JOBMASKS_KEY % wfid, WORKFLOW_QUEUE_KEY % wfid, REMAINING_KEY % wfid, CONTEXT_KEY % wfid],
+ [Time.now.to_f, job, old_job, Time.now.to_f - @failed_job_lifespan, @failed_job_max_count]
+ )
+ end
+ end
+
def reschedule_all(earlier_than)
@pool.execute do |client|
client.evalsha(
@scripts['reschedule_all'],
[RELIABILITY_SET_KEY, RELIABILITY_QUEUE_KEY, RETRY_SET_KEY, QUEUE_KEY],
@@ -84,9 +96,10 @@
def run_workflow(wfid, jobs, jobmasks, context_buffer)
@pool.execute do |client|
client.multi do
jobmasks.each { |jid, jobmask| client.zadd(JOBMASK_KEY % jid, jobmask) }
+ client.sadd(JOBMASKS_KEY % wfid, jobmasks.map { |jid, _| JOBMASK_KEY % jid }) unless jobmasks.empty?
client.evalsha(
@scripts['run_workflow'],
[WORKFLOW_QUEUE_KEY % wfid, QUEUE_KEY, REMAINING_KEY % wfid],
jobs
)