lib/piglet/relation/relation.rb in piglet-0.2.5 vs lib/piglet/relation/relation.rb in piglet-0.3.0

- old
+ new

@@ -6,89 +6,105 @@ attr_reader :sources # The name this relation will get in Pig Latin. Then name is generated when # the relation is outputed by the interpreter, and will be unique. def alias - @alias ||= Relation.next_alias + @alias ||= @interpreter.next_relation_alias end + + def next_field_alias + @field_counter ||= 0 + @field_counter += 1 + "#{self.alias}_field_#{@field_counter}" + end # GROUP # # x.group(:a) # => GROUP x By a # x.group(:a, :b, :c) # => GROUP x BY (a, b, c) # x.group([:a, :b, :c], :parallel => 3) # => GROUP x BY (a, b, c) PARALLEL 3 def group(*args) grouping, options = split_at_options(args) - Group.new(self, [grouping].flatten, options) + Group.new(self, @interpreter, [grouping].flatten, options) end # DISTINCT # # x.distinct # => DISTINCT x # x.distinct(:parallel => 5) # => DISTINCT x PARALLEL 5 def distinct(options={}) - Distinct.new(self, options) + Distinct.new(self, @interpreter, options) end # COGROUP # # x.cogroup(x => :a, y => :b) # => COGROUP x BY a, y BY b # x.cogroup(x => :a, y => :b, z => :c) # => COGROUP x BY a, y BY b, z BY c # x.cogroup(x => [:a, :b], y => [:c, :d]) # => COGROUP x BY (a, b), y BY (c, d) # x.cogroup(x => :a, y => [:b, :inner]) # => COGROUP x BY a, y BY b INNER # x.cogroup(x => :a, y => :b, :parallel => 5) # => COGROUP x BY a, y BY b PARALLEL 5 def cogroup(description) - Cogroup.new(self, description) + Cogroup.new(self, @interpreter, description) end # CROSS # # x.cross(y) # => CROSS x, y # x.cross(y, z, w) # => CROSS x, y, z, w # x.cross([y, z], :parallel => 5) # => CROSS x, y, z, w PARALLEL 5 def cross(*args) relations, options = split_at_options(args) - Cross.new(([self] + relations).flatten, options) + Cross.new(([self] + relations).flatten, @interpreter, options) end # FILTER # - # x.filter { |r| r.a == r.b } # => FILTER x BY a == b - # x.filter { |r| r.a > r.b && r.c != 3 } # => FILTER x BY a > b AND c != 3 - def filter - Filter.new(self, yield(self)) + # x.filter { a == b } # => FILTER x BY a == b + # x.filter { a > b && c == 3 } # => FILTER x BY a > b AND c == 3 + def filter(&block) + context = BlockContext.new(self, @interpreter) + Filter.new(self, @interpreter, context.instance_eval(&block)) end # FOREACH ... GENERATE # - # x.foreach { |r| r.a } # => FOREACH x GENERATE a - # x.foreach { |r| [r.a, r.b] } # => FOREACH x GENERATE a, b - # x.foreach { |r| r.a.max } # => FOREACH x GENERATE MAX(a) - # x.foreach { |r| r.a.avg.as(:b) } # => FOREACH x GENERATE AVG(a) AS b + # x.foreach { a } # => FOREACH x GENERATE a + # x.foreach { [a, b] } # => FOREACH x GENERATE a, b + # x.foreach { a.max } # => FOREACH x GENERATE MAX(a) + # x.foreach { a.avg.as(:b) } # => FOREACH x GENERATE AVG(a) AS b # - #-- + # See #nested_foreach for FOREACH ... { ... GENERATE } + def foreach(&block) + context = BlockContext.new(self, @interpreter) + Foreach.new(self, @interpreter, context.instance_eval(&block)) + end + + # FOREACH ... { ... GENERATE } # - # TODO: FOREACH a { b GENERATE c } - def foreach - Foreach.new(self, yield(self)) + # x.nested_foreach { [a.distinct] } # => FOREACH x { a1 = DISTINCT a; GENERATE a1 } + # + # See #foreach for FOREACH ... GENERATE + def nested_foreach(&block) + context = BlockContext.new(self, @interpreter) + NestedForeach.new(self, @interpreter, context.instance_eval(&block)) end # JOIN # # x.join(x => :a, y => :b) # => JOIN x BY a, y BY b # x.join(x => :a, y => :b, z => :c) # => JOIN x BY a, y BY b, z BY c # x.join(x => :a, y => :b, :using => :replicated) # => JOIN x BY a, y BY b USING "replicated" # x.join(x => :a, y => :b, :parallel => 5) # => JOIN x BY a, y BY b PARALLEL 5 def join(description) - Join.new(self, description) + Join.new(self, @interpreter, description) end # LIMIT # # x.limit(10) # => LIMIT x 10 def limit(n) - Limit.new(self, n) + Limit.new(self, @interpreter, n) end # ORDER # # x.order(:a) # => ORDER x BY a @@ -101,44 +117,45 @@ # NOTE: the syntax x.order(:a => :asc, :b => :desc) would be nice, but in # Ruby 1.8 the order of the keys cannot be guaranteed. def order(*args) fields, options = split_at_options(args) fields = *fields - Order.new(self, fields, options) + Order.new(self, @interpreter, fields, options) end # SAMPLE # # x.sample(5) # => SAMPLE x 5; def sample(n) - Sample.new(self, n) + Sample.new(self, @interpreter, n) end # SPLIT # - # y, z = x.split { |r| [r.a <= 3, r.b > 4]} # => SPLIT x INTO y IF a <= 3, z IF a > 4 - def split - Split.new(self, yield(self)).shards + # y, z = x.split { [a <= 3, b > 4] } # => SPLIT x INTO y IF a <= 3, z IF a > 4 + def split(&block) + context = BlockContext.new(self, @interpreter) + Split.new(self, @interpreter, context.instance_eval(&block)).shards end # STREAM # # x.stream(:command => 'cut -f 3') # => STREAM x THROUGH `cut -f 3` # x.stream(:cmd) # => STREAM x THROUGH cmd # x.stream(y, :command => 'cut -f 3') # => STREAM x, y THROUGH `cut -f 3` # x.stream(:cmd, :schema => [%w(a int)]) # => STREAM x THROUGH cmd AS (a:int) def stream(*args) fields, options = split_at_options(args) - Stream.new(self, fields, options) + Stream.new(self, @interpreter, fields, options) end # UNION # # x.union(y) # => UNION x, y # x.union(y, z) # => UNION x, y, z def union(*relations) - Union.new(*([self] + relations)) + Union.new(([self] + relations).flatten, @interpreter) end def field(name) type = schema.field_type(name) rescue nil Field::Reference.new(name, self, :type => type) @@ -180,15 +197,9 @@ if parameters.last.is_a? Hash [parameters[0..-2], parameters.last] else [parameters, nil] end - end - - def self.next_alias - @counter ||= 0 - @counter += 1 - "relation_#{@counter}" end end end end \ No newline at end of file