Sha256: 89ce13a824fb85aa7567b58b65e7b9ee63fcf3f84cba5f859ded44cc345b6e9a

Contents?: true

Size: 1.24 KB

Versions: 2

Compression:

Stored size: 1.24 KB

Contents

require 'drb'

module Rodimus

  class Transformation
    include Observable
    include Observing # Transformations observe themselves for run hooks
    include RuntimeLogging

    attr_reader :drb_server, :pids, :steps

    # User-data accessible across all running steps.
    attr_reader :shared_data

    def initialize
      @steps = []
      @pids = []
      @shared_data = {} # TODO: This needs to be thread safe
      observers << self
    end

    def run
      notify(self, :before_run)
      @drb_server = DRb.start_service(nil, shared_data)
      pids.clear
      prepare

      steps.each do |step|
        pids << fork do
          DRb.start_service # the parent DRb thread dies across the fork
          step.shared_data = DRbObject.new_with_uri(drb_server.uri)
          step.run
        end
        step.close_descriptors
      end
    ensure
      Process.waitall
      drb_server.stop_service
      notify(self, :after_run)
    end

    def to_s
      "#{self.class} with #{steps.length} steps"
    end

    private

    def prepare
      # [1, 2, 3, 4] => [1, 2], [2, 3], [3, 4]
      steps.inject do |first, second|
        read, write = IO.pipe
        first.outgoing = write
        second.incoming = read
        second
      end
    end
  end

end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
rodimus-1.1.0 lib/rodimus/transformation.rb
rodimus-1.0.0 lib/rodimus/transformation.rb