Sha256: 0b9580a07a2287531b4ba4f26838875e2fa9b7d5d4ae905bbfb3a5b666c1c51f
Contents?: true
Size: 1.92 KB
Versions: 2
Compression:
Stored size: 1.92 KB
Contents
require "keen/async/storage/redis_handler" require "net/http" require "net/https" module Keen module Async # How many events should we send over the wire at a time? BATCH_SIZE = 100 SSL_CA_FILE = File.dirname(__FILE__) + '../../../conf/cacert.pem' class Worker def initialize(handler) @handler = handler end def batch_url(project_id) if not project_id raise "Missing project_id." end "https://api.keen.io/1.0/projects/#{project_id}/_events" end def process_queue queue_length = @handler.count_active_queue batch_size = Keen::Async::BATCH_SIZE num_batches = queue_length / batch_size num_batches.times do collated = @handler.get_collated_jobs(batch_size) collated.each do |project_id, batch| send_batch(project_id, batch) end end end def send_batch(project_id, batch) if not batch return end first_key = batch.keys[0] job_list = batch[first_key] auth_token = job_list[0].auth_token uri = URI.parse(batch_url(project_id)) http = Net::HTTP.new(uri.host, uri.port) http.use_ssl = true http.ca_file = Keen::Async::SSL_CA_FILE http.verify_mode = OpenSSL::SSL::VERIFY_PEER http.verify_depth = 5 request = Net::HTTP::Post.new(uri.path) request.body = batch.to_json request["Content-Type"] = "application/json" request["Authorization"] = auth_token response = http.request(request) #response = Net::HTTP.start(uri.host, uri.port) {|http| #http.request(request) #} puts response # TODO: If something fails, we should move the job to the # prior_failures queue by calling, perhaps: # # @handler.log_failed_job(job) end end end end
Version data entries
2 entries across 2 versions & 1 rubygems
Version | Path |
---|---|
keen-0.0.5 | lib/keen/async/worker.rb |
keen-0.0.4 | lib/keen/async/worker.rb |