lib/mongo/collection/view/map_reduce.rb in mongo-2.1.0.beta vs lib/mongo/collection/view/map_reduce.rb in mongo-2.1.0.rc0
- old
+ new
@@ -23,10 +23,11 @@
class MapReduce
extend Forwardable
include Enumerable
include Immutable
include Iterable
+ include Loggable
# @return [ View ] view The collection view.
attr_reader :view
# @return [ String ] map The map function.
@@ -156,46 +157,50 @@
:read => read,
:selector => {
:mapreduce => collection.name,
:map => map,
:reduce => reduce,
- :query => view.selector[:$query] || view.selector,
+ :query => view.modifiers[:$query] || view.selector,
:out => { inline: 1 }
- }.merge(options).merge(view.options)
+ }.merge(options).merge(view_options)
}
end
+ def view_options
+ view.sort ? view.options.merge(:sort => view.sort) : view.options
+ end
+
def new(options)
MapReduce.new(view, map, reduce, options)
end
def initial_query_op
Operation::MapReduce.new(map_reduce_spec)
end
+ def valid_server?(server)
+ server.standalone? || server.mongos? || server.primary? || secondary_ok?
+ end
+
+ def secondary_ok?
+ out.respond_to?(:keys) &&
+ out.keys.first.to_s.downcase == 'inline'
+ end
+
def send_initial_query(server)
- result =
- begin
- initial_query_op.execute(server.context)
- rescue Mongo::Error::NeedPrimaryServer
- log_warn([
- 'Rerouting the MapReduce operation to the primary server.'
- ])
- server = ServerSelector.get(mode: :primary).select_server(cluster)
- initial_query_op.execute(server.context)
- end
- if inline?
- result
- else
- send_fetch_query(server)
+ unless valid_server?(server)
+ log_warn('Rerouting the MapReduce operation to the primary server.')
+ server = cluster.next_primary
end
+ result = initial_query_op.execute(server.context)
+ inline? ? result : send_fetch_query(server)
end
def fetch_query_spec
{ :selector => {},
:options => {},
:db_name => database.name,
- :coll_name => out.values.first }
+ :coll_name => out.respond_to?(:keys) ? out.values.first : out }
end
def fetch_query_op
Operation::Read::Query.new(fetch_query_spec)
end