lib/gush/client.rb in gush-0.3 vs lib/gush/client.rb in gush-0.3.1
- old
+ new
@@ -14,16 +14,14 @@
@redis = build_redis
end
def create_workflow(name)
begin
- flow = name.constantize.new
- flow.save
+ name.constantize.create
rescue NameError
raise WorkflowNotFound.new("Workflow with given name doesn't exist")
end
-
flow
end
def start_workflow(workflow, job_names = [])
workflow.mark_as_started
@@ -44,11 +42,22 @@
workflow = find_workflow(id)
workflow.mark_as_stopped
persist_workflow(workflow)
end
- def next_free_id
+ def next_free_job_id(workflow_id,job_klass)
+ job_identifier = nil
+ loop do
+ id = SecureRandom.uuid
+ job_identifier = "#{job_klass}-#{id}"
+ break if !redis.exists("gush.jobs.#{workflow_id}.#{job_identifier}")
+ end
+
+ job_identifier
+ end
+
+ def next_free_workflow_id
id = nil
loop do
id = SecureRandom.uuid
break if !redis.exists("gush.workflow.#{id}")
end
@@ -81,28 +90,35 @@
workflow.mark_as_persisted
true
end
def persist_job(workflow_id, job)
- redis.set("gush.jobs.#{workflow_id}.#{job.class.to_s}", job.to_json)
+ redis.set("gush.jobs.#{workflow_id}.#{job.name}", job.to_json)
end
def load_job(workflow_id, job_id)
workflow = find_workflow(workflow_id)
- data = redis.get("gush.jobs.#{workflow_id}.#{job_id}")
+ job_name_match = /(?<klass>\w*[^-])-(?<identifier>.*)/.match(job_id)
+ hypen = '-' if job_name_match.nil?
+
+ keys = redis.keys("gush.jobs.#{workflow_id}.#{job_id}#{hypen}*")
+ return nil if keys.nil?
+
+ data = redis.get(keys.first)
return nil if data.nil?
+
data = Gush::JSON.decode(data, symbolize_keys: true)
Gush::Job.from_hash(workflow, data)
end
def destroy_workflow(workflow)
redis.del("gush.workflows.#{workflow.id}")
workflow.jobs.each {|job| destroy_job(workflow.id, job) }
end
def destroy_job(workflow_id, job)
- redis.del("gush.jobs.#{workflow_id}.#{job.class.to_s}")
+ redis.del("gush.jobs.#{workflow_id}.#{job.name}")
end
def worker_report(message)
report("gush.workers.status", message)
end
@@ -116,19 +132,20 @@
persist_job(workflow_id, job)
sidekiq.push(
'class' => Gush::Worker,
'queue' => configuration.namespace,
- 'args' => [workflow_id, job.class.to_s]
+ 'args' => [workflow_id, job.name]
)
end
private
attr_reader :sidekiq, :redis
def workflow_from_hash(hash, nodes = nil)
flow = hash[:klass].constantize.new
+ flow.jobs = []
flow.stopped = hash.fetch(:stopped, false)
flow.id = hash[:id]
(nodes || hash[:nodes]).each do |node|
flow.jobs << Gush::Job.from_hash(flow, node)