lib/tail.rb in streamdal-0.0.1 vs lib/tail.rb in streamdal-0.0.2

- old
+ new

@@ -1,7 +1,6 @@ -# TODO: implement token bucket limiter -require "bozos_buckets" +require 'bozos_buckets' NUM_TAIL_WORKERS = 2 MIN_TAIL_RESPONSE_INTERVAL_MS = 100 module Streamdal @@ -13,22 +12,23 @@ @streamdal_url = streamdal_url @auth_token = auth_token @logger = log @metrics = metrics @active = active - @last_msg = Time::at(0) + @last_msg = Time.at(0) @queue = Queue.new @workers = [] # Only use rate limiting if sample_options is set - unless request.sample_options.nil? - @limiter = BozosBuckets::Bucket.new( - initial_token_count: request.sample_options.sample_rate, - refill_rate: request.sample_options.sample_interval_seconds, - max_token_count: request.sample_options.sample_rate - ) - end + return if request.sample_options.nil? + + @limiter = BozosBuckets::Bucket.new( + initial_token_count: request.sample_options.sample_rate, + refill_rate: request.sample_options.sample_interval_seconds, + max_token_count: request.sample_options.sample_rate + ) + end def start_tail_workers NUM_TAIL_WORKERS.times do |worker_id| @workers << Thread.new { start_tail_worker(worker_id + 1) } @@ -41,15 +41,12 @@ @active = false sleep(1) @workers.each do |worker| - if worker.alive? - worker.exit - end + worker.exit if worker.alive? end - end def start_tail_worker(worker_id) @logger.debug("Starting tail worker #{worker_id}") @@ -61,37 +58,34 @@ if @queue.empty? sleep(0.1) next end - if Time::now - @last_msg < MIN_TAIL_RESPONSE_INTERVAL_MS + if Time.now - @last_msg < MIN_TAIL_RESPONSE_INTERVAL_MS sleep(MIN_TAIL_RESPONSE_INTERVAL_MS) @metrics.incr(Metrics::CounterEntry.new(COUNTER_DROPPED_TAIL_MESSAGES, nil, {}, 1)) @logger.debug("Dropped tail message for '#{@request.id}' due to rate limiting") next end - unless stub.nil? - tail_response = @queue.pop(non_block = false) - @logger.debug("Sending tail request for '#{tail_response.tail_request_id}'") + next if stub.nil? - begin - stub.send_tail([tail_response], metadata: { "auth-token" => @auth_token }) - rescue => e - @logger.error("Error sending tail request: #{e}") - end + tail_response = @queue.pop(false) + @logger.debug("Sending tail request for '#{tail_response.tail_request_id}'") + + begin + stub.send_tail([tail_response], metadata: { 'auth-token' => @auth_token }) + rescue Error => e + @logger.error("Error sending tail request: #{e}") end end @logger.debug "Tail worker #{worker_id} exited" - end def should_send - if @limiter.nil? - true - end + true if @limiter.nil? @limiter.use_tokens(1) end end -end \ No newline at end of file +end