lib/gush/client.rb in gush-0.0.1 vs lib/gush/client.rb in gush-0.1

- old
+ new

@@ -4,87 +4,100 @@ def initialize(config = Gush.configuration) @configuration = config @sidekiq = build_sidekiq @redis = build_redis - load_gushfile end def configure yield configuration @sidekiq = build_sidekiq @redis = build_redis end def create_workflow(name) - id = SecureRandom.uuid.split("-").first - begin - workflow = name.constantize.new(id) + flow = name.constantize.new + flow.save rescue NameError raise WorkflowNotFound.new("Workflow with given name doesn't exist") end - persist_workflow(workflow) - workflow + flow end - def start_workflow(id, jobs = []) - workflow = find_workflow(id) - workflow.start! + def start_workflow(workflow, job_names = []) + workflow.mark_as_started persist_workflow(workflow) - jobs = if jobs.empty? - workflow.next_jobs + jobs = if job_names.empty? + workflow.initial_jobs else - jobs.map {|name| workflow.find_job(name) } + job_names.map {|name| workflow.find_job(name) } end jobs.each do |job| - job.enqueue! - persist_job(workflow.id, job) enqueue_job(workflow.id, job) end end def stop_workflow(id) workflow = find_workflow(id) - workflow.stop! + workflow.mark_as_stopped persist_workflow(workflow) end + def next_free_id + id = nil + loop do + id = SecureRandom.uuid + break if !redis.exists("gush.workflow.#{id}") + end + + id + end + def all_workflows redis.keys("gush.workflows.*").map do |key| id = key.sub("gush.workflows.", "") find_workflow(id) end end def find_workflow(id) data = redis.get("gush.workflows.#{id}") unless data.nil? - hash = Yajl::Parser.parse(data, symbolize_keys: true) + hash = Gush::JSON.decode(data, symbolize_keys: true) keys = redis.keys("gush.jobs.#{id}.*") - nodes = redis.mget(*keys).map { |json| Yajl::Parser.parse(json, symbolize_keys: true) } + nodes = redis.mget(*keys).map { |json| Gush::JSON.decode(json, symbolize_keys: true) } workflow_from_hash(hash, nodes) else raise WorkflowNotFound.new("Workflow with given id doesn't exist") end end def persist_workflow(workflow) redis.set("gush.workflows.#{workflow.id}", workflow.to_json) - workflow.nodes.each {|job| persist_job(workflow.id, job) } + workflow.jobs.each {|job| persist_job(workflow.id, job) } + workflow.mark_as_persisted + true end def persist_job(workflow_id, job) redis.set("gush.jobs.#{workflow_id}.#{job.class.to_s}", job.to_json) end + def load_job(workflow_id, job_id) + data = redis.get("gush.jobs.#{workflow_id}.#{job_id}") + return nil if data.nil? + data = Gush::JSON.decode(data, symbolize_keys: true) + Gush::Job.from_hash(nil, data) + end + def destroy_workflow(workflow) redis.del("gush.workflows.#{workflow.id}") - workflow.nodes.each {|job| destroy_job(workflow.id, job) } + workflow.jobs.each {|job| destroy_job(workflow.id, job) } end def destroy_job(workflow_id, job) redis.del("gush.jobs.#{workflow_id}.#{job.class.to_s}") end @@ -95,37 +108,41 @@ def workflow_report(message) report("gush.workflows.status", message) end + def enqueue_job(workflow_id, job) + job.enqueue! + persist_job(workflow_id, job) + + sidekiq.push( + 'class' => Gush::Worker, + 'queue' => configuration.namespace, + 'args' => [workflow_id, job.class.to_s, configuration.to_json] + ) + end + private attr_reader :sidekiq, :redis def workflow_from_hash(hash, nodes = nil) - flow = hash[:klass].constantize.new(hash[:id], configure: false) - flow.logger_builder(hash.fetch(:logger_builder, 'Gush::LoggerBuilder').constantize) + flow = hash[:klass].constantize.new(false) flow.stopped = hash.fetch(:stopped, false) + flow.id = hash[:id] (nodes || hash[:nodes]).each do |node| - flow.nodes << Gush::Job.from_hash(node) + flow.jobs << Gush::Job.from_hash(flow, node) end flow end def report(key, message) - redis.publish(key, Yajl::Encoder.new.encode(message)) + redis.publish(key, Gush::JSON.encode(message)) end - def enqueue_job(workflow_id, job) - sidekiq.push( - 'class' => Gush::Worker, - 'queue' => configuration.namespace, - 'args' => [workflow_id, job.class.to_s, configuration.to_json] - ) - end def build_sidekiq Sidekiq::Client.new(connection_pool) end @@ -133,14 +150,8 @@ Redis.new(url: configuration.redis_url) end def connection_pool ConnectionPool.new(size: configuration.concurrency, timeout: 1) { build_redis } - end - - def load_gushfile - require configuration.gushfile - rescue LoadError - raise Thor::Error, "failed to load #{configuration.gushfile.basename}".colorize(:red) end end end