Sha256: f66170ef739b704d3351fb4ec90b2c5641013b64bc871f47585c8ae628a7352f
Contents?: true
Size: 1.87 KB
Versions: 1
Compression:
Stored size: 1.87 KB
Contents
module Druid module Writer class Base attr_accessor :tranquilizers attr_reader :config def initialize(config) @config = config @tranquilizers = [] end def remove_tranquilizer_for_datasource(datasource) tranquilizer = tranquilizer_for_datasource(datasource) remove_tranquilizer(tranquilizer) if tranquilizer end def write_point(datasource, datapoint) datapoint = Druid::Writer::Tranquilizer::Datapoint.new(datapoint) sender = get_tranquilizer(datasource, datapoint) sender.send(datapoint) end private def build_tranquilizer(datasource, datapoint) Druid::Writer::Tranquilizer::Base.new({config: config, datasource: datasource, datapoint: datapoint}) end def get_tranquilizer(datasource, datapoint) tranquilizer = tranquilizer_for_datasource(datasource) unless has_current_schema?(tranquilizer, datapoint) remove_tranquilizer(tranquilizer) if tranquilizer tranquilizer = build_tranquilizer(datasource, datapoint) tranquilizers << tranquilizer end tranquilizer end def has_current_schema?(tranquilizer, datapoint) return false unless tranquilizer dimensions = tranquilizer.rollup.dimensions.specMap["dimensions"].to_a aggregators = tranquilizer.rollup.aggregators metrics = Java::ScalaCollection::JavaConverters.seqAsJavaListConverter(aggregators).asJava.to_a.map{ |metric| metric.getFieldName } dimensions == datapoint.dimensions.keys && metrics == datapoint.metrics.keys end def remove_tranquilizer(tranquilizer) tranquilizers.delete(tranquilizer) tranquilizer.stop end def tranquilizer_for_datasource(datasource) tranquilizers.detect{ |t| t.datasource == datasource } end end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
jruby-druid-1.0.0.pre.rc2 | lib/druid/writer/base.rb |