lib/floe/workflow.rb in floe-0.14.0 vs lib/floe/workflow.rb in floe-0.15.0
- old
+ new
@@ -2,13 +2,12 @@
require "securerandom"
require "json"
module Floe
- class Workflow
+ class Workflow < Floe::WorkflowBase
include Logging
- include ValidationMixin
class << self
def load(path_or_io, context = nil, credentials = {}, name = nil)
payload = path_or_io.respond_to?(:read) ? path_or_io.read : File.read(path_or_io)
# default the name if it is a filename and none was passed in
@@ -17,11 +16,11 @@
new(payload, context, credentials, name)
end
def wait(workflows, timeout: nil, &block)
workflows = [workflows] if workflows.kind_of?(self)
- logger.info("checking #{workflows.count} workflows...")
+ logger.info("Checking #{workflows.count} workflows...")
run_until = Time.now.utc + timeout if timeout.to_i > 0
ready = []
queue = Queue.new
wait_thread = Thread.new do
@@ -64,57 +63,39 @@
loop do
# Block until an event is raised
event, data = queue.pop
break if event.nil?
- _execution_id, runner_context = data.values_at("execution_id", "runner_context")
-
- # If the event is for one of our workflows set the updated runner_context
- workflows.each do |workflow|
- next unless workflow.context.state.dig("RunnerContext", "container_ref") == runner_context["container_ref"]
-
- workflow.context.state["RunnerContext"] = runner_context
- end
-
- break if queue.empty?
+ # break out of the loop if the event is for one of our workflows
+ break if queue.empty? || workflows.detect { |wf| wf.execution_id == data["execution_id"] }
end
ensure
sleep_thread&.kill
end
- logger.info("checking #{workflows.count} workflows...Complete - #{ready.count} ready")
+ logger.info("Checking #{workflows.count} workflows...Complete - #{ready.count} ready")
ready
ensure
wait_thread&.kill
end
end
- attr_reader :context, :payload, :states, :states_by_name, :start_at, :name, :comment
+ attr_reader :comment, :context
def initialize(payload, context = nil, credentials = nil, name = nil)
payload = JSON.parse(payload) if payload.kind_of?(String)
credentials = JSON.parse(credentials) if credentials.kind_of?(String)
context = Context.new(context) unless context.kind_of?(Context)
# backwards compatibility
# caller should really put credentials into context and not pass that variable
context.credentials = credentials if credentials
- # NOTE: this is a string, and states use an array
- @name = name || "State Machine"
- @payload = payload
- @context = context
- @comment = payload["Comment"]
- @start_at = payload["StartAt"]
+ @context = context
+ @comment = payload["Comment"]
- # NOTE: Everywhere else we include our name (i.e.: parent name) when building the child name.
- # When creating the states, we are dropping our name (i.e.: the workflow name)
- @states = payload["States"].to_a.map { |state_name, state| State.build!(self, ["States", state_name], state) }
-
- validate_workflow
-
- @states_by_name = @states.each_with_object({}) { |state, result| result[state.short_name] = state }
+ super(payload, name)
rescue Floe::Error
raise
rescue => err
raise Floe::InvalidWorkflowError, err.message
end
@@ -183,24 +164,22 @@
self
end
# NOTE: Expecting the context to be initialized (via start_workflow) before this
def current_state
- @states_by_name[context.state_name]
+ states_by_name[context.state_name]
end
# backwards compatibility. Caller should access directly from context
def credentials
@context.credentials
end
- private
-
- def validate_workflow
- missing_field_error!("States") if @states.empty?
- missing_field_error!("StartAt") if @start_at.nil?
- invalid_field_error!("StartAt", @start_at, "is not found in \"States\"") unless workflow_state?(@start_at, self)
+ def execution_id
+ @context.execution["Id"]
end
+
+ private
def step!
next_state = {"Name" => context.next_state, "Guid" => SecureRandom.uuid, "PreviousStateGuid" => context.state["Guid"]}
# if rerunning due to an error (and we are using Retry)