Sha256: 40c520abe4036252333537ae154a2dfd034474b2cd873390a8743cbb3d15a152
Contents?: true
Size: 732 Bytes
Versions: 1
Compression:
Stored size: 732 Bytes
Contents
require 'set' module Embulk module Filter class Unique < FilterPlugin Plugin.register_filter("unique", self) def self.transaction(config, in_schema, &control) task = { "columns" => config.param("columns", :array), } yield(task, in_schema) end def init @cols = task["columns"] @exists = Set.new end def close end def add(page) page.each do |record| part = {} @cols.each do |c| cs = page.schema.select{|s| s.name == c} idx = cs[0].index part[c] = record[idx] end unless @exists.include?(part) page_builder.add(record) @exists.add(part) end end end def finish page_builder.finish end end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
embulk-filter-unique-0.1.0 | lib/embulk/filter/unique.rb |