lib/piglet/relation/relation.rb in piglet-0.2.5 vs lib/piglet/relation/relation.rb in piglet-0.3.0
- old
+ new
@@ -6,89 +6,105 @@
attr_reader :sources
# The name this relation will get in Pig Latin. Then name is generated when
# the relation is outputed by the interpreter, and will be unique.
def alias
- @alias ||= Relation.next_alias
+ @alias ||= @interpreter.next_relation_alias
end
+
+ def next_field_alias
+ @field_counter ||= 0
+ @field_counter += 1
+ "#{self.alias}_field_#{@field_counter}"
+ end
# GROUP
#
# x.group(:a) # => GROUP x By a
# x.group(:a, :b, :c) # => GROUP x BY (a, b, c)
# x.group([:a, :b, :c], :parallel => 3) # => GROUP x BY (a, b, c) PARALLEL 3
def group(*args)
grouping, options = split_at_options(args)
- Group.new(self, [grouping].flatten, options)
+ Group.new(self, @interpreter, [grouping].flatten, options)
end
# DISTINCT
#
# x.distinct # => DISTINCT x
# x.distinct(:parallel => 5) # => DISTINCT x PARALLEL 5
def distinct(options={})
- Distinct.new(self, options)
+ Distinct.new(self, @interpreter, options)
end
# COGROUP
#
# x.cogroup(x => :a, y => :b) # => COGROUP x BY a, y BY b
# x.cogroup(x => :a, y => :b, z => :c) # => COGROUP x BY a, y BY b, z BY c
# x.cogroup(x => [:a, :b], y => [:c, :d]) # => COGROUP x BY (a, b), y BY (c, d)
# x.cogroup(x => :a, y => [:b, :inner]) # => COGROUP x BY a, y BY b INNER
# x.cogroup(x => :a, y => :b, :parallel => 5) # => COGROUP x BY a, y BY b PARALLEL 5
def cogroup(description)
- Cogroup.new(self, description)
+ Cogroup.new(self, @interpreter, description)
end
# CROSS
#
# x.cross(y) # => CROSS x, y
# x.cross(y, z, w) # => CROSS x, y, z, w
# x.cross([y, z], :parallel => 5) # => CROSS x, y, z, w PARALLEL 5
def cross(*args)
relations, options = split_at_options(args)
- Cross.new(([self] + relations).flatten, options)
+ Cross.new(([self] + relations).flatten, @interpreter, options)
end
# FILTER
#
- # x.filter { |r| r.a == r.b } # => FILTER x BY a == b
- # x.filter { |r| r.a > r.b && r.c != 3 } # => FILTER x BY a > b AND c != 3
- def filter
- Filter.new(self, yield(self))
+ # x.filter { a == b } # => FILTER x BY a == b
+ # x.filter { a > b && c == 3 } # => FILTER x BY a > b AND c == 3
+ def filter(&block)
+ context = BlockContext.new(self, @interpreter)
+ Filter.new(self, @interpreter, context.instance_eval(&block))
end
# FOREACH ... GENERATE
#
- # x.foreach { |r| r.a } # => FOREACH x GENERATE a
- # x.foreach { |r| [r.a, r.b] } # => FOREACH x GENERATE a, b
- # x.foreach { |r| r.a.max } # => FOREACH x GENERATE MAX(a)
- # x.foreach { |r| r.a.avg.as(:b) } # => FOREACH x GENERATE AVG(a) AS b
+ # x.foreach { a } # => FOREACH x GENERATE a
+ # x.foreach { [a, b] } # => FOREACH x GENERATE a, b
+ # x.foreach { a.max } # => FOREACH x GENERATE MAX(a)
+ # x.foreach { a.avg.as(:b) } # => FOREACH x GENERATE AVG(a) AS b
#
- #--
+ # See #nested_foreach for FOREACH ... { ... GENERATE }
+ def foreach(&block)
+ context = BlockContext.new(self, @interpreter)
+ Foreach.new(self, @interpreter, context.instance_eval(&block))
+ end
+
+ # FOREACH ... { ... GENERATE }
#
- # TODO: FOREACH a { b GENERATE c }
- def foreach
- Foreach.new(self, yield(self))
+ # x.nested_foreach { [a.distinct] } # => FOREACH x { a1 = DISTINCT a; GENERATE a1 }
+ #
+ # See #foreach for FOREACH ... GENERATE
+ def nested_foreach(&block)
+ context = BlockContext.new(self, @interpreter)
+ NestedForeach.new(self, @interpreter, context.instance_eval(&block))
end
# JOIN
#
# x.join(x => :a, y => :b) # => JOIN x BY a, y BY b
# x.join(x => :a, y => :b, z => :c) # => JOIN x BY a, y BY b, z BY c
# x.join(x => :a, y => :b, :using => :replicated) # => JOIN x BY a, y BY b USING "replicated"
# x.join(x => :a, y => :b, :parallel => 5) # => JOIN x BY a, y BY b PARALLEL 5
def join(description)
- Join.new(self, description)
+ Join.new(self, @interpreter, description)
end
# LIMIT
#
# x.limit(10) # => LIMIT x 10
def limit(n)
- Limit.new(self, n)
+ Limit.new(self, @interpreter, n)
end
# ORDER
#
# x.order(:a) # => ORDER x BY a
@@ -101,44 +117,45 @@
# NOTE: the syntax x.order(:a => :asc, :b => :desc) would be nice, but in
# Ruby 1.8 the order of the keys cannot be guaranteed.
def order(*args)
fields, options = split_at_options(args)
fields = *fields
- Order.new(self, fields, options)
+ Order.new(self, @interpreter, fields, options)
end
# SAMPLE
#
# x.sample(5) # => SAMPLE x 5;
def sample(n)
- Sample.new(self, n)
+ Sample.new(self, @interpreter, n)
end
# SPLIT
#
- # y, z = x.split { |r| [r.a <= 3, r.b > 4]} # => SPLIT x INTO y IF a <= 3, z IF a > 4
- def split
- Split.new(self, yield(self)).shards
+ # y, z = x.split { [a <= 3, b > 4] } # => SPLIT x INTO y IF a <= 3, z IF a > 4
+ def split(&block)
+ context = BlockContext.new(self, @interpreter)
+ Split.new(self, @interpreter, context.instance_eval(&block)).shards
end
# STREAM
#
# x.stream(:command => 'cut -f 3') # => STREAM x THROUGH `cut -f 3`
# x.stream(:cmd) # => STREAM x THROUGH cmd
# x.stream(y, :command => 'cut -f 3') # => STREAM x, y THROUGH `cut -f 3`
# x.stream(:cmd, :schema => [%w(a int)]) # => STREAM x THROUGH cmd AS (a:int)
def stream(*args)
fields, options = split_at_options(args)
- Stream.new(self, fields, options)
+ Stream.new(self, @interpreter, fields, options)
end
# UNION
#
# x.union(y) # => UNION x, y
# x.union(y, z) # => UNION x, y, z
def union(*relations)
- Union.new(*([self] + relations))
+ Union.new(([self] + relations).flatten, @interpreter)
end
def field(name)
type = schema.field_type(name) rescue nil
Field::Reference.new(name, self, :type => type)
@@ -180,15 +197,9 @@
if parameters.last.is_a? Hash
[parameters[0..-2], parameters.last]
else
[parameters, nil]
end
- end
-
- def self.next_alias
- @counter ||= 0
- @counter += 1
- "relation_#{@counter}"
end
end
end
end
\ No newline at end of file