Sha256: 52e31f40ee598a32f65ab243fe2a0394640387c25dcda9717e6f0936b1169e13

Contents?: true

Size: 1.41 KB

Versions: 22

Compression:

Stored size: 1.41 KB

Contents

require File.join(File.dirname(__FILE__), "service")

module Charrington
  class TransformPostgres
    include Service
    attr_accessor :event
    attr_reader :top_level_keys

    Error = Class.new(StandardError)
    EventNil = Class.new(Error)
    TableNameNil = Class.new(Error)
    ColumnBlacklist = Class.new(Error)

    KEY_FILTER_BLACKLIST = ['host','path','jwt','sequence']
    KEY_RAISE_BLACKLIST = ['id', 'inserted_at']

    def initialize(event)
      raise EventNil, "Event is nil" if event.nil?
      event = event.to_hash
      @event = drop_keys(event)
      @top_level_keys = @event.keys
      check_blacklist
    end

    def call
      flattened = flatten_hash(event)
      top_level_keys.each { |k| event.delete(k) }
      flattened.each_pair { |key, val| event[key] = val }
      event
    end

    private

    def check_blacklist
      arr = []
      KEY_RAISE_BLACKLIST.each { |k| arr << k if event.keys.include?(k) }
      raise ColumnBlacklist, "Event contains these blacklisted keys: #{arr.join(",")}" unless arr.empty?
    end

    def drop_keys(event)
      event.delete_if {|k, _v| k.start_with?("@") || KEY_FILTER_BLACKLIST.include?(k) }
    end

    def flatten_hash(hash)
      hash.each_with_object({}) do |(k, v), acc|
        if v.is_a? Hash
          flatten_hash(v).map do |h_k, h_v|
            acc["#{k}_#{h_k}"] = h_v
          end
        else
          acc[k] = v
        end
      end
    end
  end
end

Version data entries

22 entries across 22 versions & 1 rubygems

Version Path
logstash-output-charrington-0.3.1 lib/logstash/outputs/charrington/transform_postgres.rb
logstash-output-charrington-0.3.0 lib/logstash/outputs/charrington/transform_postgres.rb