Sha256: 9bd87caa9a62d54982a88bcaca04a7f1ad965b8b571589c7ac4796cffb698952
Contents?: true
Size: 1.53 KB
Versions: 4
Compression:
Stored size: 1.53 KB
Contents
module Piglet module Relation class Cogroup # :nodoc: include Relation def initialize(relation, description) @join_fields = description.reject { |k, v| ! (k.is_a?(Relation)) } @sources = @join_fields.keys @parallel = description[:parallel] end def schema first_schema = @sources.first.schema join_fields = @join_fields[@sources.first] if join_fields.is_a?(Enumerable) && join_fields.size > 1 group_type = join_fields.map { |f| [f, first_schema.field_type[f]] } description = [[:group, :tuple, group_type]] else description = [[:group, *join_fields]] end @sources.each do |source| description << [source.alias.to_sym, Piglet::Schema::Bag.new(source.schema)] end Piglet::Schema::Tuple.parse(description) end def to_s joins = @sources.map do |s| fields = @join_fields[s] if fields.is_a?(Enumerable) && fields.size > 1 && (fields.last == :inner || fields.last == :outer) inout = fields.last.to_s.upcase fields = fields[0..-2] end if fields.is_a?(Enumerable) && fields.size > 1 str = "#{s.alias} BY (#{fields.join(', ')})" else str = "#{s.alias} BY #{fields}" end str << " #{inout}" if inout str end str = "COGROUP #{joins.join(', ')}" str << " PARALLEL #{@parallel}" if @parallel str end end end end
Version data entries
4 entries across 4 versions & 1 rubygems
Version | Path |
---|---|
piglet-0.2.4 | lib/piglet/relation/cogroup.rb |
piglet-0.2.3 | lib/piglet/relation/cogroup.rb |
piglet-0.2.2 | lib/piglet/relation/cogroup.rb |
piglet-0.2.0 | lib/piglet/relation/cogroup.rb |