Sha256: a4ae5d7dae98f7e72e6fa6bbc32c0d408cd0291f90875066623e93368df25e91

Contents?: true

Size: 1.84 KB

Versions: 28

Compression:

Stored size: 1.84 KB

Contents

module Dynflow
  module Connectors
    class Direct < Abstract

      class Core < Actor

        def initialize(connector)
          @connector = connector
          @worlds = {}
          @executor_round_robin = RoundRobin.new
        end

        def start_listening(world)
          @worlds[world.id] = world
          @executor_round_robin.add(world) if world.executor
        end

        def stop_receiving_new_work(world)
          @executor_round_robin.delete(world)
        end

        def stop_listening(world)
          @worlds.delete(world.id)
          @executor_round_robin.delete(world) if world.executor
          reference.tell(:terminate!) if @worlds.empty?
        end

        def handle_envelope(envelope)
          if world = find_receiver(envelope)
            @connector.receive(world, envelope)
          else
            log(Logger::ERROR, "Receiver for envelope #{ envelope } not found")
          end
        end

        private

        def find_receiver(envelope)
          receiver = if Dispatcher::AnyExecutor === envelope.receiver_id
                       @executor_round_robin.next
                     else
                       @worlds[envelope.receiver_id]
                     end
          raise Dynflow::Error, "No executor available" unless receiver
          return receiver
        end
      end

      def initialize(world = nil)
        @core = Core.spawn('connector-direct-core', self)
        start_listening(world) if world
      end

      def start_listening(world)
        @core.ask([:start_listening, world])
      end

      def stop_receiving_new_work(world)
        @core.ask([:stop_receiving_new_work, world]).wait
      end

      def stop_listening(world)
        @core.ask([:stop_listening, world]).wait
      end

      def send(envelope)
        @core.ask([:handle_envelope, envelope])
      end
    end
  end
end

Version data entries

28 entries across 28 versions & 1 rubygems

Version Path
dynflow-1.1.0 lib/dynflow/connectors/direct.rb
dynflow-1.0.5 lib/dynflow/connectors/direct.rb
dynflow-1.0.4 lib/dynflow/connectors/direct.rb
dynflow-1.0.3 lib/dynflow/connectors/direct.rb
dynflow-1.0.2 lib/dynflow/connectors/direct.rb
dynflow-1.0.1 lib/dynflow/connectors/direct.rb
dynflow-1.0.0 lib/dynflow/connectors/direct.rb
dynflow-0.8.37 lib/dynflow/connectors/direct.rb
dynflow-0.8.36 lib/dynflow/connectors/direct.rb
dynflow-0.8.35 lib/dynflow/connectors/direct.rb
dynflow-0.8.34 lib/dynflow/connectors/direct.rb
dynflow-0.8.33 lib/dynflow/connectors/direct.rb
dynflow-0.8.32 lib/dynflow/connectors/direct.rb
dynflow-0.8.31 lib/dynflow/connectors/direct.rb
dynflow-0.8.30 lib/dynflow/connectors/direct.rb
dynflow-0.8.29 lib/dynflow/connectors/direct.rb
dynflow-0.8.28 lib/dynflow/connectors/direct.rb
dynflow-0.8.27 lib/dynflow/connectors/direct.rb
dynflow-0.8.26 lib/dynflow/connectors/direct.rb
dynflow-0.8.25 lib/dynflow/connectors/direct.rb