lib/floe/workflow.rb in floe-0.14.0 vs lib/floe/workflow.rb in floe-0.15.0

- old
+ new

@@ -2,13 +2,12 @@ require "securerandom" require "json" module Floe - class Workflow + class Workflow < Floe::WorkflowBase include Logging - include ValidationMixin class << self def load(path_or_io, context = nil, credentials = {}, name = nil) payload = path_or_io.respond_to?(:read) ? path_or_io.read : File.read(path_or_io) # default the name if it is a filename and none was passed in @@ -17,11 +16,11 @@ new(payload, context, credentials, name) end def wait(workflows, timeout: nil, &block) workflows = [workflows] if workflows.kind_of?(self) - logger.info("checking #{workflows.count} workflows...") + logger.info("Checking #{workflows.count} workflows...") run_until = Time.now.utc + timeout if timeout.to_i > 0 ready = [] queue = Queue.new wait_thread = Thread.new do @@ -64,57 +63,39 @@ loop do # Block until an event is raised event, data = queue.pop break if event.nil? - _execution_id, runner_context = data.values_at("execution_id", "runner_context") - - # If the event is for one of our workflows set the updated runner_context - workflows.each do |workflow| - next unless workflow.context.state.dig("RunnerContext", "container_ref") == runner_context["container_ref"] - - workflow.context.state["RunnerContext"] = runner_context - end - - break if queue.empty? + # break out of the loop if the event is for one of our workflows + break if queue.empty? || workflows.detect { |wf| wf.execution_id == data["execution_id"] } end ensure sleep_thread&.kill end - logger.info("checking #{workflows.count} workflows...Complete - #{ready.count} ready") + logger.info("Checking #{workflows.count} workflows...Complete - #{ready.count} ready") ready ensure wait_thread&.kill end end - attr_reader :context, :payload, :states, :states_by_name, :start_at, :name, :comment + attr_reader :comment, :context def initialize(payload, context = nil, credentials = nil, name = nil) payload = JSON.parse(payload) if payload.kind_of?(String) credentials = JSON.parse(credentials) if credentials.kind_of?(String) context = Context.new(context) unless context.kind_of?(Context) # backwards compatibility # caller should really put credentials into context and not pass that variable context.credentials = credentials if credentials - # NOTE: this is a string, and states use an array - @name = name || "State Machine" - @payload = payload - @context = context - @comment = payload["Comment"] - @start_at = payload["StartAt"] + @context = context + @comment = payload["Comment"] - # NOTE: Everywhere else we include our name (i.e.: parent name) when building the child name. - # When creating the states, we are dropping our name (i.e.: the workflow name) - @states = payload["States"].to_a.map { |state_name, state| State.build!(self, ["States", state_name], state) } - - validate_workflow - - @states_by_name = @states.each_with_object({}) { |state, result| result[state.short_name] = state } + super(payload, name) rescue Floe::Error raise rescue => err raise Floe::InvalidWorkflowError, err.message end @@ -183,24 +164,22 @@ self end # NOTE: Expecting the context to be initialized (via start_workflow) before this def current_state - @states_by_name[context.state_name] + states_by_name[context.state_name] end # backwards compatibility. Caller should access directly from context def credentials @context.credentials end - private - - def validate_workflow - missing_field_error!("States") if @states.empty? - missing_field_error!("StartAt") if @start_at.nil? - invalid_field_error!("StartAt", @start_at, "is not found in \"States\"") unless workflow_state?(@start_at, self) + def execution_id + @context.execution["Id"] end + + private def step! next_state = {"Name" => context.next_state, "Guid" => SecureRandom.uuid, "PreviousStateGuid" => context.state["Guid"]} # if rerunning due to an error (and we are using Retry)