lib/xcflushd/runner.rb in xcflushd-1.0.0 vs lib/xcflushd/runner.rb in xcflushd-1.1.0

- old
+ new

@@ -1,9 +1,10 @@ require 'xcflushd' require 'redis' require '3scale_client' require 'xcflushd/3scale_client_ext' +require 'xcflushd/threading' module Xcflushd class Runner class << self # Amount of time to wait before retrying the subscription to the @@ -25,11 +26,13 @@ @max_term_wait = opts[:max_term_wait] || DEFAULT_MAX_TERM_WAIT @logger = Logger.new(STDOUT) redis_host = opts[:redis].host redis_port = opts[:redis].port - redis = Redis.new(host: redis_host, port: redis_port, driver: :hiredis) + redis_driver = RUBY_ENGINE == 'jruby' ? :ruby : :hiredis + + redis = Redis.new(host: redis_host, port: redis_port, driver: redis_driver) storage = Storage.new(redis, @logger, StorageKeys) threescale = ThreeScale::Client.new(provider_key: opts[:provider_key], host: opts[:backend].host, port: opts[:backend].port || @@ -37,29 +40,38 @@ secure: opts[:secure], persistent: true) reporter = Reporter.new(threescale) authorizer = Authorizer.new(threescale) - redis_pub = Redis.new(host: redis_host, port: redis_port, driver: :hiredis) - redis_sub = Redis.new(host: redis_host, port: redis_port, driver: :hiredis) + redis_pub = Redis.new(host: redis_host, port: redis_port, driver: redis_driver) + redis_sub = Redis.new(host: redis_host, port: redis_port, driver: redis_driver) auth_ttl = opts[:auth_ttl] error_handler = FlusherErrorHandler.new(@logger, storage) @flusher = Flusher.new(reporter, authorizer, storage, - auth_ttl, error_handler, opts[:threads]) + auth_ttl, error_handler, @logger, + flusher_threads(opts[:threads])) @prio_auth_renewer = PriorityAuthRenewer.new(authorizer, storage, redis_pub, redis_sub, auth_ttl, @logger, - opts[:prio_threads]) + prio_threads(opts[:prio_threads])) @prio_auth_renewer_thread = start_priority_auth_renewer flush_periodically(opts[:frequency]) end private + + def flusher_threads(opts_threads) + opts_threads ? opts_threads.max : Threading.default_threads + end + + def prio_threads(opts_prio_threads) + opts_prio_threads ? opts_prio_threads.max : Threading.default_threads + end def start_priority_auth_renewer Thread.new do loop do break if @exit