lib/fluent/plugin/out_azureeventhubs_buffered.rb in sk-fluent-plugin-azureeventhubs-0.0.18 vs lib/fluent/plugin/out_azureeventhubs_buffered.rb in sk-fluent-plugin-azureeventhubs-0.0.19
- old
+ new
@@ -43,18 +43,12 @@
end
def configure(conf)
compat_parameters_convert(conf, :buffer, :inject)
super
- case @type
- when 'amqps'
- raise NotImplementedError
- else
require_relative 'azureeventhubs/http'
@sender = AzureEventHubsHttpSender.new(@connection_string, @hub_name, @expiry_interval,@proxy_addr,@proxy_port,@open_timeout,@read_timeout)
- end
- # raise Fluent::ConfigError, "'tag' in chunk_keys is required." if not @chunk_key_tag
end
def format(tag, time, record)
record = inject_values_to_record(tag, time, record)
[tag, time, record].to_msgpack
@@ -62,19 +56,18 @@
def formatted_to_msgpack_binary?
true
end
+ def multi_workers_ready?
+ true
+ end
+
def write(chunk)
chunk.msgpack_each { |tag, time, record|
-
- if @include_tag
- record['tag'] = tag
- end
- if @include_time
- record[@tag_time_name] = time
- end
- @sender.send_w_properties(record, @message_properties)
+ records = []
+ records.push(record)
+ @sender.send_w_properties(records, @message_properties)
}
end
end
end
\ No newline at end of file