Sha256: f4ebffa5be413ea843dddf3c215e6f99e8147d76659172c1814f725ce98cab57

Contents?: true

Size: 1.25 KB

Versions: 10

Compression:

Stored size: 1.25 KB

Contents

require "keen/async/storage/redis_handler"


module Keen

  module Async

    # How many events should we send over the wire at a time?
    BATCH_SIZE = 100
    SSL_CA_FILE = File.dirname(__FILE__) + '../../../conf/cacert.pem'
     
    class Worker

      def initialize(client)
        @client = client
        @storage_handler = client.storage_handler
      end

      def batch_url(project_id)
        if not project_id
          raise "Missing project_id."
        end
        "https://api.keen.io/2.0/projects/#{project_id}/_events"
      end

      def process_queue
        queue_length = @storage_handler.count_active_queue

        batch_size = Keen::Async::BATCH_SIZE


        events = []


        responses = []

        num_batches = queue_length / batch_size + 1
        num_batches.times do

          job_definitions = @storage_handler.get_authorized_jobs(batch_size, @client)

          job_definitions.each do |job_definition|
            #puts JSON.generate job_definition
            job = Keen::Async::Job.new(@client, job_definition)
            events.push Keen::Event.new(job.timestamp, job.collection_name, job.event_body)
          end

          responses.push @client.send_batch(events)
        end


        responses

      end

    end
  end

end

Version data entries

10 entries across 10 versions & 1 rubygems

Version Path
keen-0.1.11 lib/keen/async/worker.rb
keen-0.1.10 lib/keen/async/worker.rb
keen-0.1.9 lib/keen/async/worker.rb
keen-0.1.8 lib/keen/async/worker.rb
keen-0.1.7 lib/keen/async/worker.rb
keen-0.1.6 lib/keen/async/worker.rb
keen-0.1.5 lib/keen/async/worker.rb
keen-0.1.4 lib/keen/async/worker.rb
keen-0.1.3 lib/keen/async/worker.rb
keen-0.1.0 lib/keen/async/worker.rb