lib/gush/workflow.rb in gush-0.0.1 vs lib/gush/workflow.rb in gush-0.1

- old
+ new

@@ -1,50 +1,65 @@ require 'securerandom' -require 'gush/metadata' module Gush class Workflow - include Gush::Metadata + attr_accessor :id, :jobs, :stopped, :persisted - attr_accessor :id, :nodes, :stopped - - def initialize(id, options = {}) + def initialize(should_run_configure = true) @id = id - @nodes = [] + @jobs = [] @dependencies = [] - @logger_builder = default_logger_builder + @persisted = false @stopped = false - unless options[:configure] == false + if should_run_configure configure create_dependencies end end - def default_logger_builder - LoggerBuilder + def self.find(id) + Gush::Client.new.find_workflow(id) end + def self.create(*args) + flow = new(*args) + flow.save + flow + end + + def save + if @id.nil? + assign_id + end + + client.persist_workflow(self) + end + def configure end - def stop! + def mark_as_stopped @stopped = true end def start! - @stopped = false + client.start_workflow(self) end - def logger_builder(klass) - @logger_builder = klass + def persist! + client.persist_workflow(self) end - def build_logger_for_job(job, jid) - @logger_builder.new(self, job, jid).build + def mark_as_persisted + @persisted = true end + def mark_as_started + @stopped = false + end + def create_dependencies @dependencies.each do |dependency| from = find_job(dependency[:from]) to = find_job(dependency[:to]) @@ -52,32 +67,32 @@ from.outgoing << dependency[:to] end end def find_job(name) - @nodes.find { |node| node.name == name.to_s || node.class.to_s == name.to_s } + @jobs.find { |node| node.name == name.to_s || node.class.to_s == name.to_s } end def finished? - nodes.all?(&:finished) + jobs.all?(&:finished?) end def running? - nodes.any? {|j| j.enqueued? || j.running? } && !stopped? + !stopped? && jobs.any? {|j| j.enqueued? || j.running? } end def failed? - nodes.any?(&:failed) + jobs.any?(&:failed?) end def stopped? stopped end def run(klass, deps = {}) - node = klass.new(name: klass.to_s) - @nodes << node + node = klass.new(self, name: klass.to_s) + @jobs << node deps_after = [*deps[:after]] deps_after.each do |dep| @dependencies << {from: dep.to_s, to: klass.to_s } end @@ -86,22 +101,30 @@ deps_before.each do |dep| @dependencies << {from: klass.to_s, to: dep.to_s } end end + def reload + self.class.find(@id) + end + + def initial_jobs + jobs.select(&:has_no_dependencies?) + end + def status case when failed? - "Failed" + :failed when running? - "Running" + :running when finished? - "Finished" + :finished when stopped? - "Stopped" + :stopped else - "Pending" + :pending end end def started_at first_job ? first_job.started_at : nil @@ -114,41 +137,43 @@ def to_hash name = self.class.to_s { name: name, id: @id, - total: @nodes.count, - finished: @nodes.count(&:finished?), + total: @jobs.count, + finished: @jobs.count(&:finished?), klass: name, - nodes: @nodes.map(&:as_json), + jobs: @jobs.map(&:as_json), status: status, stopped: stopped, started_at: started_at, - finished_at: finished_at, - logger_builder: @logger_builder.to_s + finished_at: finished_at } end def to_json(options = {}) - JSON.dump(to_hash) + Gush::JSON.encode(to_hash) end - def next_jobs - @nodes.select do |job| - job.can_be_started?(self) - end - end - def self.descendants ObjectSpace.each_object(Class).select { |klass| klass < self } end private + + def assign_id + @id = client.next_free_id + end + + def client + @client ||= Client.new + end + def first_job - nodes.min_by{ |n| n.started_at || Time.now.to_i } + jobs.min_by{ |n| n.started_at || Time.now.to_i } end def last_job - nodes.max_by{ |n| n.finished_at || 0 } if nodes.all?(&:finished?) + jobs.max_by{ |n| n.finished_at || 0 } if jobs.all?(&:finished?) end end end