Sha256: 80f58e5690c1c0583fb88968f02ae4261427a4ce8ac7591de49005c56c52c946

Contents?: true

Size: 1.03 KB

Versions: 2

Compression:

Stored size: 1.03 KB

Contents

class Datapipes
  # Build your own sink logic in `run` method.
  #
  # Be careful each sinks are executed concurrently.
  # Avoid race condition in multi sinks.
  #
  # This is bad:
  #
  #   $shared = []
  #
  #   class A < Datapipes::Sink
  #     def run(data)
  #       $shared << data
  #     end
  #   end
  #
  #   class B < Datapipes::Sink
  #     def run(data)
  #       $shared << data
  #     end
  #   end
  #
  # On the other hand, a sink is called serially. So you can
  # touch shared object in one sink logic.
  #
  # This is good:
  #
  #   class A < Datapipes::Source
  #     def initialize
  #       @shared = []
  #     end
  #
  #     def run(data)
  #       @shared << data
  #     end
  #   end
  #
  class Sink
    include Composable

    # Override this in sub class
    def run(data)
      data
    end

    # For internal uses.
    def run_all(data)
      @accumulated ||= [self]
      count = Parallel.processor_count
      Parallel.each(@accumulated, in_threads: count) do |sink|
        sink.run(data)
      end
    end
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
datapipes-0.1.5 lib/datapipes/sink.rb
datapipes-0.1.4 lib/datapipes/sink.rb