lib/transflow/transaction.rb in transflow-0.2.0 vs lib/transflow/transaction.rb in transflow-0.3.0
- old
+ new
@@ -1,23 +1,9 @@
-require 'transproc'
+require 'dry-pipeline'
+require 'transflow/errors'
module Transflow
- class TransactionFailedError < StandardError
- attr_reader :transaction
-
- attr_reader :original_error
-
- def initialize(transaction, original_error)
- @transaction = transaction
- @original_error = original_error
-
- super("#{transaction} failed [#{original_error.class}: #{original_error.message}]")
-
- set_backtrace(original_error.backtrace)
- end
- end
-
# Transaction encapsulates calling individual steps registered within a transflow
# constructor.
#
# It's responsible for calling steps in the right order and optionally currying
# arguments for specific steps.
@@ -25,15 +11,24 @@
# Furthermore you can subscribe event listeners to individual steps within a
# transaction.
#
# @api public
class Transaction
- # Internal function factory using Transproc extension
+ # Step wrapper object which adds `>>` operator
#
# @api private
- module Registry
- extend Transproc::Registry
+ class Step
+ include Dry::Pipeline::Mixin
+
+ # @api private
+ def self.[](op)
+ if op.respond_to?(:>>)
+ op
+ else
+ Step.new(op)
+ end
+ end
end
# @attr_reader [Hash<Symbol => Proc,#call>] steps The step map
#
# @api private
@@ -75,11 +70,15 @@
#
# @return [self]
#
# @api public
def subscribe(listeners)
- listeners.each { |step, listener| steps[step].subscribe(listener) }
+ if listeners.is_a?(Hash)
+ listeners.each { |step, listener| steps[step].subscribe(listener) }
+ else
+ steps.each { |(_, step)| step.subscribe(listeners) }
+ end
self
end
# Call the transaction
#
@@ -106,14 +105,14 @@
#
# @raises TransactionFailedError
#
# @api public
def call(input, options = {})
- handler = handler_steps(options).map(&method(:fn)).reduce(:>>)
+ handler = handler_steps(options).map(&method(:step)).reduce(:>>)
handler.call(input)
- rescue Transproc::MalformedInputError => err
- raise TransactionFailedError.new(self, err.original_error)
+ rescue StepError => err
+ raise TransactionFailedError.new(self, err)
end
alias_method :[], :call
# Coerce a transaction into string representation
#
@@ -157,14 +156,10 @@
# Wrap a proc into composable transproc function
#
# @param [#call]
#
# @api private
- def fn(obj)
- if obj.respond_to?(:>>)
- obj
- else
- Registry[obj]
- end
+ def step(obj)
+ Step[obj]
end
end
end