lib/arc-furnace/pipeline.rb in arc-furnace-0.1.7 vs lib/arc-furnace/pipeline.rb in arc-furnace-0.1.8

- old
+ new

@@ -151,11 +151,11 @@ def build dsl_class.intermediates_map.each do |key, instance| intermediates_map[key] = instance_exec(&instance) if instance end - @sink_node = instance_exec(&dsl_class.sink_node) + @sink_node = exec_with_error_handling(&dsl_class.sink_node) @sink_source = intermediates_map[dsl_class.sink_source] end def resolve_parameters(node_id, params_to_resolve) params_to_resolve.each_with_object({}) do |(key, value), result| @@ -173,8 +173,19 @@ def resolve_parameter(node_id, key) self.params[key] || self.intermediates_map[key] || (raise "When processing node #{node_id}: Unknown key #{key}!") end + def exec_with_error_handling(&block) + instance_exec(&block) if block_given? + rescue CSV::MalformedCSVError + params = sink_source.params + raise "File #{find_root_source(params).file.path} cannot be processed." + end + + def find_root_source(params) + source = params[:source] + source = params[:source] while source.params[:source] + end end end end