lib/logstash/outputs/clickhouse.rb in logstash-output-clickhouse-0.1.4 vs lib/logstash/outputs/clickhouse.rb in logstash-output-clickhouse-0.1.5

- old
+ new

@@ -56,11 +56,10 @@ @logger.info("Initialized clickhouse with settings", :flush_size => @flush_size, :idle_flush_time => @idle_flush_time, :request_tokens => @pool_max, :http_hosts => @http_hosts, - :http_query => @http_query, :headers => request_headers) end def register # Handle this deprecated option. TODO: remove the option @@ -72,12 +71,11 @@ # tokens must be added back by the client on success @request_tokens = SizedQueue.new(@pool_max) @pool_max.times { |t| @request_tokens << true } @requests = Array.new - params = { "query" => "INSERT INTO #{table} FORMAT JSONEachRow" }.merge(@extra_params) - @http_query = "?#{URI.encode_www_form(params)}" + @table_name = table buffer_initialize( :max_items => @flush_size, :max_interval => @idle_flush_time, :logger => @logger, @@ -89,10 +87,19 @@ # This module currently does not support parallel requests as that would circumvent the batching def receive(event) buffer_receive(event) end + def generate_table_name(event) + event.sprintf(@table_name) + end + + def generate_http_query(table_name) + params = { "query" => "INSERT INTO #{table_name} FORMAT JSONEachRow" }.merge(@extra_params) + return "?#{URI.encode_www_form(params)}" + end + def mutate(src) return src if @mutations.empty? res = {} @mutations.each_pair do |dstkey, source| case source @@ -113,18 +120,22 @@ end public def flush(events, close = false) - documents = "" #this is the string of hashes that we push to Fusion as documents + documents = {} events.each do |event| - documents << LogStash::Json.dump(mutate(event.to_hash())) << "\n" + table_name = generate_table_name(event) + documents[table_name] ||= "" + documents[table_name] << LogStash::Json.dump(mutate(event.to_hash())) << "\n" end hosts = @http_hosts.clone - make_request(documents, hosts, @http_query, 1, 1, hosts.sample) + documents.each do |table_name, document| + make_request(document, hosts, generate_http_query(table_name), 1, 1, hosts.sample) + end end private def save_to_disk(documents)