require "enumerator" require "stringio" require "set" require "alf/version" require "alf/loader" # # Classy data-manipulation dressed in a DSL (+ commandline) # module Alf # # Provides tooling methods that are used here and there in Alf. # module Tools # # Returns the unqualified name of a ruby class or module # # Example # # class_name(Alf::Tools) -> :Tools # def class_name(clazz) clazz.name.to_s =~ /([A-Za-z0-9_]+)$/ $1.to_sym end # # Converts an unqualified class or module name to a ruby case method name. # # Example # # ruby_case(:Alf) -> "alf" # ruby_case(:HelloWorld) -> "hello_world" # def ruby_case(s) s.to_s.gsub(/[A-Z]/){|x| "_#{x.downcase}"}[1..-1] end # # Returns the first non nil values from arguments # # Example # # coalesce(nil, 1, "abc") -> 1 # def coalesce(*args) args.find{|x| !x.nil?} end # # Iterates over enum and yields the block on each element. # Collect block results as key/value pairs returns them as # a Hash. # def tuple_collect(enum) tuple = {} enum.each do |elm| k, v = yield(elm) tuple[k] = v end tuple end # # Provides a handle, implementing a flyweight design pattern on tuples. # class TupleHandle # Creates an handle instance def initialize @tuple = nil end # # Sets the next tuple to use. # # This method installs the handle as a side effect # on first call. # def set(tuple) build(tuple) if @tuple.nil? @tuple = tuple self end # # Compiles a tuple expression and returns a lambda # instance that can be passed to evaluate later. # def self.compile(expr) case expr when Proc expr when NilClass compile('true') when Hash if expr.empty? compile(nil) else # TODO: replace inspect by to_ruby compile expr.each_pair.collect{|k,v| "(#{k} == #{v.inspect})" }.join(" && ") end when Array compile(Hash[*expr]) when String, Symbol eval("lambda{ #{expr} }") else raise ArgumentError, "Unable to compile #{expr} to a TupleHandle" end end # # Evaluates an expression on the current tuple. Expression # can be a lambda or a string (immediately compiled in the # later case). # def evaluate(expr) if RUBY_VERSION < "1.9" instance_eval(&TupleHandle.compile(expr)) else instance_exec(&TupleHandle.compile(expr)) end end private # # Builds this handle with a tuple. # # This method should be called only once and installs # instance methods on the handle with keys of _tuple_. # def build(tuple) tuple.keys.each do |k| (class << self; self; end).send(:define_method, k) do @tuple[k] end end end end # class TupleHandle # # Defines a projection key # class ProjectionKey include Tools # Projection attributes attr_accessor :attributes # Allbut projection? attr_accessor :allbut def initialize(attributes, allbut = false) @attributes = attributes @allbut = allbut end def self.coerce(arg) case arg when Array ProjectionKey.new(arg, false) when OrderingKey ProjectionKey.new(arg.attributes, false) when ProjectionKey arg else raise ArgumentError, "Unable to coerce #{arg} to a projection key" end end def to_ordering_key OrderingKey.new attributes.collect{|arg| [arg, :asc] } end def project(tuple) split(tuple).first end def split(tuple) projection, rest = {}, tuple.dup attributes.each do |a| projection[a] = tuple[a] rest.delete(a) end @allbut ? [rest, projection] : [projection, rest] end end # class ProjectionKey # # Encapsulates tools for computing orders on tuples # class OrderingKey attr_reader :ordering def initialize(ordering = []) @ordering = ordering @sorter = nil end def self.coerce(arg) case arg when Array if arg.all?{|a| a.is_a?(Symbol)} arg = arg.collect{|a| [a, :asc]} end OrderingKey.new(arg) when ProjectionKey arg.to_ordering_key when OrderingKey arg else raise ArgumentError, "Unable to coerce #{arg} to an ordering key" end end def attributes @ordering.collect{|arg| arg.first} end def order_by(attr, order = :asc) @ordering << [attr, order] @sorter = nil self end def order_of(attr) @ordering.find{|arg| arg.first == attr}.last end def compare(t1,t2) @ordering.each do |attr,order| x, y = t1[attr], t2[attr] comp = x.respond_to?(:<=>) ? (x <=> y) : (x.to_s <=> y.to_s) comp *= -1 if order == :desc return comp unless comp == 0 end return 0 end def sorter @sorter ||= lambda{|t1,t2| compare(t1, t2)} end def +(other) other = OrderingKey.coerce(other) OrderingKey.new(@ordering + other.ordering) end end # class OrderingKey extend Tools end # module Tools # # Builds and returns a lispy engine on a specific environment. # # Example(s): # # # Returns a lispy instance on the default environment # lispy = Alf.lispy # # # Returns a lispy instance on the examples' environment # lispy = Alf.lispy(Alf::Environment.examples) # # # Returns a lispy instance on a folder environment of your choice # lispy = Alf.lispy(Alf::Environment.folder('path/to/a/folder')) # # @see Alf::Environment about available environments and their contract # def self.lispy(env = Alf::Environment.default) Command::Main.new(env) end # # Encapsulates the interface with the outside world, providing base iterators # for named datasets, among others. # # An environment is typically obtained through the factory defined by this # class: # # # Returns the default environment (examples, for now) # Alf::Environment.default # # # Returns an environment on Alf's examples # Alf::Environment.examples # # # Returns an environment on a specific folder, automatically # # resolving datasources via Readers' recognized file extensions # Alf::Environment.folder('path/to/a/folder') # # You can implement your own environment by subclassing this class and # implementing the {#dataset} method. As additional support is implemented # in the base class, Environment should never be mimiced. # class Environment # # Returns a dataset whose name is provided. # # This method resolves named datasets to tuple enumerables. When the # dataset exists, this method must return an Iterator, typically a # Reader instance. Otherwise, it must throw a NoSuchDatasetError. # # @param [Symbol] name the name of a dataset # @return [Iterator] an iterator, typically a Reader instance # @raise [NoSuchDatasetError] when the dataset does not exists # def dataset(name) end undef :dataset # # Branches this environment and puts some additional explicit # definitions. # # This method is provided for (with ...) expressions and should not # be overriden by subclasses. # # @param [Hash] a set of (name, Iterator) pairs. # @return [Environment] an environment instance with new definitions set # def branch(defs) Explicit.new(defs, self) end # # Specialization of Environment that works with explicitely defined # datasources and allow branching and unbranching. # class Explicit < Environment # # Creates a new environment instance with initial definitions # and optional child environment. # def initialize(defs = {}, child = nil) @defs = defs @child = child end # # Unbranches this environment and returns its child # def unbranch @child end # (see Environment#dataset) def dataset(name) if @defs.has_key?(name) @defs[name] elsif @child @child.dataset(name) else raise "No such dataset #{name}" end end end # class Explicit # # Specialization of Environment to work on files of a given folder. # # This kind of environment resolves datasets by simply looking at # recognized files in a specific folder. "Recognized" files are simply # those for which a Reader subclass has been previously registered. # This environment then serves reader instances. # class Folder < Environment # # Creates an environment instance, wired to the specified folder. # # @param [String] folder path to the folder to use as dataset source # def initialize(folder) @folder = folder end # (see Environment#dataset) def dataset(name) if file = find_file(name) Reader.reader(file, self) else raise "No such dataset #{name} (#{@folder})" end end protected def find_file(name) # TODO: refactor this, because it allows getting out of the folder if File.exists?(name.to_s) name.to_s elsif File.exists?(explicit = File.join(@folder, name.to_s)) && File.file?(explicit) explicit else Dir[File.join(@folder, "#{name}.*")].find do |f| File.file?(f) end end end end # class Folder # # Factors a Folder environment on a specific path # def self.folder(path) Folder.new(path) end # # Returns the default environment # def self.default examples end # # Returns the examples environment # def self.examples folder File.expand_path('../../examples', __FILE__) end end # class Environment # # Marker module for all elements implementing tuple iterators. # # At first glance, an iterator is nothing else than an Enumerable that serves # tuples (represented by ruby hashes). However, this module helps Alf's internal # classes to recognize enumerables that may safely be considered as tuple # iterators from other enumerables. For this reason, all elements that would # like to participate to an iteration chain (that is, an logical operator # implementation) should be marked with this module. This is the case for # all Readers and Operators defined in Alf. # # Moreover, an Iterator should always define a {#pipe} method, which is the # natural way to define the input and execution environment of operators and # readers. # module Iterator include Enumerable # # Wire the iterator input and an optional execution environment. # # Iterators (typically Reader and Operator instances) work from input data # that come from files, or other operators, and so on. This method wires # this input data to the iterator. Wiring is required before any attempt # to call each, unless autowiring occurs at construction. The exact kind of # input object is left at discretion of Iterator implementations. # # @param [Object] input the iterator input, at discretion of the Iterator # implementation. # @param [Environment] environment an optional environment for resolving # named datasets if needed. # @return [Object] self # def pipe(input, environment = nil) self end undef :pipe # # Coerces something to an iterator # def self.coerce(arg, environment = nil) case arg when Iterator, Array arg else Reader.coerce(arg, environment) end end # # Converts this iterator to an in-memory Relation. # # @return [Relation] a relation instance, as the set of tuples # that would be yield by this iterator. # def to_rel Relation::coerce(self) end end # module Iterator # # Implements an Iterator at the interface with the outside world. # # The contrat of a Reader is simply to be an Iterator. Unlike operators, # however, readers are not expected to take other iterators as input, but IO # objects, database tables, or something similar instead. This base class # provides a default behavior for readers that works with IO objects. It can # be safely extended, overriden, or even mimiced (provided that you include # and implement the Iterator contract). # # This class also provides a registration mechanism to help getting Reader # instances for specific file extensions. A typical scenario for using this # registration mechanism is as follows: # # # Registers a reader kind named :foo, associated with ".foo" file # # extensions and the FooFileDecoder class (typically a subclass of # # Reader) # Reader.register(:foo, [".foo"], FooFileDecoder) # # # Later on, you can request a reader instance for a .foo file, as # # illustrated below. # r = Reader.reader('/a/path/to/a/file.foo') # # # Also, a factory method is automatically installed on the Reader class # # itself. This factory method can be used with a String, or an IO object. # r = Reader.foo([a path or a IO object]) # class Reader include Iterator # Registered readers @@readers = [] # # Registers a reader class associated with specific file extensions # # Registered class must provide a constructor with the following signature # new(path_or_io, environment = nil). The name must be a symbol # which can safely be used as a ruby method name. A factory class method of # that name and same signature is automatically installed on the Reader # class. # # @param [Symbol] name a name for the kind of data decoded # @param [Array] extensions file extensions mapped to the registered reader # class (should include the '.', e.g. '.foo') # @param [Class] class Reader subclass used to decode this kind of files # def self.register(name, extensions, clazz) @@readers << [name, extensions, clazz] (class << self; self; end). send(:define_method, name) do |*args| clazz.new(*args) end end # # When filepath is a String, returns a reader instance for a specific file # whose path is given as argument. Otherwise, delegate the call to # coerce(filepath) # # @param [String] filepath path to a file for which extension is recognized # @param [Array] args optional additional arguments that must be passed at # reader's class new method. # @return [Reader] a reader instance # def self.reader(filepath, *args) if filepath.is_a?(String) ext = File.extname(filepath) if registered = @@readers.find{|r| r[1].include?(ext)} registered[2].new(filepath, *args) else raise "No registered reader for #{ext} (#{filepath})" end else coerce(filepath) end end # # Coerces an argument to a reader, using an optional environment to convert # named datasets. # # This method automatically provides readers for Strings and Symbols through # passed environment (**not** through the reader factory) and for IO objects # (through Rash reader). It is part if Alf's internals and should be used # with care. # def self.coerce(arg, environment = nil) case arg when Reader arg when IO rash(arg, environment) when String, Symbol if environment environment.dataset(arg.to_sym) else raise "No environment set" end else raise ArgumentError, "Unable to coerce #{arg.inspect} to a reader" end end # @return [Environment] Wired environment attr_accessor :environment # @return [String or IO] Input IO, or file name attr_accessor :input # # Creates a reader instance, with an optional input and environment wiring. # # @param [String or IO] path to a file or IO object for input # @param [Environment] environment wired environment, serving this reader # def initialize(input = nil, environment = nil) @input = input @environment = environment end # # (see Iterator#pipe) # def pipe(input, env = environment) @input = input self end # # (see Iterator#each) # # @private the default implementation reads lines of the input stream and # yields the block with line2tuple(line) on each of them. This # method may be overriden if this behavior does not fit reader's needs. # def each each_input_line do |line| tuple = line2tuple(line) yield tuple unless tuple.nil? end end protected # # Returns the input file path, or nil if this Reader is bound to an IO # directly. # def input_path input.is_a?(String) ? input : nil end # # Coerces the input object to an IO and yields the block with it. # # StringIO and IO input are yield directly while file paths are first # opened in read mode and then yield. # def with_input_io case input when IO, StringIO yield input when String File.open(input, 'r'){|io| yield io} else raise "Unable to convert #{input} to an IO object" end end # # Returns the whole input text. # # This feature should only be used by subclasses on inputs that are # small enough to fit in memory. Consider implementing readers without this # feature on files that could be larger. # def input_text with_input_io{|io| io.readlines.join} end # # Yields the block with each line of the input text in turn. # # This method is an helper for files that capture one tuple on each input # line. It should be used in those cases, as the resulting reader will not # load all input in memory but serve tuples on demand. # def each_input_line with_input_io{|io| io.each_line(&Proc.new)} end # # Converts a line previously read from the input stream to a tuple. # # The line is simply ignored is this method return nil. Errors should be # properly handled by raising exceptions. This method MUST be implemented # by subclasses unless each is overriden. # def line2tuple(line) end undef :line2tuple # # Specialization of the Reader contract for .rash files. # # A .rash file/stream contains one ruby hash literal on each line. This # reader simply decodes each of them in turn with Kernel.eval, providing a # state-less reader (that is, tuples are not all loaded in memory at once). # class Rash < Reader # (see Reader#line2tuple) def line2tuple(line) begin h = Kernel.eval(line) raise "hash expected, got #{h}" unless h.is_a?(Hash) rescue Exception => ex $stderr << "Skipping #{line.strip}: #{ex.message}\n" nil else return h end end Reader.register(:rash, [".rash"], self) end # class Rash # # Specialization of the Reader contrat for .alf files. # # A .alf file simply contains a query expression in the Lispy DSL. This # reader decodes and compiles the expression and delegates the enumeration # to the obtained operator. # # Note that an Environment must be wired at creation or piping time. # NoSuchDatasetError will certainly occur otherwise. # class AlfFile < Reader # (see Reader#each) def each op = Alf.lispy(environment).compile(input_text, input_path) op.each(&Proc.new) end Reader.register(:alf, [".alf"], self) end # module AlfFile end # module Reader # # Renders a relation (given by any Iterator) in a specific format. # # A renderer takes an Iterator instance as input and renders it on an output # stream. Renderers are **not** iterators themselves, even if they mimic the # {#pipe} method. Their usage is made via the {#execute} method. # # Similarly to the {Reader} class, this one provides a registration mechanism # for specific output formats. The common scenario is as follows: # # # Register a new renderer for :foo format (automatically provides the # # '--foo Render output as a foo stream' option of 'alf show') and with # # the FooRenderer class for handling rendering. # Renderer.register(:foo, "as a foo stream", FooRenderer) # # # Later on, you can request a renderer instance for a specific format # # as follows (wiring input is optional) # r = Renderer.renderer(:foo, [an Iterator]) # # # Also, a factory method is automatically installed on the Renderer class # # itself. # r = Renderer.foo([an Iterator]) # class Renderer # Registered renderers @@renderers = [] # # Register a renderering class with a given name and description. # # Registered class must at least provide a constructor with an empty # signature. The name must be a symbol which can safely be used as a ruby # method name. A factory class method of that name and degelation signature # is automatically installed on the Renderer class. # # @param [Symbol] name a name for the output format # @param [String] description an output format description (for 'alf show') # @param [Class] clazz Renderer subclass used to render in this format # def self.register(name, description, clazz) @@renderers << [name, description, clazz] (class << self; self; end). send(:define_method, name) do |*args| clazz.new(*args) end end # # Returns a Renderer instance for the given output format name. # # @param [Symbol] name name of an output format previously registered # @param [...] args other arguments to pass to the renderer constructor # @return [Renderer] a Renderer instance, already wired if args are # provided # def self.renderer(name, *args) if r = @@renderers.find{|triple| triple[0] == name} r[2].new(*args) else raise "No renderer registered for #{name}" end end # # Yields each (name,description,clazz) previously registered in turn # def self.each_renderer @@renderers.each(&Proc.new) end # Renderer input (typically an Iterator) attr_accessor :input # @return [Environment] Optional wired environment attr_accessor :environment # # Creates a renderer instance, optionally wired to an input # def initialize(input = nil) @input = input end # # Sets the renderer input. # # This method mimics {Iterator#pipe} and have the same contract. # def pipe(input, env = environment) self.environment = env self.input = input self end # # Executes the rendering, outputting the resulting tuples on the provided # output buffer. # # The default implementation simply coerces the input as an Iterator and # delegates the call to {#render}. # def execute(output = $stdout) render(Iterator.coerce(input, environment), output) end protected # # Renders tuples served by the iterator to the output buffer provided and # returns the latter. # # This method must be implemented by subclasses unless {#execute} is # overriden. # def render(iterator, output) end undef :render # # Implements the Renderer contract through inspect # class Rash < Renderer # (see Renderer#render) def render(input, output) input.each do |tuple| output << tuple.inspect << "\n" end output end Renderer.register(:rash, "as ruby hashes", self) end # class Rash require "alf/renderer/text" require "alf/renderer/yaml" end # module Renderer # # Provides a factory over Alf operators and handles the interface with # Quickl for commandline support. # # This module is part of Alf's internal architecture and should not be used # at all by third-party projects. # module Factory # @see Quickl::Command def Command(file, line) Quickl::Command(file, line){|builder| builder.command_parent = Alf::Command::Main yield(builder) if block_given? } end # @see Operator def Operator(file, line) Command(file, line) do |b| b.instance_module Alf::Operator end end extend Factory end # module Factory # # Marker module and namespace for Alf main commands, those that are **not** # operators at all. # module Command # # alf - Classy data-manipulation dressed in a DSL (+ commandline) # # SYNOPSIS # alf [--version] [--help] # alf -e '(lispy command)' # alf [FILE.alf] # alf [alf opts] OPERATOR [operator opts] ARGS ... # alf help OPERATOR # # OPTIONS # #{summarized_options} # # RELATIONAL COMMANDS # #{summarized_subcommands subcommands.select{|cmd| # cmd.include?(Alf::Operator::Relational) # }} # # NON-RELATIONAL COMMANDS # #{summarized_subcommands subcommands.select{|cmd| # cmd.include?(Alf::Operator::NonRelational) # }} # # OTHER NON-RELATIONAL COMMANDS # #{summarized_subcommands subcommands.select{|cmd| # cmd.include?(Alf::Command) # }} # # See '#{program_name} help COMMAND' for details about a specific command. # class Main < Quickl::Delegator(__FILE__, __LINE__) include Command # Environment instance to use to get base iterators attr_accessor :environment # Output renderer attr_accessor :renderer # Creates a command instance def initialize(env = Environment.default) @environment = env extend(Lispy) end # Install options options do |opt| @execute = false opt.on("-e", "--execute", "Execute one line of script (Lispy API)") do @execute = true end @renderer = Renderer::Rash.new Renderer.each_renderer do |name,descr,clazz| opt.on("--#{name}", "Render output #{descr}"){ @renderer = clazz.new } end opt.on('--env=FOLDER', "Set the environment folder to use") do |value| @environment = Environment.folder(value) end opt.on_tail('-h', "--help", "Show help") do raise Quickl::Help end opt.on_tail('-v', "--version", "Show version") do raise Quickl::Exit, "#{program_name} #{Alf::VERSION}"\ " (c) 2011, Bernard Lambeau" end end # Alf's options # # Overrided because Quickl only keep --options but modifying it there # should probably be considered a broken API. # def _run(argv = []) # 1) Extract my options and parse them my_argv = [] while argv.first =~ /^-/ my_argv << argv.shift end parse_options(my_argv) # 2) build the operator according to -e option operator = if @execute instance_eval(argv.first) else super end # 3) if there is a requester, then we do the job (assuming bin/alf) # with the renderer to use. Otherwise, we simply return built operator if operator && requester renderer.pipe(operator, environment).execute($stdout) else operator end end end # # Output input tuples through a specific renderer (text, yaml, ...) # # SYNOPSIS # #{program_name} #{command_name} [DATASET...] # # OPTIONS # #{summarized_options} # # DESCRIPTION # # When dataset names are specified as commandline args, request the environment # to provide those datasets and print them. Otherwise, take what comes on standard # input. # # Note that this command is not an operator and should not be piped anymore. # class Show < Factory::Command(__FILE__, __LINE__) include Command options do |opt| @renderer = Renderer::Text.new Renderer.each_renderer do |name,descr,clazz| opt.on("--#{name}", "Render output #{descr}"){ @renderer = clazz.new } end end def execute(args) requester.renderer = @renderer args = [ $stdin ] if args.empty? requester.send(:chain,*args) end end # class Show # # Executes an .alf file on current environment # # SYNOPSIS # #{program_name} #{command_name} [FILE] # # OPTIONS # #{summarized_options} # # DESCRIPTION # # This command executes the .alf file passed as first argument (or what comes # on standard input) as a alf query to be executed on the current environment. # class Exec < Factory::Command(__FILE__, __LINE__) include Command def execute(args) Reader.alf(args.first || $stdin, requester.environment) end end # class Exec # # Show help about a specific command # # SYNOPSIS # #{program_name} #{command_name} COMMAND # class Help < Factory::Command(__FILE__, __LINE__) include Command # Let NoSuchCommandError be passed to higher stage no_react_to Quickl::NoSuchCommand # Command execution def execute(args) if args.size != 1 puts super_command.help else cmd = has_command!(args.first, super_command) puts cmd.help end nil end end # class Help end # # Marker for all operators, relational and non-relational ones. # module Operator include Iterator, Tools # # Yields non-relational then relational operators, in turn. # def self.each Operator::NonRelational.each{|x| yield(x)} Operator::Relational.each{|x| yield(x)} end # # Encapsulates method that allows making operator introspection, that is, # knowing operator cardinality and similar stuff. # module Introspection # # Returns true if this operator is an unary operator, false otherwise # def unary? ancestors.include?(Operator::Unary) end # # Returns true if this operator is a binary operator, false otherwise # def binary? ancestors.include?(Operator::Binary) end end # module Introspection # Ensures that the Introspection module is set on real operators def self.included(mod) mod.extend(Introspection) if mod.is_a?(Class) end # # Encapsulates method definitions that convert operators to Quickl # commands # module CommandMethods protected # # Configures the operator from arguments taken from command line. # # This method is intended to be overriden by subclasses and must return the # operator itself. # def set_args(args) self end # # Overrides Quickl::Command::Single#_run to handles the '--' separator # correctly. # # This is because parse_options tend to eat the '--' separator... This # could be handled in Quickl itself, but it should be considered a broken # API and will only be available in quickl >= 0.3.0 (probably) # def _run(argv = []) operands, args = split_command_args(argv).collect do |arr| parse_options(arr) end self.set_args(args) if operands = command_line_operands(operands) env = environment || (requester ? requester.environment : nil) self.pipe(operands, env) end self end def split_command_args(args) case (i = args.index("--")) when NilClass [args, []] when 0 [[ $stdin ], args[1..-1]] else [args[0...i], args[i+1..-1]] end end def command_line_operands(operands) operands end end # module CommandMethods include CommandMethods # Operators input datasets attr_accessor :datasets # Optional environment attr_reader :environment # Sets the environment on this operator and propagate on # datasets def environment=(env) # this is to avoid infinite loop (TODO: why is there infinite loops??) return if @environment == env # set and propagate on children @environment = env datasets.each do |dataset| if dataset.respond_to?(:environment) dataset.environment = env end end if datasets env end # # Sets the operator input # def pipe(input, env = environment) raise NotImplementedError, "Operator#pipe should be overriden" end # # Yields each tuple in turn # # This method is implemented in a way that ensures that all operators are # thread safe. It is not intended to be overriden, use _each instead. # def each op = self.dup op._prepare op._each(&Proc.new) end protected # # Prepares the iterator before subsequent call to _each. # # This method is intended to be overriden by suclasses to install what's # need for successful iteration. The default implementation does nothing. # def _prepare end # Internal implementation of the iterator. # # This method must be implemented by subclasses. It is safe to use instance # variables (typically initialized in _prepare) here. # def _each end # # Specialization of Operator for operators that work on a unary input # module Unary include Operator # # Sets the operator input # def pipe(input, env = environment) self.environment = env self.datasets = [ input ] self end protected def command_line_operands(operands) operands.first || $stdin end # # Simply returns the first dataset # def input Iterator.coerce(datasets.first, environment) end # # Yields the block with each input tuple. # # This method should be preferred to input.each when possible. # def each_input_tuple input.each(&Proc.new) end end # module Unary # # Specialization of Operator for operators that work on a binary input # module Binary include Operator # # Sets the operator input # def pipe(input, env = environment) self.environment = env self.datasets = input self end protected def command_line_operands(operands) (operands.size < 2) ? ([$stdin] + operands) : operands end # Returns the left operand def left Iterator.coerce(datasets.first, environment) end # Returns the right operand def right Iterator.coerce(datasets.last, environment) end end # module Binary # # Specialization of Operator for operators that simply convert single tuples # to single tuples. # module Transform include Unary protected # (see Operator#_each) def _each each_input_tuple do |tuple| yield _tuple2tuple(tuple) end end # # Transforms an input tuple to an output tuple # def _tuple2tuple(tuple) end end # module Transform # # Specialization of Operator for implementing operators that rely on a # cesure algorithm. # module Cesure include Unary protected # (see Operator#_each) def _each receiver, proj_key, prev_key = Proc.new, cesure_key, nil each_input_tuple do |tuple| cur_key = proj_key.project(tuple) if cur_key != prev_key flush_cesure(prev_key, receiver) unless prev_key.nil? start_cesure(cur_key, receiver) prev_key = cur_key end accumulate_cesure(tuple, receiver) end flush_cesure(prev_key, receiver) unless prev_key.nil? end def cesure_key end def start_cesure(key, receiver) end def accumulate_cesure(tuple, receiver) end def flush_cesure(key, receiver) end end # module Cesure # # Specialization of Operator for operators that are shortcuts for longer # expressions. # module Shortcut include Operator # # Sets the operator input # def pipe(input, env = environment) self.environment = env self.datasets = input self end protected # (see Operator#_each) def _each longexpr.each(&Proc.new) end # # Compiles the longer expression and returns it. # # @return (Iterator) the compiled longer expression, typically another # Operator instance # def longexpr end undef :longexpr # # This is an helper ala Lispy#chain for implementing (#longexpr). # # @param [Array] elements a list of Iterator-able # @return [Operator] the first element of the list, but piped with the # next one, and so on. # def chain(*elements) elements = elements.reverse elements[1..-1].inject(elements.first) do |c, elm| elm.pipe(c, environment) elm end end end # module Shortcut end # module Operator # # Marker module and namespace for non relational operators # module Operator::NonRelational # # Yields the block with each operator module in turn # def self.each constants.each do |c| val = const_get(c) yield(val) if val.ancestors.include?(Operator::NonRelational) end end # # Extend its operand with an unique autonumber attribute # # SYNOPSIS # # #{program_name} #{command_name} [OPERAND] -- [ATTRNAME] # # DESCRIPTION # # This non-relational operator guarantees uniqueness of output tuples by # adding an attribute called 'ATTRNAME' whose value is an Integer. No # guarantee is given about ordering of output tuples, nor to the fact # that this autonumber is sequential. Only that all values are different. # If the presence of duplicates was the only "non-relational" aspect of # input tuples, the result may be considered a valid relation representation. # # IN RUBY # # (autonum OPERAND, ATTRNAME = :autonum) # # (autonum :suppliers) # (autonum :suppliers, :unique_id) # # IN SHELL # # #{program_name} #{command_name} [OPERAND] -- [ATTRNAME] # # alf autonum suppliers # alf autonum suppliers -- unique_id # class Autonum < Factory::Operator(__FILE__, __LINE__) include Operator::NonRelational, Operator::Transform # Names of the new attribute to add attr_accessor :attrname def initialize(attrname = :autonum) @attrname = attrname end protected # (see Operator::CommandMethods#set_args) def set_args(args) @attrname = args.last.to_sym unless args.empty? end # (see Operator#_prepare) def _prepare @autonum = -1 end # (see Operator::Transform#_tuple2tuple) def _tuple2tuple(tuple) tuple.merge(@attrname => (@autonum += 1)) end end # class Autonum # # Force default values on missing/nil attributes # # SYNOPSIS # #{program_name} #{command_name} [OPERAND] -- ATTR1 VAL1 ... # # OPTIONS # #{summarized_options} # # API & EXAMPLE # # # Non strict mode # (defaults :suppliers, :country => 'Belgium') # # # Strict mode (--strict) # (defaults :suppliers, {:country => 'Belgium'}, true) # # DESCRIPTION # # This operator rewrites tuples so as to ensure that all values for specified # attributes ATTRx are defined and not nil. Missing or nil attributes are # replaced by the associated default value VALx. # # When used in shell, the hash of default values is built from commandline # arguments ala Hash[...]. However, to keep type safety VALx are interpreted # as ruby literals and built with Kernel.eval. This means that strings must # be doubly quoted. For the example of the API section: # # alf defaults suppliers -- country "'Belgium'" # # When used in --strict mode, the operator simply project resulting tuples on # attributes for which a default value has been specified. Using the strict # mode guarantess that the heading of all tuples is the same, and that no nil # value ever remains. However, this operator never remove duplicates. # class Defaults < Factory::Operator(__FILE__, __LINE__) include Operator::NonRelational, Operator::Transform # Default values as a ATTR -> VAL hash attr_accessor :defaults # Strict mode? attr_accessor :strict # Builds a Defaults operator instance def initialize(defaults = {}, strict = false) @defaults = defaults @strict = strict end options do |opt| opt.on('-s', '--strict', 'Strictly restrict to default attributes'){ self.strict = true } end protected # (see Operator::CommandMethods#set_args) def set_args(args) @defaults = tuple_collect(args.each_slice(2)) do |k,v| [k.to_sym, Kernel.eval(v)] end self end # (see Operator::Transform#_tuple2tuple) def _tuple2tuple(tuple) if strict tuple_collect(@defaults){|k,v| [k, coalesce(tuple[k], v)] } else @defaults.merge tuple_collect(tuple){|k,v| [k, coalesce(v, @defaults[k])] } end end end # class Defaults # # Remove tuple duplicates # # SYNOPSIS # #{program_name} #{command_name} [OPERAND] # # API & EXAMPLE # # # clip, unlike project, typically leave duplicates # (compact (clip :suppliers, [ :city ])) # # DESCRIPTION # # This operator remove duplicates from input tuples. As defaults, it is a non # relational operator that helps normalizing input for implementing relational # operators. This one is centric in converting bags of tuples to sets of # tuples, as required by true relations. # # alf compact ... # class Compact < Factory::Operator(__FILE__, __LINE__) include Operator::NonRelational, Operator::Shortcut, Operator::Unary # Removes duplicates according to a complete order class SortBased include Operator::Cesure def cesure_key @cesure_key ||= ProjectionKey.new([],true) end def accumulate_cesure(tuple, receiver) @tuple = tuple end def flush_cesure(key, receiver) receiver.call(@tuple) end end # class SortBased # Removes duplicates by loading all in memory and filtering # them there class BufferBased include Operator::Unary def _prepare @tuples = input.to_a.uniq end def _each @tuples.each(&Proc.new) end end # class BufferBased protected def longexpr chain BufferBased.new, datasets end end # class Compact # # Sort input tuples according to an order relation # # SYNOPSIS # #{program_name} #{command_name} [OPERAND] -- ATTR1 ORDER1 ATTR2 ORDER2... # # API & EXAMPLE # # # sort on supplier name in ascending order # (sort :suppliers, [:name]) # # # sort on city then on name # (sort :suppliers, [:city, :name]) # # # sort on city DESC then on name ASC # (sort :suppliers, [[:city, :desc], [:name, :asc]]) # # => See OrderingKey about specifying orderings # # DESCRIPTION # # This operator sorts input tuples on ATTR1 then ATTR2, etc. and outputs # them sorted after that. This is, of course, a non relational operator as # relations are unordered sets. It is provided to implement operators that # need tuples to be sorted to work correctly. When used in shell, the key # ordering must be specified in its longest form: # # alf sort suppliers -- name asc # alf sort suppliers -- city desc name asc # # LIMITATIONS # # The fact that the ordering must be completely specified with commandline # arguments is a limitation, shortcuts could be provided in the future. # class Sort < Factory::Operator(__FILE__, __LINE__) include Operator::NonRelational, Operator::Unary def initialize(ordering_key = []) @ordering_key = OrderingKey.coerce(ordering_key) yield self if block_given? end def ordering=(ordering) @ordering_key = OrderingKey.coerce(ordering) end protected def set_args(args) self.ordering = args.collect{|c| c.to_sym}.each_slice(2).to_a self end def _prepare @buffer = Buffer::Sorted.new(@ordering_key) @buffer.add_all(input) end def _each @buffer.each(&Proc.new) end end # class Sort # # Clip input tuples to a subset of attributes # # SYNOPSIS # #{program_name} #{command_name} [OPERAND] -- ATTR1 ATTR2 ... # # OPTIONS # #{summarized_options} # # API & EXAMPLE # # # Keep only name and city attributes # (clip :suppliers, [:name, :city]) # # # Keep all but name and city attributes # (clip :suppliers, [:name, :city], true) # # DESCRIPTION # # This operator clips tuples on attributes whose names are specified as # arguments. This is similar to the relational PROJECT operator, expect # that this one does not removed duplicates that can occur from clipping. # In other words, clipping may lead to bags of tuples instead of sets. # # When used in shell, the clipping/projection key is simply taken from # commandline arguments: # # alf clip suppliers -- name city # alf clip suppliers --allbut -- name city # class Clip < Factory::Operator(__FILE__, __LINE__) include Operator::NonRelational, Operator::Transform # Builds a Clip operator instance def initialize(attributes = [], allbut = false) @projection_key = ProjectionKey.new(attributes, allbut) yield self if block_given? end def attributes=(attrs) @projection_key.attributes = attrs end def allbut=(allbut) @projection_key.allbut = allbut end # Installs the options options do |opt| opt.on('-a', '--allbut', 'Apply a ALLBUT clipping') do self.allbut = true end end protected # (see Operator::CommandMethods#set_args) def set_args(args) self.attributes = args.collect{|a| a.to_sym} self end # (see Operator::Transform#_tuple2tuple) def _tuple2tuple(tuple) @projection_key.project(tuple) end end # class Clip end # Operator::NonRelational # # Marker module and namespace for relational operators # module Operator::Relational # # Yields the block with each operator module in turn # def self.each constants.each do |c| val = const_get(c) yield(val) if val.ancestors.include?(Operator::Relational) end end # Relational projection (clip + compact) # # SYNOPSIS # #{program_name} #{command_name} [OPERAND] -- ATTR1 ATTR2 ... # # OPTIONS # #{summarized_options} # # API & EXAMPLE # # # Project on name and city attributes # (project :suppliers, [:name, :city]) # # # Project on all but name and city attributes # (allbut :suppliers, [:name, :city]) # # DESCRIPTION # # This operator projects tuples on attributes whose names are specified as # arguments. This is similar to clip, except that this ones is a truly # relational one, that is, it also removes duplicates tuples. # # When used in shell, the clipping/projection key is simply taken from # commandline arguments: # # alf project suppliers -- name city # alf project --allbut suppliers -- name city # class Project < Factory::Operator(__FILE__, __LINE__) include Operator::Relational, Operator::Shortcut, Operator::Unary # Builds a Project operator instance def initialize(attributes = [], allbut = false) @projection_key = ProjectionKey.new(attributes, allbut) yield self if block_given? end def attributes=(attrs) @projection_key.attributes = attrs end def allbut=(allbut) @projection_key.allbut = allbut end # Installs the options options do |opt| opt.on('-a', '--allbut', 'Apply a ALLBUT projection') do self.allbut = true end end protected # (see Operator::CommandMethods#set_args) def set_args(args) self.attributes = args.collect{|a| a.to_sym} self end # (see Operator::Shortcut#longexpr) def longexpr chain Operator::NonRelational::Compact.new, Operator::NonRelational::Clip.new(@projection_key.attributes, @projection_key.allbut), datasets end end # class Project # # Relational extension (additional, computed attributes) # # SYNOPSIS # #{program_name} #{command_name} [OPERAND] -- ATTR1 EXPR1 ATTR2 EXPR2... # # API & EXAMPLE # # (extend :supplies, :sp => lambda{ sid + "/" + pid }, # :big => lambda{ qty > 100 ? true : false }) # # DESCRIPTION # # This command extend input tuples with new attributes (named ATTR1, ...) # whose value is the result of evaluating tuple expressions (i.e. EXPR1, ...). # See main documentation about the semantics of tuple expressions. When used # in shell, the hash of extensions is built from commandline arguments ala # Hash[...]. Tuple expressions must be specified as code literals there: # # alf extend supplies -- sp 'sid + "/" + pid' big "qty > 100 ? true : false" # # Attributes ATTRx should not already exist, no behavior is guaranteed if # this precondition is not respected. # class Extend < Factory::Operator(__FILE__, __LINE__) include Operator::Relational, Operator::Transform # Extensions as a Hash attr => lambda{...} attr_accessor :extensions # Builds an Extend operator instance def initialize(extensions = {}) @extensions = extensions end protected # (see Operator::CommandMethods#set_args) def set_args(args) @extensions = tuple_collect(args.each_slice(2)){|k,v| [k.to_sym, TupleHandle.compile(v)] } self end # (see Operator#_prepare) def _prepare @handle = TupleHandle.new end # (see Operator::Transform#_tuple2tuple) def _tuple2tuple(tuple) tuple.merge tuple_collect(@extensions){|k,v| [k, @handle.set(tuple).evaluate(v)] } end end # class Extend # # Relational renaming (rename some attributes) # # SYNOPSIS # #{program_name} #{command_name} [OPERAND] -- OLD1 NEW1 ... # # OPTIONS # #{summarized_options} # # API & EXAMPLE # # (rename :suppliers, :name => :supplier_name, :city => :supplier_city) # # DESCRIPTION # # This command renames OLD attributes as NEW as specified by arguments. # Attributes OLD should exist in source tuples while attributes NEW should # not. When used in shell, renaming attributes are built ala Hash[...] from # commandline arguments: # # alf rename suppliers -- name supplier_name city supplier_city # class Rename < Factory::Operator(__FILE__, __LINE__) include Operator::Relational, Operator::Transform # Hash of source -> target attribute renamings attr_accessor :renaming # Builds a Rename operator instance def initialize(renaming = {}) @renaming = renaming end protected # (see Operator::CommandMethods#set_args) def set_args(args) @renaming = Hash[*args.collect{|c| c.to_sym}] self end # (see Operator::Transform#_tuple2tuple) def _tuple2tuple(tuple) tuple_collect(tuple){|k,v| [@renaming[k] || k, v]} end end # class Rename # # Relational restriction (aka where, predicate filtering) # # SYNOPSIS # #{program_name} #{command_name} [OPERAND] -- EXPR # #{program_name} #{command_name} [OPERAND] -- ATTR1 VAL1 ... # # API & EXAMPLE # # # Restrict to suppliers with status greater than 20 # (restrict :suppliers, lambda{ status > 20 }) # # # Restrict to suppliers that live in London # (restrict :suppliers, lambda{ city == 'London' }) # # DESCRIPTION # # This command restricts tuples to those for which EXPR evaluates to true. # EXPR must be a valid tuple expression that should return a truth-value. # When used in shell, the predicate is taken as a string and compiled with # TupleHandle.compile. We also provide a shortcut for equality expressions. # Note that, in that case, values are expected to be ruby code literals, # evaluated with Kernel.eval. Therefore, strings must be doubly quoted. # # alf restrict suppliers -- "status > 20" # alf restrict suppliers -- city "'London'" # class Restrict < Factory::Operator(__FILE__, __LINE__) include Operator::Relational, Operator::Unary # Restriction predicate attr_accessor :predicate # Builds a Restrict operator instance def initialize(predicate = "true") @predicate = TupleHandle.compile(predicate) yield self if block_given? end protected # (see Operator::CommandMethods#set_args) def set_args(args) @predicate = if args.size > 1 TupleHandle.compile tuple_collect(args.each_slice(2)){|a,expr| [a, Kernel.eval(expr)] } else TupleHandle.compile(args.first) end self end # (see Operator#_each) def _each handle = TupleHandle.new each_input_tuple{|t| yield(t) if handle.set(t).evaluate(@predicate) } end end # class Restrict # # Relational join (and cross-join) # # SYNOPSIS # #{program_name} #{command_name} [LEFT] RIGHT # # API & EXAMPLE # # (join :suppliers, :parts) # # DESCRIPTION # # This operator computes the (natural) join of two input iterators. Natural # join means that, unlike what is commonly used in SQL, the default behavior # is to join on common attributes. You can use the rename operator if this # behavior does not fit your needs. # # alf join suppliers supplies # class Join < Factory::Operator(__FILE__, __LINE__) include Operator::Relational, Operator::Shortcut, Operator::Binary class HashBased include Operator::Binary class JoinBuffer def initialize(enum) @buffer = nil @key = nil @enum = enum end def split(tuple) _init(tuple) unless @key @key.split(tuple) end def each(key) @buffer[key].each(&Proc.new) if @buffer.has_key?(key) end private def _init(right) @buffer = Hash.new{|h,k| h[k] = []} @enum.each do |left| @key = Tools::ProjectionKey.coerce(left.keys & right.keys) unless @key @buffer[@key.project(left)] << left end end end protected def _each buffer = JoinBuffer.new(right) left.each do |left_tuple| key, rest = buffer.split(left_tuple) buffer.each(key) do |right| yield(left_tuple.merge(right)) end end end end protected # (see Shortcut#longexpr) def longexpr chain HashBased.new, datasets end end # class Join # # Relational intersection (aka a logical and) # # SYNOPSIS # #{program_name} #{command_name} [LEFT] RIGHT # # API & EXAMPLE # # # Give suppliers that live in Paris and have status >= 20 # (intersect \\ # (restrict :suppliers, lambda{ status >= 20 }), # (restrict :suppliers, lambda{ city == 'Paris' })) # # DESCRIPTION # # This operator computes the intersection between its two operands. The # intersection is simply the set of common tuples between them. Both operands # must have the same heading. # # alf intersect ... ... # class Intersect < Factory::Operator(__FILE__, __LINE__) include Operator::Relational, Operator::Shortcut, Operator::Binary class HashBased include Operator::Binary protected def _prepare @index = Hash.new right.each{|t| @index[t] = true} end def _each left.each do |left_tuple| yield(left_tuple) if @index.has_key?(left_tuple) end end end protected # (see Shortcut#longexpr) def longexpr chain HashBased.new, datasets end end # class Intersect # # Relational minus (aka difference) # # SYNOPSIS # #{program_name} #{command_name} [LEFT] RIGHT # # API & EXAMPLE # # # Give all suppliers but those living in Paris # (minus :suppliers, # (restrict :suppliers, lambda{ city == 'Paris' })) # # DESCRIPTION # # This operator computes the difference between its two operands. The # difference is simply the set of tuples in left operands non shared by # the right one. # # alf minus ... ... # class Minus < Factory::Operator(__FILE__, __LINE__) include Operator::Relational, Operator::Shortcut, Operator::Binary class HashBased include Operator::Binary protected def _prepare @index = Hash.new right.each{|t| @index[t] = true} end def _each left.each do |left_tuple| yield(left_tuple) unless @index.has_key?(left_tuple) end end end protected # (see Shortcut#longexpr) def longexpr chain HashBased.new, datasets end end # class Minus # # Relational union # # SYNOPSIS # #{program_name} #{command_name} [LEFT] RIGHT # # API & EXAMPLE # # (union (project :suppliers, [:city]), # (project :parts, [:city])) # # DESCRIPTION # # This operator computes the union join of two input iterators. Input # iterators should have the same heading. The result never contain duplicates. # # alf union ... ... # class Union < Factory::Operator(__FILE__, __LINE__) include Operator::Relational, Operator::Shortcut, Operator::Binary class DisjointBased include Operator::Binary protected def _each left.each(&Proc.new) right.each(&Proc.new) end end protected # (see Shortcut#longexpr) def longexpr chain Operator::NonRelational::Compact.new, DisjointBased.new, datasets end end # class Union # # Relational wraping (tuple-valued attributes) # # SYNOPSIS # #{program_name} #{command_name} [OPERAND] -- ATTR1 ATTR2 ... NEWNAME # # API & EXAMPLE # # (wrap :suppliers, [:city, :status], :loc_and_status) # # DESCRIPTION # # This operator wraps attributes ATTR1 to ATTRN as a new, tuple-based # attribute whose name is NEWNAME. When used in shell, names of wrapped # attributes are taken from commandline arguments, expected the last one # which defines the new name to use: # # alf wrap suppliers -- city status loc_and_status # class Wrap < Factory::Operator(__FILE__, __LINE__) include Operator::Relational, Operator::Transform # Array of wraping attributes attr_accessor :attributes # New name for the wrapped attribute attr_accessor :as # Builds a Wrap operator instance def initialize(attributes = [], as = :wrapped) @attributes = attributes @as = as end protected # (see Operator::CommandMethods#set_args) def set_args(args) @as = args.pop.to_sym @attributes = args.collect{|a| a.to_sym} self end # (see Operator::Transform#_tuple2tuple) def _tuple2tuple(tuple) others = tuple_collect(tuple.keys - @attributes){|k| [k,tuple[k]] } others[as] = tuple_collect(attributes){|k| [k, tuple[k]] } others end end # class Wrap # # Relational un-wraping (inverse of wrap) # # SYNOPSIS # #{program_name} #{command_name} [OPERAND] -- ATTR # # API & EXAMPLE # # # Assuming wrapped = (wrap :suppliers, [:city, :status], :loc_and_status) # (unwrap wrapped, :loc_and_status) # # DESCRIPTION # # This operator unwraps the tuple-valued attribute named ATTR so as to # flatten its pairs with 'upstream' tuple. The latter should be such so that # no name collision occurs. When used in shell, the name of the attribute to # unwrap is taken as the first commandline argument: # # alf unwrap wrap -- loc_and_status # class Unwrap < Factory::Operator(__FILE__, __LINE__) include Operator::Relational, Operator::Transform # Name of the attribute to unwrap attr_accessor :attribute # Builds a Rename operator instance def initialize(attribute = :wrapped) @attribute = attribute end protected # (see Operator::CommandMethods#set_args) def set_args(args) @attribute = args.first.to_sym self end # (see Operator::Transform#_tuple2tuple) def _tuple2tuple(tuple) tuple = tuple.dup wrapped = tuple.delete(@attribute) || {} tuple.merge(wrapped) end end # class Unwrap # # Relational grouping (relation-valued attributes) # # SYNOPSIS # #{program_name} #{command_name} [OPERAND] -- ATTR1 ATTR2 ... NEWNAME # # API & EXAMPLE # # (group :supplies, [:pid, :qty], :supplying) # (group :supplies, [:sid], :supplying, true) # # DESCRIPTION # # This operator groups attributes ATTR1 to ATTRN as a new, relation-valued # attribute whose name is NEWNAME. When used in shell, names of grouped # attributes are taken from commandline arguments, expected the last one # which defines the new name to use: # # alf group supplies -- pid qty supplying # alf group supplies --allbut -- sid supplying # class Group < Factory::Operator(__FILE__, __LINE__) include Operator::Relational, Operator::Unary # Attributes on which grouping applies attr_accessor :attributes # Attribute name for grouping tuple attr_accessor :as # Group all but attributes? attr_accessor :allbut # Creates a Group instance def initialize(attributes = [], as = :group, allbut = false) @attributes = attributes @as = as @allbut = allbut end options do |opt| opt.on('--allbut', "Group all but specified attributes"){ @allbut = true } end protected # (see Operator::CommandMethods#set_args) def set_args(args) @as = args.pop.to_sym @attributes = args.collect{|a| a.to_sym} self end # See Operator#_prepare def _prepare pkey = ProjectionKey.new(attributes, !allbut) @index = Hash.new{|h,k| h[k] = Set.new} each_input_tuple do |tuple| key, rest = pkey.split(tuple) @index[key] << rest end end # See Operator#_each def _each @index.each_pair do |k,v| yield(k.merge(@as => Relation.coerce(v))) end end end # class Group # # Relational un-grouping (inverse of group) # # SYNOPSIS # #{program_name} #{command_name} [OPERAND] -- ATTR # # API & EXAMPLE # # # Assuming grouped = (group enum, [:pid, :qty], :supplying) # (ungroup grouped, :supplying) # # DESCRIPTION # # This operator ungroups the relation-valued attribute named ATTR and outputs # tuples as the flattening of each of of its tuples merged with the upstream # one. Sub relation should be such so that no name collision occurs. When # used in shell, the name of the attribute to ungroup is taken as the first # commandline argument: # # alf ungroup group -- supplying # class Ungroup < Factory::Operator(__FILE__, __LINE__) include Operator::Relational, Operator::Unary # Relation-value attribute to ungroup attr_accessor :attribute # Creates a Group instance def initialize(attribute = :grouped) @attribute = attribute end protected # (see Operator::CommandMethods#set_args) def set_args(args) @attribute = args.pop.to_sym self end # See Operator#_each def _each each_input_tuple do |tuple| tuple = tuple.dup subrel = tuple.delete(@attribute) subrel.each do |subtuple| yield(tuple.merge(subtuple)) end end end end # class Ungroup # # Relational summarization (group-by + aggregate ops) # # SYNOPSIS # #{program_name} #{command_name} [OPERAND] [--allbut] --by=KEY1,KEY2... -- AGG1 EXPR1... # # OPTIONS # #{summarized_options} # # API & EXAMPLE # # (summarize :supplies, [:sid], # :total_qty => Aggregator.sum(:qty)) # # # Or, to specify an allbut projection # (summarize :supplies, [:qty, :pid], # :total_qty => Aggregator.sum(:qty), true) # # DESCRIPTION # # This operator summarizes input tuples on the projection on KEY1,KEY2,... # attributes and applies aggregate operators on sets of matching tuples. # Introduced names AGG should be disjoint from KEY attributes. # # When used in shell, the aggregations are taken from commandline arguments # AGG and EXPR, where AGG is the name of a new attribute and EXPR is an # aggregation expression evaluated on Aggregator: # # alf summarize supplies --by=sid -- total_qty "sum(:qty)" # alf summarize supplies --allbut --by=pid,qty -- total_qty "sum(:qty)" # class Summarize < Factory::Operator(__FILE__, __LINE__) include Operator::Relational, Operator::Shortcut, Operator::Unary # By attributes attr_accessor :by # Allbut on by? attr_accessor :allbut # Aggregations as a AGG => Aggregator(EXPR) hash attr_accessor :aggregators def initialize(by = [], aggregators = {}, allbut = false) @by = by @allbut = allbut @aggregators = aggregators end # Installs the options options do |opt| opt.on('--by=x,y,z', 'Specify by attributes', Array) do |args| @by = args.collect{|a| a.to_sym} end opt.on('--allbut', 'Make an allbut projection/summarization') do @allbut = true end end # Summarizes according to a complete order class SortBased include Alf::Operator::Cesure attr_reader :cesure_key attr_reader :aggregators def initialize(by_key, aggregators) @cesure_key, @aggregators = by_key, aggregators end protected def start_cesure(key, receiver) @aggs = tuple_collect(@aggregators) do |a,agg| [a, agg.least] end end def accumulate_cesure(tuple, receiver) @aggs = tuple_collect(@aggregators) do |a,agg| [a, agg.happens(@aggs[a], tuple)] end end def flush_cesure(key, receiver) @aggs = tuple_collect(@aggregators) do |a,agg| [a, agg.finalize(@aggs[a])] end receiver.call key.merge(@aggs) end end # class SortBased # Summarizes in-memory with a hash class HashBased include Operator::Relational, Operator::Unary attr_reader :by_key attr_reader :aggregators def initialize(by_key, aggregators) @by_key, @aggregators = by_key, aggregators end protected def _each index = Hash.new do |h,k| h[k] = tuple_collect(@aggregators) do |a,agg| [a, agg.least] end end each_input_tuple do |tuple| key, rest = by_key.split(tuple) index[key] = tuple_collect(@aggregators) do |a,agg| [a, agg.happens(index[key][a], tuple)] end end index.each_pair do |key,aggs| aggs = tuple_collect(@aggregators) do |a,agg| [a, agg.finalize(aggs[a])] end yield key.merge(aggs) end end end protected # (see Operator::CommandMethods#set_args) def set_args(args) @aggregators = tuple_collect(args.each_slice(2)) do |a,expr| [a.to_sym, Aggregator.compile(expr)] end self end def longexpr if @allbut by_key = Tools::ProjectionKey.new(@by, @allbut) chain HashBased.new(by_key, @aggregators), datasets else by_key = Tools::ProjectionKey.new(@by, @allbut) chain SortBased.new(by_key, @aggregators), Operator::NonRelational::Sort.new(by_key.to_ordering_key), datasets end end end # class Summarize # # Relational quota-queries (position, sum progression, etc.) # # SYNOPSIS # #{program_name} #{command_name} [OPERAND] --by=KEY1,... --order=OR1... AGG1 EXPR1... # # OPTIONS # #{summarized_options} # # API & EXAMPLE # # (quota :supplies, [:sid], [:qty], # :position => Aggregator.count, # :sum_qty => Aggregator.sum(:qty)) # # DESCRIPTION # # This operator computes quota values on input tuples. # # alf quota supplies --by=sid --order=qty -- position count sum_qty "sum(:qty)" # class Quota < Factory::Operator(__FILE__, __LINE__) include Operator::Relational, Operator::Shortcut, Operator::Unary # Quota by attr_accessor :by # Quota order attr_accessor :order # Quota aggregations attr_accessor :aggregators def initialize(by = [], order = [], aggregators = {}) @by, @order, @aggregators = by, order, aggregators end options do |opt| opt.on('--by=x,y,z', 'Specify by attributes', Array) do |args| @by = args.collect{|a| a.to_sym} end opt.on('--order=x,y,z', 'Specify order attributes', Array) do |args| @order = args.collect{|a| a.to_sym} end end class SortBased include Operator::Cesure def initialize(by, order, aggregators) @by, @order, @aggregators = by, order, aggregators end def cesure_key ProjectionKey.coerce @by end def ordering_key OrderingKey.coerce @order end def start_cesure(key, receiver) @aggs = tuple_collect(@aggregators) do |a,agg| [a, agg.least] end end def accumulate_cesure(tuple, receiver) @aggs = tuple_collect(@aggregators) do |a,agg| [a, agg.happens(@aggs[a], tuple)] end thisone = tuple_collect(@aggregators) do |a,agg| [a, agg.finalize(@aggs[a])] end receiver.call tuple.merge(thisone) end end # class SortBased protected # (see Operator::CommandMethods#set_args) def set_args(args) @aggregators = tuple_collect(args.each_slice(2)) do |a,expr| [a.to_sym, Aggregator.compile(expr)] end self end def cesure_key ProjectionKey.coerce @by end def ordering_key OrderingKey.coerce @order end def longexpr sort_key = cesure_key.to_ordering_key + ordering_key chain SortBased.new(@by, @order, @aggregators), Operator::NonRelational::Sort.new(sort_key), datasets end end # class Quota end # # Aggregation operator. # class Aggregator # Aggregate options attr_reader :options # # Automatically installs factory methods for inherited classes. # # Example: # class Sum < Aggregate # will give a method Aggregator.sum # ... # end # Aggregator.sum(:size) # factor an Sum aggregator on tuple[:size] # Aggregator.sum{ size } # idem but works on any tuple expression # def self.inherited(clazz) basename = Tools.ruby_case(Tools.class_name(clazz)) instance_eval <<-EOF def #{basename}(*args, &block) #{clazz}.new(*args, &block) end EOF end def self.compile(expr, &block) instance_eval(expr, &block) end # # Creates an Aggregator instance. # # This constructor can be used either by passing an attribute # argument or a block that will be evaluated on a TupleHandle # instance set on each aggregated tuple. # # Aggregator.new(:size) # will aggregate on tuple[:size] # Aggregator.new{ size * price } # ... on tuple[:size] * tuple[:price] # def initialize(attribute = nil, options = {}, &block) attribute, options = nil, attribute if attribute.is_a?(Hash) @handle = Tools::TupleHandle.new @options = default_options.merge(options) @functor = Tools::TupleHandle.compile(attribute || block) end # # Returns the default options to use # def default_options {} end # # Returns the least value, which is the one to use on an empty # set. # # This method is intended to be overriden by subclasses; default # implementation returns nil. # def least nil end # # This method is called on each aggregated tuple and must return # an updated _memo_ value. It can be seen as the block typically # given to Enumerable.inject. # # The default implementation collects the pre-value on the tuple # and delegates to _happens. # def happens(memo, tuple) _happens(memo, @handle.set(tuple).evaluate(@functor)) end # # This method finalizes a computation. # # Argument _memo_ is either _least_ or the result of aggregating # through _happens_. The default implementation simply returns # _memo_. The method is intended to be overriden for complex # aggregations that need statefull information. See Avg for an # example # def finalize(memo) memo end # # Aggregates over an enumeration of tuples. # def aggregate(enum) finalize( enum.inject(least){|memo,tuple| happens(memo, tuple) }) end protected # # @see happens. # # This method is intended to be overriden and returns _value_ # by default, making this aggregator a "Last" one... # def _happens(memo, value) value end # # Defines a COUNT aggregation operator # class Count < Aggregator def least(); 0; end def happens(memo, tuple) memo + 1; end end # class Count # # Defines a SUM aggregation operator # class Sum < Aggregator def least(); 0; end def _happens(memo, val) memo + val; end end # class Sum # # Defines an AVG aggregation operator # class Avg < Aggregator def least(); [0.0, 0.0]; end def _happens(memo, val) [memo.first + val, memo.last + 1]; end def finalize(memo) memo.first / memo.last end end # class Sum # # Defines a MIN aggregation operator # class Min < Aggregator def least(); nil; end def _happens(memo, val) memo.nil? ? val : (memo < val ? memo : val) end end # class Min # # Defines a MAX aggregation operator # class Max < Aggregator def least(); nil; end def _happens(memo, val) memo.nil? ? val : (memo > val ? memo : val) end end # class Max # # Defines a COLLECT aggregation operator # class Group < Aggregator def initialize(*attrs) super(nil, {}){ Tools.tuple_collect(attrs){|k| [k, self.send(k)] } } end def least(); Set.new; end def _happens(memo, val) memo << val end def finalize(memo) Relation.coerce memo end end # # Defines a COLLECT aggregation operator # class Collect < Aggregator def least(); []; end def _happens(memo, val) memo << val end end # # Defines a CONCAT aggregation operator # class Concat < Aggregator def least(); ""; end def default_options {:before => "", :after => "", :between => ""} end def _happens(memo, val) memo << options[:between].to_s unless memo.empty? memo << val.to_s end def finalize(memo) options[:before].to_s + memo + options[:after].to_s end end end # class Aggregator # # Base class for implementing buffers. # class Buffer # # Keeps tuples ordered on a specific key # class Sorted < Buffer def initialize(ordering_key) @ordering_key = ordering_key @buffer = [] end def add_all(enum) sorter = @ordering_key.sorter @buffer = merge_sort(@buffer, enum.to_a.sort(&sorter), sorter) end def each @buffer.each(&Proc.new) end private def merge_sort(s1, s2, sorter) (s1 + s2).sort(&sorter) end end # class Buffer::Sorted end # class Buffer # # Implements a small LISP-like DSL on top of Alf. # # The lispy dialect is the functional one used in .alf files and in compiled # expressions as below: # # Alf.lispy.compile do # (restrict :suppliers, lambda{ city == 'London' }) # end # # The DSL this module provides is part of Alf's public API and won't be broken # without a major version change. The module itself and its inclusion pre- # conditions are not part of the DSL itself, thus not considered as part of # the API, and may therefore evolve at any time. In other words, this module # is not intended to be directly included by third-party classes. # module Lispy alias :ruby_extend :extend # The environment attr_accessor :environment # # Compiles a query expression given by a String or a block and returns # the result (typically a tuple iterator) # # Example # # # with a string # op = compile "(restrict :suppliers, lambda{ city == 'London' })" # # # or with a block # op = compile { # (restrict :suppliers, lambda{ city == 'London' }) # } # # @param [String] expr a Lispy expression to compile # @return [Iterator] the iterator resulting from compilation # def compile(expr = nil, path = nil, &block) if expr.nil? instance_eval(&block) else (path ? Kernel.eval(expr, binding, path) : Kernel.eval(expr, binding)) end end # # Evaluates a query expression given by a String or a block and returns # the result as an in-memory relation (Alf::Relation) # # Example: # # # with a string # rel = evaluate "(restrict :suppliers, lambda{ city == 'London' })" # # # or with a block # rel = evaluate { # (restrict :suppliers, lambda{ city == 'London' }) # } # def evaluate(expr = nil, path = nil, &block) compile(expr, path, &block).to_rel end # # Delegated to the current environment # # This method returns the dataset associated to a given name. The result # may depend on the current environment, but is generally an Iterator, # often a Reader instance. # # @param [Symbol] name name of the dataset to retrieve # @return [Iterator] the dataset as an iterator # @see Environment#dataset # def dataset(name) raise "Environment not set" unless @environment @environment.dataset(name) end # Functional equivalent to Alf::Relation[...] def relation(*tuples) Relation.coerce(tuples) end # # Install the DSL through iteration over defined operators # Operator::each do |op_class| meth_name = Tools.ruby_case(Tools.class_name(op_class)).to_sym if op_class.unary? define_method(meth_name) do |child, *args| child = Iterator.coerce(child, environment) op_class.new(*args).pipe(child, environment) end elsif op_class.binary? define_method(meth_name) do |left, right, *args| operands = [left, right].collect{|x| Iterator.coerce(x, environment)} op_class.new(*args).pipe(operands, environment) end else raise "Unexpected operator #{op_class}" end end # Operators::each def allbut(child, attributes) (project child, attributes, true) end Agg = Alf::Aggregator end # module Lispy end # module Alf require "alf/relation"