lib/pallets/backends/redis.rb in pallets-0.7.0 vs lib/pallets/backends/redis.rb in pallets-0.8.0
- old
+ new
@@ -7,10 +7,11 @@
RELIABILITY_QUEUE_KEY = 'reliability-queue'
RELIABILITY_SET_KEY = 'reliability-set'
RETRY_SET_KEY = 'retry-set'
GIVEN_UP_SET_KEY = 'given-up-set'
WORKFLOW_QUEUE_KEY = 'workflow-queue:%s'
+ JOBMASK_KEY = 'jobmask:%s'
CONTEXT_KEY = 'context:%s'
REMAINING_KEY = 'remaining:%s'
def initialize(blocking_timeout:, failed_job_lifespan:, job_timeout:, pool_size:, **options)
@blocking_timeout = blocking_timeout
@@ -39,15 +40,15 @@
@pool.execute do |client|
client.hgetall(CONTEXT_KEY % wfid)
end
end
- def save(wfid, job, context_buffer)
+ def save(wfid, jid, job, context_buffer)
@pool.execute do |client|
client.evalsha(
@scripts['save'],
- [WORKFLOW_QUEUE_KEY % wfid, QUEUE_KEY, RELIABILITY_QUEUE_KEY, RELIABILITY_SET_KEY, CONTEXT_KEY % wfid, REMAINING_KEY % wfid],
+ [WORKFLOW_QUEUE_KEY % wfid, QUEUE_KEY, RELIABILITY_QUEUE_KEY, RELIABILITY_SET_KEY, CONTEXT_KEY % wfid, REMAINING_KEY % wfid, JOBMASK_KEY % jid],
context_buffer.to_a << job
)
end
end
@@ -79,16 +80,17 @@
[earlier_than]
)
end
end
- def run_workflow(wfid, jobs_with_order, context_buffer)
+ def run_workflow(wfid, jobs, jobmasks, context_buffer)
@pool.execute do |client|
client.multi do
+ jobmasks.each { |jid, jobmask| client.zadd(JOBMASK_KEY % jid, jobmask) }
client.evalsha(
@scripts['run_workflow'],
[WORKFLOW_QUEUE_KEY % wfid, QUEUE_KEY, REMAINING_KEY % wfid],
- jobs_with_order
+ jobs
)
client.hmset(CONTEXT_KEY % wfid, *context_buffer.to_a) unless context_buffer.empty?
end
end
end