lib/logstash/outputs/clickhouse.rb in logstash-output-clickhouse-0.1.4 vs lib/logstash/outputs/clickhouse.rb in logstash-output-clickhouse-0.1.5
- old
+ new
@@ -56,11 +56,10 @@
@logger.info("Initialized clickhouse with settings",
:flush_size => @flush_size,
:idle_flush_time => @idle_flush_time,
:request_tokens => @pool_max,
:http_hosts => @http_hosts,
- :http_query => @http_query,
:headers => request_headers)
end
def register
# Handle this deprecated option. TODO: remove the option
@@ -72,12 +71,11 @@
# tokens must be added back by the client on success
@request_tokens = SizedQueue.new(@pool_max)
@pool_max.times { |t| @request_tokens << true }
@requests = Array.new
- params = { "query" => "INSERT INTO #{table} FORMAT JSONEachRow" }.merge(@extra_params)
- @http_query = "?#{URI.encode_www_form(params)}"
+ @table_name = table
buffer_initialize(
:max_items => @flush_size,
:max_interval => @idle_flush_time,
:logger => @logger,
@@ -89,10 +87,19 @@
# This module currently does not support parallel requests as that would circumvent the batching
def receive(event)
buffer_receive(event)
end
+ def generate_table_name(event)
+ event.sprintf(@table_name)
+ end
+
+ def generate_http_query(table_name)
+ params = { "query" => "INSERT INTO #{table_name} FORMAT JSONEachRow" }.merge(@extra_params)
+ return "?#{URI.encode_www_form(params)}"
+ end
+
def mutate(src)
return src if @mutations.empty?
res = {}
@mutations.each_pair do |dstkey, source|
case source
@@ -113,18 +120,22 @@
end
public
def flush(events, close = false)
- documents = "" #this is the string of hashes that we push to Fusion as documents
+ documents = {}
events.each do |event|
- documents << LogStash::Json.dump(mutate(event.to_hash())) << "\n"
+ table_name = generate_table_name(event)
+ documents[table_name] ||= ""
+ documents[table_name] << LogStash::Json.dump(mutate(event.to_hash())) << "\n"
end
hosts = @http_hosts.clone
- make_request(documents, hosts, @http_query, 1, 1, hosts.sample)
+ documents.each do |table_name, document|
+ make_request(document, hosts, generate_http_query(table_name), 1, 1, hosts.sample)
+ end
end
private
def save_to_disk(documents)