Sha256: ce443c9f9266687936856ad4751cbe4047d93ac533cc503b5270d1b39a7771c0

Contents?: true

Size: 1.31 KB

Versions: 2

Compression:

Stored size: 1.31 KB

Contents

require "keen/async/storage/redis_handler"


module Keen

  module Async

    # How many events should we send over the wire at a time?
    BATCH_SIZE = 100
    SSL_CA_FILE = File.dirname(__FILE__) + '../../../conf/cacert.pem'
     
    class Worker

      def initialize(client)
        @client = client
        @storage_handler = client.storage_handler
      end

      def batch_url(project_id)
        if not project_id
          raise "Missing project_id."
        end
        "https://api.keen.io/2.0/projects/#{project_id}/_events"
      end

      def process_queue
        queue_length = @storage_handler.count_active_queue

        batch_size = Keen::Async::BATCH_SIZE


        events = []


        responses = []

        num_batches = queue_length / batch_size + 1
        num_batches.times do

          job_definitions = @storage_handler.get_authorized_jobs(batch_size, @client)

          job_definitions.each do |job_definition|
            #puts JSON.generate job_definition
            job = Keen::Async::Job.new(@client, job_definition)
            events.push Keen::Event.new(job.timestamp, job.collection_name, job.event_body)
          end

          responses.push @client.send_batch(events)
        end

        if @client.logging
          puts responses
        end

        responses

      end

    end
  end

end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
keen-0.2.0 lib/keen/async/worker.rb
keen-0.1.12 lib/keen/async/worker.rb