Sha256: 03d1ed641daddd49083979d060e711f66e701bc675a4fca8141b7901f76c1c1b

Contents?: true

Size: 1.91 KB

Versions: 1

Compression:

Stored size: 1.91 KB

Contents

require 'wisper'
require 'kleisli'

require 'transflow/errors'

module Transflow
  class Publisher
    include Wisper::Publisher

    attr_reader :name

    attr_reader :op

    def self.[](name, op, options = {})
      type =
        if options[:monadic]
          Monadic
        else
          self
        end
      type.new(name, op)
    end

    class Monadic < Publisher
      def call(*args)
        op.(*args)
          .or { |result| broadcast_failure(*args, result) and Left(result) }
          .>-> value { broadcast_success(value) and Right(value) }
      end
    end

    class Curried < Publisher
      attr_reader :publisher

      attr_reader :arity

      attr_reader :curry_args

      def initialize(publisher, curry_args = [])
        @publisher = publisher
        @arity = publisher.arity
        @curry_args = curry_args
      end

      def call(*args)
        all_args = curry_args + args

        if all_args.size == arity
          publisher.call(*all_args)
        else
          self.class.new(publisher, all_args)
        end
      end

      def subscribe(*args)
        publisher.subscribe(*args)
      end
    end

    def initialize(name, op)
      @name = name
      @op = op
    end

    def curry
      raise "can't curry publisher where operation arity is < 0" if arity < 0
      Curried.new(self)
    end

    def arity
      op.is_a?(Proc) ? op.arity : op.method(:call).arity
    end

    def call(*args)
      result = op.call(*args)
      broadcast_success(result)
      result
    rescue StepError => err
      broadcast_failure(*args, err) and raise(err)
    end
    alias_method :[], :call

    def subscribe(listeners, *args)
      Array(listeners).each { |listener| super(listener, *args) }
    end

    private

    def broadcast_success(result)
      broadcast(:"#{name}_success", result)
    end

    def broadcast_failure(*args, err)
      broadcast(:"#{name}_failure", *args, err)
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
transflow-0.3.0 lib/transflow/publisher.rb