Sha256: 80f58e5690c1c0583fb88968f02ae4261427a4ce8ac7591de49005c56c52c946
Contents?: true
Size: 1.03 KB
Versions: 2
Compression:
Stored size: 1.03 KB
Contents
class Datapipes # Build your own sink logic in `run` method. # # Be careful each sinks are executed concurrently. # Avoid race condition in multi sinks. # # This is bad: # # $shared = [] # # class A < Datapipes::Sink # def run(data) # $shared << data # end # end # # class B < Datapipes::Sink # def run(data) # $shared << data # end # end # # On the other hand, a sink is called serially. So you can # touch shared object in one sink logic. # # This is good: # # class A < Datapipes::Source # def initialize # @shared = [] # end # # def run(data) # @shared << data # end # end # class Sink include Composable # Override this in sub class def run(data) data end # For internal uses. def run_all(data) @accumulated ||= [self] count = Parallel.processor_count Parallel.each(@accumulated, in_threads: count) do |sink| sink.run(data) end end end end
Version data entries
2 entries across 2 versions & 1 rubygems
Version | Path |
---|---|
datapipes-0.1.5 | lib/datapipes/sink.rb |
datapipes-0.1.4 | lib/datapipes/sink.rb |