lib/em-dextras/chains.rb in em-dextras-0.2.0 vs lib/em-dextras/chains.rb in em-dextras-0.3.0
- old
+ new
@@ -1,78 +1,190 @@
module EMDextras
module Chains
+
+ class JoinedDeferrable
+ include EventMachine::Deferrable
+
+ def initialize(deferrables)
+ result_pairs = deferrables.map do |deferrable|
+ [deferrable, :unset]
+ end
+ @results = Hash[result_pairs]
+ @callback_values = []
+ @errback_values = []
+
+ initialize_deferrables!
+ end
+
+ def one_callback(*vs)
+ deferrable, *values = vs
+ @results[deferrable] = :ok
+ @callback_values.push *values
+
+ check_if_complete
+ end
+
+ def one_errback(*vs)
+ deferrable, *values = vs
+ @results[deferrable] = :error
+ @errback_values.push *values
+
+ check_if_complete
+ end
+
+ private
+
+ def check_if_complete
+ complete! unless any_was?(:unset)
+ end
+
+ def complete!
+ (self.fail(@errback_values); return) if any_was?(:error)
+ self.succeed(@callback_values)
+ end
+
+ def any_was?(state)
+ @results.any? {|k, v| v == state }
+ end
+
+ def initialize_deferrables!
+ ds = @results.keys
+
+ ds.each do |deferrable|
+ deferrable.callback do |*values|
+ self.one_callback deferrable, *values
+ end
+ deferrable.errback do |*values|
+ self.one_errback deferrable, *values
+ end
+ end
+
+ ds.each do |d|
+ d.timeout(5, "Expired timeout of #{5} for #{d.inspect}")
+ end
+ end
+ end
+
module Deferrables
def self.succeeded(*args)
deferrable = EventMachine::DefaultDeferrable.new
deferrable.succeed(*args)
- deferrable
- end
+ deferrable end
def self.failed(*args)
deferrable = EventMachine::DefaultDeferrable.new
deferrable.fail(*args)
deferrable
end
end
- PipeSetup = Struct.new(:monitoring, :options) do
+ PipeSetup = Struct.new(:monitoring, :options, :result) do
def inform_exception!(error_value, stage)
- self.monitoring.inform_exception! error_value, stage
+ if options[:context]
+ self.monitoring.inform_exception! error_value, stage, options[:context]
+ else
+ self.monitoring.inform_exception! error_value, stage
+ end
end
end
def self.pipe(zero, monitoring, stages, options = {})
- run_chain zero, stages, PipeSetup.new(monitoring, options)
+ result = create_chain_result(monitoring, options)
+ run_chain zero, stages, PipeSetup.new(monitoring, options, result)
end
+ def self.create_chain_result(monitoring, options)
+ EventMachine::DefaultDeferrable.new.
+ tap {|d| d.callback { |value| notify_end_of_chain!(value, monitoring, options) }}.
+ tap {|d| d.errback { |value| notify_end_of_chain!(value, monitoring, options) }}
+ end
+
def self.run_chain input, stages, pipe_setup
- return if stages.empty?
+ return pipe_setup.result.succeed(input) if stages.empty? || input.nil?
stage, *rest = *stages
- puts "Running #{stage}(#{input})" if pipe_setup.options[:debug]
-
if stage == :split
split_chain(input, rest, pipe_setup)
- return
+ return pipe_setup.result
end
-
deferrable = call(stage, input, pipe_setup)
+ check_stage_is_well_behaved!(deferrable, stage, input, deferrable)
deferrable.callback do |value|
run_chain value, rest, pipe_setup
end
deferrable.errback do |error_value|
pipe_setup.inform_exception! error_value, stage
+ pipe_setup.result.fail(error_value)
end
+
+ pipe_setup.result
end
private
+
+ def self.check_stage_is_well_behaved!(deferrable, stage, input, value)
+ unless deferrable.respond_to?(:callback) && deferrable.respond_to?(:errback)
+ raise InvalidStage, "Stage '#{stage.class.name}' did not return a deferrable object when given input '#{input.to_s[0..10]}', instead it returned '#{value}'!"
+ end
+ end
+
def self.split_chain input, rest, pipe_setup
new_options = pipe_setup.options.clone
context = new_options[:context]
if context && context.respond_to?(:split)
- new_options[:context] = context.split
+ new_options[:context] = context.split
end
- new_pipe_setup = PipeSetup.new(pipe_setup.monitoring, new_options)
+ rest_of_chain = rest
unless input.respond_to? :each
pipe_setup.inform_exception! ArgumentError.new(":split stage expects enumerable input. \"#{input}\" is not enumerable."), :split
return
end
- input.each do |value|
- run_chain value, rest, new_pipe_setup
+
+ splits_deferrables = input.map do |value|
+ split_result = EventMachine::DefaultDeferrable.new
+ new_pipe_setup = PipeSetup.new(pipe_setup.monitoring, new_options, split_result)
+ run_chain value, rest_of_chain, new_pipe_setup
+
+ split_result
end
+
+ join = JoinedDeferrable.new(splits_deferrables)
+ join.callback do |*values|
+ pipe_setup.result.succeed(*values)
+ end
+ join.errback do |*values|
+ pipe_setup.result.fail(*values)
+ end
end
def self.call(stage, input, pipe_setup)
todo_method = stage.method(:todo)
- case todo_method.arity
- when 1
+ arity = todo_method.arity
+ if arity < 0 && pipe_setup.options[:context]
+ stage.todo(input, pipe_setup.options[:context])
+ elsif arity < 0 || arity == 1
stage.todo(input)
- when 2
+ elsif arity == 2
stage.todo(input, pipe_setup.options[:context])
end
+ end
+
+ def self.notify_end_of_chain!(value, monitoring, options)
+ context = options[:context]
+
+ if monitoring.respond_to? :end_of_chain!
+ if context
+ monitoring.end_of_chain!(value, context)
+ else
+ monitoring.end_of_chain!(value)
+ end
+ end
+ end
+
+ class InvalidStage < Exception
end
end
end