# Copyright: Copyright (c) 2004 Nicolas Pouillard. All rights reserved. # Author: Nicolas Pouillard . # License: Gnu General Public License. # $LastChangedBy$ # $Id$ # # Efficients control flows for synchronisation (and mutual exclusion) in # parallel systems. # require 'thread' require 'set' # # The +SynFlowFactory+ define the common part for a set of flows. # # A SynFlow is like an automaton (finite state machine): # * states # * transitions # * an alphabet # * an initial state # * a set of final states # # With this automaton (diagram, graph...) you can easily describe important # steps of your program control flow. # # Your automaton describe a model of execution and constraint every execution # to it. # # When a thread want to change the state of its flow it proceed like that: # flow << :a_symbol_of_the_alphabet # # If a transition between the _current_ state and a state _destination_ is # labeled by _symbol_, then the state is updated to _destination_. # Otherwise the thread is constraint to wait the modification of the _current_ # state. # # See examples below to know how to use these class. # class SynFlowFactory class Transition attr_reader :src, :dest, :label def initialize ( src, label, dest ) @src, @label, @dest = src, label, dest end def to_a [@src, @label, @dest] end def == ( rhs ) rhs.is_a? self.class and @label == rhs.label and @dest == rhs.dest and @src == rhs.src end end # class Transition class TransitionSet def initialize @val = {} end def include? ( *a ) case a.size when 1 t = a[0] case t when Array return include?(*t) else return delta(t.src, t.label) == t.dest end when 2 state, label = a return ! delta(state, label).nil? when 3 state, label, dest = a return delta(state, label) == dest else raise ArgumentError, 'bad transition' end end def add ( src, label, dest ) @val[src] ||= {} @val[src][label] = dest end def add_transition ( t ) add(t.src, t.label, t.dest) end def delta ( src, label ) @val[src] ||= {} @val[src][label] end def << ( transition ) case transition when Array add(*transition) when Hash transition.each do |src, x| unless x.is_a? Hash raise TypeError, "bad transtion: #{transition.inspect}" end x.each do |label, dest| add(src, label, dest) end end when Transition add_transition(transition) else raise TypeError, "bad transtion: #{transition.inspect}" end self end end # class TransitionSet attr_accessor :initial, :transitions def initialize @transitions = TransitionSet.new @initial = nil end def << ( transition ) @transitions << transition self end def include? ( *transition ) @transitions.include?(*transition) end def delta ( src, label ) @transitions.delta(src, label) end def new_flow SynFlow.new(self, @initial) end def initial? ( state ) state == @initial end end # class SynFlowFactory class SynFlow class Error < Exception end attr_reader :state attr_reader :i @@i = -1 def initialize ( factory, initial_state ) @i = (@@i += 1) super() @mutex = Mutex.new @condition = ConditionVariable.new @factory = factory @state = initial_state end def feed ( label ) D "#@i: Want read label #{label} (#@state)" @mutex.synchronize do while not @factory.include?(@state, label) begin @condition.wait(@mutex) rescue ThreadError raise Error, 'Cannot wait for change state because only one thread is running' end end dest = @factory.delta(@state, label) D "#@i: Ok change state with label #{label} (#@state -> #{dest})" @state = dest @condition.broadcast end end alias advance feed alias << feed def try_feed ( label ) D "#@i: Try read label #{label} (#@state)" if @mutex.try_lock begin if @factory.include?(@state, label) dest = @factory.delta(@state, label) D "#@i: Ok change state with label #{label} (#@state -> #{dest})" @state = dest @condition.broadcast return true end ensure @mutex.unlock end end D "#@i: Ko you cannot change state" return false end alias try_advance try_feed def destroy @i = @mutex = @condition = @factory = @state = nil end @@out = [] def self.debug_out @@out end def D ( msg ) # @@out << msg # STDERR.puts msg end private :D end # class SynFlow if $0 == __FILE__ require 'test/unit' require 'timeout' class SynFlowTest < Test::Unit::TestCase T = SynFlowFactory::Transition TSet = SynFlowFactory::TransitionSet def test_aa_simple_transition assert_equal(T.new(:a, :b, :c), T.new(:a, :b, :c)) assert_not_equal(T.new(:b, :a, :c), T.new(:a, :b, :d)) end def test_ab_simple_transition_set s = TSet.new s << T.new(:a, :b, :c) << T.new(:b, :a, :d) << [:e, :f, :g] assert(s.include?(:a, :b)) assert(s.include?(T.new(:a, :b, :c))) assert(s.include?(:b, :a, :d)) assert(s.include?([:e, :f])) end def test_ac_bad_transition_set s = TSet.new s << T.new(:a, :b, :c) << T.new(:b, :a, :d) assert(! s.include?(T.new(:b, :b, :c))) assert(! s.include?(:a, :d, :c)) assert(! s.include?([:c, :b])) end # # g # +---------------------------+ # | | # v a b c d | # 1 ---> 2 ---> 3 ---> 4 ---> 5 # | ^ # | e f | # +----> 6 -----+ # def make_simple_factory f = SynFlowFactory.new f << [1, :a, 2] \ << [6, :f, 4] \ << SynFlowFactory::Transition.new(3, :c, 4) f << { 2 => { :e => 6, :b => 3 }, 4 => { :d => 5 }, 5 => { :g => 1 } } f.initial = 1 return f end def test_ad_simple_factory f = nil assert_nothing_raised do f = make_simple_factory end transitions = [[1, :a, 2], [6, :f, 4], [3, :c, 4], [2, :e, 6], [2, :b, 3], [4, :d, 5], [5, :g, 1]] transitions.each do |s, l, d| assert(f.transitions.include?(s, l, d)) end assert(! f.transitions.include?(1, :a, 3)) assert(! f.transitions.include?(2, :a)) assert(! f.transitions.include?(1, :a, 1)) assert(f.initial?(1)) assert(! f.initial?(2)) end def test_ba_simple_flow f = make_simple_factory m = nil assert_nothing_raised { m = f.new_flow } assert_nothing_raised do Timeout.timeout(2) do assert_equal(1, m.state) m << :a assert_equal(2, m.state) m << :b assert_equal(3, m.state) m << :c assert_equal(4, m.state) m << :d assert_equal(5, m.state) m << :g assert_equal(1, m.state) end end end def test_bb_simple_loop f = make_simple_factory f << [6, :e, 6] m = f.new_flow assert_nothing_raised do Timeout.timeout(2) do assert_equal(1, m.state) m << :a assert_equal(2, m.state) m << :e assert_equal(6, m.state) m << :e assert_equal(6, m.state) m << :f assert_equal(4, m.state) end end end def test_bc_simple_try_advance f = make_simple_factory m = f.new_flow assert_nothing_raised do Timeout.timeout(2) do assert(m.try_advance(:a)) assert_equal(2, m.state) m << :b assert_equal(3, m.state) assert(! m.try_advance(:e)) assert_equal(3, m.state) m << :c assert_equal(4, m.state) end end end def test_c_avanced_flow f = make_simple_factory m = f.new_flow th = Thread.new { sleep ; m << :e } assert_raise(Timeout::Error) do Timeout.timeout(0.2) do th.wakeup th.join end end assert_nothing_raised do Timeout.timeout(2) do m << :a sleep 0.1 assert_equal(6, m.state) end end assert_nothing_raised do Timeout.timeout(2) do m << :f assert_equal(4, m.state) m << :d assert_equal(5, m.state) m << :g assert_equal(1, m.state) end end end def test_d_multi_threads f = make_simple_factory m = f.new_flow assert_nothing_raised do Timeout.timeout(5) do t = [] t << Thread.new { m << :d } t << Thread.new { m << :c } t << Thread.new { m << :a } t << Thread.new { m << :b } t.each { |t| t.join } end end assert_equal(5, m.state) end class E < Exception end def test_e_with_block f = make_simple_factory m = f.new_flow assert_nothing_raised do Timeout.timeout(5) do t1 = Thread.new do sleep m << :a sleep end t2 = Thread.new do m << :e t1.raise E end t1.wakeup t2.join assert_raise(E) { t1.join } end end assert_equal(6, m.state) end def test_f_dumpable f = make_simple_factory m = f.new_flow assert_nothing_raised do Marshal.load(Marshal.dump(m)) end assert_nothing_raised do Marshal.load(Marshal.dump(f)) end end end # class SynFlowTest end