Sha256: f66170ef739b704d3351fb4ec90b2c5641013b64bc871f47585c8ae628a7352f

Contents?: true

Size: 1.87 KB

Versions: 1

Compression:

Stored size: 1.87 KB

Contents

module Druid
  module Writer
    class Base
      attr_accessor :tranquilizers
      attr_reader :config

      def initialize(config)
        @config = config
        @tranquilizers = []
      end

      def remove_tranquilizer_for_datasource(datasource)
        tranquilizer = tranquilizer_for_datasource(datasource)
        remove_tranquilizer(tranquilizer) if tranquilizer
      end

      def write_point(datasource, datapoint)
        datapoint = Druid::Writer::Tranquilizer::Datapoint.new(datapoint)
        sender = get_tranquilizer(datasource, datapoint)
        sender.send(datapoint)
      end

      private

      def build_tranquilizer(datasource, datapoint)
        Druid::Writer::Tranquilizer::Base.new({config: config, datasource: datasource, datapoint: datapoint})
      end

      def get_tranquilizer(datasource, datapoint)
        tranquilizer = tranquilizer_for_datasource(datasource)

        unless has_current_schema?(tranquilizer, datapoint)
          remove_tranquilizer(tranquilizer) if tranquilizer
          tranquilizer = build_tranquilizer(datasource, datapoint)
          tranquilizers << tranquilizer
        end

        tranquilizer
      end

      def has_current_schema?(tranquilizer, datapoint)
        return false unless tranquilizer
        dimensions = tranquilizer.rollup.dimensions.specMap["dimensions"].to_a
        aggregators = tranquilizer.rollup.aggregators
        metrics = Java::ScalaCollection::JavaConverters.seqAsJavaListConverter(aggregators).asJava.to_a.map{ |metric| metric.getFieldName }
        dimensions == datapoint.dimensions.keys && metrics == datapoint.metrics.keys
      end

      def remove_tranquilizer(tranquilizer)
        tranquilizers.delete(tranquilizer)
        tranquilizer.stop
      end

      def tranquilizer_for_datasource(datasource)
        tranquilizers.detect{ |t| t.datasource == datasource }
      end
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
jruby-druid-1.0.0.pre.rc2 lib/druid/writer/base.rb