Sha256: c496d5a0173e5d048680aca2c2c6fd8e494b653116268189eb1e813595fdc9bb

Contents?: true

Size: 1.95 KB

Versions: 5

Compression:

Stored size: 1.95 KB

Contents

require 'avro'

Settings.define :cassandra_avro_schema, :default => ('/usr/local/share/cassandra/interface/avro/cassandra.avpr')
module Wukong::Store::CassandraModel

  #
  # Store model using avro writer
  #
  def streaming_save
    self.class.streaming_insert id, self
  end
  module ClassMethods

    def streaming_writer
      @streaming_writer ||= AvroWriter.new
    end

    #
    # Use avro and stream into cassandra
    #
    def streaming_insert id, hsh
      streaming_writer.put(id.to_s, hsh.to_db_hash)
    end
  end
  class AvroWriter
    #
    # Reads in the protocol schema
    # creates the necessary encoder and writer.
    #
    def initialize
      schema_file = Settings.cassandra_avro_schema
      @proto  = Avro::Protocol.parse(File.read(schema_file))
      @schema = @proto.types.detect{|schema| schema.name == 'StreamingMutation'}
      @enc    = Avro::IO::BinaryEncoder.new($stdout)
      # @enc    = DummyEncoder.new($stdout)
      @writer = Avro::IO::DatumWriter.new(@schema)
      # warn [@schema, @enc].inspect
    end

    def write key, col_name, value
      @writer.write(smutation(key, col_name, value), @enc)
    end

    def write_directly key, col_name, value, timestamp, ttl
      # Log.info "Insert(row_key => #{key}, col_name => #{col_name}, value => #{value}"
      @enc.write_bytes(key)
      @enc.write_bytes(col_name)
      @enc.write_bytes(value)
      @enc.write_long(timestamp)
      @enc.write_int(ttl)
    end

    #
    # Iterate through each key value pair in the hash to
    # be inserted and write directly one at a time
    #
    def put id, hsh, timestamp=nil, ttl=0
      timestamp ||= Time.now.to_i
      hsh.each do |attr, val|
        write_directly(id, attr, val, timestamp, ttl)
      end
    end

    def smutation key, name, value
      {
        'key'       => key,
        'name'      => name.to_s,
        'value'     => value.to_s,
        'timestamp' => Time.epoch_microseconds,
        'ttl'       => 0
      }
    end
  end

end

Version data entries

5 entries across 5 versions & 1 rubygems

Version Path
wukong-3.0.0.pre old/wukong/store/cassandra/streaming.rb
wukong-2.0.2 lib/wukong/store/cassandra/streaming.rb
wukong-2.0.1 lib/wukong/store/cassandra/streaming.rb
wukong-2.0.0 lib/wukong/store/cassandra/streaming.rb
wukong-1.5.4 lib/wukong/store/cassandra/streaming.rb