lib/arc-furnace/pipeline.rb in arc-furnace-0.1.8 vs lib/arc-furnace/pipeline.rb in arc-furnace-0.1.9
- old
+ new
@@ -106,11 +106,11 @@
resolved_params = resolve_parameters(node_id, params)
key_parameters = type.instance_method(:initialize).parameters do |param|
ALLOWABLE_PARAM_TYPES.include?(param.first)
end.map(&:second)
# Allow params to be passed that are not in the initializer
- instance = type.new(resolved_params.slice(*key_parameters))
+ instance = create_instance_with_error_handling(type, resolved_params, key_parameters)
instance.params = resolved_params
instance
end
end
@@ -151,11 +151,11 @@
def build
dsl_class.intermediates_map.each do |key, instance|
intermediates_map[key] = instance_exec(&instance) if instance
end
- @sink_node = exec_with_error_handling(&dsl_class.sink_node)
+ @sink_node = instance_exec(&dsl_class.sink_node)
@sink_source = intermediates_map[dsl_class.sink_source]
end
def resolve_parameters(node_id, params_to_resolve)
params_to_resolve.each_with_object({}) do |(key, value), result|
@@ -173,19 +173,13 @@
def resolve_parameter(node_id, key)
self.params[key] || self.intermediates_map[key] || (raise "When processing node #{node_id}: Unknown key #{key}!")
end
- def exec_with_error_handling(&block)
- instance_exec(&block) if block_given?
+ def create_instance_with_error_handling(type, resolved_params, key_parameters)
+ type.new(resolved_params.slice(*key_parameters))
rescue CSV::MalformedCSVError
- params = sink_source.params
- raise "File #{find_root_source(params).file.path} cannot be processed."
- end
-
- def find_root_source(params)
- source = params[:source]
- source = params[:source] while source.params[:source]
+ raise "File #{resolved_params[:filename]} cannot be processed."
end
end
end
end