Sha256: a906340d83bbcfa898671c0df1c6bbb7f97756af27d9e06f8f222077f510e101

Contents?: true

Size: 1.59 KB

Versions: 8

Compression:

Stored size: 1.59 KB

Contents

# Defines a base class for streaming data into a cassandra db connection.
require 'cassandra' ; include Cassandra::Constants
module Wukong
  module Streamer

    class CassandraStreamer < Wukong::Streamer::Base
      attr_accessor :batch_count, :batch_record_count, :batch_size, :column_space, :db_seeds, :cassandra_db

      def initialize *args
        super *args
        self.batch_count = 0
        self.batch_record_count = 0
        self.column_space ||= 'Twitter'
        self.batch_size   ||= 100
        self.db_seeds     ||= %w[10.244.191.178 10.243.19.223 10.243.17.219 10.245.70.85 10.244.206.241].map{ |s| s.to_s+':9160'}
        self.cassandra_db ||= Cassandra.new(self.column_space, self.db_seeds)
      end

      def stream
        while still_lines? do
          start_batch do
            while still_lines? && batch_not_full? do
              line = get_line
              record = recordize(line.chomp) or next
              next if record.blank?
              process(*record) do |output_record|
                emit output_record
              end
              self.batch_record_count += 1
            end
          end
        end
      end

      def process *args, &blk
        Raise "Overwrite this method to insert into cassandra db"
      end

      def start_batch &blk
        self.batch_record_count = 0
        self.batch_count += 1
        self.cassandra_db.batch(&blk)
      end

      def get_line
        $stdin.gets
      end

      def still_lines?
        !$stdin.eof?
      end

      def batch_not_full?
        self.batch_record_count < self.batch_size
      end

    end
  end

end

Version data entries

8 entries across 8 versions & 1 rubygems

Version Path
wukong-1.5.4 lib/wukong/streamer/cassandra_streamer.rb
wukong-1.5.3 lib/wukong/streamer/cassandra_streamer.rb
wukong-1.5.2 lib/wukong/streamer/cassandra_streamer.rb
wukong-1.5.1 lib/wukong/streamer/cassandra_streamer.rb
wukong-1.5.0 lib/wukong/streamer/cassandra_streamer.rb
wukong-1.4.12 lib/wukong/streamer/cassandra_streamer.rb
wukong-1.4.11 lib/wukong/streamer/cassandra_streamer.rb
wukong-1.4.10 lib/wukong/streamer/cassandra_streamer.rb