lib/xcflushd/runner.rb in xcflushd-1.0.0 vs lib/xcflushd/runner.rb in xcflushd-1.1.0
- old
+ new
@@ -1,9 +1,10 @@
require 'xcflushd'
require 'redis'
require '3scale_client'
require 'xcflushd/3scale_client_ext'
+require 'xcflushd/threading'
module Xcflushd
class Runner
class << self
# Amount of time to wait before retrying the subscription to the
@@ -25,11 +26,13 @@
@max_term_wait = opts[:max_term_wait] || DEFAULT_MAX_TERM_WAIT
@logger = Logger.new(STDOUT)
redis_host = opts[:redis].host
redis_port = opts[:redis].port
- redis = Redis.new(host: redis_host, port: redis_port, driver: :hiredis)
+ redis_driver = RUBY_ENGINE == 'jruby' ? :ruby : :hiredis
+
+ redis = Redis.new(host: redis_host, port: redis_port, driver: redis_driver)
storage = Storage.new(redis, @logger, StorageKeys)
threescale = ThreeScale::Client.new(provider_key: opts[:provider_key],
host: opts[:backend].host,
port: opts[:backend].port ||
@@ -37,29 +40,38 @@
secure: opts[:secure],
persistent: true)
reporter = Reporter.new(threescale)
authorizer = Authorizer.new(threescale)
- redis_pub = Redis.new(host: redis_host, port: redis_port, driver: :hiredis)
- redis_sub = Redis.new(host: redis_host, port: redis_port, driver: :hiredis)
+ redis_pub = Redis.new(host: redis_host, port: redis_port, driver: redis_driver)
+ redis_sub = Redis.new(host: redis_host, port: redis_port, driver: redis_driver)
auth_ttl = opts[:auth_ttl]
error_handler = FlusherErrorHandler.new(@logger, storage)
@flusher = Flusher.new(reporter, authorizer, storage,
- auth_ttl, error_handler, opts[:threads])
+ auth_ttl, error_handler, @logger,
+ flusher_threads(opts[:threads]))
@prio_auth_renewer = PriorityAuthRenewer.new(authorizer, storage,
redis_pub, redis_sub,
auth_ttl, @logger,
- opts[:prio_threads])
+ prio_threads(opts[:prio_threads]))
@prio_auth_renewer_thread = start_priority_auth_renewer
flush_periodically(opts[:frequency])
end
private
+
+ def flusher_threads(opts_threads)
+ opts_threads ? opts_threads.max : Threading.default_threads
+ end
+
+ def prio_threads(opts_prio_threads)
+ opts_prio_threads ? opts_prio_threads.max : Threading.default_threads
+ end
def start_priority_auth_renewer
Thread.new do
loop do
break if @exit