lib/metacrunch/job.rb in metacrunch-4.1.0 vs lib/metacrunch/job.rb in metacrunch-4.1.1

- old
+ new

@@ -1,151 +1,149 @@ -module Metacrunch - class Job - require_relative "job/dsl" - require_relative "job/buffer" +class Metacrunch::Job # http://valve.github.io/blog/2013/10/26/constant-resolution-in-ruby/ + require_relative "job/dsl" + require_relative "job/buffer" - attr_reader :dsl + attr_reader :dsl - class << self - def define(file_content = nil, &block) - self.new(file_content, &block) - end + class << self + def define(file_content = nil, &block) + self.new(file_content, &block) end + end - def initialize(file_content = nil, &block) - @dsl = Dsl.new(self) + def initialize(file_content = nil, &block) + @dsl = Dsl.new(self) - @deprecator = ActiveSupport::Deprecation.new("5.0.0", "metacrunch") + @deprecator = ActiveSupport::Deprecation.new("5.0.0", "metacrunch") - if file_content - @dsl.instance_eval(file_content, "Check your metacrunch Job at Line") - elsif block_given? - @dsl.instance_eval(&block) - end + if file_content + @dsl.instance_eval(file_content, "Check your metacrunch Job at Line") + elsif block_given? + @dsl.instance_eval(&block) end + end - def source - @source - end + def source + @source + end - def source=(source) - ensure_source!(source) - @source = source - end + def source=(source) + ensure_source!(source) + @source = source + end - def destination - @destination - end + def destination + @destination + end - def destination=(destination) - ensure_destination!(destination) - @destination = destination - end + def destination=(destination) + ensure_destination!(destination) + @destination = destination + end - def pre_process - @pre_process - end + def pre_process + @pre_process + end - def pre_process=(callable) - ensure_callable!(callable) - @pre_process = callable - end + def pre_process=(callable) + ensure_callable!(callable) + @pre_process = callable + end - def post_process - @post_process - end + def post_process + @post_process + end - def post_process=(callable) - ensure_callable!(callable) - @post_process = callable - end + def post_process=(callable) + ensure_callable!(callable) + @post_process = callable + end - def transformations - @transformations ||= [] - end + def transformations + @transformations ||= [] + end - def add_transformation(callable, buffer_size: nil, buffer: nil) - ensure_callable!(callable) + def add_transformation(callable, buffer_size: nil, buffer: nil) + ensure_callable!(callable) - if buffer_size && buffer_size.is_a?(Numeric) - @deprecator.deprecation_warning(:buffer_size, :buffer) - buffer = buffer_size - end + if buffer_size && buffer_size.is_a?(Numeric) + @deprecator.deprecation_warning(:buffer_size, :buffer) + buffer = buffer_size + end - if buffer - transformations << Metacrunch::Job::Buffer.new(buffer) - end - - transformations << callable + if buffer + transformations << Metacrunch::Job::Buffer.new(buffer) end - def run - run_pre_process + transformations << callable + end - if source - # Run transformation for each data object available in source - source.each do |data| - data = run_transformations(data) - write_destination(data) - end + def run + run_pre_process - # Run all transformations a last time to flush existing buffers - data = run_transformations(nil, flush_buffers: true) + if source + # Run transformation for each data object available in source + source.each do |data| + data = run_transformations(data) write_destination(data) - - # Close destination - destination.close if destination end - run_post_process + # Run all transformations a last time to flush existing buffers + data = run_transformations(nil, flush_buffers: true) + write_destination(data) - self + # Close destination + destination.close if destination end - private + run_post_process - def ensure_source!(object) - raise ArgumentError, "#{object} doesn't respond to #each." unless object.respond_to?(:each) - end + self + end - def ensure_destination!(object) - raise ArgumentError, "#{object} doesn't respond to #write." unless object.respond_to?(:write) - raise ArgumentError, "#{object} doesn't respond to #close." unless object.respond_to?(:close) - end +private - def ensure_callable!(object) - raise ArgumentError, "#{object} doesn't respond to #call." unless object.respond_to?(:call) - end + def ensure_source!(object) + raise ArgumentError, "#{object} doesn't respond to #each." unless object.respond_to?(:each) + end - def run_pre_process - pre_process.call if pre_process - end + def ensure_destination!(object) + raise ArgumentError, "#{object} doesn't respond to #write." unless object.respond_to?(:write) + raise ArgumentError, "#{object} doesn't respond to #close." unless object.respond_to?(:close) + end - def run_post_process - post_process.call if post_process - end + def ensure_callable!(object) + raise ArgumentError, "#{object} doesn't respond to #call." unless object.respond_to?(:call) + end - def run_transformations(data, flush_buffers: false) - transformations.each do |transformation| - if transformation.is_a?(Buffer) - buffer = transformation + def run_pre_process + pre_process.call if pre_process + end - if data - data = buffer.buffer(data) - data = buffer.flush if flush_buffers - else - data = buffer.flush - end + def run_post_process + post_process.call if post_process + end + + def run_transformations(data, flush_buffers: false) + transformations.each do |transformation| + if transformation.is_a?(Buffer) + buffer = transformation + + if data + data = buffer.buffer(data) + data = buffer.flush if flush_buffers else - data = transformation.call(data) if data + data = buffer.flush end + else + data = transformation.call(data) if data end - - data end - def write_destination(data) - destination.write(data) if destination && data - end + data + end + def write_destination(data) + destination.write(data) if destination && data end + end