Sha256: 03d1ed641daddd49083979d060e711f66e701bc675a4fca8141b7901f76c1c1b
Contents?: true
Size: 1.91 KB
Versions: 1
Compression:
Stored size: 1.91 KB
Contents
require 'wisper' require 'kleisli' require 'transflow/errors' module Transflow class Publisher include Wisper::Publisher attr_reader :name attr_reader :op def self.[](name, op, options = {}) type = if options[:monadic] Monadic else self end type.new(name, op) end class Monadic < Publisher def call(*args) op.(*args) .or { |result| broadcast_failure(*args, result) and Left(result) } .>-> value { broadcast_success(value) and Right(value) } end end class Curried < Publisher attr_reader :publisher attr_reader :arity attr_reader :curry_args def initialize(publisher, curry_args = []) @publisher = publisher @arity = publisher.arity @curry_args = curry_args end def call(*args) all_args = curry_args + args if all_args.size == arity publisher.call(*all_args) else self.class.new(publisher, all_args) end end def subscribe(*args) publisher.subscribe(*args) end end def initialize(name, op) @name = name @op = op end def curry raise "can't curry publisher where operation arity is < 0" if arity < 0 Curried.new(self) end def arity op.is_a?(Proc) ? op.arity : op.method(:call).arity end def call(*args) result = op.call(*args) broadcast_success(result) result rescue StepError => err broadcast_failure(*args, err) and raise(err) end alias_method :[], :call def subscribe(listeners, *args) Array(listeners).each { |listener| super(listener, *args) } end private def broadcast_success(result) broadcast(:"#{name}_success", result) end def broadcast_failure(*args, err) broadcast(:"#{name}_failure", *args, err) end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
transflow-0.3.0 | lib/transflow/publisher.rb |