lib/gush/client.rb in gush-0.0.1 vs lib/gush/client.rb in gush-0.1
- old
+ new
@@ -4,87 +4,100 @@
def initialize(config = Gush.configuration)
@configuration = config
@sidekiq = build_sidekiq
@redis = build_redis
- load_gushfile
end
def configure
yield configuration
@sidekiq = build_sidekiq
@redis = build_redis
end
def create_workflow(name)
- id = SecureRandom.uuid.split("-").first
-
begin
- workflow = name.constantize.new(id)
+ flow = name.constantize.new
+ flow.save
rescue NameError
raise WorkflowNotFound.new("Workflow with given name doesn't exist")
end
- persist_workflow(workflow)
- workflow
+ flow
end
- def start_workflow(id, jobs = [])
- workflow = find_workflow(id)
- workflow.start!
+ def start_workflow(workflow, job_names = [])
+ workflow.mark_as_started
persist_workflow(workflow)
- jobs = if jobs.empty?
- workflow.next_jobs
+ jobs = if job_names.empty?
+ workflow.initial_jobs
else
- jobs.map {|name| workflow.find_job(name) }
+ job_names.map {|name| workflow.find_job(name) }
end
jobs.each do |job|
- job.enqueue!
- persist_job(workflow.id, job)
enqueue_job(workflow.id, job)
end
end
def stop_workflow(id)
workflow = find_workflow(id)
- workflow.stop!
+ workflow.mark_as_stopped
persist_workflow(workflow)
end
+ def next_free_id
+ id = nil
+ loop do
+ id = SecureRandom.uuid
+ break if !redis.exists("gush.workflow.#{id}")
+ end
+
+ id
+ end
+
def all_workflows
redis.keys("gush.workflows.*").map do |key|
id = key.sub("gush.workflows.", "")
find_workflow(id)
end
end
def find_workflow(id)
data = redis.get("gush.workflows.#{id}")
unless data.nil?
- hash = Yajl::Parser.parse(data, symbolize_keys: true)
+ hash = Gush::JSON.decode(data, symbolize_keys: true)
keys = redis.keys("gush.jobs.#{id}.*")
- nodes = redis.mget(*keys).map { |json| Yajl::Parser.parse(json, symbolize_keys: true) }
+ nodes = redis.mget(*keys).map { |json| Gush::JSON.decode(json, symbolize_keys: true) }
workflow_from_hash(hash, nodes)
else
raise WorkflowNotFound.new("Workflow with given id doesn't exist")
end
end
def persist_workflow(workflow)
redis.set("gush.workflows.#{workflow.id}", workflow.to_json)
- workflow.nodes.each {|job| persist_job(workflow.id, job) }
+ workflow.jobs.each {|job| persist_job(workflow.id, job) }
+ workflow.mark_as_persisted
+ true
end
def persist_job(workflow_id, job)
redis.set("gush.jobs.#{workflow_id}.#{job.class.to_s}", job.to_json)
end
+ def load_job(workflow_id, job_id)
+ data = redis.get("gush.jobs.#{workflow_id}.#{job_id}")
+ return nil if data.nil?
+ data = Gush::JSON.decode(data, symbolize_keys: true)
+ Gush::Job.from_hash(nil, data)
+ end
+
def destroy_workflow(workflow)
redis.del("gush.workflows.#{workflow.id}")
- workflow.nodes.each {|job| destroy_job(workflow.id, job) }
+ workflow.jobs.each {|job| destroy_job(workflow.id, job) }
end
def destroy_job(workflow_id, job)
redis.del("gush.jobs.#{workflow_id}.#{job.class.to_s}")
end
@@ -95,37 +108,41 @@
def workflow_report(message)
report("gush.workflows.status", message)
end
+ def enqueue_job(workflow_id, job)
+ job.enqueue!
+ persist_job(workflow_id, job)
+
+ sidekiq.push(
+ 'class' => Gush::Worker,
+ 'queue' => configuration.namespace,
+ 'args' => [workflow_id, job.class.to_s, configuration.to_json]
+ )
+ end
+
private
attr_reader :sidekiq, :redis
def workflow_from_hash(hash, nodes = nil)
- flow = hash[:klass].constantize.new(hash[:id], configure: false)
- flow.logger_builder(hash.fetch(:logger_builder, 'Gush::LoggerBuilder').constantize)
+ flow = hash[:klass].constantize.new(false)
flow.stopped = hash.fetch(:stopped, false)
+ flow.id = hash[:id]
(nodes || hash[:nodes]).each do |node|
- flow.nodes << Gush::Job.from_hash(node)
+ flow.jobs << Gush::Job.from_hash(flow, node)
end
flow
end
def report(key, message)
- redis.publish(key, Yajl::Encoder.new.encode(message))
+ redis.publish(key, Gush::JSON.encode(message))
end
- def enqueue_job(workflow_id, job)
- sidekiq.push(
- 'class' => Gush::Worker,
- 'queue' => configuration.namespace,
- 'args' => [workflow_id, job.class.to_s, configuration.to_json]
- )
- end
def build_sidekiq
Sidekiq::Client.new(connection_pool)
end
@@ -133,14 +150,8 @@
Redis.new(url: configuration.redis_url)
end
def connection_pool
ConnectionPool.new(size: configuration.concurrency, timeout: 1) { build_redis }
- end
-
- def load_gushfile
- require configuration.gushfile
- rescue LoadError
- raise Thor::Error, "failed to load #{configuration.gushfile.basename}".colorize(:red)
end
end
end