require 'test_helper' module Rworkflow class SidekiqFlowTest < ActiveSupport::TestCase def setup super RedisRds::Object.flushdb end def test_lethal_workflow lifecycle = Lifecycle.new do |lc| lc.state('Rworkflow::SidekiqFlowTest::Floating', cardinality: 10) do |state| state.transition :rescued, 'Rworkflow::SidekiqFlowTest::Lifeboat' state.transition :drowned, Rworkflow::Flow::STATE_FAILED end lc.state('Rworkflow::SidekiqFlowTest::Lifeboat', cardinality: 2) do |state| state.transition :landed, 'Rworkflow::SidekiqFlowTest::Land' state.transition :starved, Rworkflow::Flow::STATE_FAILED end lc.state('Rworkflow::SidekiqFlowTest::Land') do |state| state.transition :rescued, Rworkflow::Flow::STATE_SUCCESSFUL state.transition :died, Rworkflow::Flow::STATE_FAILED end lc.initial = 'Rworkflow::SidekiqFlowTest::Floating' end initial_objects = (0...20).to_a workflow = SidekiqFlow.create(lifecycle, 'Lethal Rworkflow 2: Lethaler', {}) workflow.start(initial_objects) assert workflow.finished? counters = workflow.counters assert_equal 19, counters[Rworkflow::Flow::STATE_FAILED] assert_equal 1, counters[Rworkflow::Flow::STATE_SUCCESSFUL] assert 2, RedisRds::String.new('Rworkflow::SidekiqFlowTest::Floating').get.to_i assert 6, RedisRds::String.new('Rworkflow::SidekiqFlowTest::Lifeboat').get.to_i assert 2, RedisRds::String.new('Rworkflow::SidekiqFlowTest::Land').get.to_i end def test_pause_continue lifecycle = Lifecycle.new do |lc| lc.state('Rworkflow::SidekiqFlowTest::Floating', cardinality: 10) do |state| state.transition :rescued, 'Rworkflow::SidekiqFlowTest::Lifeboat' state.transition :drowned, Rworkflow::Flow::STATE_FAILED end lc.state('Rworkflow::SidekiqFlowTest::Lifeboat', cardinality: 2) do |state| state.transition :landed, Rworkflow::Flow::STATE_SUCCESSFUL state.transition :starved, Rworkflow::Flow::STATE_FAILED end lc.initial = 'Rworkflow::SidekiqFlowTest::Floating' end initial_objects = (0...20).to_a workflow = SidekiqFlow.create(lifecycle, 'Lethal Workflow 4: Lethalerest', {}) workflow.pause workflow.start(initial_objects) assert !workflow.finished? workflow.pause workflow.continue assert !workflow.finished? workflow.continue assert workflow.finished? counters = workflow.counters assert_equal 18, counters[Rworkflow::Flow::STATE_FAILED] assert_equal 2, counters[Rworkflow::Flow::STATE_SUCCESSFUL] assert 2, RedisRds::String.new('Rworkflow::SidekiqFlowTest::Floating').get.to_i assert 6, RedisRds::String.new('Rworkflow::SidekiqFlowTest::Lifeboat').get.to_i end def test_collector_state_workflow lifecycle = Lifecycle.new do |lc| lc.state('Rworkflow::SidekiqFlowTest::PostcardSend', cardinality: 1) do |state| state.transition :sent, 'Rworkflow::SidekiqFlowTest::PostcardCollector' end lc.state('Rworkflow::SidekiqFlowTest::PostcardCollector', cardinality: Lifecycle::CARDINALITY_ALL_STARTED, policy: State::STATE_POLICY_WAIT) do |state| state.transition :received, Rworkflow::Flow::STATE_SUCCESSFUL end lc.initial = 'Rworkflow::SidekiqFlowTest::PostcardSend' end initial_objects = (0...20).to_a workflow = SidekiqFlow.create(lifecycle, 'CollectorWorkflow', {}) workflow.start(initial_objects) assert workflow.finished?, 'Rworkflow finish successfully' assert_equal 20, RedisRds::String.new('Rworkflow::SidekiqFlowTest::PostcardSend').get.to_i, 'All initial objects should be processed by the first state one by one' assert_equal 1, RedisRds::String.new('Rworkflow::SidekiqFlowTest::PostcardSend_card').get.to_i, 'All initial objects should be processed by the first state one by one' assert_equal 1, RedisRds::String.new('Rworkflow::SidekiqFlowTest::PostcardCollector').get.to_i, 'All initial objects should be processed by the collector state all at once' assert_equal initial_objects.size, RedisRds::String.new('Rworkflow::SidekiqFlowTest::PostcardCollector_card').get.to_i, 'All initial objects should be processed by the collector state all at once' end def test_gated lifecycle = Lifecycle.new do |lc| lc.state('Rworkflow::SidekiqFlowTest::Floating', cardinality: 10) do |state| state.transition :rescued, 'Rworkflow::SidekiqFlowTest::Lifeboat' state.transition :drowned, Rworkflow::Flow::STATE_FAILED end lc.state('Rworkflow::SidekiqFlowTest::Lifeboat', cardinality: 2, policy: SidekiqFlow::STATE_POLICY_GATED) do |state| state.transition :landed, Rworkflow::Flow::STATE_SUCCESSFUL state.transition :starved, Rworkflow::Flow::STATE_FAILED end lc.initial = 'Rworkflow::SidekiqFlowTest::Floating' end initial_objects = (0...20).to_a workflow = SidekiqFlow.create(lifecycle, 'Lethal Workflow 4: Lethalerest', {}) workflow.start(initial_objects) assert !workflow.finished? workflow.open_gate('Rworkflow::SidekiqFlowTest::Lifeboat') assert workflow.finished? counters = workflow.counters assert_equal 18, counters[Rworkflow::Flow::STATE_FAILED] assert_equal 2, counters[Rworkflow::Flow::STATE_SUCCESSFUL] assert 2, RedisRds::String.new('Rworkflow::SidekiqFlowTest::Floating').get.to_i assert 6, RedisRds::String.new('Rworkflow::SidekiqFlowTest::Lifeboat').get.to_i end class Floating < Worker def process(objects) rescued, drowned = objects.partition(&:even?) transition(:rescued, rescued) transition(:drowned, drowned) RedisRds::String.new(self.class.name).incr end end class Lifeboat < Worker def process(objects) landed, starved = objects.partition { |object| object < 4 } transition(:landed, landed) transition(:starved, starved) RedisRds::String.new(self.class.name).incr end end class Land < Worker def process(objects) rescued, died = objects.partition { |object| object == 0 } transition(:rescued, rescued) transition(:died, died) RedisRds::String.new(self.class.name).incr end end class PostcardSend < Worker def process(objects) transition(:sent, objects) RedisRds::String.new(self.class.name).incr RedisRds::String.new("#{self.class.name}_card").set(objects.size) end end class PostcardCollector < Worker def process(objects) transition(:received, objects) RedisRds::String.new(self.class.name).incr RedisRds::String.new("#{self.class.name}_card").set(objects.size) end end end end