lib/gush/client.rb in gush-1.0.0 vs lib/gush/client.rb in gush-1.1.0
- old
+ new
@@ -71,11 +71,11 @@
id
end
def all_workflows
connection_pool.with do |redis|
- redis.keys("gush.workflows.*").map do |key|
+ redis.scan_each(match: "gush.workflows.*").map do |key|
id = key.sub("gush.workflows.", "")
find_workflow(id)
end
end
end
@@ -84,11 +84,11 @@
connection_pool.with do |redis|
data = redis.get("gush.workflows.#{id}")
unless data.nil?
hash = Gush::JSON.decode(data, symbolize_keys: true)
- keys = redis.keys("gush.jobs.#{id}.*")
+ keys = redis.scan_each(match: "gush.jobs.#{id}.*")
nodes = redis.mget(*keys).map { |json| Gush::JSON.decode(json, symbolize_keys: true) }
workflow_from_hash(hash, nodes)
else
raise WorkflowNotFound.new("Workflow with given id doesn't exist")
end
@@ -114,11 +114,11 @@
def find_job(workflow_id, job_id)
job_name_match = /(?<klass>\w*[^-])-(?<identifier>.*)/.match(job_id)
hypen = '-' if job_name_match.nil?
keys = connection_pool.with do |redis|
- redis.keys("gush.jobs.#{workflow_id}.#{job_id}#{hypen}*")
+ redis.scan_each(match: "gush.jobs.#{workflow_id}.#{job_id}#{hypen}*").to_a
end
return nil if keys.nil?
data = connection_pool.with do |redis|
@@ -142,26 +142,41 @@
connection_pool.with do |redis|
redis.del("gush.jobs.#{workflow_id}.#{job.name}")
end
end
+ def expire_workflow(workflow, ttl=nil)
+ ttl = ttl || configuration.ttl
+ connection_pool.with do |redis|
+ redis.expire("gush.workflows.#{workflow.id}", ttl)
+ end
+ workflow.jobs.each {|job| expire_job(workflow.id, job, ttl) }
+ end
+
+ def expire_job(workflow_id, job, ttl=nil)
+ ttl = ttl || configuration.ttl
+ connection_pool.with do |redis|
+ redis.expire("gush.jobs.#{workflow_id}.#{job.name}", ttl)
+ end
+ end
+
def enqueue_job(workflow_id, job)
job.enqueue!
persist_job(workflow_id, job)
Gush::Worker.set(queue: configuration.namespace).perform_later(*[workflow_id, job.name])
end
private
- def workflow_from_hash(hash, nodes = nil)
+ def workflow_from_hash(hash, nodes = [])
flow = hash[:klass].constantize.new(*hash[:arguments])
flow.jobs = []
flow.stopped = hash.fetch(:stopped, false)
flow.id = hash[:id]
- (nodes || hash[:nodes]).each do |node|
- flow.jobs << Gush::Job.from_hash(node)
+ flow.jobs = nodes.map do |node|
+ Gush::Job.from_hash(node)
end
flow
end