lib/transflow/transaction.rb in transflow-0.2.0 vs lib/transflow/transaction.rb in transflow-0.3.0

- old
+ new

@@ -1,23 +1,9 @@ -require 'transproc' +require 'dry-pipeline' +require 'transflow/errors' module Transflow - class TransactionFailedError < StandardError - attr_reader :transaction - - attr_reader :original_error - - def initialize(transaction, original_error) - @transaction = transaction - @original_error = original_error - - super("#{transaction} failed [#{original_error.class}: #{original_error.message}]") - - set_backtrace(original_error.backtrace) - end - end - # Transaction encapsulates calling individual steps registered within a transflow # constructor. # # It's responsible for calling steps in the right order and optionally currying # arguments for specific steps. @@ -25,15 +11,24 @@ # Furthermore you can subscribe event listeners to individual steps within a # transaction. # # @api public class Transaction - # Internal function factory using Transproc extension + # Step wrapper object which adds `>>` operator # # @api private - module Registry - extend Transproc::Registry + class Step + include Dry::Pipeline::Mixin + + # @api private + def self.[](op) + if op.respond_to?(:>>) + op + else + Step.new(op) + end + end end # @attr_reader [Hash<Symbol => Proc,#call>] steps The step map # # @api private @@ -75,11 +70,15 @@ # # @return [self] # # @api public def subscribe(listeners) - listeners.each { |step, listener| steps[step].subscribe(listener) } + if listeners.is_a?(Hash) + listeners.each { |step, listener| steps[step].subscribe(listener) } + else + steps.each { |(_, step)| step.subscribe(listeners) } + end self end # Call the transaction # @@ -106,14 +105,14 @@ # # @raises TransactionFailedError # # @api public def call(input, options = {}) - handler = handler_steps(options).map(&method(:fn)).reduce(:>>) + handler = handler_steps(options).map(&method(:step)).reduce(:>>) handler.call(input) - rescue Transproc::MalformedInputError => err - raise TransactionFailedError.new(self, err.original_error) + rescue StepError => err + raise TransactionFailedError.new(self, err) end alias_method :[], :call # Coerce a transaction into string representation # @@ -157,14 +156,10 @@ # Wrap a proc into composable transproc function # # @param [#call] # # @api private - def fn(obj) - if obj.respond_to?(:>>) - obj - else - Registry[obj] - end + def step(obj) + Step[obj] end end end