Sha256: 6757006bd2f8691c623f889af8397b6fbd85f052dc54d7910499f8bc462f0e1a

Contents?: true

Size: 1.84 KB

Versions: 17

Compression:

Stored size: 1.84 KB

Contents

module Dynflow
  module Connectors
    class Direct < Abstract

      class Core < Actor

        def initialize(connector)
          @connector = connector
          @worlds = {}
          @executor_round_robin = RoundRobin.new
        end

        def start_listening(world)
          @worlds[world.id] = world
          @executor_round_robin.add(world) if world.executor
        end

        def stop_receiving_new_work(world)
          @executor_round_robin.delete(world)
        end

        def stop_listening(world)
          @worlds.delete(world.id)
          @executor_round_robin.delete(world) if world.executor
          reference.tell(:terminate!) if @worlds.empty?
        end

        def handle_envelope(envelope)
          if world = find_receiver(envelope)
            @connector.receive(world, envelope)
          else
            log(Logger::ERROR, "Receiver for envelope #{ envelope } not found")
          end
        end

        private

        def find_receiver(envelope)
          receiver = if Dispatcher::AnyExecutor === envelope.receiver_id
                       @executor_round_robin.next
                     else
                       @worlds[envelope.receiver_id]
                     end
          raise Dynflow::Error, "No executor available" unless receiver
          return receiver
        end
      end

      def initialize(world = nil)
        @core  = Core.spawn('connector-direct-core', self)
        start_listening(world) if world
      end

      def start_listening(world)
        @core.ask([:start_listening, world])
      end

      def stop_receiving_new_work(world)
        @core.ask([:stop_receiving_new_work, world]).wait
      end

      def stop_listening(world)
        @core.ask([:stop_listening, world]).wait
      end

      def send(envelope)
        @core.ask([:handle_envelope, envelope])
      end
    end
  end
end

Version data entries

17 entries across 17 versions & 1 rubygems

Version Path
dynflow-0.8.16 lib/dynflow/connectors/direct.rb
dynflow-0.8.15 lib/dynflow/connectors/direct.rb
dynflow-0.8.14 lib/dynflow/connectors/direct.rb
dynflow-0.8.13 lib/dynflow/connectors/direct.rb
dynflow-0.8.12 lib/dynflow/connectors/direct.rb
dynflow-0.8.11 lib/dynflow/connectors/direct.rb
dynflow-0.8.10 lib/dynflow/connectors/direct.rb
dynflow-0.8.9 lib/dynflow/connectors/direct.rb
dynflow-0.8.8 lib/dynflow/connectors/direct.rb
dynflow-0.8.7 lib/dynflow/connectors/direct.rb
dynflow-0.8.6 lib/dynflow/connectors/direct.rb
dynflow-0.8.5 lib/dynflow/connectors/direct.rb
dynflow-0.8.4 lib/dynflow/connectors/direct.rb
dynflow-0.8.3 lib/dynflow/connectors/direct.rb
dynflow-0.8.2 lib/dynflow/connectors/direct.rb
dynflow-0.8.1 lib/dynflow/connectors/direct.rb
dynflow-0.8.0 lib/dynflow/connectors/direct.rb