lib/gush/client.rb in gush-0.3 vs lib/gush/client.rb in gush-0.3.1

- old
+ new

@@ -14,16 +14,14 @@ @redis = build_redis end def create_workflow(name) begin - flow = name.constantize.new - flow.save + name.constantize.create rescue NameError raise WorkflowNotFound.new("Workflow with given name doesn't exist") end - flow end def start_workflow(workflow, job_names = []) workflow.mark_as_started @@ -44,11 +42,22 @@ workflow = find_workflow(id) workflow.mark_as_stopped persist_workflow(workflow) end - def next_free_id + def next_free_job_id(workflow_id,job_klass) + job_identifier = nil + loop do + id = SecureRandom.uuid + job_identifier = "#{job_klass}-#{id}" + break if !redis.exists("gush.jobs.#{workflow_id}.#{job_identifier}") + end + + job_identifier + end + + def next_free_workflow_id id = nil loop do id = SecureRandom.uuid break if !redis.exists("gush.workflow.#{id}") end @@ -81,28 +90,35 @@ workflow.mark_as_persisted true end def persist_job(workflow_id, job) - redis.set("gush.jobs.#{workflow_id}.#{job.class.to_s}", job.to_json) + redis.set("gush.jobs.#{workflow_id}.#{job.name}", job.to_json) end def load_job(workflow_id, job_id) workflow = find_workflow(workflow_id) - data = redis.get("gush.jobs.#{workflow_id}.#{job_id}") + job_name_match = /(?<klass>\w*[^-])-(?<identifier>.*)/.match(job_id) + hypen = '-' if job_name_match.nil? + + keys = redis.keys("gush.jobs.#{workflow_id}.#{job_id}#{hypen}*") + return nil if keys.nil? + + data = redis.get(keys.first) return nil if data.nil? + data = Gush::JSON.decode(data, symbolize_keys: true) Gush::Job.from_hash(workflow, data) end def destroy_workflow(workflow) redis.del("gush.workflows.#{workflow.id}") workflow.jobs.each {|job| destroy_job(workflow.id, job) } end def destroy_job(workflow_id, job) - redis.del("gush.jobs.#{workflow_id}.#{job.class.to_s}") + redis.del("gush.jobs.#{workflow_id}.#{job.name}") end def worker_report(message) report("gush.workers.status", message) end @@ -116,19 +132,20 @@ persist_job(workflow_id, job) sidekiq.push( 'class' => Gush::Worker, 'queue' => configuration.namespace, - 'args' => [workflow_id, job.class.to_s] + 'args' => [workflow_id, job.name] ) end private attr_reader :sidekiq, :redis def workflow_from_hash(hash, nodes = nil) flow = hash[:klass].constantize.new + flow.jobs = [] flow.stopped = hash.fetch(:stopped, false) flow.id = hash[:id] (nodes || hash[:nodes]).each do |node| flow.jobs << Gush::Job.from_hash(flow, node)