lib/arc-furnace/pipeline.rb in arc-furnace-0.1.8 vs lib/arc-furnace/pipeline.rb in arc-furnace-0.1.9

- old
+ new

@@ -106,11 +106,11 @@ resolved_params = resolve_parameters(node_id, params) key_parameters = type.instance_method(:initialize).parameters do |param| ALLOWABLE_PARAM_TYPES.include?(param.first) end.map(&:second) # Allow params to be passed that are not in the initializer - instance = type.new(resolved_params.slice(*key_parameters)) + instance = create_instance_with_error_handling(type, resolved_params, key_parameters) instance.params = resolved_params instance end end @@ -151,11 +151,11 @@ def build dsl_class.intermediates_map.each do |key, instance| intermediates_map[key] = instance_exec(&instance) if instance end - @sink_node = exec_with_error_handling(&dsl_class.sink_node) + @sink_node = instance_exec(&dsl_class.sink_node) @sink_source = intermediates_map[dsl_class.sink_source] end def resolve_parameters(node_id, params_to_resolve) params_to_resolve.each_with_object({}) do |(key, value), result| @@ -173,19 +173,13 @@ def resolve_parameter(node_id, key) self.params[key] || self.intermediates_map[key] || (raise "When processing node #{node_id}: Unknown key #{key}!") end - def exec_with_error_handling(&block) - instance_exec(&block) if block_given? + def create_instance_with_error_handling(type, resolved_params, key_parameters) + type.new(resolved_params.slice(*key_parameters)) rescue CSV::MalformedCSVError - params = sink_source.params - raise "File #{find_root_source(params).file.path} cannot be processed." - end - - def find_root_source(params) - source = params[:source] - source = params[:source] while source.params[:source] + raise "File #{resolved_params[:filename]} cannot be processed." end end end end