lib/gush/client.rb in gush-1.0.0 vs lib/gush/client.rb in gush-1.1.0

- old
+ new

@@ -71,11 +71,11 @@ id end def all_workflows connection_pool.with do |redis| - redis.keys("gush.workflows.*").map do |key| + redis.scan_each(match: "gush.workflows.*").map do |key| id = key.sub("gush.workflows.", "") find_workflow(id) end end end @@ -84,11 +84,11 @@ connection_pool.with do |redis| data = redis.get("gush.workflows.#{id}") unless data.nil? hash = Gush::JSON.decode(data, symbolize_keys: true) - keys = redis.keys("gush.jobs.#{id}.*") + keys = redis.scan_each(match: "gush.jobs.#{id}.*") nodes = redis.mget(*keys).map { |json| Gush::JSON.decode(json, symbolize_keys: true) } workflow_from_hash(hash, nodes) else raise WorkflowNotFound.new("Workflow with given id doesn't exist") end @@ -114,11 +114,11 @@ def find_job(workflow_id, job_id) job_name_match = /(?<klass>\w*[^-])-(?<identifier>.*)/.match(job_id) hypen = '-' if job_name_match.nil? keys = connection_pool.with do |redis| - redis.keys("gush.jobs.#{workflow_id}.#{job_id}#{hypen}*") + redis.scan_each(match: "gush.jobs.#{workflow_id}.#{job_id}#{hypen}*").to_a end return nil if keys.nil? data = connection_pool.with do |redis| @@ -142,26 +142,41 @@ connection_pool.with do |redis| redis.del("gush.jobs.#{workflow_id}.#{job.name}") end end + def expire_workflow(workflow, ttl=nil) + ttl = ttl || configuration.ttl + connection_pool.with do |redis| + redis.expire("gush.workflows.#{workflow.id}", ttl) + end + workflow.jobs.each {|job| expire_job(workflow.id, job, ttl) } + end + + def expire_job(workflow_id, job, ttl=nil) + ttl = ttl || configuration.ttl + connection_pool.with do |redis| + redis.expire("gush.jobs.#{workflow_id}.#{job.name}", ttl) + end + end + def enqueue_job(workflow_id, job) job.enqueue! persist_job(workflow_id, job) Gush::Worker.set(queue: configuration.namespace).perform_later(*[workflow_id, job.name]) end private - def workflow_from_hash(hash, nodes = nil) + def workflow_from_hash(hash, nodes = []) flow = hash[:klass].constantize.new(*hash[:arguments]) flow.jobs = [] flow.stopped = hash.fetch(:stopped, false) flow.id = hash[:id] - (nodes || hash[:nodes]).each do |node| - flow.jobs << Gush::Job.from_hash(node) + flow.jobs = nodes.map do |node| + Gush::Job.from_hash(node) end flow end